{-# LANGUAGE OverloadedStrings #-}

{- |
Module      : GHC.Eventlog.Live.Machine.Core
Description : Core machines for processing data in batches.
Stability   : experimental
Portability : portable
-}
module GHC.Eventlog.Live.Machine.Core (
  -- * Ticks
  Tick (..),
  batchByTick,
  batchToTick,
  batchListToTick,
  batchByTickList,
  liftTick,
  dropTick,
  onlyTick,
  liftBatch,

  -- * Debug
  counterBy,
  counterByTick,

  -- * Routers
  liftRouter,

  -- * Event sorting
  sortByBatch,
  sortByBatchTick,

  -- * Delimiting
  between,
  delimit,

  -- * Validation
  validateInput,
  validateOrder,
) where

import Control.Monad (when)
import Control.Monad.IO.Class (MonadIO (..))
import Data.DList qualified as D
import Data.Foldable (for_)
import Data.Function (on)
import Data.Functor ((<&>))
import Data.HashMap.Strict (HashMap)
import Data.HashMap.Strict qualified as M
import Data.Hashable (Hashable (..))
import Data.List qualified as L
import Data.Machine (Is (..), MachineT (..), Moore (..), PlanT, Process, ProcessT, Step (..), await, construct, encased, mapping, repeatedly, starve, stopped, yield, (~>))
import Data.Maybe (fromMaybe)
import Data.Semigroup (Max (..))
import Data.Text (Text)
import Data.Text qualified as T
import GHC.Eventlog.Live.Logger (logDebug, logError, logWarning)
import GHC.Eventlog.Live.Verbosity (Verbosity, verbosityDebug, verbosityError, verbosityWarning)
import GHC.RTS.Events (Event (..), Timestamp)
import GHC.RTS.Events qualified as E
import Text.Printf (printf)

-------------------------------------------------------------------------------
-- Ticks
-------------------------------------------------------------------------------

{- |
The type of data on a stream of items and ticks.

The t`Tick` type is isomorphic to `Maybe` modulo strictness,
but with the caveat that v`Tick` does not represent failure.
-}
data Tick a = Item !a | Tick
  deriving (Tick a -> Tick a -> Bool
(Tick a -> Tick a -> Bool)
-> (Tick a -> Tick a -> Bool) -> Eq (Tick a)
forall a. Eq a => Tick a -> Tick a -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: forall a. Eq a => Tick a -> Tick a -> Bool
== :: Tick a -> Tick a -> Bool
$c/= :: forall a. Eq a => Tick a -> Tick a -> Bool
/= :: Tick a -> Tick a -> Bool
Eq, (forall a b. (a -> b) -> Tick a -> Tick b)
-> (forall a b. a -> Tick b -> Tick a) -> Functor Tick
forall a b. a -> Tick b -> Tick a
forall a b. (a -> b) -> Tick a -> Tick b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
$cfmap :: forall a b. (a -> b) -> Tick a -> Tick b
fmap :: forall a b. (a -> b) -> Tick a -> Tick b
$c<$ :: forall a b. a -> Tick b -> Tick a
<$ :: forall a b. a -> Tick b -> Tick a
Functor, (forall m. Monoid m => Tick m -> m)
-> (forall m a. Monoid m => (a -> m) -> Tick a -> m)
-> (forall m a. Monoid m => (a -> m) -> Tick a -> m)
-> (forall a b. (a -> b -> b) -> b -> Tick a -> b)
-> (forall a b. (a -> b -> b) -> b -> Tick a -> b)
-> (forall b a. (b -> a -> b) -> b -> Tick a -> b)
-> (forall b a. (b -> a -> b) -> b -> Tick a -> b)
-> (forall a. (a -> a -> a) -> Tick a -> a)
-> (forall a. (a -> a -> a) -> Tick a -> a)
-> (forall a. Tick a -> [a])
-> (forall a. Tick a -> Bool)
-> (forall a. Tick a -> Int)
-> (forall a. Eq a => a -> Tick a -> Bool)
-> (forall a. Ord a => Tick a -> a)
-> (forall a. Ord a => Tick a -> a)
-> (forall a. Num a => Tick a -> a)
-> (forall a. Num a => Tick a -> a)
-> Foldable Tick
forall a. Eq a => a -> Tick a -> Bool
forall a. Num a => Tick a -> a
forall a. Ord a => Tick a -> a
forall m. Monoid m => Tick m -> m
forall a. Tick a -> Bool
forall a. Tick a -> Int
forall a. Tick a -> [a]
forall a. (a -> a -> a) -> Tick a -> a
forall m a. Monoid m => (a -> m) -> Tick a -> m
forall b a. (b -> a -> b) -> b -> Tick a -> b
forall a b. (a -> b -> b) -> b -> Tick a -> b
forall (t :: * -> *).
(forall m. Monoid m => t m -> m)
-> (forall m a. Monoid m => (a -> m) -> t a -> m)
-> (forall m a. Monoid m => (a -> m) -> t a -> m)
-> (forall a b. (a -> b -> b) -> b -> t a -> b)
-> (forall a b. (a -> b -> b) -> b -> t a -> b)
-> (forall b a. (b -> a -> b) -> b -> t a -> b)
-> (forall b a. (b -> a -> b) -> b -> t a -> b)
-> (forall a. (a -> a -> a) -> t a -> a)
-> (forall a. (a -> a -> a) -> t a -> a)
-> (forall a. t a -> [a])
-> (forall a. t a -> Bool)
-> (forall a. t a -> Int)
-> (forall a. Eq a => a -> t a -> Bool)
-> (forall a. Ord a => t a -> a)
-> (forall a. Ord a => t a -> a)
-> (forall a. Num a => t a -> a)
-> (forall a. Num a => t a -> a)
-> Foldable t
$cfold :: forall m. Monoid m => Tick m -> m
fold :: forall m. Monoid m => Tick m -> m
$cfoldMap :: forall m a. Monoid m => (a -> m) -> Tick a -> m
foldMap :: forall m a. Monoid m => (a -> m) -> Tick a -> m
$cfoldMap' :: forall m a. Monoid m => (a -> m) -> Tick a -> m
foldMap' :: forall m a. Monoid m => (a -> m) -> Tick a -> m
$cfoldr :: forall a b. (a -> b -> b) -> b -> Tick a -> b
foldr :: forall a b. (a -> b -> b) -> b -> Tick a -> b
$cfoldr' :: forall a b. (a -> b -> b) -> b -> Tick a -> b
foldr' :: forall a b. (a -> b -> b) -> b -> Tick a -> b
$cfoldl :: forall b a. (b -> a -> b) -> b -> Tick a -> b
foldl :: forall b a. (b -> a -> b) -> b -> Tick a -> b
$cfoldl' :: forall b a. (b -> a -> b) -> b -> Tick a -> b
foldl' :: forall b a. (b -> a -> b) -> b -> Tick a -> b
$cfoldr1 :: forall a. (a -> a -> a) -> Tick a -> a
foldr1 :: forall a. (a -> a -> a) -> Tick a -> a
$cfoldl1 :: forall a. (a -> a -> a) -> Tick a -> a
foldl1 :: forall a. (a -> a -> a) -> Tick a -> a
$ctoList :: forall a. Tick a -> [a]
toList :: forall a. Tick a -> [a]
$cnull :: forall a. Tick a -> Bool
null :: forall a. Tick a -> Bool
$clength :: forall a. Tick a -> Int
length :: forall a. Tick a -> Int
$celem :: forall a. Eq a => a -> Tick a -> Bool
elem :: forall a. Eq a => a -> Tick a -> Bool
$cmaximum :: forall a. Ord a => Tick a -> a
maximum :: forall a. Ord a => Tick a -> a
$cminimum :: forall a. Ord a => Tick a -> a
minimum :: forall a. Ord a => Tick a -> a
$csum :: forall a. Num a => Tick a -> a
sum :: forall a. Num a => Tick a -> a
$cproduct :: forall a. Num a => Tick a -> a
product :: forall a. Num a => Tick a -> a
Foldable, Functor Tick
Foldable Tick
(Functor Tick, Foldable Tick) =>
(forall (f :: * -> *) a b.
 Applicative f =>
 (a -> f b) -> Tick a -> f (Tick b))
-> (forall (f :: * -> *) a.
    Applicative f =>
    Tick (f a) -> f (Tick a))
-> (forall (m :: * -> *) a b.
    Monad m =>
    (a -> m b) -> Tick a -> m (Tick b))
-> (forall (m :: * -> *) a. Monad m => Tick (m a) -> m (Tick a))
-> Traversable Tick
forall (t :: * -> *).
(Functor t, Foldable t) =>
(forall (f :: * -> *) a b.
 Applicative f =>
 (a -> f b) -> t a -> f (t b))
-> (forall (f :: * -> *) a. Applicative f => t (f a) -> f (t a))
-> (forall (m :: * -> *) a b.
    Monad m =>
    (a -> m b) -> t a -> m (t b))
-> (forall (m :: * -> *) a. Monad m => t (m a) -> m (t a))
-> Traversable t
forall (m :: * -> *) a. Monad m => Tick (m a) -> m (Tick a)
forall (f :: * -> *) a. Applicative f => Tick (f a) -> f (Tick a)
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Tick a -> m (Tick b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Tick a -> f (Tick b)
$ctraverse :: forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Tick a -> f (Tick b)
traverse :: forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Tick a -> f (Tick b)
$csequenceA :: forall (f :: * -> *) a. Applicative f => Tick (f a) -> f (Tick a)
sequenceA :: forall (f :: * -> *) a. Applicative f => Tick (f a) -> f (Tick a)
$cmapM :: forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Tick a -> m (Tick b)
mapM :: forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Tick a -> m (Tick b)
$csequence :: forall (m :: * -> *) a. Monad m => Tick (m a) -> m (Tick a)
sequence :: forall (m :: * -> *) a. Monad m => Tick (m a) -> m (Tick a)
Traversable, Int -> Tick a -> ShowS
[Tick a] -> ShowS
Tick a -> String
(Int -> Tick a -> ShowS)
-> (Tick a -> String) -> ([Tick a] -> ShowS) -> Show (Tick a)
forall a. Show a => Int -> Tick a -> ShowS
forall a. Show a => [Tick a] -> ShowS
forall a. Show a => Tick a -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: forall a. Show a => Int -> Tick a -> ShowS
showsPrec :: Int -> Tick a -> ShowS
$cshow :: forall a. Show a => Tick a -> String
show :: Tick a -> String
$cshowList :: forall a. Show a => [Tick a] -> ShowS
showList :: [Tick a] -> ShowS
Show)

{- |
This machine batches all items between two ticks into a list.
-}
batchByTickList :: Process (Tick a) [a]
batchByTickList :: forall a (m :: * -> *). Monad m => MachineT m (Is (Tick a)) [a]
batchByTickList =
  (Tick a -> Tick (DList a))
-> Machine (Is (Tick a)) (Tick (DList a))
forall (k :: * -> * -> *) a b.
Category k =>
(a -> b) -> Machine (k a) b
mapping ((a -> DList a) -> Tick a -> Tick (DList a)
forall a b. (a -> b) -> Tick a -> Tick b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> DList a
forall a. a -> DList a
D.singleton)
    MachineT m (Is (Tick a)) (Tick (DList a))
-> ProcessT m (Tick (DList a)) (DList a)
-> MachineT m (Is (Tick a)) (DList a)
forall (m :: * -> *) (k :: * -> *) b c.
Monad m =>
MachineT m k b -> ProcessT m b c -> MachineT m k c
~> ProcessT m (Tick (DList a)) (DList a)
forall (m :: * -> *) a.
(Monad m, Monoid a) =>
ProcessT m (Tick a) a
batchByTick
    MachineT m (Is (Tick a)) (DList a)
-> ProcessT m (DList a) [a] -> MachineT m (Is (Tick a)) [a]
forall (m :: * -> *) (k :: * -> *) b c.
Monad m =>
MachineT m k b -> ProcessT m b c -> MachineT m k c
~> (DList a -> [a]) -> Machine (Is (DList a)) [a]
forall (k :: * -> * -> *) a b.
Category k =>
(a -> b) -> Machine (k a) b
mapping DList a -> [a]
forall a. DList a -> [a]
D.toList

{- |
Generalised version of `batchByTickList`.
-}
batchByTick ::
  forall m a.
  (Monad m, Monoid a) =>
  ProcessT m (Tick a) a
batchByTick :: forall (m :: * -> *) a.
(Monad m, Monoid a) =>
ProcessT m (Tick a) a
batchByTick = a -> MachineT m (Is (Tick a)) a
batchByTickWith a
forall a. Monoid a => a
mempty
 where
  batchByTickWith :: a -> MachineT m (Is (Tick a)) a
  batchByTickWith :: a -> MachineT m (Is (Tick a)) a
batchByTickWith a
acc = m (Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a))
-> MachineT m (Is (Tick a)) a
forall (m :: * -> *) (k :: * -> *) o.
m (Step k o (MachineT m k o)) -> MachineT m k o
MachineT (m (Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a))
 -> MachineT m (Is (Tick a)) a)
