{-# LANGUAGE OverloadedStrings #-}
module GHC.Eventlog.Live.Machine.Core (
Tick (..),
batchByTick,
batchToTick,
batchListToTick,
batchByTickList,
liftTick,
dropTick,
onlyTick,
liftBatch,
counterBy,
counterByTick,
liftRouter,
sortByBatch,
sortByBatchTick,
between,
delimit,
validateInput,
validateOrder,
) where
import Control.Monad (when)
import Control.Monad.IO.Class (MonadIO (..))
import Data.DList qualified as D
import Data.Foldable (for_)
import Data.Function (on)
import Data.Functor ((<&>))
import Data.HashMap.Strict (HashMap)
import Data.HashMap.Strict qualified as M
import Data.Hashable (Hashable (..))
import Data.List qualified as L
import Data.Machine (Is (..), MachineT (..), Moore (..), PlanT, Process, ProcessT, Step (..), await, construct, encased, mapping, repeatedly, starve, stopped, yield, (~>))
import Data.Maybe (fromMaybe)
import Data.Semigroup (Max (..))
import Data.Text (Text)
import Data.Text qualified as T
import GHC.Eventlog.Live.Internal.Logger (logDebug, logError, logWarning)
import GHC.Eventlog.Live.Verbosity (Verbosity, verbosityDebug, verbosityError, verbosityWarning)
import GHC.RTS.Events (Event (..), Timestamp)
import GHC.RTS.Events qualified as E
import Text.Printf (printf)
data Tick a = Item !a | Tick
deriving (Tick a -> Tick a -> Bool
(Tick a -> Tick a -> Bool)
-> (Tick a -> Tick a -> Bool) -> Eq (Tick a)
forall a. Eq a => Tick a -> Tick a -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: forall a. Eq a => Tick a -> Tick a -> Bool
== :: Tick a -> Tick a -> Bool
$c/= :: forall a. Eq a => Tick a -> Tick a -> Bool
/= :: Tick a -> Tick a -> Bool
Eq, (forall a b. (a -> b) -> Tick a -> Tick b)
-> (forall a b. a -> Tick b -> Tick a) -> Functor Tick
forall a b. a -> Tick b -> Tick a
forall a b. (a -> b) -> Tick a -> Tick b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
$cfmap :: forall a b. (a -> b) -> Tick a -> Tick b
fmap :: forall a b. (a -> b) -> Tick a -> Tick b
$c<$ :: forall a b. a -> Tick b -> Tick a
<$ :: forall a b. a -> Tick b -> Tick a
Functor, (forall m. Monoid m => Tick m -> m)
-> (forall m a. Monoid m => (a -> m) -> Tick a -> m)
-> (forall m a. Monoid m => (a -> m) -> Tick a -> m)
-> (forall a b. (a -> b -> b) -> b -> Tick a -> b)
-> (forall a b. (a -> b -> b) -> b -> Tick a -> b)
-> (forall b a. (b -> a -> b) -> b -> Tick a -> b)
-> (forall b a. (b -> a -> b) -> b -> Tick a -> b)
-> (forall a. (a -> a -> a) -> Tick a -> a)
-> (forall a. (a -> a -> a) -> Tick a -> a)
-> (forall a. Tick a -> [a])
-> (forall a. Tick a -> Bool)
-> (forall a. Tick a -> Int)
-> (forall a. Eq a => a -> Tick a -> Bool)
-> (forall a. Ord a => Tick a -> a)
-> (forall a. Ord a => Tick a -> a)
-> (forall a. Num a => Tick a -> a)
-> (forall a. Num a => Tick a -> a)
-> Foldable Tick
forall a. Eq a => a -> Tick a -> Bool
forall a. Num a => Tick a -> a
forall a. Ord a => Tick a -> a
forall m. Monoid m => Tick m -> m
forall a. Tick a -> Bool
forall a. Tick a -> Int
forall a. Tick a -> [a]
forall a. (a -> a -> a) -> Tick a -> a
forall m a. Monoid m => (a -> m) -> Tick a -> m
forall b a. (b -> a -> b) -> b -> Tick a -> b
forall a b. (a -> b -> b) -> b -> Tick a -> b
forall (t :: * -> *).
(forall m. Monoid m => t m -> m)
-> (forall m a. Monoid m => (a -> m) -> t a -> m)
-> (forall m a. Monoid m => (a -> m) -> t a -> m)
-> (forall a b. (a -> b -> b) -> b -> t a -> b)
-> (forall a b. (a -> b -> b) -> b -> t a -> b)
-> (forall b a. (b -> a -> b) -> b -> t a -> b)
-> (forall b a. (b -> a -> b) -> b -> t a -> b)
-> (forall a. (a -> a -> a) -> t a -> a)
-> (forall a. (a -> a -> a) -> t a -> a)
-> (forall a. t a -> [a])
-> (forall a. t a -> Bool)
-> (forall a. t a -> Int)
-> (forall a. Eq a => a -> t a -> Bool)
-> (forall a. Ord a => t a -> a)
-> (forall a. Ord a => t a -> a)
-> (forall a. Num a => t a -> a)
-> (forall a. Num a => t a -> a)
-> Foldable t
$cfold :: forall m. Monoid m => Tick m -> m
fold :: forall m. Monoid m => Tick m -> m
$cfoldMap :: forall m a. Monoid m => (a -> m) -> Tick a -> m
foldMap :: forall m a. Monoid m => (a -> m) -> Tick a -> m
$cfoldMap' :: forall m a. Monoid m => (a -> m) -> Tick a -> m
foldMap' :: forall m a. Monoid m => (a -> m) -> Tick a -> m
$cfoldr :: forall a b. (a -> b -> b) -> b -> Tick a -> b
foldr :: forall a b. (a -> b -> b) -> b -> Tick a -> b
$cfoldr' :: forall a b. (a -> b -> b) -> b -> Tick a -> b
foldr' :: forall a b. (a -> b -> b) -> b -> Tick a -> b
$cfoldl :: forall b a. (b -> a -> b) -> b -> Tick a -> b
foldl :: forall b a. (b -> a -> b) -> b -> Tick a -> b
$cfoldl' :: forall b a. (b -> a -> b) -> b -> Tick a -> b
foldl' :: forall b a. (b -> a -> b) -> b -> Tick a -> b
$cfoldr1 :: forall a. (a -> a -> a) -> Tick a -> a
foldr1 :: forall a. (a -> a -> a) -> Tick a -> a
$cfoldl1 :: forall a. (a -> a -> a) -> Tick a -> a
foldl1 :: forall a. (a -> a -> a) -> Tick a -> a
$ctoList :: forall a. Tick a -> [a]
toList :: forall a. Tick a -> [a]
$cnull :: forall a. Tick a -> Bool
null :: forall a. Tick a -> Bool
$clength :: forall a. Tick a -> Int
length :: forall a. Tick a -> Int
$celem :: forall a. Eq a => a -> Tick a -> Bool
elem :: forall a. Eq a => a -> Tick a -> Bool
$cmaximum :: forall a. Ord a => Tick a -> a
maximum :: forall a. Ord a => Tick a -> a
$cminimum :: forall a. Ord a => Tick a -> a
minimum :: forall a. Ord a => Tick a -> a
$csum :: forall a. Num a => Tick a -> a
sum :: forall a. Num a => Tick a -> a
$cproduct :: forall a. Num a => Tick a -> a
product :: forall a. Num a => Tick a -> a
Foldable, Functor Tick
Foldable Tick
(Functor Tick, Foldable Tick) =>
(forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Tick a -> f (Tick b))
-> (forall (f :: * -> *) a.
Applicative f =>
Tick (f a) -> f (Tick a))
-> (forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Tick a -> m (Tick b))
-> (forall (m :: * -> *) a. Monad m => Tick (m a) -> m (Tick a))
-> Traversable Tick
forall (t :: * -> *).
(Functor t, Foldable t) =>
(forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> t a -> f (t b))
-> (forall (f :: * -> *) a. Applicative f => t (f a) -> f (t a))
-> (forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> t a -> m (t b))
-> (forall (m :: * -> *) a. Monad m => t (m a) -> m (t a))
-> Traversable t
forall (m :: * -> *) a. Monad m => Tick (m a) -> m (Tick a)
forall (f :: * -> *) a. Applicative f => Tick (f a) -> f (Tick a)
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Tick a -> m (Tick b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Tick a -> f (Tick b)
$ctraverse :: forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Tick a -> f (Tick b)
traverse :: forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Tick a -> f (Tick b)
$csequenceA :: forall (f :: * -> *) a. Applicative f => Tick (f a) -> f (Tick a)
sequenceA :: forall (f :: * -> *) a. Applicative f => Tick (f a) -> f (Tick a)
$cmapM :: forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Tick a -> m (Tick b)
mapM :: forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Tick a -> m (Tick b)
$csequence :: forall (m :: * -> *) a. Monad m => Tick (m a) -> m (Tick a)
sequence :: forall (m :: * -> *) a. Monad m => Tick (m a) -> m (Tick a)
Traversable, Int -> Tick a -> ShowS
[Tick a] -> ShowS
Tick a -> String
(Int -> Tick a -> ShowS)
-> (Tick a -> String) -> ([Tick a] -> ShowS) -> Show (Tick a)
forall a. Show a => Int -> Tick a -> ShowS
forall a. Show a => [Tick a] -> ShowS
forall a. Show a => Tick a -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: forall a. Show a => Int -> Tick a -> ShowS
showsPrec :: Int -> Tick a -> ShowS
$cshow :: forall a. Show a => Tick a -> String
show :: Tick a -> String
$cshowList :: forall a. Show a => [Tick a] -> ShowS
showList :: [Tick a] -> ShowS
Show)
batchByTickList :: Process (Tick a) [a]
batchByTickList :: forall a (m :: * -> *). Monad m => MachineT m (Is (Tick a)) [a]
batchByTickList =
(Tick a -> Tick (DList a))
-> Machine (Is (Tick a)) (Tick (DList a))
forall (k :: * -> * -> *) a b.
Category k =>
(a -> b) -> Machine (k a) b
mapping ((a -> DList a) -> Tick a -> Tick (DList a)
forall a b. (a -> b) -> Tick a -> Tick b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> DList a
forall a. a -> DList a
D.singleton)
MachineT m (Is (Tick a)) (Tick (DList a))
-> ProcessT m (Tick (DList a)) (DList a)
-> MachineT m (Is (Tick a)) (DList a)
forall (m :: * -> *) (k :: * -> *) b c.
Monad m =>
MachineT m k b -> ProcessT m b c -> MachineT m k c
~> ProcessT m (Tick (DList a)) (DList a)
forall (m :: * -> *) a.
(Monad m, Monoid a) =>
ProcessT m (Tick a) a
batchByTick
MachineT m (Is (Tick a)) (DList a)
-> ProcessT m (DList a) [a] -> MachineT m (Is (Tick a)) [a]
forall (m :: * -> *) (k :: * -> *) b c.
Monad m =>
MachineT m k b -> ProcessT m b c -> MachineT m k c
~> (DList a -> [a]) -> Machine (Is (DList a)) [a]
forall (k :: * -> * -> *) a b.
Category k =>
(a -> b) -> Machine (k a) b
mapping DList a -> [a]
forall a. DList a -> [a]
D.toList
batchByTick ::
forall m a.
(Monad m, Monoid a) =>
ProcessT m (Tick a) a
batchByTick :: forall (m :: * -> *) a.
(Monad m, Monoid a) =>
ProcessT m (Tick a) a
batchByTick = a -> MachineT m (Is (Tick a)) a
batchByTickWith a
forall a. Monoid a => a
mempty
where
batchByTickWith :: a -> MachineT m (Is (Tick a)) a
batchByTickWith :: a -> MachineT m (Is (Tick a)) a
batchByTickWith a
acc = m (Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a))
-> MachineT m (Is (Tick a)) a
forall (m :: * -> *) (k :: * -> *) o.
m (Step k o (MachineT m k o)) -> MachineT m k o
MachineT (m (Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a))
-> MachineT m (Is (Tick a)) a)
-> m (Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a))
-> MachineT m (Is (Tick a)) a
forall a b. (a -> b) -> a -> b
$ Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a)
-> m (Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a)
-> m (Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a)))
-> Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a)
-> m (Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a))
forall a b. (a -> b) -> a -> b
$ (Tick a -> MachineT m (Is (Tick a)) a)
-> Is (Tick a) (Tick a)
-> MachineT m (Is (Tick a)) a
-> Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a)
forall (k :: * -> *) o r t. (t -> r) -> k t -> r -> Step k o r
Await Tick a -> MachineT m (Is (Tick a)) a
onNext Is (Tick a) (Tick a)
forall a. Is a a
Refl MachineT m (Is (Tick a)) a
onStop
where
onNext :: Tick a -> MachineT m (Is (Tick a)) a
onNext :: Tick a -> MachineT m (Is (Tick a)) a
onNext = \case
Item a
a -> a -> MachineT m (Is (Tick a)) a
batchByTickWith (a
a a -> a -> a
forall a. Semigroup a => a -> a -> a
<> a
acc)
Tick a
Tick -> m (Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a))
-> MachineT m (Is (Tick a)) a
forall (m :: * -> *) (k :: * -> *) o.
m (Step k o (MachineT m k o)) -> MachineT m k o
MachineT (m (Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a))
-> MachineT m (Is (Tick a)) a)
-> m (Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a))
-> MachineT m (Is (Tick a)) a
forall a b. (a -> b) -> a -> b
$ Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a)
-> m (Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a)
-> m (Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a)))
-> Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a)
-> m (Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a))
forall a b. (a -> b) -> a -> b
$ a
-> MachineT m (Is (Tick a)) a
-> Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a)
forall (k :: * -> *) o r. o -> r -> Step k o r
Yield a
acc MachineT m (Is (Tick a)) a
forall (m :: * -> *) a.
(Monad m, Monoid a) =>
ProcessT m (Tick a) a
batchByTick
onStop :: MachineT m (Is (Tick a)) a
onStop :: MachineT m (Is (Tick a)) a
onStop = m (Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a))
-> MachineT m (Is (Tick a)) a
forall (m :: * -> *) (k :: * -> *) o.
m (Step k o (MachineT m k o)) -> MachineT m k o
MachineT (m (Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a))
-> MachineT m (Is (Tick a)) a)
-> m (Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a))
-> MachineT m (Is (Tick a)) a
forall a b. (a -> b) -> a -> b
$ Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a)
-> m (Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a)
-> m (Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a)))
-> Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a)
-> m (Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a))
forall a b. (a -> b) -> a -> b
$ a
-> MachineT m (Is (Tick a)) a
-> Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a)
forall (k :: * -> *) o r. o -> r -> Step k o r
Yield a
acc MachineT m (Is (Tick a)) a
forall (k :: * -> *) b (m :: * -> *). Monad m => MachineT m k b
stopped
batchListToTick :: Process [a] (Tick a)
batchListToTick :: forall a (m :: * -> *). Monad m => MachineT m (Is [a]) (Tick a)
batchListToTick = MachineT m (Is [a]) (Tick a)
Process [a] (Tick a)
forall (f :: * -> *) a. Foldable f => Process (f a) (Tick a)
batchToTick
batchToTick :: (Foldable f) => Process (f a) (Tick a)
batchToTick :: forall (f :: * -> *) a. Foldable f => Process (f a) (Tick a)
batchToTick = PlanT (Is (f a)) (Tick a) m () -> MachineT m (Is (f a)) (Tick a)
forall (m :: * -> *) (k :: * -> *) o a.
Monad m =>
PlanT k o m a -> MachineT m k o
repeatedly PlanT (Is (f a)) (Tick a) m ()
forall {a} {m :: * -> *}. PlanT (Is (f a)) (Tick a) m ()
go
where
go :: PlanT (Is (f a)) (Tick a) m ()
go = PlanT (Is (f a)) (Tick a) m (f a)
Plan (Is (f a)) (Tick a) (f a)
forall (k :: * -> * -> *) i o. Category k => Plan (k i) o i
await PlanT (Is (f a)) (Tick a) m (f a)
-> (f a -> PlanT (Is (f a)) (Tick a) m ())
-> PlanT (Is (f a)) (Tick a) m ()
forall a b.
PlanT (Is (f a)) (Tick a) m a
-> (a -> PlanT (Is (f a)) (Tick a) m b)
-> PlanT (Is (f a)) (Tick a) m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \f a
xs -> f a
-> (a -> PlanT (Is (f a)) (Tick a) m ())
-> PlanT (Is (f a)) (Tick a) m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ f a
xs (Tick a -> PlanT (Is (f a)) (Tick a) m ()
Tick a -> forall {m :: * -> *}. PlanT (Is (f a)) (Tick a) m ()
forall o (k :: * -> *). o -> Plan k o ()
yield (Tick a -> PlanT (Is (f a)) (Tick a) m ())
-> (a -> Tick a) -> a -> PlanT (Is (f a)) (Tick a) m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Tick a
forall a. a -> Tick a
Item) PlanT (Is (f a)) (Tick a) m ()
-> PlanT (Is (f a)) (Tick a) m () -> PlanT (Is (f a)) (Tick a) m ()
forall a b.
PlanT (Is (f a)) (Tick a) m a
-> PlanT (Is (f a)) (Tick a) m b -> PlanT (Is (f a)) (Tick a) m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Tick a -> forall {m :: * -> *}. PlanT (Is (f a)) (Tick a) m ()
forall o (k :: * -> *). o -> Plan k o ()
yield Tick a
forall a. Tick a
Tick
dropTick :: Process (Tick a) a
dropTick :: forall a (m :: * -> *). Monad m => MachineT m (Is (Tick a)) a
dropTick =
PlanT (Is (Tick a)) a m () -> MachineT m (Is (Tick a)) a
forall (m :: * -> *) (k :: * -> *) o a.
Monad m =>
PlanT k o m a -> MachineT m k o
repeatedly (PlanT (Is (Tick a)) a m () -> MachineT m (Is (Tick a)) a)
-> PlanT (Is (Tick a)) a m () -> MachineT m (Is (Tick a)) a
forall a b. (a -> b) -> a -> b
$
PlanT (Is (Tick a)) a m (Tick a)
Plan (Is (Tick a)) a (Tick a)
forall (k :: * -> * -> *) i o. Category k => Plan (k i) o i
await PlanT (Is (Tick a)) a m (Tick a)
-> (Tick a -> PlanT (Is (Tick a)) a m ())
-> PlanT (Is (Tick a)) a m ()
forall a b.
PlanT (Is (Tick a)) a m a
-> (a -> PlanT (Is (Tick a)) a m b) -> PlanT (Is (Tick a)) a m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Item a
a -> a -> Plan (Is (Tick a)) a ()
forall o (k :: * -> *). o -> Plan k o ()
yield a
a
Tick a
Tick -> () -> PlanT (Is (Tick a)) a m ()
forall a. a -> PlanT (Is (Tick a)) a m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
onlyTick :: Process (Tick a) ()
onlyTick :: forall a (m :: * -> *). Monad m => MachineT m (Is (Tick a)) ()
onlyTick =
PlanT (Is (Tick a)) () m () -> MachineT m (Is (Tick a)) ()
forall (m :: * -> *) (k :: * -> *) o a.
Monad m =>
PlanT k o m a -> MachineT m k o
repeatedly (PlanT (Is (Tick a)) () m () -> MachineT m (Is (Tick a)) ())
-> PlanT (Is (Tick a)) () m () -> MachineT m (Is (Tick a)) ()
forall a b. (a -> b) -> a -> b
$
PlanT (Is (Tick a)) () m (Tick a)
Plan (Is (Tick a)) () (Tick a)
forall (k :: * -> * -> *) i o. Category k => Plan (k i) o i
await PlanT (Is (Tick a)) () m (Tick a)
-> (Tick a -> PlanT (Is (Tick a)) () m ())
-> PlanT (Is (Tick a)) () m ()
forall a b.
PlanT (Is (Tick a)) () m a
-> (a -> PlanT (Is (Tick a)) () m b) -> PlanT (Is (Tick a)) () m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Tick a
Tick -> () -> Plan (Is (Tick a)) () ()
forall o (k :: * -> *). o -> Plan k o ()
yield ()
Item{} -> () -> PlanT (Is (Tick a)) () m ()
forall a. a -> PlanT (Is (Tick a)) () m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
counterBy ::
forall m a x.
(MonadIO m) =>
Verbosity ->
Text ->
(a -> Word) ->
ProcessT m a x
counterBy :: forall (m :: * -> *) a x.
MonadIO m =>
Verbosity -> Text -> (a -> Word) -> ProcessT m a x
counterBy Verbosity
verbosity Text
label a -> Word
count
| Verbosity
verbosityDebug Verbosity -> Verbosity -> Bool
forall a. Ord a => a -> a -> Bool
>= Verbosity
verbosity = PlanT (Is a) x m () -> MachineT m (Is a) x
forall (m :: * -> *) (k :: * -> *) o a.
Monad m =>
PlanT k o m a -> MachineT m k o
repeatedly PlanT (Is a) x m ()
go
| Bool
otherwise = MachineT m (Is a) x
forall (k :: * -> *) b (m :: * -> *). Monad m => MachineT m k b
stopped
where
go :: PlanT (Is a) x m ()
go :: PlanT (Is a) x m ()
go =
PlanT (Is a) x m a
Plan (Is a) x a
forall (k :: * -> * -> *) i o. Category k => Plan (k i) o i
await PlanT (Is a) x m a
-> (a -> PlanT (Is a) x m ()) -> PlanT (Is a) x m ()
forall a b.
PlanT (Is a) x m a
-> (a -> PlanT (Is a) x m b) -> PlanT (Is a) x m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \a
a ->
Verbosity -> Text -> Text -> PlanT (Is a) x m ()
forall (m :: * -> *).
MonadIO m =>
Verbosity -> Text -> Text -> m ()
logDebug Verbosity
verbosity Text
"counterBy" (Text
"saw " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (Word -> String
forall a. Show a => a -> String
show (a -> Word
count a
a)) Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
label)
counterByTick ::
forall m a x.
(MonadIO m) =>
Verbosity ->
Text ->
ProcessT m (Tick a) x
counterByTick :: forall (m :: * -> *) a x.
MonadIO m =>
Verbosity -> Text -> ProcessT m (Tick a) x
counterByTick Verbosity
verbosity Text
label
| Verbosity
verbosityDebug Verbosity -> Verbosity -> Bool
forall a. Ord a => a -> a -> Bool
>= Verbosity
verbosity = PlanT (Is (Tick a)) x m () -> MachineT m (Is (Tick a)) x
forall (m :: * -> *) (k :: * -> *) o a.
Monad m =>
PlanT k o m a -> MachineT m k o
construct (PlanT (Is (Tick a)) x m () -> MachineT m (Is (Tick a)) x)
-> PlanT (Is (Tick a)) x m () -> MachineT m (Is (Tick a)) x
forall a b. (a -> b) -> a -> b
$ Word -> PlanT (Is (Tick a)) x m ()
go Word
0
| Bool
otherwise = MachineT m (Is (Tick a)) x
forall (k :: * -> *) b (m :: * -> *). Monad m => MachineT m k b
stopped
where
go :: Word -> PlanT (Is (Tick a)) x m ()
go :: Word -> PlanT (Is (Tick a)) x m ()
go Word
count =
PlanT (Is (Tick a)) x m (Tick a)
Plan (Is (Tick a)) x (Tick a)
forall (k :: * -> * -> *) i o. Category k => Plan (k i) o i
await PlanT (Is (Tick a)) x m (Tick a)
-> (Tick a -> PlanT (Is (Tick a)) x m ())
-> PlanT (Is (Tick a)) x m ()
forall a b.
PlanT (Is (Tick a)) x m a
-> (a -> PlanT (Is (Tick a)) x m b) -> PlanT (Is (Tick a)) x m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Item a
_ -> Word -> PlanT (Is (Tick a)) x m ()
go (Word
count Word -> Word -> Word
forall a. Num a => a -> a -> a
+ Word
1)
Tick a
Tick -> Verbosity -> Text -> Text -> PlanT (Is (Tick a)) x m ()
forall (m :: * -> *).
MonadIO m =>
Verbosity -> Text -> Text -> m ()
logDebug Verbosity
verbosity Text
"counterByTick" (Text
"saw " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (Word -> String
forall a. Show a => a -> String
show Word
count) Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
label) PlanT (Is (Tick a)) x m ()
-> PlanT (Is (Tick a)) x m () -> PlanT (Is (Tick a)) x m ()
forall a b.
PlanT (Is (Tick a)) x m a
-> PlanT (Is (Tick a)) x m b -> PlanT (Is (Tick a)) x m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Word -> PlanT (Is (Tick a)) x m ()
go Word
0
liftTick ::
(Monad m) =>
ProcessT m a b ->
ProcessT m (Tick a) (Tick b)
liftTick :: forall (m :: * -> *) a b.
Monad m =>
ProcessT m a b -> ProcessT m (Tick a) (Tick b)
liftTick ProcessT m a b
m =
m (Step (Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b)))
-> MachineT m (Is (Tick a)) (Tick b)
forall (m :: * -> *) (k :: * -> *) o.
m (Step k o (MachineT m k o)) -> MachineT m k o
MachineT (m (Step
(Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b)))
-> MachineT m (Is (Tick a)) (Tick b))
-> m (Step
(Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b)))
-> MachineT m (Is (Tick a)) (Tick b)
forall a b. (a -> b) -> a -> b
$
ProcessT m a b -> m (Step (Is a) b (ProcessT m a b))
forall (m :: * -> *) (k :: * -> *) o.
MachineT m k o -> m (Step k o (MachineT m k o))
runMachineT ProcessT m a b
m m (Step (Is a) b (ProcessT m a b))
-> (Step (Is a) b (ProcessT m a b)
-> Step (Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b)))
-> m (Step
(Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b)))
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \case
Step (Is a) b (ProcessT m a b)
Stop ->
Step (Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b))
forall (k :: * -> *) o r. Step k o r
Stop
Yield b
o ProcessT m a b
k ->
Tick b
-> MachineT m (Is (Tick a)) (Tick b)
-> Step (Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b))
forall (k :: * -> *) o r. o -> r -> Step k o r
Yield (b -> Tick b
forall a. a -> Tick a
Item b
o) (ProcessT m a b -> MachineT m (Is (Tick a)) (Tick b)
forall (m :: * -> *) a b.
Monad m =>
ProcessT m a b -> ProcessT m (Tick a) (Tick b)
liftTick ProcessT m a b
k)
Await (t -> ProcessT m a b
onNext :: t -> ProcessT m a b) Is a t
Refl ProcessT m a b
onStop ->
Step (Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b))
await'
where
await' :: Step (Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b))
await' = (Tick a -> MachineT m (Is (Tick a)) (Tick b))
-> Is (Tick a) (Tick a)
-> MachineT m (Is (Tick a)) (Tick b)
-> Step (Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b))
forall (k :: * -> *) o r t. (t -> r) -> k t -> r -> Step k o r
Await Tick a -> MachineT m (Is (Tick a)) (Tick b)
onNext' Is (Tick a) (Tick a)
forall a. Is a a
Refl MachineT m (Is (Tick a)) (Tick b)
onStop'
where
onNext' :: Tick a -> ProcessT m (Tick a) (Tick b)
onNext' :: Tick a -> MachineT m (Is (Tick a)) (Tick b)
onNext' = \case
Tick a
Tick ->
m (Step (Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b)))
-> MachineT m (Is (Tick a)) (Tick b)
forall (m :: * -> *) (k :: * -> *) o.
m (Step k o (MachineT m k o)) -> MachineT m k o
MachineT (m (Step
(Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b)))
-> MachineT m (Is (Tick a)) (Tick b))
-> (MachineT m (Is (Tick a)) (Tick b)
-> m (Step
(Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b))))
-> MachineT m (Is (Tick a)) (Tick b)
-> MachineT m (Is (Tick a)) (Tick b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Step (Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b))
-> m (Step
(Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b)))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Step (Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b))
-> m (Step
(Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b))))
-> (MachineT m (Is (Tick a)) (Tick b)
-> Step (Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b)))
-> MachineT m (Is (Tick a)) (Tick b)
-> m (Step
(Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b)))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Tick b
-> MachineT m (Is (Tick a)) (Tick b)
-> Step (Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b))
forall (k :: * -> *) o r. o -> r -> Step k o r
Yield Tick b
forall a. Tick a
Tick (MachineT m (Is (Tick a)) (Tick b)
-> MachineT m (Is (Tick a)) (Tick b))
-> MachineT m (Is (Tick a)) (Tick b)
-> MachineT m (Is (Tick a)) (Tick b)
forall a b. (a -> b) -> a -> b
$
m (Step (Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b)))
-> MachineT m (Is (Tick a)) (Tick b)
forall (m :: * -> *) (k :: * -> *) o.
m (Step k o (MachineT m k o)) -> MachineT m k o
MachineT (m (Step
(Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b)))
-> MachineT m (Is (Tick a)) (Tick b))
-> (Step (Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b))
-> m (Step
(Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b))))
-> Step (Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b))
-> MachineT m (Is (Tick a)) (Tick b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Step (Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b))
-> m (Step
(Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b)))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Step (Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b))
-> MachineT m (Is (Tick a)) (Tick b))
-> Step (Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b))
-> MachineT m (Is (Tick a)) (Tick b)
forall a b. (a -> b) -> a -> b
$
Step (Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b))
await'
Item a
a -> ProcessT m a b -> MachineT m (Is (Tick a)) (Tick b)
forall (m :: * -> *) a b.
Monad m =>
ProcessT m a b -> ProcessT m (Tick a) (Tick b)
liftTick (t -> ProcessT m a b
onNext a
t
a)
onStop' :: ProcessT m (Tick a) (Tick b)
onStop' :: MachineT m (Is (Tick a)) (Tick b)
onStop' = ProcessT m a b -> MachineT m (Is (Tick a)) (Tick b)
forall (m :: * -> *) a b.
Monad m =>
ProcessT m a b -> ProcessT m (Tick a) (Tick b)
liftTick ProcessT m a b
onStop
liftBatch ::
forall m a b.
(Monad m) =>
ProcessT m a b ->
ProcessT m [a] [b]
liftBatch :: forall (m :: * -> *) a b.
Monad m =>
ProcessT m a b -> ProcessT m [a] [b]
liftBatch = m (Step (Is [a]) [b] (MachineT m (Is [a]) [b]))
-> MachineT m (Is [a]) [b]
forall (m :: * -> *) (k :: * -> *) o.
m (Step k o (MachineT m k o)) -> MachineT m k o
MachineT (m (Step (Is [a]) [b] (MachineT m (Is [a]) [b]))
-> MachineT m (Is [a]) [b])
-> (ProcessT m a b
-> m (Step (Is [a]) [b] (MachineT m (Is [a]) [b])))
-> ProcessT m a b
-> MachineT m (Is [a]) [b]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [a]
-> [b]
-> ProcessT m a b
-> m (Step (Is [a]) [b] (MachineT m (Is [a]) [b]))
running [] []
where
running :: [a] -> [b] -> ProcessT m a b -> m (Step (Is [a]) [b] (ProcessT m [a] [b]))
running :: [a]
-> [b]
-> ProcessT m a b
-> m (Step (Is [a]) [b] (MachineT m (Is [a]) [b]))
running [a]
as [b]
bs ProcessT m a b
m =
ProcessT m a b -> m (Step (Is a) b (ProcessT m a b))
forall (m :: * -> *) (k :: * -> *) o.
MachineT m k o -> m (Step k o (MachineT m k o))
runMachineT ProcessT m a b
m m (Step (Is a) b (ProcessT m a b))
-> (Step (Is a) b (ProcessT m a b)
-> m (Step (Is [a]) [b] (MachineT m (Is [a]) [b])))
-> m (Step (Is [a]) [b] (MachineT m (Is [a]) [b]))
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Step (Is a) b (ProcessT m a b)
Stop ->
Step (Is [a]) [b] (MachineT m (Is [a]) [b])
-> m (Step (Is [a]) [b] (MachineT m (Is [a]) [b]))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Step (Is [a]) [b] (MachineT m (Is [a]) [b])
forall (k :: * -> *) o r. Step k o r
Stop
Yield b
b ProcessT m a b
k ->
[a]
-> [b]
-> ProcessT m a b
-> m (Step (Is [a]) [b] (MachineT m (Is [a]) [b]))
running [a]
as (b
b b -> [b] -> [b]
forall a. a -> [a] -> [a]
: [b]
bs) ProcessT m a b
k
Await (t -> ProcessT m a b
onNext :: t -> ProcessT m a b) Is a t
Refl ProcessT m a b
onStop ->
Step (Is [a]) [b] (MachineT m (Is [a]) [b])
-> m (Step (Is [a]) [b] (MachineT m (Is [a]) [b]))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Step (Is [a]) [b] (MachineT m (Is [a]) [b])
-> m (Step (Is [a]) [b] (MachineT m (Is [a]) [b])))
-> Step (Is [a]) [b] (MachineT m (Is [a]) [b])
-> m (Step (Is [a]) [b] (MachineT m (Is [a]) [b]))
forall a b. (a -> b) -> a -> b
$ [b]
-> MachineT m (Is [a]) [b]
-> Step (Is [a]) [b] (MachineT m (Is [a]) [b])
forall (k :: * -> *) o r. o -> r -> Step k o r
Yield ([b] -> [b]
forall a. [a] -> [a]
reverse [b]
bs) (MachineT m (Is [a]) [b]
-> Step (Is [a]) [b] (MachineT m (Is [a]) [b]))
-> MachineT m (Is [a]) [b]
-> Step (Is [a]) [b] (MachineT m (Is [a]) [b])
forall a b. (a -> b) -> a -> b
$ m (Step (Is [a]) [b] (MachineT m (Is [a]) [b]))
-> MachineT m (Is [a]) [b]
forall (m :: * -> *) (k :: * -> *) o.
m (Step k o (MachineT m k o)) -> MachineT m k o
MachineT (m (Step (Is [a]) [b] (MachineT m (Is [a]) [b]))
-> MachineT m (Is [a]) [b])
-> m (Step (Is [a]) [b] (MachineT m (Is [a]) [b]))
-> MachineT m (Is [a]) [b]
forall a b. (a -> b) -> a -> b
$ [a]
-> (a -> ProcessT m a b)
-> ProcessT m a b
-> m (Step (Is [a]) [b] (MachineT m (Is [a]) [b]))
awaiting [a]
as a -> ProcessT m a b
t -> ProcessT m a b
onNext ProcessT m a b
onStop
awaiting :: [a] -> (a -> ProcessT m a b) -> ProcessT m a b -> m (Step (Is [a]) [b] (ProcessT m [a] [b]))
awaiting :: [a]
-> (a -> ProcessT m a b)
-> ProcessT m a b
-> m (Step (Is [a]) [b] (MachineT m (Is [a]) [b]))
awaiting (a
a : [a]
as) a -> ProcessT m a b
onNext ProcessT m a b
_onStop = [a]
-> [b]
-> ProcessT m a b
-> m (Step (Is [a]) [b] (MachineT m (Is [a]) [b]))
running [a]
as [] (ProcessT m a b -> m (Step (Is [a]) [b] (MachineT m (Is [a]) [b])))
-> ProcessT m a b
-> m (Step (Is [a]) [b] (MachineT m (Is [a]) [b]))
forall a b. (a -> b) -> a -> b
$ a -> ProcessT m a b
onNext a
a
awaiting [] a -> ProcessT m a b
onNext ProcessT m a b
onStop = Step (Is [a]) [b] (MachineT m (Is [a]) [b])
-> m (Step (Is [a]) [b] (MachineT m (Is [a]) [b]))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Step (Is [a]) [b] (MachineT m (Is [a]) [b])
-> m (Step (Is [a]) [b] (MachineT m (Is [a]) [b])))
-> Step (Is [a]) [b] (MachineT m (Is [a]) [b])
-> m (Step (Is [a]) [b] (MachineT m (Is [a]) [b]))
forall a b. (a -> b) -> a -> b
$ ([a] -> MachineT m (Is [a]) [b])
-> Is [a] [a]
-> MachineT m (Is [a]) [b]
-> Step (Is [a]) [b] (MachineT m (Is [a]) [b])
forall (k :: * -> *) o r t. (t -> r) -> k t -> r -> Step k o r
Await [a] -> MachineT m (Is [a]) [b]
onNext' Is [a] [a]
forall a. Is a a
Refl MachineT m (Is [a]) [b]
onStop'
where
onNext' :: [a] -> ProcessT m [a] [b]
onNext' :: [a] -> MachineT m (Is [a]) [b]
onNext' [a]
as = m (Step (Is [a]) [b] (MachineT m (Is [a]) [b]))
-> MachineT m (Is [a]) [b]
forall (m :: * -> *) (k :: * -> *) o.
m (Step k o (MachineT m k o)) -> MachineT m k o
MachineT (m (Step (Is [a]) [b] (MachineT m (Is [a]) [b]))
-> MachineT m (Is [a]) [b])
-> m (Step (Is [a]) [b] (MachineT m (Is [a]) [b]))
-> MachineT m (Is [a]) [b]
forall a b. (a -> b) -> a -> b
$ [a]
-> (a -> ProcessT m a b)
-> ProcessT m a b
-> m (Step (Is [a]) [b] (MachineT m (Is [a]) [b]))
awaiting [a]
as a -> ProcessT m a b
onNext ProcessT m a b
onStop
onStop' :: ProcessT m [a] [b]
onStop' :: MachineT m (Is [a]) [b]
onStop' = ProcessT m a b -> MachineT m (Is [a]) [b]
forall x. ProcessT m a b -> ProcessT m x [b]
exhausting ProcessT m a b
onStop
exhausting :: ProcessT m a b -> ProcessT m x [b]
exhausting :: forall x. ProcessT m a b -> ProcessT m x [b]
exhausting = m (Step (Is x) [b] (MachineT m (Is x) [b]))
-> MachineT m (Is x) [b]
forall (m :: * -> *) (k :: * -> *) o.
m (Step k o (MachineT m k o)) -> MachineT m k o
MachineT (m (Step (Is x) [b] (MachineT m (Is x) [b]))
-> MachineT m (Is x) [b])
-> (ProcessT m a b -> m (Step (Is x) [b] (MachineT m (Is x) [b])))
-> ProcessT m a b
-> MachineT m (Is x) [b]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [b]
-> ProcessT m a b -> m (Step (Is x) [b] (MachineT m (Is x) [b]))
forall x.
[b] -> ProcessT m a b -> m (Step (Is x) [b] (ProcessT m x [b]))
go []
where
go :: [b] -> ProcessT m a b -> m (Step (Is x) [b] (ProcessT m x [b]))
go :: forall x.
[b] -> ProcessT m a b -> m (Step (Is x) [b] (ProcessT m x [b]))
go [b]
bs ProcessT m a b
m =
ProcessT m a b -> m (Step (Is a) b (ProcessT m a b))
forall (m :: * -> *) (k :: * -> *) o.
MachineT m k o -> m (Step k o (MachineT m k o))
runMachineT ProcessT m a b
m m (Step (Is a) b (ProcessT m a b))
-> (Step (Is a) b (ProcessT m a b)
-> m (Step (Is x) [b] (ProcessT m x [b])))
-> m (Step (Is x) [b] (ProcessT m x [b]))
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Step (Is a) b (ProcessT m a b)
Stop ->
Step (Is x) [b] (ProcessT m x [b])
-> m (Step (Is x) [b] (ProcessT m x [b]))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Step (Is x) [b] (ProcessT m x [b])
forall (k :: * -> *) o r. Step k o r
Stop
Yield b
b ProcessT m a b
k ->
[b] -> ProcessT m a b -> m (Step (Is x) [b] (ProcessT m x [b]))
forall x.
[b] -> ProcessT m a b -> m (Step (Is x) [b] (ProcessT m x [b]))
go (b
b b -> [b] -> [b]
forall a. a -> [a] -> [a]
: [b]
bs) ProcessT m a b
k
Await t -> ProcessT m a b
_onNext Is a t
_Refl ProcessT m a b
onStop ->
Step (Is x) [b] (ProcessT m x [b])
-> m (Step (Is x) [b] (ProcessT m x [b]))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Step (Is x) [b] (ProcessT m x [b])
-> m (Step (Is x) [b] (ProcessT m x [b])))
-> Step (Is x) [b] (ProcessT m x [b])
-> m (Step (Is x) [b] (ProcessT m x [b]))
forall a b. (a -> b) -> a -> b
$ [b] -> ProcessT m x [b] -> Step (Is x) [b] (ProcessT m x [b])
forall (k :: * -> *) o r. o -> r -> Step k o r
Yield ([b] -> [b]
forall a. [a] -> [a]
reverse [b]
bs) (ProcessT m x [b] -> Step (Is x) [b] (ProcessT m x [b]))
-> ProcessT m x [b] -> Step (Is x) [b] (ProcessT m x [b])
forall a b. (a -> b) -> a -> b
$ m (Step (Is x) [b] (ProcessT m x [b])) -> ProcessT m x [b]
forall (m :: * -> *) (k :: * -> *) o.
m (Step k o (MachineT m k o)) -> MachineT m k o
MachineT (m (Step (Is x) [b] (ProcessT m x [b])) -> ProcessT m x [b])
-> m (Step (Is x) [b] (ProcessT m x [b])) -> ProcessT m x [b]
forall a b. (a -> b) -> a -> b
$ [b] -> ProcessT m a b -> m (Step (Is x) [b] (ProcessT m x [b]))
forall x.
[b] -> ProcessT m a b -> m (Step (Is x) [b] (ProcessT m x [b]))
go [] ProcessT m a b
onStop
liftRouter ::
forall m k a b.
(MonadIO m, Hashable k) =>
(a -> Maybe k) ->
(k -> ProcessT m a b) ->
ProcessT m a b
liftRouter :: forall (m :: * -> *) k a b.
(MonadIO m, Hashable k) =>
(a -> Maybe k) -> (k -> ProcessT m a b) -> ProcessT m a b
liftRouter a -> Maybe k
measure k -> ProcessT m a b
spawn = HashMap k (ProcessT m a b) -> ProcessT m a b
awaiting HashMap k (ProcessT m a b)
forall k v. HashMap k v
M.empty
where
awaiting :: HashMap k (ProcessT m a b) -> ProcessT m a b
awaiting :: HashMap k (ProcessT m a b) -> ProcessT m a b
awaiting HashMap k (ProcessT m a b)
st = m (Step (Is a) b (ProcessT m a b)) -> ProcessT m a b
forall (m :: * -> *) (k :: * -> *) o.
m (Step k o (MachineT m k o)) -> MachineT m k o
MachineT (m (Step (Is a) b (ProcessT m a b)) -> ProcessT m a b)
-> (Step (Is a) b (ProcessT m a b)
-> m (Step (Is a) b (ProcessT m a b)))
-> Step (Is a) b (ProcessT m a b)
-> ProcessT m a b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Step (Is a) b (ProcessT m a b)
-> m (Step (Is a) b (ProcessT m a b))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Step (Is a) b (ProcessT m a b) -> ProcessT m a b)
-> Step (Is a) b (ProcessT m a b) -> ProcessT m a b
forall a b. (a -> b) -> a -> b
$ (a -> ProcessT m a b)
-> Is a a -> ProcessT m a b -> Step (Is a) b (ProcessT m a b)
forall (k :: * -> *) o r t. (t -> r) -> k t -> r -> Step k o r
Await a -> ProcessT m a b
onNext Is a a
forall a. Is a a
Refl ProcessT m a b
onStop
where
onNext :: a -> MachineT m (Is a) b
onNext :: a -> ProcessT m a b
onNext a
a = case a -> Maybe k
measure a
a of
Maybe k
Nothing -> HashMap k (ProcessT m a b) -> ProcessT m a b
awaiting HashMap k (ProcessT m a b)
st
Just k
k -> a
-> ProcessT m a b
-> (ProcessT m a b -> ProcessT m a b)
-> ProcessT m a b
provideThen a
a ProcessT m a b
m ((ProcessT m a b -> ProcessT m a b) -> ProcessT m a b)
-> (ProcessT m a b -> ProcessT m a b) -> ProcessT m a b
forall a b. (a -> b) -> a -> b
$ \ProcessT m a b
m' -> HashMap k (ProcessT m a b) -> ProcessT m a b
awaiting (k
-> ProcessT m a b
-> HashMap k (ProcessT m a b)
-> HashMap k (ProcessT m a b)
forall k v.
(Eq k, Hashable k) =>
k -> v -> HashMap k v -> HashMap k v
M.insert k
k ProcessT m a b
m' HashMap k (ProcessT m a b)
st)
where
m :: ProcessT m a b
m = ProcessT m a b -> Maybe (ProcessT m a b) -> ProcessT m a b
forall a. a -> Maybe a -> a
fromMaybe (k -> ProcessT m a b
spawn k
k) (k -> HashMap k (ProcessT m a b) -> Maybe (ProcessT m a b)
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
M.lookup k
k HashMap k (ProcessT m a b)
st)
onStop :: MachineT m (Is a) b
onStop :: ProcessT m a b
onStop = (ProcessT m a b -> ProcessT m a b -> ProcessT m a b)
-> ProcessT m a b -> [ProcessT m a b] -> ProcessT m a b
forall a b. (a -> b -> b) -> b -> [a] -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr ProcessT m a b -> ProcessT m a b -> ProcessT m a b
forall (m :: * -> *) (k0 :: * -> *) b (k :: * -> *).
Monad m =>
MachineT m k0 b -> MachineT m k b -> MachineT m k b
starve ProcessT m a b
forall (k :: * -> *) b (m :: * -> *). Monad m => MachineT m k b
stopped (HashMap k (ProcessT m a b) -> [ProcessT m a b]
forall k v. HashMap k v -> [v]
M.elems HashMap k (ProcessT m a b)
st)
provideThen :: a -> ProcessT m a b -> (ProcessT m a b -> ProcessT m a b) -> ProcessT m a b
provideThen :: a
-> ProcessT m a b
-> (ProcessT m a b -> ProcessT m a b)
-> ProcessT m a b
provideThen a
a ProcessT m a b
m ProcessT m a b -> ProcessT m a b
k =
m (Step (Is a) b (ProcessT m a b)) -> ProcessT m a b
forall (m :: * -> *) (k :: * -> *) o.
m (Step k o (MachineT m k o)) -> MachineT m k o
MachineT (m (Step (Is a) b (ProcessT m a b)) -> ProcessT m a b)
-> m (Step (Is a) b (ProcessT m a b)) -> ProcessT m a b
forall a b. (a -> b) -> a -> b
$
ProcessT m a b -> m (Step (Is a) b (ProcessT m a b))
forall (m :: * -> *) (k :: * -> *) o.
MachineT m k o -> m (Step k o (MachineT m k o))
runMachineT ProcessT m a b
m m (Step (Is a) b (ProcessT m a b))
-> (Step (Is a) b (ProcessT m a b)
-> m (Step (Is a) b (ProcessT m a b)))
-> m (Step (Is a) b (ProcessT m a b))
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Step (Is a) b (ProcessT m a b)
Stop -> ProcessT m a b -> m (Step (Is a) b (ProcessT m a b))
forall (m :: * -> *) (k :: * -> *) o.
MachineT m k o -> m (Step k o (MachineT m k o))
runMachineT (ProcessT m a b -> ProcessT m a b
k ProcessT m a b
forall (k :: * -> *) b (m :: * -> *). Monad m => MachineT m k b
stopped)
Yield b
o ProcessT m a b
m' -> Step (Is a) b (ProcessT m a b)
-> m (Step (Is a) b (ProcessT m a b))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (b -> ProcessT m a b -> Step (Is a) b (ProcessT m a b)
forall (k :: * -> *) o r. o -> r -> Step k o r
Yield b
o (a
-> ProcessT m a b
-> (ProcessT m a b -> ProcessT m a b)
-> ProcessT m a b
provideThen a
a ProcessT m a b
m' ProcessT m a b -> ProcessT m a b
k))
Await t -> ProcessT m a b
onNext Is a t
Refl ProcessT m a b
_onStop -> ProcessT m a b -> m (Step (Is a) b (ProcessT m a b))
forall (m :: * -> *) (k :: * -> *) o.
MachineT m k o -> m (Step k o (MachineT m k o))
runMachineT (ProcessT m a b
-> (ProcessT m a b -> ProcessT m a b) -> ProcessT m a b
exhaustThen (t -> ProcessT m a b
onNext a
t
a) ProcessT m a b -> ProcessT m a b
k)
exhaustThen :: ProcessT m a b -> (ProcessT m a b -> ProcessT m a b) -> ProcessT m a b
exhaustThen :: ProcessT m a b
-> (ProcessT m a b -> ProcessT m a b) -> ProcessT m a b
exhaustThen ProcessT m a b
m ProcessT m a b -> ProcessT m a b
k =
m (Step (Is a) b (ProcessT m a b)) -> ProcessT m a b
forall (m :: * -> *) (k :: * -> *) o.
m (Step k o (MachineT m k o)) -> MachineT m k o
MachineT (m (Step (Is a) b (ProcessT m a b)) -> ProcessT m a b)
-> m (Step (Is a) b (ProcessT m a b)) -> ProcessT m a b
forall a b. (a -> b) -> a -> b
$
ProcessT m a b -> m (Step (Is a) b (ProcessT m a b))
forall (m :: * -> *) (k :: * -> *) o.
MachineT m k o -> m (Step k o (MachineT m k o))
runMachineT ProcessT m a b
m m (Step (Is a) b (ProcessT m a b))
-> (Step (Is a) b (ProcessT m a b)
-> m (Step (Is a) b (ProcessT m a b)))
-> m (Step (Is a) b (ProcessT m a b))
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Yield b
o ProcessT m a b
m' -> Step (Is a) b (ProcessT m a b)
-> m (Step (Is a) b (ProcessT m a b))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (b -> ProcessT m a b -> Step (Is a) b (ProcessT m a b)
forall (k :: * -> *) o r. o -> r -> Step k o r
Yield b
o (ProcessT m a b -> ProcessT m a b
k ProcessT m a b
m'))
Step (Is a) b (ProcessT m a b)
m' -> ProcessT m a b -> m (Step (Is a) b (ProcessT m a b))
forall (m :: * -> *) (k :: * -> *) o.
MachineT m k o -> m (Step k o (MachineT m k o))
runMachineT (ProcessT m a b -> ProcessT m a b
k (Step (Is a) b (ProcessT m a b) -> ProcessT m a b
forall (m :: * -> *) (k :: * -> *) o.
Monad m =>
Step k o (MachineT m k o) -> MachineT m k o
encased Step (Is a) b (ProcessT m a b)
m'))
sortByBatch ::
forall m a.
(Monad m) =>
(a -> Timestamp) ->
ProcessT m [a] [a]
sortByBatch :: forall (m :: * -> *) a.
Monad m =>
(a -> Timestamp) -> ProcessT m [a] [a]
sortByBatch a -> Timestamp
timestamp = Maybe [a] -> ProcessT m [a] [a]
sortByBatchWith Maybe [a]
forall a. Maybe a
Nothing
where
sortByBatchWith :: Maybe [a] -> ProcessT m [a] [a]
sortByBatchWith :: Maybe [a] -> ProcessT m [a] [a]
sortByBatchWith = \case
Maybe [a]
Nothing -> m (Step (Is [a]) [a] (ProcessT m [a] [a])) -> ProcessT m [a] [a]
forall (m :: * -> *) (k :: * -> *) o.
m (Step k o (MachineT m k o)) -> MachineT m k o
MachineT (m (Step (Is [a]) [a] (ProcessT m [a] [a])) -> ProcessT m [a] [a])
-> m (Step (Is [a]) [a] (ProcessT m [a] [a])) -> ProcessT m [a] [a]
forall a b. (a -> b) -> a -> b
$ Step (Is [a]) [a] (ProcessT m [a] [a])
-> m (Step (Is [a]) [a] (ProcessT m [a] [a]))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Step (Is [a]) [a] (ProcessT m [a] [a])
-> m (Step (Is [a]) [a] (ProcessT m [a] [a])))
-> Step (Is [a]) [a] (ProcessT m [a] [a])
-> m (Step (Is [a]) [a] (ProcessT m [a] [a]))
forall a b. (a -> b) -> a -> b
$ ([a] -> ProcessT m [a] [a])
-> Is [a] [a]
-> ProcessT m [a] [a]
-> Step (Is [a]) [a] (ProcessT m [a] [a])
forall (k :: * -> *) o r t. (t -> r) -> k t -> r -> Step k o r
Await [a] -> ProcessT m [a] [a]
onNext Is [a] [a]
forall a. Is a a
Refl ProcessT m [a] [a]
onStop
where
onNext :: [a] -> ProcessT m [a] [a]
onNext :: [a] -> ProcessT m [a] [a]
onNext [a]
new = Maybe [a] -> ProcessT m [a] [a]
sortByBatchWith ([a] -> Maybe [a]
forall a. a -> Maybe a
Just [a]
sortedNew)
where
sortedNew :: [a]
sortedNew = [a] -> [a]
sortByTime [a]
new
onStop :: ProcessT m [a] [a]
onStop :: ProcessT m [a] [a]
onStop = ProcessT m [a] [a]
forall (k :: * -> *) b (m :: * -> *). Monad m => MachineT m k b
stopped
Just [a]
sortedOld -> m (Step (Is [a]) [a] (ProcessT m [a] [a])) -> ProcessT m [a] [a]
forall (m :: * -> *) (k :: * -> *) o.
m (Step k o (MachineT m k o)) -> MachineT m k o
MachineT (m (Step (Is [a]) [a] (ProcessT m [a] [a])) -> ProcessT m [a] [a])
-> m (Step (Is [a]) [a] (ProcessT m [a] [a])) -> ProcessT m [a] [a]
forall a b. (a -> b) -> a -> b
$ Step (Is [a]) [a] (ProcessT m [a] [a])
-> m (Step (Is [a]) [a] (ProcessT m [a] [a]))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Step (Is [a]) [a] (ProcessT m [a] [a])
-> m (Step (Is [a]) [a] (ProcessT m [a] [a])))
-> Step (Is [a]) [a] (ProcessT m [a] [a])
-> m (Step (Is [a]) [a] (ProcessT m [a] [a]))
forall a b. (a -> b) -> a -> b
$ ([a] -> ProcessT m [a] [a])
-> Is [a] [a]
-> ProcessT m [a] [a]
-> Step (Is [a]) [a] (ProcessT m [a] [a])
forall (k :: * -> *) o r t. (t -> r) -> k t -> r -> Step k o r
Await [a] -> ProcessT m [a] [a]
onNext Is [a] [a]
forall a. Is a a
Refl ProcessT m [a] [a]
onStop
where
onNext :: [a] -> ProcessT m [a] [a]
onNext :: [a] -> ProcessT m [a] [a]
onNext [a]
new
| [a] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [a]
sortedOld = Maybe [a] -> ProcessT m [a] [a]
sortByBatchWith (Maybe [a] -> ProcessT m [a] [a])
-> Maybe [a] -> ProcessT m [a] [a]
forall a b. (a -> b) -> a -> b
$ [a] -> Maybe [a]
forall a. a -> Maybe a
Just [a]
sortedNew
| Bool
otherwise = m (Step (Is [a]) [a] (ProcessT m [a] [a])) -> ProcessT m [a] [a]
forall (m :: * -> *) (k :: * -> *) o.
m (Step k o (MachineT m k o)) -> MachineT m k o
MachineT (m (Step (Is [a]) [a] (ProcessT m [a] [a])) -> ProcessT m [a] [a])
-> m (Step (Is [a]) [a] (ProcessT m [a] [a])) -> ProcessT m [a] [a]
forall a b. (a -> b) -> a -> b
$ Step (Is [a]) [a] (ProcessT m [a] [a])
-> m (Step (Is [a]) [a] (ProcessT m [a] [a]))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Step (Is [a]) [a] (ProcessT m [a] [a])
-> m (Step (Is [a]) [a] (ProcessT m [a] [a])))
-> Step (Is [a]) [a] (ProcessT m [a] [a])
-> m (Step (Is [a]) [a] (ProcessT m [a] [a]))
forall a b. (a -> b) -> a -> b
$ [a] -> ProcessT m [a] [a] -> Step (Is [a]) [a] (ProcessT m [a] [a])
forall (k :: * -> *) o r. o -> r -> Step k o r
Yield [a]
sortedBeforeCutoff (ProcessT m [a] [a] -> Step (Is [a]) [a] (ProcessT m [a] [a]))
-> ProcessT m [a] [a] -> Step (Is [a]) [a] (ProcessT m [a] [a])
forall a b. (a -> b) -> a -> b
$ Maybe [a] -> ProcessT m [a] [a]
sortByBatchWith (Maybe [a] -> ProcessT m [a] [a])
-> Maybe [a] -> ProcessT m [a] [a]
forall a b. (a -> b) -> a -> b
$ [a] -> Maybe [a]
forall a. a -> Maybe a
Just [a]
sortedAfterCutoff
where
cutoff :: Timestamp
cutoff = Max Timestamp -> Timestamp
forall a. Max a -> a
getMax ((a -> Max Timestamp) -> [a] -> Max Timestamp
forall m a. Monoid m => (a -> m) -> [a] -> m
forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap (Timestamp -> Max Timestamp
forall a. a -> Max a
Max (Timestamp -> Max Timestamp)
-> (a -> Timestamp) -> a -> Max Timestamp
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Timestamp
timestamp) [a]
sortedOld)
sortedNew :: [a]
sortedNew = [a] -> [a]
sortByTime [a]
new
sorted :: [a]
sorted = [a] -> [a] -> [a]
joinByTime [a]
sortedOld [a]
sortedNew
([a]
sortedBeforeCutoff, [a]
sortedAfterCutoff) = (a -> Bool) -> [a] -> ([a], [a])
forall a. (a -> Bool) -> [a] -> ([a], [a])
L.partition ((Timestamp -> Timestamp -> Bool
forall a. Ord a => a -> a -> Bool
<= Timestamp
cutoff) (Timestamp -> Bool) -> (a -> Timestamp) -> a -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Timestamp
timestamp) [a]
sorted
onStop :: ProcessT m [a] [a]
onStop :: ProcessT m [a] [a]
onStop = m (Step (Is [a]) [a] (ProcessT m [a] [a])) -> ProcessT m [a] [a]
forall (m :: * -> *) (k :: * -> *) o.
m (Step k o (MachineT m k o)) -> MachineT m k o
MachineT (m (Step (Is [a]) [a] (ProcessT m [a] [a])) -> ProcessT m [a] [a])
-> m (Step (Is [a]) [a] (ProcessT m [a] [a])) -> ProcessT m [a] [a]
forall a b. (a -> b) -> a -> b
$ Step (Is [a]) [a] (ProcessT m [a] [a])
-> m (Step (Is [a]) [a] (ProcessT m [a] [a]))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Step (Is [a]) [a] (ProcessT m [a] [a])
-> m (Step (Is [a]) [a] (ProcessT m [a] [a])))
-> Step (Is [a]) [a] (ProcessT m [a] [a])
-> m (Step (Is [a]) [a] (ProcessT m [a] [a]))
forall a b. (a -> b) -> a -> b
$ [a] -> ProcessT m [a] [a] -> Step (Is [a]) [a] (ProcessT m [a] [a])
forall (k :: * -> *) o r. o -> r -> Step k o r
Yield [a]
sortedOld (ProcessT m [a] [a] -> Step (Is [a]) [a] (ProcessT m [a] [a]))
-> ProcessT m [a] [a] -> Step (Is [a]) [a] (ProcessT m [a] [a])
forall a b. (a -> b) -> a -> b
$ ProcessT m [a] [a]
forall (k :: * -> *) b (m :: * -> *). Monad m => MachineT m k b
stopped
compByTime :: a -> a -> Ordering
compByTime = Timestamp -> Timestamp -> Ordering
forall a. Ord a => a -> a -> Ordering
compare (Timestamp -> Timestamp -> Ordering)
-> (a -> Timestamp) -> a -> a -> Ordering
forall b c a. (b -> b -> c) -> (a -> b) -> a -> a -> c
`on` a -> Timestamp
timestamp
sortByTime :: [a] -> [a]
sortByTime = (a -> a -> Ordering) -> [a] -> [a]
forall a. (a -> a -> Ordering) -> [a] -> [a]
L.sortBy a -> a -> Ordering
compByTime
joinByTime :: [a] -> [a] -> [a]
joinByTime [] [a]
ys = [a]
ys
joinByTime [a]
xs [] = [a]
xs
joinByTime (a
x : [a]
xs) (a
y : [a]
ys) = case a -> a -> Ordering
compByTime a
x a
y of
Ordering
LT -> a
x a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a] -> [a] -> [a]
joinByTime [a]
xs (a
y a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
ys)
Ordering
_ -> a
y a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a] -> [a] -> [a]
joinByTime (a
x a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
xs) [a]
ys
sortByBatchTick :: (a -> Timestamp) -> Process (Tick a) (Tick a)
sortByBatchTick :: forall a. (a -> Timestamp) -> Process (Tick a) (Tick a)
sortByBatchTick a -> Timestamp
timestamp =
(Tick a -> Tick [a]) -> Machine (Is (Tick a)) (Tick [a])
forall (k :: * -> * -> *) a b.
Category k =>
(a -> b) -> Machine (k a) b
mapping ((a -> [a]) -> Tick a -> Tick [a]
forall a b. (a -> b) -> Tick a -> Tick b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [])) MachineT m (Is (Tick a)) (Tick [a])
-> ProcessT m (Tick [a]) [a] -> MachineT m (Is (Tick a)) [a]
forall (m :: * -> *) (k :: * -> *) b c.
Monad m =>
MachineT m k b -> ProcessT m b c -> MachineT m k c
~> ProcessT m (Tick [a]) [a]
forall (m :: * -> *) a.
(Monad m, Monoid a) =>
ProcessT m (Tick a) a
batchByTick MachineT m (Is (Tick a)) [a]
-> ProcessT m [a] [a] -> MachineT m (Is (Tick a)) [a]
forall (m :: * -> *) (k :: * -> *) b c.
Monad m =>
MachineT m k b -> ProcessT m b c -> MachineT m k c
~> (a -> Timestamp) -> ProcessT m [a] [a]
forall (m :: * -> *) a.
Monad m =>
(a -> Timestamp) -> ProcessT m [a] [a]
sortByBatch a -> Timestamp
timestamp MachineT m (Is (Tick a)) [a]
-> ProcessT m [a] (Tick a) -> MachineT m (Is (Tick a)) (Tick a)
forall (m :: * -> *) (k :: * -> *) b c.
Monad m =>
MachineT m k b -> ProcessT m b c -> MachineT m k c
~> ProcessT m [a] (Tick a)
forall a (m :: * -> *). Monad m => MachineT m (Is [a]) (Tick a)
batchListToTick
between :: Text -> Text -> Moore Text Bool
between :: Text -> Text -> Moore Text Bool
between Text
x Text
y = Moore Text Bool
open
where
open :: Moore Text Bool
open = Bool -> (Text -> Moore Text Bool) -> Moore Text Bool
forall a b. b -> (a -> Moore a b) -> Moore a b
Moore Bool
False Text -> Moore Text Bool
open' where open' :: Text -> Moore Text Bool
open' Text
x' = if Text
x Text -> Text -> Bool
forall a. Eq a => a -> a -> Bool
== Text
x' then Moore Text Bool
close else Moore Text Bool
open
close :: Moore Text Bool
close = Bool -> (Text -> Moore Text Bool) -> Moore Text Bool
forall a b. b -> (a -> Moore a b) -> Moore a b
Moore Bool
True Text -> Moore Text Bool
close' where close' :: Text -> Moore Text Bool
close' Text
y' = if Text
y Text -> Text -> Bool
forall a. Eq a => a -> a -> Bool
== Text
y' then Moore Text Bool
forall {a}. Moore a Bool
end else Moore Text Bool
close
end :: Moore a Bool
end = Bool -> (a -> Moore a Bool) -> Moore a Bool
forall a b. b -> (a -> Moore a b) -> Moore a b
Moore Bool
False (Moore a Bool -> a -> Moore a Bool
forall a b. a -> b -> a
const Moore a Bool
end)
delimit :: (Monad m) => Moore Text Bool -> ProcessT m Event Event
delimit :: forall (m :: * -> *).
Monad m =>
Moore Text Bool -> ProcessT m Event Event
delimit = PlanT (Is Event) Event m () -> MachineT m (Is Event) Event
forall (m :: * -> *) (k :: * -> *) o a.
Monad m =>
PlanT k o m a -> MachineT m k o
construct (PlanT (Is Event) Event m () -> MachineT m (Is Event) Event)
-> (Moore Text Bool -> PlanT (Is Event) Event m ())
-> Moore Text Bool
-> MachineT m (Is Event) Event
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Moore Text Bool -> PlanT (Is Event) Event m ()
forall (m :: * -> *).
Monad m =>
Moore Text Bool -> PlanT (Is Event) Event m ()
go
where
go :: (Monad m) => Moore Text Bool -> PlanT (Is Event) Event m ()
go :: forall (m :: * -> *).
Monad m =>
Moore Text Bool -> PlanT (Is Event) Event m ()
go mm :: Moore Text Bool
mm@(Moore Bool
s Text -> Moore Text Bool
next) = do
Event
e <- PlanT (Is Event) Event m Event
Plan (Is Event) Event Event
forall (k :: * -> * -> *) i o. Category k => Plan (k i) o i
await
case Event -> EventInfo
evSpec Event
e of
E.UserMarker Text
m -> do
let mm' :: Moore Text Bool
mm'@(Moore Bool
s' Text -> Moore Text Bool
_) = Text -> Moore Text Bool
next Text
m
Bool -> PlanT (Is Event) Event m () -> PlanT (Is Event) Event m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool
s Bool -> Bool -> Bool
|| Bool
s') (PlanT (Is Event) Event m () -> PlanT (Is Event) Event m ())
-> PlanT (Is Event) Event m () -> PlanT (Is Event) Event m ()
forall a b. (a -> b) -> a -> b
$ Event -> Plan (Is Event) Event ()
forall o (k :: * -> *). o -> Plan k o ()
yield Event
e
Moore Text Bool -> PlanT (Is Event) Event m ()
forall (m :: * -> *).
Monad m =>
Moore Text Bool -> PlanT (Is Event) Event m ()
go Moore Text Bool
mm'
EventInfo
_ -> do
Bool -> PlanT (Is Event) Event m () -> PlanT (Is Event) Event m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
s (PlanT (Is Event) Event m () -> PlanT (Is Event) Event m ())
-> PlanT (Is Event) Event m () -> PlanT (Is Event) Event m ()
forall a b. (a -> b) -> a -> b
$ Event -> Plan (Is Event) Event ()
forall o (k :: * -> *). o -> Plan k o ()
yield Event
e
Moore Text Bool -> PlanT (Is Event) Event m ()
forall (m :: * -> *).
Monad m =>
Moore Text Bool -> PlanT (Is Event) Event m ()
go Moore Text Bool
mm
validateInput ::
(MonadIO m) =>
Verbosity ->
Int ->
ProcessT m (Tick a) x
validateInput :: forall (m :: * -> *) a x.
MonadIO m =>
Verbosity -> Int -> ProcessT m (Tick a) x
validateInput Verbosity
verbosity Int
ticks
| Verbosity
verbosityWarning Verbosity -> Verbosity -> Bool
forall a. Ord a => a -> a -> Bool
>= Verbosity
verbosity = PlanT (Is (Tick a)) x m () -> MachineT m (Is (Tick a)) x
forall (m :: * -> *) (k :: * -> *) o a.
Monad m =>
PlanT k o m a -> MachineT m k o
construct (PlanT (Is (Tick a)) x m () -> MachineT m (Is (Tick a)) x)
-> PlanT (Is (Tick a)) x m () -> MachineT m (Is (Tick a)) x
forall a b. (a -> b) -> a -> b
$ Int -> PlanT (Is (Tick a)) x m ()
start Int
ticks
| Bool
otherwise = MachineT m (Is (Tick a)) x
forall (k :: * -> *) b (m :: * -> *). Monad m => MachineT m k b
stopped
where
start :: Int -> PlanT (Is (Tick a)) x m ()
start Int
remaining
| Int
remaining Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0 = IO () -> PlanT (Is (Tick a)) x m ()
forall a. IO a -> PlanT (Is (Tick a)) x m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> PlanT (Is (Tick a)) x m ())
-> IO () -> PlanT (Is (Tick a)) x m ()
forall a b. (a -> b) -> a -> b
$ do
Verbosity -> Text -> Text -> IO ()
forall (m :: * -> *).
MonadIO m =>
Verbosity -> Text -> Text -> m ()
logWarning Verbosity
verbosity Text
"validateInput" (Text -> IO ()) -> (String -> Text) -> String -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$
String -> Int -> String
forall r. PrintfType r => String -> r
printf
String
"No input after %d ticks. Did you pass -l to the GHC RTS?"
Int
ticks
| Bool
otherwise = do
Verbosity -> Text -> Text -> PlanT (Is (Tick a)) x m ()
forall (m :: * -> *).
MonadIO m =>
Verbosity -> Text -> Text -> m ()
logDebug Verbosity
verbosity Text
"validateInput" (Text -> PlanT (Is (Tick a)) x m ())
-> Text -> PlanT (Is (Tick a)) x m ()
forall a b. (a -> b) -> a -> b
$
String -> Text
T.pack (Int -> String
forall a. Show a => a -> String
show Int
remaining) Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" ticks remaining."
PlanT (Is (Tick a)) x m (Tick a)
Plan (Is (Tick a)) x (Tick a)
forall (k :: * -> * -> *) i o. Category k => Plan (k i) o i
await PlanT (Is (Tick a)) x m (Tick a)
-> (Tick a -> PlanT (Is (Tick a)) x m ())
-> PlanT (Is (Tick a)) x m ()
forall a b.
PlanT (Is (Tick a)) x m a
-> (a -> PlanT (Is (Tick a)) x m b) -> PlanT (Is (Tick a)) x m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Item{} -> do
Verbosity -> Text -> Text -> PlanT (Is (Tick a)) x m ()
forall (m :: * -> *).
MonadIO m =>
Verbosity -> Text -> Text -> m ()
logDebug Verbosity
verbosity Text
"validateInput" Text
"Received item."
Tick a
Tick -> do
Verbosity -> Text -> Text -> PlanT (Is (Tick a)) x m ()
forall (m :: * -> *).
MonadIO m =>
Verbosity -> Text -> Text -> m ()
logDebug Verbosity
verbosity Text
"validateInput" Text
"Received tick."
Int -> PlanT (Is (Tick a)) x m ()
start (Int -> Int
forall a. Enum a => a -> a
pred Int
remaining)
validateOrder ::
(MonadIO m, Show a) =>
Verbosity ->
(a -> Timestamp) ->
ProcessT m a x
validateOrder :: forall (m :: * -> *) a x.
(MonadIO m, Show a) =>
Verbosity -> (a -> Timestamp) -> ProcessT m a x
validateOrder Verbosity
verbosity a -> Timestamp
timestamp
| Verbosity
verbosityError Verbosity -> Verbosity -> Bool
forall a. Ord a => a -> a -> Bool
>= Verbosity
verbosity = PlanT (Is a) x m () -> MachineT m (Is a) x
forall (m :: * -> *) (k :: * -> *) o a.
Monad m =>
PlanT k o m a -> MachineT m k o
construct (PlanT (Is a) x m () -> MachineT m (Is a) x)
-> PlanT (Is a) x m () -> MachineT m (Is a) x
forall a b. (a -> b) -> a -> b
$ Maybe a -> PlanT (Is a) x m ()
start Maybe a
forall a. Maybe a
Nothing
| Bool
otherwise = MachineT m (Is a) x
forall (k :: * -> *) b (m :: * -> *). Monad m => MachineT m k b
stopped
where
start :: Maybe a -> PlanT (Is a) x m ()
start Maybe a
maybeOld =
PlanT (Is a) x m a
Plan (Is a) x a
forall (k :: * -> * -> *) i o. Category k => Plan (k i) o i
await PlanT (Is a) x m a
-> (a -> PlanT (Is a) x m ()) -> PlanT (Is a) x m ()
forall a b.
PlanT (Is a) x m a
-> (a -> PlanT (Is a) x m b) -> PlanT (Is a) x m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \a
new ->
case Maybe a
maybeOld of
Just a
old
| a -> Timestamp
timestamp a
new Timestamp -> Timestamp -> Bool
forall a. Ord a => a -> a -> Bool
< a -> Timestamp
timestamp a
old -> do
Verbosity -> Text -> Text -> PlanT (Is a) x m ()
forall (m :: * -> *).
MonadIO m =>
Verbosity -> Text -> Text -> m ()
logError Verbosity
verbosity Text
"validateOrder" (Text -> PlanT (Is a) x m ())
-> (String -> Text) -> String -> PlanT (Is a) x m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack (String -> PlanT (Is a) x m ()) -> String -> PlanT (Is a) x m ()
forall a b. (a -> b) -> a -> b
$
String
"Encountered two out-of-order inputs.\n\
\Did you pass --eventlog-flush-interval to the GHC RTS?\n\
\Did you set --batch-interval to be at least as big as the value of --eventlog-flush-interval?"
Verbosity -> Text -> Text -> PlanT (Is a) x m ()
forall (m :: * -> *).
MonadIO m =>
Verbosity -> Text -> Text -> m ()
logDebug Verbosity
verbosity Text
"validateOrder" (Text -> PlanT (Is a) x m ())
-> (String -> Text) -> String -> PlanT (Is a) x m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack (String -> PlanT (Is a) x m ()) -> String -> PlanT (Is a) x m ()
forall a b. (a -> b) -> a -> b
$
String -> String -> ShowS
forall r. PrintfType r => String -> r
printf
String
"Out-of-order inputs:\n\
\- %s\n\
\- %s"
(a -> String
forall a. Show a => a -> String
show a
old)
(a -> String
forall a. Show a => a -> String
show a
new)
() -> PlanT (Is a) x m ()
forall a. a -> PlanT (Is a) x m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Maybe a
_otherwise -> do
Maybe a -> PlanT (Is a) x m ()
start (a -> Maybe a
forall a. a -> Maybe a
Just a
new)