{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
module Eventium.ReadModel.Class
( ReadModel (..),
runPollingReadModel,
)
where
import Control.Concurrent (threadDelay)
import Control.Monad (forever)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Eventium.Store.Class
data ReadModel model serialized m
= ReadModel
{ forall model serialized (m :: * -> *).
ReadModel model serialized m -> model
readModelModel :: model,
forall model serialized (m :: * -> *).
ReadModel model serialized m -> model -> m SequenceNumber
readModelLatestAppliedSequence :: model -> m SequenceNumber,
forall model serialized (m :: * -> *).
ReadModel model serialized m
-> model -> [GlobalStreamEvent serialized] -> m ()
readModelHandleEvents :: model -> [GlobalStreamEvent serialized] -> m ()
}
type PollingPeriodSeconds = Double
runPollingReadModel ::
(MonadIO m, Monad mstore) =>
ReadModel model serialized m ->
GlobalEventStoreReader mstore serialized ->
(forall a. mstore a -> m a) ->
PollingPeriodSeconds ->
m ()
runPollingReadModel :: forall (m :: * -> *) (mstore :: * -> *) model serialized.
(MonadIO m, Monad mstore) =>
ReadModel model serialized m
-> GlobalEventStoreReader mstore serialized
-> (forall a. mstore a -> m a)
-> PollingPeriodSeconds
-> m ()
runPollingReadModel ReadModel {model
model -> m SequenceNumber
model -> [GlobalStreamEvent serialized] -> m ()
readModelModel :: forall model serialized (m :: * -> *).
ReadModel model serialized m -> model
readModelLatestAppliedSequence :: forall model serialized (m :: * -> *).
ReadModel model serialized m -> model -> m SequenceNumber
readModelHandleEvents :: forall model serialized (m :: * -> *).
ReadModel model serialized m
-> model -> [GlobalStreamEvent serialized] -> m ()
readModelModel :: model
readModelLatestAppliedSequence :: model -> m SequenceNumber
readModelHandleEvents :: model -> [GlobalStreamEvent serialized] -> m ()
..} GlobalEventStoreReader mstore serialized
globalReader forall a. mstore a -> m a
runStore PollingPeriodSeconds
waitSeconds = m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
SequenceNumber
latestSeq <- model -> m SequenceNumber
readModelLatestAppliedSequence model
readModelModel
[GlobalStreamEvent serialized]
newEvents <- mstore [GlobalStreamEvent serialized]
-> m [GlobalStreamEvent serialized]
forall a. mstore a -> m a
runStore (mstore [GlobalStreamEvent serialized]
-> m [GlobalStreamEvent serialized])
-> mstore [GlobalStreamEvent serialized]
-> m [GlobalStreamEvent serialized]
forall a b. (a -> b) -> a -> b
$ GlobalEventStoreReader mstore serialized
-> QueryRange () SequenceNumber
-> mstore [GlobalStreamEvent serialized]
forall key position (m :: * -> *) event.
EventStoreReader key position m event
-> QueryRange key position -> m [event]
getEvents GlobalEventStoreReader mstore serialized
globalReader (() -> SequenceNumber -> QueryRange () SequenceNumber
forall key position. key -> position -> QueryRange key position
eventsStartingAt () (SequenceNumber -> QueryRange () SequenceNumber)
-> SequenceNumber -> QueryRange () SequenceNumber
forall a b. (a -> b) -> a -> b
$ SequenceNumber
latestSeq SequenceNumber -> SequenceNumber -> SequenceNumber
forall a. Num a => a -> a -> a
+ SequenceNumber
1)
model -> [GlobalStreamEvent serialized] -> m ()
readModelHandleEvents model
readModelModel [GlobalStreamEvent serialized]
newEvents
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay (Int -> IO ()) -> Int -> IO ()
forall a b. (a -> b) -> a -> b
$ PollingPeriodSeconds -> Int
forall b. Integral b => PollingPeriodSeconds -> b
forall a b. (RealFrac a, Integral b) => a -> b
ceiling (PollingPeriodSeconds
waitSeconds PollingPeriodSeconds
-> PollingPeriodSeconds -> PollingPeriodSeconds
forall a. Num a => a -> a -> a
* PollingPeriodSeconds
1000000)