-> m (Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a))
-> MachineT m (Is (Tick a)) a
forall a b. (a -> b) -> a -> b
$ Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a)
-> m (Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a)
 -> m (Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a)))
-> Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a)
-> m (Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a))
forall a b. (a -> b) -> a -> b
$ (Tick a -> MachineT m (Is (Tick a)) a)
-> Is (Tick a) (Tick a)
-> MachineT m (Is (Tick a)) a
-> Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a)
forall (k :: * -> *) o r t. (t -> r) -> k t -> r -> Step k o r
Await Tick a -> MachineT m (Is (Tick a)) a
onNext Is (Tick a) (Tick a)
forall a. Is a a
Refl MachineT m (Is (Tick a)) a
onStop
   where
    onNext :: Tick a -> MachineT m (Is (Tick a)) a
    onNext :: Tick a -> MachineT m (Is (Tick a)) a
onNext = \case
      Item a
a -> a -> MachineT m (Is (Tick a)) a
batchByTickWith (a
a a -> a -> a
forall a. Semigroup a => a -> a -> a
<> a
acc)
      Tick a
Tick -> m (Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a))
-> MachineT m (Is (Tick a)) a
forall (m :: * -> *) (k :: * -> *) o.
m (Step k o (MachineT m k o)) -> MachineT m k o
MachineT (m (Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a))
 -> MachineT m (Is (Tick a)) a)
-> m (Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a))
-> MachineT m (Is (Tick a)) a
forall a b. (a -> b) -> a -> b
$ Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a)
-> m (Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a)
 -> m (Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a)))
-> Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a)
-> m (Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a))
forall a b. (a -> b) -> a -> b
$ a
-> MachineT m (Is (Tick a)) a
-> Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a)
forall (k :: * -> *) o r. o -> r -> Step k o r
Yield a
acc MachineT m (Is (Tick a)) a
forall (m :: * -> *) a.
(Monad m, Monoid a) =>
ProcessT m (Tick a) a
batchByTick
    onStop :: MachineT m (Is (Tick a)) a
    onStop :: MachineT m (Is (Tick a)) a
onStop = m (Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a))
-> MachineT m (Is (Tick a)) a
forall (m :: * -> *) (k :: * -> *) o.
m (Step k o (MachineT m k o)) -> MachineT m k o
MachineT (m (Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a))
 -> MachineT m (Is (Tick a)) a)
-> m (Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a))
-> MachineT m (Is (Tick a)) a
forall a b. (a -> b) -> a -> b
$ Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a)
-> m (Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a)
 -> m (Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a)))
-> Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a)
-> m (Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a))
forall a b. (a -> b) -> a -> b
$ a
-> MachineT m (Is (Tick a)) a
-> Step (Is (Tick a)) a (MachineT m (Is (Tick a)) a)
forall (k :: * -> *) o r. o -> r -> Step k o r
Yield a
acc MachineT m (Is (Tick a)) a
forall (k :: * -> *) b (m :: * -> *). Monad m => MachineT m k b
stopped

{- |
This machine streams a list of items into a series of items
separated by ticks.
-}
batchListToTick :: Process [a] (Tick a)
batchListToTick :: forall a (m :: * -> *). Monad m => MachineT m (Is [a]) (Tick a)
batchListToTick = MachineT m (Is [a]) (Tick a)
Process [a] (Tick a)
forall (f :: * -> *) a. Foldable f => Process (f a) (Tick a)
batchToTick

{- |
Generalised version of `batchListToTick`.
-}
batchToTick :: (Foldable f) => Process (f a) (Tick a)
batchToTick :: forall (f :: * -> *) a. Foldable f => Process (f a) (Tick a)
batchToTick = PlanT (Is (f a)) (Tick a) m () -> MachineT m (Is (f a)) (Tick a)
forall (m :: * -> *) (k :: * -> *) o a.
Monad m =>
PlanT k o m a -> MachineT m k o
repeatedly PlanT (Is (f a)) (Tick a) m ()
forall {a} {m :: * -> *}. PlanT (Is (f a)) (Tick a) m ()
go
 where
  go :: PlanT (Is (f a)) (Tick a) m ()
go = PlanT (Is (f a)) (Tick a) m (f a)
Plan (Is (f a)) (Tick a) (f a)
forall (k :: * -> * -> *) i o. Category k => Plan (k i) o i
await PlanT (Is (f a)) (Tick a) m (f a)
-> (f a -> PlanT (Is (f a)) (Tick a) m ())
-> PlanT (Is (f a)) (Tick a) m ()
forall a b.
PlanT (Is (f a)) (Tick a) m a
-> (a -> PlanT (Is (f a)) (Tick a) m b)
-> PlanT (Is (f a)) (Tick a) m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \f a
xs -> f a
-> (a -> PlanT (Is (f a)) (Tick a) m ())
-> PlanT (Is (f a)) (Tick a) m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ f a
xs (Tick a -> PlanT (Is (f a)) (Tick a) m ()
Tick a -> forall {m :: * -> *}. PlanT (Is (f a)) (Tick a) m ()
forall o (k :: * -> *). o -> Plan k o ()
yield (Tick a -> PlanT (Is (f a)) (Tick a) m ())
-> (a -> Tick a) -> a -> PlanT (Is (f a)) (Tick a) m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Tick a
forall a. a -> Tick a
Item) PlanT (Is (f a)) (Tick a) m ()
-> PlanT (Is (f a)) (Tick a) m () -> PlanT (Is (f a)) (Tick a) m ()
forall a b.
PlanT (Is (f a)) (Tick a) m a
-> PlanT (Is (f a)) (Tick a) m b -> PlanT (Is (f a)) (Tick a) m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Tick a -> forall {m :: * -> *}. PlanT (Is (f a)) (Tick a) m ()
forall o (k :: * -> *). o -> Plan k o ()
yield Tick a
forall a. Tick a
Tick

{- |
This machine drops all ticks.
-}
dropTick :: Process (Tick a) a
dropTick :: forall a (m :: * -> *). Monad m => MachineT m (Is (Tick a)) a
dropTick =
  PlanT (Is (Tick a)) a m () -> MachineT m (Is (Tick a)) a
forall (m :: * -> *) (k :: * -> *) o a.
Monad m =>
PlanT k o m a -> MachineT m k o
repeatedly (PlanT (Is (Tick a)) a m () -> MachineT m (Is (Tick a)) a)
-> PlanT (Is (Tick a)) a m () -> MachineT m (Is (Tick a)) a
forall a b. (a -> b) -> a -> b
$
    PlanT (Is (Tick a)) a m (Tick a)
Plan (Is (Tick a)) a (Tick a)
forall (k :: * -> * -> *) i o. Category k => Plan (k i) o i
await PlanT (Is (Tick a)) a m (Tick a)
-> (Tick a -> PlanT (Is (Tick a)) a m ())
-> PlanT (Is (Tick a)) a m ()
forall a b.
PlanT (Is (Tick a)) a m a
-> (a -> PlanT (Is (Tick a)) a m b) -> PlanT (Is (Tick a)) a m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      Item a
a -> a -> Plan (Is (Tick a)) a ()
forall o (k :: * -> *). o -> Plan k o ()
yield a
a
      Tick a
Tick -> () -> PlanT (Is (Tick a)) a m ()
forall a. a -> PlanT (Is (Tick a)) a m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

{- |
This machine drops all items.
-}
onlyTick :: Process (Tick a) ()
onlyTick :: forall a (m :: * -> *). Monad m => MachineT m (Is (Tick a)) ()
onlyTick =
  PlanT (Is (Tick a)) () m () -> MachineT m (Is (Tick a)) ()
forall (m :: * -> *) (k :: * -> *) o a.
Monad m =>
PlanT k o m a -> MachineT m k o
repeatedly (PlanT (Is (Tick a)) () m () -> MachineT m (Is (Tick a)) ())
-> PlanT (Is (Tick a)) () m () -> MachineT m (Is (Tick a)) ()
forall a b. (a -> b) -> a -> b
$
    PlanT (Is (Tick a)) () m (Tick a)
Plan (Is (Tick a)) () (Tick a)
forall (k :: * -> * -> *) i o. Category k => Plan (k i) o i
await PlanT (Is (Tick a)) () m (Tick a)
-> (Tick a -> PlanT (Is (Tick a)) () m ())
-> PlanT (Is (Tick a)) () m ()
forall a b.
PlanT (Is (Tick a)) () m a
-> (a -> PlanT (Is (Tick a)) () m b) -> PlanT (Is (Tick a)) () m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      Tick a
Tick -> () -> Plan (Is (Tick a)) () ()
forall o (k :: * -> *). o -> Plan k o ()
yield ()
      Item{} -> () -> PlanT (Is (Tick a)) () m ()
forall a. a -> PlanT (Is (Tick a)) () m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

-------------------------------------------------------------------------------
-- Machine combinators
-------------------------------------------------------------------------------

{- |
This machine counts the number of inputs it received,
using the given function, and logs this value.
-}
counterBy ::
  forall m a x.
  (MonadIO m) =>
  Verbosity ->
  Text ->
  (a -> Word) ->
  ProcessT m a x
counterBy :: forall (m :: * -> *) a x.
MonadIO m =>
Verbosity -> Text -> (a -> Word) -> ProcessT m a x
counterBy Verbosity
verbosity Text
label a -> Word
count
  | Verbosity
verbosityDebug Verbosity -> Verbosity -> Bool
forall a. Ord a => a -> a -> Bool
>= Verbosity
verbosity = PlanT (Is a) x m () -> MachineT m (Is a) x
forall (m :: * -> *) (k :: * -> *) o a.
Monad m =>
PlanT k o m a -> MachineT m k o
repeatedly PlanT (Is a) x m ()
go
  | Bool
otherwise = MachineT m (Is a) x
forall (k :: * -> *) b (m :: * -> *). Monad m => MachineT m k b
stopped
 where
  go :: PlanT (Is a) x m ()
  go :: PlanT (Is a) x m ()
go =
    PlanT (Is a) x m a
Plan (Is a) x a
forall (k :: * -> * -> *) i o. Category k => Plan (k i) o i
await PlanT (Is a) x m a
-> (a -> PlanT (Is a) x m ()) -> PlanT (Is a) x m ()
forall a b.
PlanT (Is a) x m a
-> (a -> PlanT (Is a) x m b) -> PlanT (Is a) x m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \a
a ->
      Verbosity -> Text -> PlanT (Is a) x m ()
forall (m :: * -> *).
(HasCallStack, MonadIO m) =>
Verbosity -> Text -> m ()
logDebug Verbosity
verbosity (Text
"saw " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (Word -> String
forall a. Show a => a -> String
show (a -> Word
count a
a)) Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
label)

{- |
This machine counts the number of inputs it received,
and logs this value on every tick.
-}
counterByTick ::
  forall m a x.
  (MonadIO m) =>
  Verbosity ->
  Text ->
  ProcessT m (Tick a) x
counterByTick :: forall (m :: * -> *) a x.
MonadIO m =>
Verbosity -> Text -> ProcessT m (Tick a) x
counterByTick Verbosity
verbosity Text
label
  | Verbosity
verbosityDebug Verbosity -> Verbosity -> Bool
forall a. Ord a => a -> a -> Bool
>= Verbosity
verbosity = PlanT (Is (Tick a)) x m () -> MachineT m (Is (Tick a)) x
forall (m :: * -> *) (k :: * -> *) o a.
Monad m =>
PlanT k o m a -> MachineT m k o
construct (PlanT (Is (Tick a)) x m () -> MachineT m (Is (Tick a)) x)
-> PlanT (Is (Tick a)) x m () -> MachineT m (Is (Tick a)) x
forall a b. (a -> b) -> a -> b
$ Word -> PlanT (Is (Tick a)) x m ()
go Word
0
  | Bool
otherwise = MachineT m (Is (Tick a)) x
forall (k :: * -> *) b (m :: * -> *). Monad m => MachineT m k b
stopped
 where
  go :: Word -> PlanT (Is (Tick a)) x m ()
  go :: Word -> PlanT (Is (Tick a)) x m ()
go Word
count =
    PlanT (Is (Tick a)) x m (Tick a)
Plan (Is (Tick a)) x (Tick a)
forall (k :: * -> * -> *) i o. Category k => Plan (k i) o i
await PlanT (Is (Tick a)) x m (Tick a)
-> (Tick a -> PlanT (Is (Tick a)) x m ())
-> PlanT (Is (Tick a)) x m ()
forall a b.
PlanT (Is (Tick a)) x m a
-> (a -> PlanT (Is (Tick a)) x m b) -> PlanT (Is (Tick a)) x m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      Item a
_ -> Word -> PlanT (Is (Tick a)) x m ()
go (Word
count Word -> Word -> Word
forall a. Num a => a -> a -> a
+ Word
1)
      Tick a
Tick -> Verbosity -> Text -> PlanT (Is (Tick a)) x m ()
forall (m :: * -> *).
(HasCallStack, MonadIO m) =>
Verbosity -> Text -> m ()
logDebug Verbosity
verbosity (Text
"saw " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (Word -> String
forall a. Show a => a -> String
show Word
count) Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
label) PlanT (Is (Tick a)) x m ()
-> PlanT (Is (Tick a)) x m () -> PlanT (Is (Tick a)) x m ()
forall a b.
PlanT (Is (Tick a)) x m a
-> PlanT (Is (Tick a)) x m b -> PlanT (Is (Tick a)) x m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Word -> PlanT (Is (Tick a)) x m ()
go Word
0

-------------------------------------------------------------------------------
-- Machine combinators
-------------------------------------------------------------------------------

--------------------------------------------------------------------------------
-- Lift a machine to a machine that passes on ticks unchanged

{- |
Lift a machine to a machine that passes on ticks unchanged.

Constructs the following machine:

@
           ┌─(if Tick)────────────────────┐
  [ Tick a ]                              [ Tick b ]
           └─(if Item)─( ProcessT m a b )─┘
@
-}
liftTick ::
  (Monad m) =>
  ProcessT m a b ->
  ProcessT m (Tick a) (Tick b)
liftTick :: forall (m :: * -> *) a b.
Monad m =>
ProcessT m a b -> ProcessT m (Tick a) (Tick b)
liftTick ProcessT m a b
m =
  m (Step (Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b)))
-> MachineT m (Is (Tick a)) (Tick b)
forall (m :: * -> *) (k :: * -> *) o.
m (Step k o (MachineT m k o)) -> MachineT m k o
MachineT (m (Step
      (Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b)))
 -> MachineT m (Is (Tick a)) (Tick b))
-> m (Step
        (Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b)))
-> MachineT m (Is (Tick a)) (Tick b)
forall a b. (a -> b) -> a -> b
$
    ProcessT m a b -> m (Step (Is a) b (ProcessT m a b))
forall (m :: * -> *) (k :: * -> *) o.
MachineT m k o -> m (Step k o (MachineT m k o))
runMachineT ProcessT m a b
m m (Step (Is a) b (ProcessT m a b))
-> (Step (Is a) b (ProcessT m a b)
    -> Step (Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b)))
-> m (Step
        (Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b)))
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \case
      Step (Is a) b (ProcessT m a b)
Stop ->
        Step (Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b))
forall (k :: * -> *) o r. Step k o r
Stop
      Yield b
o ProcessT m a b
k ->
        Tick b
-> MachineT m (Is (Tick a)) (Tick b)
-> Step (Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b))
forall (k :: * -> *) o r. o -> r -> Step k o r
Yield (b -> Tick b
forall a. a -> Tick a
Item b
o) (ProcessT m a b -> MachineT m (Is (Tick a)) (Tick b)
forall (m :: * -> *) a b.
Monad m =>
ProcessT m a b -> ProcessT m (Tick a) (Tick b)
liftTick ProcessT m a b
k)
      Await (t -> ProcessT m a b
onNext :: t -> ProcessT m a b) Is a t
Refl ProcessT m a b
onStop ->
        Step (Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b))
await'
       where
        await' :: Step (Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b))
await' = (Tick a -> MachineT m (Is (Tick a)) (Tick b))
-> Is (Tick a) (Tick a)
-> MachineT m (Is (Tick a)) (Tick b)
-> Step (Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b))
forall (k :: * -> *) o r t. (t -> r) -> k t -> r -> Step k o r
Await Tick a -> MachineT m (Is (Tick a)) (Tick b)
onNext' Is (Tick a) (Tick a)
forall a. Is a a
Refl MachineT m (Is (Tick a)) (Tick b)
onStop'
         where
          onNext' :: Tick a -> ProcessT m (Tick a) (Tick b)
          onNext' :: Tick a -> MachineT m (Is (Tick a)) (Tick b)
onNext' = \case
            Tick a
Tick ->
              m (Step (Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b)))
-> MachineT m (Is (Tick a)) (Tick b)
forall (m :: * -> *) (k :: * -> *) o.
m (Step k o (MachineT m k o)) -> MachineT m k o
MachineT (m (Step
      (Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b)))
 -> MachineT m (Is (Tick a)) (Tick b))
-> (MachineT m (Is (Tick a)) (Tick b)
    -> m (Step
            (Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b))))
-> MachineT m (Is (Tick a)) (Tick b)
-> MachineT m (Is (Tick a)) (Tick b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Step (Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b))
-> m (Step
        (Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b)))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Step (Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b))
 -> m (Step
         (Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b))))
-> (MachineT m (Is (Tick a)) (Tick b)
    -> Step (Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b)))
-> MachineT m (Is (Tick a)) (Tick b)
-> m (Step
        (Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b)))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Tick b
-> MachineT m (Is (Tick a)) (Tick b)
-> Step (Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b))
forall (k :: * -> *) o r. o -> r -> Step k o r
Yield Tick b
forall a. Tick a
Tick (MachineT m (Is (Tick a)) (Tick b)
 -> MachineT m (Is (Tick a)) (Tick b))
-> MachineT m (Is (Tick a)) (Tick b)
-> MachineT m (Is (Tick a)) (Tick b)
forall a b. (a -> b) -> a -> b
$
                m (Step (Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b)))
-> MachineT m (Is (Tick a)) (Tick b)
forall (m :: * -> *) (k :: * -> *) o.
m (Step k o (MachineT m k o)) -> MachineT m k o
MachineT (m (Step
      (Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b)))
 -> MachineT m (Is (Tick a)) (Tick b))
-> (Step (Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b))
    -> m (Step
            (Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b))))
-> Step (Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b))
-> MachineT m (Is (Tick a)) (Tick b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Step (Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b))
-> m (Step
        (Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b)))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Step (Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b))
 -> MachineT m (Is (Tick a)) (Tick b))
-> Step (Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b))
-> MachineT m (Is (Tick a)) (Tick b)
forall a b. (a -> b) -> a -> b
$
                  Step (Is (Tick a)) (Tick b) (MachineT m (Is (Tick a)) (Tick b))
await'
            Item a
a -> ProcessT m a b -> MachineT m (Is (Tick a)) (Tick b)
forall (m :: * -> *) a b.
Monad m =>
ProcessT m a b -> ProcessT m (Tick a) (Tick b)
liftTick (t -> ProcessT m a b
onNext a
t
a)
          onStop' :: ProcessT m (Tick a) (Tick b)
          onStop' :: MachineT m (Is (Tick a)) (Tick b)
onStop' = ProcessT m a b -> MachineT m (Is (Tick a)) (Tick b)
forall (m :: * -> *) a b.
Monad m =>
ProcessT m a b -> ProcessT m (Tick a) (Tick b)
liftTick ProcessT m a b
onStop

--------------------------------------------------------------------------------
-- Lift a machine to a machine that operates on batches

{- |
Lift a machine that processes @a@s into @b@s to a machine that processes
batches of @a@s into batches of @b@s.
-}
liftBatch ::
  forall m a b.
  (Monad m) =>
  ProcessT m a b ->
  ProcessT m [a] [b]
liftBatch :: forall (m :: * -> *) a b.
Monad m =>
ProcessT m a b -> ProcessT m [a] [b]
liftBatch = m (Step (Is [a]) [b] (MachineT m (Is [a]) [b]))
-> MachineT m (Is [a]) [b]
forall (m :: * -> *) (k :: * -> *) o.
m (Step k o (MachineT m k o)) -> MachineT m k o
MachineT (m (Step (Is [a]) [b] (MachineT m (Is [a]) [b]))
 -> MachineT m (Is [a]) [b])
-> (ProcessT m a b
    -> m (Step (Is [a]) [b] (MachineT m (Is [a]) [b])))
-> ProcessT m a b
-> MachineT m (Is [a]) [b]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [a]
-> [b]
-> ProcessT m a b
-> m (Step (Is [a]) [b] (MachineT m (Is [a]) [b]))
running [] []
 where
  -- The parent machine is running the child machine with the current batch.
  running :: [a] -> [b] -> ProcessT m a b -> m (Step (Is [a]) [b] (ProcessT m [a] [b]))
  running :: [a]
-> [b]
-> ProcessT m a b
-> m (Step (Is [a]) [b] (MachineT m (Is [a]) [b]))
running [a]
as [b]
bs ProcessT m a b
m =
    ProcessT m a b -> m (Step (Is a) b (ProcessT m a b))
forall (m :: * -> *) (k :: * -> *) o.
MachineT m k o -> m (Step k o (MachineT m k o))
runMachineT ProcessT m a b
m m (Step (Is a) b (ProcessT m a b))
-> (Step (Is a) b (ProcessT m a b)
    -> m (Step (Is [a]) [b] (MachineT m (Is [a]) [b])))
-> m (Step (Is [a]) [b] (MachineT m (Is [a]) [b]))
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      Step (Is a) b (ProcessT m a b)
Stop ->
        Step (Is [a]) [b] (MachineT m (Is [a]) [b])
-> m (Step (Is [a]) [b] (MachineT m (Is [a]) [b]))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Step (Is [a]) [b] (MachineT m (Is [a]) [b])
forall (k :: * -> *) o r. Step k o r
Stop
      Yield b
b ProcessT m a b
k ->
        [a]
-> [b]
-> ProcessT m a b
-> m (Step (Is [a]) [b] (MachineT m (Is [a]) [b]))
running [a]
as (b
b b -> [b] -> [b]
forall a. a -> [a] -> [a]
: [b]
bs) ProcessT m a b
k
      Await (t -> ProcessT m a b
onNext :: t -> ProcessT m a b) Is a t
Refl ProcessT m a b
onStop ->
        Step (Is [a]) [b] (MachineT m (Is [a]) [b])
-> m (Step (Is [a]) [b] (MachineT m (Is [a]) [b]))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Step (Is [a]) [b] (MachineT m (Is [a]) [b])
 -> m (Step (Is [a]) [b] (MachineT m (Is [a]) [b])))
-> Step (Is [a]) [b] (MachineT m (Is [a]) [b])
-> m (Step (Is [a]) [b] (MachineT m (Is [a]) [b]))
forall a b. (a -> b) -> a -> b
$ [b]
-> MachineT m (Is [a]) [b]
-> Step (Is [a]) [b] (MachineT m (Is [a]) [b])
forall (k :: * -> *) o r. o -> r -> Step k o r
Yield ([b] -> [b]
forall a. [a] -> [a]
reverse [b]
bs) (MachineT m (Is [a]) [b]
 -> Step (Is [a]) [b] (MachineT m (Is [a]) [b]))
-> MachineT m (Is [a]) [b]
-> Step (Is [a]) [b] (MachineT m (Is [a]) [b])
forall a b. (a -> b) -> a -> b
$ m (Step (Is [a]) [b] (MachineT m (Is [a]) [b]))
-> MachineT m (Is [a]) [b]
forall (m :: * -> *) (k :: * -> *) o.
m (Step k o (MachineT m k o)) -> MachineT m k o
MachineT (m (Step (Is [a]) [b] (MachineT m (Is [a]) [b]))
 -> MachineT m (Is [a]) [b])
-> m (Step (Is [a]) [b] (MachineT m (Is [a]) [b]))
-> MachineT m (Is [a]) [b]
forall a b. (a -> b) -> a -> b
$ [a]
-> (a -> ProcessT m a b)
-> ProcessT m a b
-> m (Step (Is [a]) [b] (MachineT m (Is [a]) [b]))
awaiting [a]
as a -> ProcessT m a b
t -> ProcessT m a b
onNext ProcessT m a b
onStop

  -- The parent machine is awaiting new input.
  awaiting :: [a] -> (a -> ProcessT m a b) -> ProcessT m a b -> m (Step (Is [a]) [b] (ProcessT m [a] [b]))
  awaiting :: [a]
-> (a -> ProcessT m a b)
-> ProcessT m a b
-> m (Step (Is [a]) [b] (MachineT m (Is [a]) [b]))
awaiting (a
a : [a]
as) a -> ProcessT m a b
onNext ProcessT m a b
_onStop = [a]
-> [b]
-> ProcessT m a b
-> m (Step (Is [a]) [b] (MachineT m (Is [a]) [b]))
running [a]
as [] (ProcessT m a b -> m (Step (Is [a]) [b] (MachineT m (Is [a]) [b])))
-> ProcessT m a b
-> m (Step (Is [a]) [b] (MachineT m (Is [a]) [b]))
forall a b. (a -> b) -> a -> b
$ a -> ProcessT m a b
onNext a
a
  awaiting [] a -> ProcessT m a b
onNext ProcessT m a b
onStop = Step (Is [a]) [b] (MachineT m (Is [a]) [b])
-> m (Step (Is [a]) [b] (MachineT m (Is [a]) [b]))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Step (Is [a]) [b] (MachineT m (Is [a]) [b])
 -> m (Step (Is [a]) [b] (MachineT m (Is [a]) [b])))
-> Step (Is [a]) [b] (MachineT m (Is [a]) [b])
-> m (Step (Is [a]) [b] (MachineT m (Is [a]) [b]))
forall a b. (a -> b) -> a -> b
$ ([a] -> MachineT m (Is [a]) [b])
-> Is [a] [a]
-> MachineT m (Is [a]) [b]
-> Step (Is [a]) [b] (MachineT m (Is [a]) [b])
forall (k :: * -> *) o r t. (t -> r) -> k t -> r -> Step k o r
Await [a] -> MachineT m (Is [a]) [b]
onNext' Is [a] [a]
forall a. Is a a
Refl MachineT m (Is [a]) [b]
onStop'
   where
    onNext' :: [a] -> ProcessT m [a] [b]
    onNext' :: [a] -> MachineT m (Is [a]) [b]
onNext' [a]
as = m (Step (Is [a]) [b] (MachineT m (Is [a]) [b]))
-> MachineT m (Is [a]) [b]
forall (m :: * -> *) (k :: * -> *) o.
m (Step k o (MachineT m k o)) -> MachineT m k o
MachineT (m (Step (Is [a]) [b] (MachineT m (Is [a]) [b]))
 -> MachineT m (Is [a]) [b])
-> m (Step (Is [a]) [b] (MachineT m (Is [a]) [b]))
-> MachineT m (Is [a]) [b]
forall a b. (a -> b) -> a -> b
$ [a]
-> (a -> ProcessT m a b)
-> ProcessT m a b
-> m (Step (Is [a]) [b] (MachineT m (Is [a]) [b]))
awaiting [a]
as a -> ProcessT m a b
onNext ProcessT m a b
onStop
    onStop' :: ProcessT m [a] [b]
    onStop' :: MachineT m (Is [a]) [b]
onStop' = ProcessT m a b -> MachineT m (Is [a]) [b]
forall x. ProcessT m a b -> ProcessT m x [b]
exhausting ProcessT m a b
onStop

  -- The parent machine is exhausting the child machine to gather its output.
  exhausting :: ProcessT m a b -> ProcessT m x [b]
  exhausting :: forall x. ProcessT m a b -> ProcessT m x [b]
exhausting = m (Step (Is x) [b] (MachineT m (Is x) [b]))
-> MachineT m (Is x) [b]
forall (m :: * -> *) (k :: * -> *) o.
m (Step k o (MachineT m k o)) -> MachineT m k o
MachineT (m (Step (Is x) [b] (MachineT m (Is x) [b]))
 -> MachineT m (Is x) [b])
-> (ProcessT m a b -> m (Step (Is x) [b] (MachineT m (Is x) [b])))
-> ProcessT m a b
-> MachineT m (Is x) [b]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [b]
-> ProcessT m a b -> m (Step (Is x) [b] (MachineT m (Is x) [b]))
forall x.
[b] -> ProcessT m a b -> m (Step (Is x) [b] (ProcessT m x [b]))
go []
   where
    go :: [b] -> ProcessT m a b -> m (Step (Is x) [b] (ProcessT m x [b]))
    go :: forall x.
[b] -> ProcessT m a b -> m (Step (Is x) [b] (ProcessT m x [b]))
go [b]
bs ProcessT m a b
m =
      ProcessT m a b -> m (Step (Is a) b (ProcessT m a b))
forall (m :: * -> *) (k :: * -> *) o.
MachineT m k o -> m (Step k o (MachineT m k o))
runMachineT ProcessT m a b
m m (Step (Is a) b (ProcessT m a b))
-> (Step (Is a) b (ProcessT m a b)
    -> m (Step (Is x) [b] (ProcessT m x [b])))
-> m (Step (Is x) [b] (ProcessT m x [b]))
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        Step (Is a) b (ProcessT m a b)
Stop ->
          Step (Is x) [b] (ProcessT m x [b])
-> m (Step (Is x) [b] (ProcessT m x [b]))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Step (Is x) [b] (ProcessT m x [b])
forall (k :: * -> *) o r. Step k o r
Stop
        Yield b
b ProcessT m a b
k ->
          [b] -> ProcessT m a b -> m (Step (Is x) [b] (ProcessT m x [b]))
forall x.
[b] -> ProcessT m a b -> m (Step (Is x) [b] (ProcessT m x [b]))
go (b
b b -> [b] -> [b]
forall a. a -> [a] -> [a]
: [b]
bs) ProcessT m a b
k
        Await t -> ProcessT m a b
_onNext Is a t
_Refl ProcessT m a b
onStop ->
          Step (Is x) [b] (ProcessT m x [b])
-> m (Step (Is x) [b] (ProcessT m x [b]))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Step (Is x) [b] (ProcessT m x [b])
 -> m (Step (Is x) [b] (ProcessT m x [b])))
-> Step (Is x) [b] (ProcessT m x [b])
-> m (Step (Is x) [b] (ProcessT m x [b]))
forall a b. (a -> b) -> a -> b
$ [b] -> ProcessT m x [b] -> Step (Is x) [b] (ProcessT m x [b])
forall (k :: * -> *) o r. o -> r -> Step k o r
Yield ([b] -> [b]
forall a. [a] -> [a]
reverse [b]
bs) (ProcessT m x [b] -> Step (Is x) [b] (ProcessT m x [b]))
-> ProcessT m x [b] -> Step (Is x) [b] (ProcessT m x [b])
forall a b. (a -> b) -> a -> b
$ m (Step (Is x) [b] (ProcessT m x [b])) -> ProcessT m x [b]
forall (m :: * -> *) (k :: * -> *) o.
m (Step k o (MachineT m k o)) -> MachineT m k o
MachineT (m (Step (Is x) [b] (ProcessT m x [b])) -> ProcessT m x [b])
-> m (Step (Is x) [b] (ProcessT m x [b])) -> ProcessT m x [b]
forall a b. (a -> b) -> a -> b
$ [b] -> ProcessT m a b -> m (Step (Is x) [b] (ProcessT m x [b]))
forall x.
[b] -> ProcessT m a b -> m (Step (Is x) [b] (ProcessT m x [b]))
go [] ProcessT m a b
onStop

--------------------------------------------------------------------------------
-- Construct a processor that spawns a separate child processor for each measure

{- |
Spawn a copy of a machine for each "measure".

Constructs the following machine:

@
    ┌─────(if measure == k0)─( ProcessT m a b )────┐
  [ a ] ──(if measure == ..)─( ProcessT m a b )─ [ b ]
    └─────(if measure == kN)─( ProcessT m a b )────┘
@

__Warning:__ The router does not currently garbage-collect terminated child processors.
-}
liftRouter ::
  forall m k a b.
  (MonadIO m, Hashable k) =>
  -- | Function to measure.
  (a -> Maybe k) ->
  -- | Function to spawn child processors.
  (k -> ProcessT m a b) ->
  ProcessT m a b
liftRouter :: forall (m :: * -> *) k a b.
(MonadIO m, Hashable k) =>
(a -> Maybe k) -> (k -> ProcessT m a b) -> ProcessT m a b
liftRouter a -> Maybe k
measure k -> ProcessT m a b
spawn = HashMap k (ProcessT m a b) -> ProcessT m a b
awaiting HashMap k (ProcessT m a b)
forall k v. HashMap k v
M.empty
 where
  awaiting :: HashMap k (ProcessT m a b) -> ProcessT m a b
  awaiting :: HashMap k (ProcessT m a b) -> ProcessT m a b
awaiting HashMap k (ProcessT m a b)
st = m (Step (Is a) b (ProcessT m a b)) -> ProcessT m a b
forall (m :: * -> *) (k :: * -> *) o.
m (Step k o (MachineT m k o)) -> MachineT m k o
MachineT (m (Step (Is a) b (ProcessT m a b)) -> ProcessT m a b)
-> (Step (Is a) b (ProcessT m a b)
    -> m (Step (Is a) b (ProcessT m a b)))
-> Step (Is a) b (ProcessT m a b)
-> ProcessT m a b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Step (Is a) b (ProcessT m a b)
-> m (Step (Is a) b (ProcessT m a b))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Step (Is a) b (ProcessT m a b) -> ProcessT m a b)
-> Step (Is a) b (ProcessT m a b) -> ProcessT m a b
forall a b. (a -> b) -> a -> b
$ (a -> ProcessT m a b)
-> Is a a -> ProcessT m a b -> Step (Is a) b (ProcessT m a b)
forall (k :: * -> *) o r t. (t -> r) -> k t -> r -> Step k o r
Await a -> ProcessT m a b
onNext Is a a
forall a. Is a a
Refl ProcessT m a b
onStop
   where
    onNext :: a -> MachineT m (Is a) b
    onNext :: a -> ProcessT m a b
onNext a
a = case a -> Maybe k
measure a
a of
      Maybe k
Nothing -> HashMap k (ProcessT m a b) -> ProcessT m a b
awaiting HashMap k (ProcessT m a b)
st
      Just k
k -> a
-> ProcessT m a b
-> (ProcessT m a b -> ProcessT m a b)
-> ProcessT m a b
provideThen a
a ProcessT m a b
m ((ProcessT m a b -> ProcessT m a b) -> ProcessT m a b)
-> (ProcessT m a b -> ProcessT m a b) -> ProcessT m a b
forall a b. (a -> b) -> a -> b
$ \ProcessT m a b
m' -> HashMap k (ProcessT m a b) -> ProcessT m a b
awaiting (k
-> ProcessT m a b
-> HashMap k (ProcessT m a b)
-> HashMap k (ProcessT m a b)
forall k v.
(Eq k, Hashable k) =>
k -> v -> HashMap k v -> HashMap k v
M.insert k
k ProcessT m a b
m' HashMap k (ProcessT m a b)
st)
       where
        m :: ProcessT m a b
m = ProcessT m a b -> Maybe (ProcessT m a b) -> ProcessT m a b
forall a. a -> Maybe a -> a
fromMaybe (k -> ProcessT m a b
spawn k
k) (k -> HashMap k (ProcessT m a b) -> Maybe (ProcessT m a b)
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
M.lookup k
k HashMap k (ProcessT m a b)
st)
    onStop :: MachineT m (Is a) b
    onStop :: ProcessT m a b
onStop = (ProcessT m a b -> ProcessT m a b -> ProcessT m a b)
-> ProcessT m a b -> [ProcessT m a b] -> ProcessT m a b
forall a b. (a -> b -> b) -> b -> [a] -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr ProcessT m a b -> ProcessT m a b -> ProcessT m a b
forall (m :: * -> *) (k0 :: * -> *) b (k :: * -> *).
Monad m =>
MachineT m k0 b -> MachineT m k b -> MachineT m k b
starve ProcessT m a b
forall (k :: * -> *) b (m :: * -> *). Monad m => MachineT m k b
stopped (HashMap k (ProcessT m a b) -> [ProcessT m a b]
forall k v. HashMap k v -> [v]
M.elems HashMap k (ProcessT m a b)
st)

  provideThen :: a -> ProcessT m a b -> (ProcessT m a b -> ProcessT m a b) -> ProcessT m a b
  provideThen :: a
-> ProcessT m a b
-> (ProcessT m a b -> ProcessT m a b)
-> ProcessT m a b
provideThen a
a ProcessT m a b
m ProcessT m a b -> ProcessT m a b
k =
    m (Step (Is a) b (ProcessT m a b)) -> ProcessT m a b
forall (m :: * -> *) (k :: * -> *) o.
m (Step k o (MachineT m k o)) -> MachineT m k o
MachineT (m (Step (Is a) b (ProcessT m a b)) -> ProcessT m a b)
-> m (Step (Is a) b (ProcessT m a b)) -> ProcessT m a b
forall a b. (a -> b) -> a -> b
$
      ProcessT m a b -> m (Step (Is a) b (ProcessT m a b))
forall (m :: * -> *) (k :: * -> *) o.
MachineT m k o -> m (Step k o (MachineT m k o))
runMachineT ProcessT m a b
m m (Step (Is a) b (ProcessT m a b))
-> (Step (Is a) b (ProcessT m a b)
    -> m (Step (Is a) b (ProcessT m a b)))
-> m (Step (Is a) b (ProcessT m a b))
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        Step (Is a) b (ProcessT m a b)
Stop -> ProcessT m a b -> m (Step (Is a) b (ProcessT m a b))
forall (m :: * -> *) (k :: * -> *) o.
MachineT m k o -> m (Step k o (MachineT m k o))
runMachineT (ProcessT m a b -> ProcessT m a b
k ProcessT m a b
forall (k :: * -> *) b (m :: * -> *). Monad m => MachineT m k b
stopped)
        Yield b
o ProcessT m a b
m' -> Step (Is a) b (ProcessT m a b)
-> m (Step (Is a) b (ProcessT m a b))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (b -> ProcessT m a b -> Step (Is a) b (ProcessT m a b)
forall (k :: * -> *) o r. o -> r -> Step k o r
Yield b
o (a
-> ProcessT m a b
-> (ProcessT m a b -> ProcessT m a b)
-> ProcessT m a b
provideThen a
a ProcessT m a b
m' ProcessT m a b -> ProcessT m a b
k))
        Await t -> ProcessT m a b
onNext Is a t
Refl ProcessT m a b
_onStop -> ProcessT m a b -> m (Step (Is a) b (ProcessT m a b))
forall (m :: * -> *) (k :: * -> *) o.
MachineT m k o -> m (Step k o (MachineT m k o))
runMachineT (ProcessT m a b
-> (ProcessT m a b -> ProcessT m a b) -> ProcessT m a b
exhaustThen (t -> ProcessT m a b
onNext a
t
a) ProcessT m a b -> ProcessT m a b
k)

  exhaustThen :: ProcessT m a b -> (ProcessT m a b -> ProcessT m a b) -> ProcessT m a b
  exhaustThen :: ProcessT m a b
-> (ProcessT m a b -> ProcessT m a b) -> ProcessT m a b
exhaustThen ProcessT m a b
m ProcessT m a b -> ProcessT m a b
k =
    m (Step (Is a) b (ProcessT m a b)) -> ProcessT m a b
forall (m :: * -> *) (k :: * -> *) o.
m (Step k o (MachineT m k o)) -> MachineT m k o
MachineT (m (Step (Is a) b (ProcessT m a b)) -> ProcessT m a b)
-> m (Step (Is a) b (ProcessT m a b)) -> ProcessT m a b
forall a b. (a -> b) -> a -> b
$
      ProcessT m a b -> m (Step (Is a) b (ProcessT m a b))
forall (m :: * -> *) (k :: * -> *) o.
MachineT m k o -> m (Step k o (MachineT m k o))
runMachineT ProcessT m a b
m m (Step (Is a) b (ProcessT m a b))
-> (Step (Is a) b (ProcessT m a b)
    -> m (Step (Is a) b (ProcessT m a b)))
-> m (Step (Is a) b (ProcessT m a b))
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        Yield b
o ProcessT m a b
m' -> Step (Is a) b (ProcessT m a b)
-> m (Step (Is a) b (ProcessT m a b))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (b -> ProcessT m a b -> Step (Is a) b (ProcessT m a b)
forall (k :: * -> *) o r. o -> r -> Step k o r
Yield b
o (ProcessT m a b -> ProcessT m a b
k ProcessT m a b
m'))
        Step (Is a) b (ProcessT m a b)
m' -> ProcessT m a b -> m (Step (Is a) b (ProcessT m a b))
forall (m :: * -> *) (k :: * -> *) o.
MachineT m k o -> m (Step k o (MachineT m k o))
runMachineT (ProcessT m a b -> ProcessT m a b
k (Step (Is a) b (ProcessT m a b) -> ProcessT m a b
forall (m :: * -> *) (k :: * -> *) o.
Monad m =>
Step k o (MachineT m k o) -> MachineT m k o
encased Step (Is a) b (ProcessT m a b)
m'))

-------------------------------------------------------------------------------
-- Event stream sorting
-------------------------------------------------------------------------------

{- |
Reorder events respecting ticks.

This machine caches two batches worth of events, sorts them together,
and then yields only those events whose timestamp is less than or equal
to the maximum of the first batch.
-}
sortByBatch ::
  forall m a.
  (Monad m) =>
  (a -> Timestamp) ->
  ProcessT m [a] [a]
sortByBatch :: forall (m :: * -> *) a.
Monad m =>
(a -> Timestamp) -> ProcessT m [a] [a]
sortByBatch a -> Timestamp
timestamp = Maybe [a] -> ProcessT m [a] [a]
sortByBatchWith Maybe [a]
forall a. Maybe a
Nothing
 where
  sortByBatchWith :: Maybe [a] -> ProcessT m [a] [a]
  sortByBatchWith :: Maybe [a] -> ProcessT m [a] [a]
sortByBatchWith = \case
    Maybe [a]
Nothing -> m (Step (Is [a]) [a] (ProcessT m [a] [a])) -> ProcessT m [a] [a]
forall (m :: * -> *) (k :: * -> *) o.
m (Step k o (MachineT m k o)) -> MachineT m k o
MachineT (m (Step (Is [a]) [a] (ProcessT m [a] [a])) -> ProcessT m [a] [a])
-> m (Step (Is [a]) [a] (ProcessT m [a] [a])) -> ProcessT m [a] [a]
forall a b. (a -> b) -> a -> b
$ Step (Is [a]) [a] (ProcessT m [a] [a])
-> m (Step (Is [a]) [a] (ProcessT m [a] [a]))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Step (Is [a]) [a] (ProcessT m [a] [a])
 -> m (Step (Is [a]) [a] (ProcessT m [a] [a])))
-> Step (Is [a]) [a] (ProcessT m [a] [a])
-> m (Step (Is [a]) [a] (ProcessT m [a] [a]))
forall a b. (a -> b) -> a -> b
$ ([a] -> ProcessT m [a] [a])
-> Is [a] [a]
-> ProcessT m [a] [a]
-> Step (Is [a]) [a] (ProcessT m [a] [a])
forall (k :: * -> *) o r t. (t -> r) -> k t -> r -> Step k o r
Await [a] -> ProcessT m [a] [a]
onNext Is [a] [a]
forall a. Is a a
Refl ProcessT m [a] [a]
onStop
     where
      onNext :: [a] -> ProcessT m [a] [a]
      onNext :: [a] -> ProcessT m [a] [a]
onNext [a]
new = Maybe [a] -> ProcessT m [a] [a]
sortByBatchWith ([a] -> Maybe [a]
forall a. a -> Maybe a
Just [a]
sortedNew)
       where
        sortedNew :: [a]
sortedNew = [a] -> [a]
sortByTime [a]
new
      onStop :: ProcessT m [a] [a]
      onStop :: ProcessT m [a] [a]
onStop = ProcessT m [a] [a]
forall (k :: * -> *) b (m :: * -> *). Monad m => MachineT m k b
stopped
    Just [a]
sortedOld -> m (Step (Is [a]) [a] (ProcessT m [a] [a])) -> ProcessT m [a] [a]
forall (m :: * -> *) (k :: * -> *) o.
m (Step k o (MachineT m k o)) -> MachineT m k o
MachineT (m (Step (Is [a]) [a] (ProcessT m [a] [a])) -> ProcessT m [a] [a])
-> m (Step (Is [a]) [a] (ProcessT m [a] [a])) -> ProcessT m [a] [a]
forall a b. (a -> b) -> a -> b
$ Step (Is [a]) [a] (ProcessT m [a] [a])
-> m (Step (Is [a]) [a] (ProcessT m [a] [a]))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Step (Is [a]) [a] (ProcessT m [a] [a])
 -> m (Step (Is [a]) [a] (ProcessT m [a] [a])))
-> Step (Is [a]) [a] (ProcessT m [a] [a])
-> m (Step (Is [a]) [a] (ProcessT m [a] [a]))
forall a b. (a -> b) -> a -> b
$ ([a] -> ProcessT m [a] [a])
-> Is [a] [a]
-> ProcessT m [a] [a]
-> Step (Is [a]) [a] (ProcessT m [a] [a])
forall (k :: * -> *) o r t. (t -> r) -> k t -> r -> Step k o r
Await [a] -> ProcessT m [a] [a]
onNext Is [a] [a]
forall a. Is a a
Refl ProcessT m [a] [a]
onStop
     where
      onNext :: [a] -> ProcessT m [a] [a]
      onNext :: [a] -> ProcessT m [a] [a]
onNext [a]
new
        | [a] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [a]
sortedOld = Maybe [a] -> ProcessT m [a] [a]
sortByBatchWith (Maybe [a] -> ProcessT m [a] [a])
-> Maybe [a] -> ProcessT m [a] [a]
forall a b. (a -> b) -> a -> b
$ [a] -> Maybe [a]
forall a. a -> Maybe a
Just [a]
sortedNew
        | Bool
otherwise = m (Step (Is [a]) [a] (ProcessT m [a] [a])) -> ProcessT m [a] [a]
forall (m :: * -> *) (k :: * -> *) o.
m (Step k o (MachineT m k o)) -> MachineT m k o
MachineT (m (Step (Is [a]) [a] (ProcessT m [a] [a])) -> ProcessT m [a] [a])
-> m (Step (Is [a]) [a] (ProcessT m [a] [a])) -> ProcessT m [a] [a]
forall a b. (a -> b) -> a -> b
$ Step (Is [a]) [a] (ProcessT m [a] [a])
-> m (Step (Is [a]) [a] (ProcessT m [a] [a]))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Step (Is [a]) [a] (ProcessT m [a] [a])
 -> m (Step (Is [a]) [a] (ProcessT m [a] [a])))
-> Step (Is [a]) [a] (ProcessT m [a] [a])
-> m (Step (Is [a]) [a] (ProcessT m [a] [a]))
forall a b. (a -> b) -> a -> b
$ [a] -> ProcessT m [a] [a] -> Step (Is [a]) [a] (ProcessT m [a] [a])
forall (k :: * -> *) o r. o -> r -> Step k o r
Yield [a]
sortedBeforeCutoff (ProcessT m [a] [a] -> Step (Is [a]) [a] (ProcessT m [a] [a]))
-> ProcessT m [a] [a] -> Step (Is [a]) [a] (ProcessT m [a] [a])
forall a b. (a -> b) -> a -> b
$ Maybe [a] -> ProcessT m [a] [a]
sortByBatchWith (Maybe [a] -> ProcessT m [a] [a])
-> Maybe [a] -> ProcessT m [a] [a]
forall a b. (a -> b) -> a -> b
$ [a] -> Maybe [a]
forall a. a -> Maybe a
Just [a]
sortedAfterCutoff
       where
        -- NOTE: use of partial @maximum@ is guarded by the check @null old@.
        cutoff :: Timestamp
cutoff = Max Timestamp -> Timestamp
forall a. Max a -> a
getMax ((a -> Max Timestamp) -> [a] -> Max Timestamp
forall m a. Monoid m => (a -> m) -> [a] -> m
forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap (Timestamp -> Max Timestamp
forall a. a -> Max a
Max (Timestamp -> Max Timestamp)
-> (a -> Timestamp) -> a -> Max Timestamp
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Timestamp
timestamp) [a]
sortedOld)
        sortedNew :: [a]
sortedNew = [a] -> [a]
sortByTime [a]
new
        sorted :: [a]
sorted = [a] -> [a] -> [a]
joinByTime [a]
sortedOld [a]
sortedNew
        ([a]
sortedBeforeCutoff, [a]
sortedAfterCutoff) = (a -> Bool) -> [a] -> ([a], [a])
forall a. (a -> Bool) -> [a] -> ([a], [a])
L.partition ((Timestamp -> Timestamp -> Bool
forall a. Ord a => a -> a -> Bool
<= Timestamp
cutoff) (Timestamp -> Bool) -> (a -> Timestamp) -> a -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Timestamp
timestamp) [a]
sorted
      onStop :: ProcessT m [a] [a]
      onStop :: ProcessT m [a] [a]
onStop = m (Step (Is [a]) [a] (ProcessT m [a] [a])) -> ProcessT m [a] [a]
forall (m :: * -> *) (k :: * -> *) o.
m (Step k o (MachineT m k o)) -> MachineT m k o
MachineT (m (Step (Is [a]) [a] (ProcessT m [a] [a])) -> ProcessT m [a] [a])
-> m (Step (Is [a]) [a] (ProcessT m [a] [a])) -> ProcessT m [a] [a]
forall a b. (a -> b) -> a -> b
$ Step (Is [a]) [a] (ProcessT m [a] [a])
-> m (Step (Is [a]) [a] (ProcessT m [a] [a]))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Step (Is [a]) [a] (ProcessT m [a] [a])
 -> m (Step (Is [a]) [a] (ProcessT m [a] [a])))
-> Step (Is [a]) [a] (ProcessT m [a] [a])
-> m (Step (Is [a]) [a] (ProcessT m [a] [a]))
forall a b. (a -> b) -> a -> b
$ [a] -> ProcessT m [a] [a] -> Step (Is [a]) [a] (ProcessT m [a] [a])
forall (k :: * -> *) o r. o -> r -> Step k o r
Yield [a]
sortedOld (ProcessT m [a] [a] -> Step (Is [a]) [a] (ProcessT m [a] [a]))
-> ProcessT m [a] [a] -> Step (Is [a]) [a] (ProcessT m [a] [a])
forall a b. (a -> b) -> a -> b
$ ProcessT m [a] [a]
forall (k :: * -> *) b (m :: * -> *). Monad m => MachineT m k b
stopped

  -- compByTime :: a -> a -> Ordering
  compByTime :: a -> a -> Ordering
compByTime = Timestamp -> Timestamp -> Ordering
forall a. Ord a => a -> a -> Ordering
compare (Timestamp -> Timestamp -> Ordering)
-> (a -> Timestamp) -> a -> a -> Ordering
forall b c a. (b -> b -> c) -> (a -> b) -> a -> a -> c
`on` a -> Timestamp
timestamp

  -- sortByTime :: [a] -> [a]
  sortByTime :: [a] -> [a]
sortByTime = (a -> a -> Ordering) -> [a] -> [a]
forall a. (a -> a -> Ordering) -> [a] -> [a]
L.sortBy a -> a -> Ordering
compByTime

  -- joinByTime :: [a] -> [a] -> [a]
  joinByTime :: [a] -> [a] -> [a]
joinByTime [] [a]
ys = [a]
ys
  joinByTime [a]
xs [] = [a]
xs
  joinByTime (a
x : [a]
xs) (a
y : [a]
ys) = case a -> a -> Ordering
compByTime a
x a
y of
    Ordering
LT -> a
x a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a] -> [a] -> [a]
joinByTime [a]
xs (a
y a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
ys)
    Ordering
_ -> a
y a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a] -> [a] -> [a]
joinByTime (a
x a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
xs) [a]
ys

{- |
Variant of `sortByBatch` that operates on streams of items and ticks.
-}
sortByBatchTick :: (a -> Timestamp) -> Process (Tick a) (Tick a)
sortByBatchTick :: forall a. (a -> Timestamp) -> Process (Tick a) (Tick a)
sortByBatchTick a -> Timestamp
timestamp =
  (Tick a -> Tick [a]) -> Machine (Is (Tick a)) (Tick [a])
forall (k :: * -> * -> *) a b.
Category k =>
(a -> b) -> Machine (k a) b
mapping ((a -> [a]) -> Tick a -> Tick [a]
forall a b. (a -> b) -> Tick a -> Tick b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [])) MachineT m (Is (Tick a)) (Tick [a])
-> ProcessT m (Tick [a]) [a] -> MachineT m (Is (Tick a)) [a]
forall (m :: * -> *) (k :: * -> *) b c.
Monad m =>
MachineT m k b -> ProcessT m b c -> MachineT m k c
~> ProcessT m (Tick [a]) [a]
forall (m :: * -> *) a.
(Monad m, Monoid a) =>
ProcessT m (Tick a) a
batchByTick MachineT m (Is (Tick a)) [a]
-> ProcessT m [a] [a] -> MachineT m (Is (Tick a)) [a]
forall (m :: * -> *) (k :: * -> *) b c.
Monad m =>
MachineT m k b -> ProcessT m b c -> MachineT m k c
~> (a -> Timestamp) -> ProcessT m [a] [a]
forall (m :: * -> *) a.
Monad m =>
(a -> Timestamp) -> ProcessT m [a] [a]
sortByBatch a -> Timestamp
timestamp MachineT m (Is (Tick a)) [a]
-> ProcessT m [a] (Tick a) -> MachineT m (Is (Tick a)) (Tick a)
forall (m :: * -> *) (k :: * -> *) b c.
Monad m =>
MachineT m k b -> ProcessT m b c -> MachineT m k c
~> ProcessT m [a] (Tick a)
forall a (m :: * -> *). Monad m => MachineT m (Is [a]) (Tick a)
batchListToTick

-------------------------------------------------------------------------------
-- Filtering semaphores
-------------------------------------------------------------------------------

{- | A simple delimiting t'Moore' machine,
which is opened by one constant marker and closed by the other one.
-}
between :: Text -> Text -> Moore Text Bool
between :: Text -> Text -> Moore Text Bool
between Text
x Text
y = Moore Text Bool
open
 where
  open :: Moore Text Bool
open = Bool -> (Text -> Moore Text Bool) -> Moore Text Bool
forall a b. b -> (a -> Moore a b) -> Moore a b
Moore Bool
False Text -> Moore Text Bool
open' where open' :: Text -> Moore Text Bool
open' Text
x' = if Text
x Text -> Text -> Bool
forall a. Eq a => a -> a -> Bool
== Text
x' then Moore Text Bool
close else Moore Text Bool
open
  close :: Moore Text Bool
close = Bool -> (Text -> Moore Text Bool) -> Moore Text Bool
forall a b. b -> (a -> Moore a b) -> Moore a b
Moore Bool
True Text -> Moore Text Bool
close' where close' :: Text -> Moore Text Bool
close' Text
y' = if Text
y Text -> Text -> Bool
forall a. Eq a => a -> a -> Bool
== Text
y' then Moore Text Bool
forall {a}. Moore a Bool
end else Moore Text Bool
close
  end :: Moore a Bool
end = Bool -> (a -> Moore a Bool) -> Moore a Bool
forall a b. b -> (a -> Moore a b) -> Moore a b
Moore Bool
False (Moore a Bool -> a -> Moore a Bool
forall a b. a -> b -> a
const Moore a Bool
end)

-- | Delimit the event process.
delimit :: (Monad m) => Moore Text Bool -> ProcessT m Event Event
delimit :: forall (m :: * -> *).
Monad m =>
Moore Text Bool -> ProcessT m Event Event
delimit = PlanT (Is Event) Event m () -> MachineT m (Is Event) Event
forall (m :: * -> *) (k :: * -> *) o a.
Monad m =>
PlanT k o m a -> MachineT m k o
construct (PlanT (Is Event) Event m () -> MachineT m (Is Event) Event)
-> (Moore Text Bool -> PlanT (Is Event) Event m ())
-> Moore Text Bool
-> MachineT m (Is Event) Event
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Moore Text Bool -> PlanT (Is Event) Event m ()
forall (m :: * -> *).
Monad m =>
Moore Text Bool -> PlanT (Is Event) Event m ()
go
 where
  go :: (Monad m) => Moore Text Bool -> PlanT (Is Event) Event m ()
  go :: forall (m :: * -> *).
Monad m =>
Moore Text Bool -> PlanT (Is Event) Event m ()
go mm :: Moore Text Bool
mm@(Moore Bool
s Text -> Moore Text Bool
next) = do
    Event
e <- PlanT (Is Event) Event m Event
Plan (Is Event) Event Event
forall (k :: * -> * -> *) i o. Category k => Plan (k i) o i
await
    case Event -> EventInfo
evSpec Event
e of
      -- on marker step the moore machine.
      E.UserMarker Text
m -> do
        let mm' :: Moore Text Bool
mm'@(Moore Bool
s' Text -> Moore Text Bool
_) = Text -> Moore Text Bool
next Text
m
        -- if current or next state is open (== True), emit the marker.
        Bool -> PlanT (Is Event) Event m () -> PlanT (Is Event) Event m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool
s Bool -> Bool -> Bool
|| Bool
s') (PlanT (Is Event) Event m () -> PlanT (Is Event) Event m ())
-> PlanT (Is Event) Event m () -> PlanT (Is Event) Event m ()
forall a b. (a -> b) -> a -> b
$ Event -> Plan (Is Event) Event ()
forall o (k :: * -> *). o -> Plan k o ()
yield Event
e
        Moore Text Bool -> PlanT (Is Event) Event m ()
forall (m :: * -> *).
Monad m =>
Moore Text Bool -> PlanT (Is Event) Event m ()
go Moore Text Bool
mm'

      -- for other events, emit if the state is open.
      EventInfo
_ -> do
        Bool -> PlanT (Is Event) Event m () -> PlanT (Is Event) Event m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
s (PlanT (Is Event) Event m () -> PlanT (Is Event) Event m ())
-> PlanT (Is Event) Event m () -> PlanT (Is Event) Event m ()
forall a b. (a -> b) -> a -> b
$ Event -> Plan (Is Event) Event ()
forall o (k :: * -> *). o -> Plan k o ()
yield Event
e
        Moore Text Bool -> PlanT (Is Event) Event m ()
forall (m :: * -> *).
Monad m =>
Moore Text Bool -> PlanT (Is Event) Event m ()
go Moore Text Bool
mm

-------------------------------------------------------------------------------
-- Validation
-------------------------------------------------------------------------------

{- |
This machine validates that there is some input.

If no input is encountered after the given number of ticks, the machine prints
a warning that directs the user to check that the @-l@ flag was set correctly.
-}
validateInput ::
  (MonadIO m) =>
  Verbosity ->
  Int ->
  ProcessT m (Tick a) x
validateInput :: forall (m :: * -> *) a x.
MonadIO m =>
Verbosity -> Int -> ProcessT m (Tick a) x
validateInput Verbosity
verbosity Int
ticks
  | Verbosity
verbosityWarning Verbosity -> Verbosity -> Bool
forall a. Ord a => a -> a -> Bool
>= Verbosity
verbosity = PlanT (Is (Tick a)) x m () -> MachineT m (Is (Tick a)) x
forall (m :: * -> *) (k :: * -> *) o a.
Monad m =>
PlanT k o m a -> MachineT m k o
construct (PlanT (Is (Tick a)) x m () -> MachineT m (Is (Tick a)) x)
-> PlanT (Is (Tick a)) x m () -> MachineT m (Is (Tick a)) x
forall a b. (a -> b) -> a -> b
$ Int -> PlanT (Is (Tick a)) x m ()
start Int
ticks
  | Bool
otherwise = MachineT m (Is (Tick a)) x
forall (k :: * -> *) b (m :: * -> *). Monad m => MachineT m k b
stopped
 where
  start :: Int -> PlanT (Is (Tick a)) x m ()
start Int
remaining
    | Int
remaining Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0 = IO () -> PlanT (Is (Tick a)) x m ()
forall a. IO a -> PlanT (Is (Tick a)) x m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> PlanT (Is (Tick a)) x m ())
-> IO () -> PlanT (Is (Tick a)) x m ()
forall a b. (a -> b) -> a -> b
$ do
        Verbosity -> Text -> IO ()
forall (m :: * -> *).
(HasCallStack, MonadIO m) =>
Verbosity -> Text -> m ()
logWarning Verbosity
verbosity (Text -> IO ()) -> (String -> Text) -> String -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$
          String -> Int -> String
forall r. PrintfType r => String -> r
printf
            String
"No input after %d ticks. Did you pass -l to the GHC RTS?"
            Int
ticks
    | Bool
otherwise = do
        Verbosity -> Text -> PlanT (Is (Tick a)) x m ()
forall (m :: * -> *).
(HasCallStack, MonadIO m) =>
Verbosity -> Text -> m ()
logDebug Verbosity
verbosity (Text -> PlanT (Is (Tick a)) x m ())
-> Text -> PlanT (Is (Tick a)) x m ()
forall a b. (a -> b) -> a -> b
$
          String -> Text
T.pack (Int -> String
forall a. Show a => a -> String
show Int
remaining) Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" ticks remaining."
        PlanT (Is (Tick a)) x m (Tick a)
Plan (Is (Tick a)) x (Tick a)
forall (k :: * -> * -> *) i o. Category k => Plan (k i) o i
await PlanT (Is (Tick a)) x m (Tick a)
-> (Tick a -> PlanT (Is (Tick a)) x m ())
-> PlanT (Is (Tick a)) x m ()
forall a b.
PlanT (Is (Tick a)) x m a
-> (a -> PlanT (Is (Tick a)) x m b) -> PlanT (Is (Tick a)) x m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
          Item{} -> do
            Verbosity -> Text -> PlanT (Is (Tick a)) x m ()
forall (m :: * -> *).
(HasCallStack, MonadIO m) =>
Verbosity -> Text -> m ()
logDebug Verbosity
verbosity Text
"Received item."
          Tick a
Tick -> do
            Verbosity -> Text -> PlanT (Is (Tick a)) x m ()
forall (m :: * -> *).
(HasCallStack, MonadIO m) =>
Verbosity -> Text -> m ()
logDebug Verbosity
verbosity Text
"Received tick."
            Int -> PlanT (Is (Tick a)) x m ()
start (Int -> Int
forall a. Enum a => a -> a
pred Int
remaining)

{- |
This machine validates that the inputs are received in order.

If an out-of-order input is encountered, the machine prints an error message
that directs the user to check that the @--eventlog-flush-interval@ and the
@--batch-interval@ flags are set correctly.
-}
validateOrder ::
  (MonadIO m, Show a) =>
  Verbosity ->
  (a -> Timestamp) ->
  ProcessT m a x
validateOrder :: forall (m :: * -> *) a x.
(MonadIO m, Show a) =>
Verbosity -> (a -> Timestamp) -> ProcessT m a x
validateOrder Verbosity
verbosity a -> Timestamp
timestamp
  | Verbosity
verbosityError Verbosity -> Verbosity -> Bool
forall a. Ord a => a -> a -> Bool
>= Verbosity
verbosity = PlanT (Is a) x m () -> MachineT m (Is a) x
forall (m :: * -> *) (k :: * -> *) o a.
Monad m =>
PlanT k o m a -> MachineT m k o
construct (PlanT (Is a) x m () -> MachineT m (Is a) x)
-> PlanT (Is a) x m () -> MachineT m (Is a) x
forall a b. (a -> b) -> a -> b
$ Maybe a -> PlanT (Is a) x m ()
start Maybe a
forall a. Maybe a
Nothing
  | Bool
otherwise = MachineT m (Is a) x
forall (k :: * -> *) b (m :: * -> *). Monad m => MachineT m k b
stopped
 where
  start :: Maybe a -> PlanT (Is a) x m ()
start Maybe a
maybeOld =
    PlanT (Is a) x m a
Plan (Is a) x a
forall (k :: * -> * -> *) i o. Category k => Plan (k i) o i
await PlanT (Is a) x m a
-> (a -> PlanT (Is a) x m ()) -> PlanT (Is a) x m ()
forall a b.
PlanT (Is a) x m a
-> (a -> PlanT (Is a) x m b) -> PlanT (Is a) x m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \a
new ->
      case Maybe a
maybeOld of
        Just a
old
          | a -> Timestamp
timestamp a
new Timestamp -> Timestamp -> Bool
forall a. Ord a => a -> a -> Bool
< a -> Timestamp
timestamp a
old -> do
              Verbosity -> Text -> PlanT (Is a) x m ()
forall (m :: * -> *).
(HasCallStack, MonadIO m) =>
Verbosity -> Text -> m ()
logError Verbosity
verbosity (Text -> PlanT (Is a) x m ())
-> (String -> Text) -> String -> PlanT (Is a) x m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack (String -> PlanT (Is a) x m ()) -> String -> PlanT (Is a) x m ()
forall a b. (a -> b) -> a -> b
$
                String
"Encountered two out-of-order inputs.\n\
                \Did you pass --eventlog-flush-interval to the GHC RTS?\n\
                \Did you set --batch-interval to be at least as big as the value of --eventlog-flush-interval?"
              Verbosity -> Text -> PlanT (Is a) x m ()
forall (m :: * -> *).
(HasCallStack, MonadIO m) =>
Verbosity -> Text -> m ()
logDebug Verbosity
verbosity (Text -> PlanT (Is a) x m ())
-> (String -> Text) -> String -> PlanT (Is a) x m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack (String -> PlanT (Is a) x m ()) -> String -> PlanT (Is a) x m ()
forall a b. (a -> b) -> a -> b
$
                String -> String -> ShowS
forall r. PrintfType r => String -> r
printf
                  String
"Out-of-order inputs:\n\
                  \- %s\n\
                  \- %s"
                  (a -> String
forall a. Show a => a -> String
show a
old)
                  (a -> String
forall a. Show a => a -> String
show a
new)
              () -> PlanT (Is a) x m ()
forall a. a -> PlanT (Is a) x m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
        Maybe a
_otherwise -> do
          Maybe a -> PlanT (Is a) x m ()
start (a -> Maybe a
forall a. a -> Maybe a
Just a
new)