{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}

module Eventium.ReadModel.Class
  ( ReadModel (..),
    runPollingReadModel,
  )
where

import Control.Concurrent (threadDelay)
import Control.Monad (forever)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Eventium.Store.Class

data ReadModel model serialized m
  = ReadModel
  { forall model serialized (m :: * -> *).
ReadModel model serialized m -> model
readModelModel :: model,
    forall model serialized (m :: * -> *).
ReadModel model serialized m -> model -> m SequenceNumber
readModelLatestAppliedSequence :: model -> m SequenceNumber,
    forall model serialized (m :: * -> *).
ReadModel model serialized m
-> model -> [GlobalStreamEvent serialized] -> m ()
readModelHandleEvents :: model -> [GlobalStreamEvent serialized] -> m ()
  }

type PollingPeriodSeconds = Double

runPollingReadModel ::
  (MonadIO m, Monad mstore) =>
  ReadModel model serialized m ->
  GlobalEventStoreReader mstore serialized ->
  (forall a. mstore a -> m a) ->
  PollingPeriodSeconds ->
  m ()
runPollingReadModel :: forall (m :: * -> *) (mstore :: * -> *) model serialized.
(MonadIO m, Monad mstore) =>
ReadModel model serialized m
-> GlobalEventStoreReader mstore serialized
-> (forall a. mstore a -> m a)
-> PollingPeriodSeconds
-> m ()
runPollingReadModel ReadModel {model
model -> m SequenceNumber
model -> [GlobalStreamEvent serialized] -> m ()
readModelModel :: forall model serialized (m :: * -> *).
ReadModel model serialized m -> model
readModelLatestAppliedSequence :: forall model serialized (m :: * -> *).
ReadModel model serialized m -> model -> m SequenceNumber
readModelHandleEvents :: forall model serialized (m :: * -> *).
ReadModel model serialized m
-> model -> [GlobalStreamEvent serialized] -> m ()
readModelModel :: model
readModelLatestAppliedSequence :: model -> m SequenceNumber
readModelHandleEvents :: model -> [GlobalStreamEvent serialized] -> m ()
..} GlobalEventStoreReader mstore serialized
globalReader forall a. mstore a -> m a
runStore PollingPeriodSeconds
waitSeconds = m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
  -- Get new events starting from latest applied sequence number
  SequenceNumber
latestSeq <- model -> m SequenceNumber
readModelLatestAppliedSequence model
readModelModel
  [GlobalStreamEvent serialized]
newEvents <- mstore [GlobalStreamEvent serialized]
-> m [GlobalStreamEvent serialized]
forall a. mstore a -> m a
runStore (mstore [GlobalStreamEvent serialized]
 -> m [GlobalStreamEvent serialized])
-> mstore [GlobalStreamEvent serialized]
-> m [GlobalStreamEvent serialized]
forall a b. (a -> b) -> a -> b
$ GlobalEventStoreReader mstore serialized
-> QueryRange () SequenceNumber
-> mstore [GlobalStreamEvent serialized]
forall key position (m :: * -> *) event.
EventStoreReader key position m event
-> QueryRange key position -> m [event]
getEvents GlobalEventStoreReader mstore serialized
globalReader (() -> SequenceNumber -> QueryRange () SequenceNumber
forall key position. key -> position -> QueryRange key position
eventsStartingAt () (SequenceNumber -> QueryRange () SequenceNumber)
-> SequenceNumber -> QueryRange () SequenceNumber
forall a b. (a -> b) -> a -> b
$ SequenceNumber
latestSeq SequenceNumber -> SequenceNumber -> SequenceNumber
forall a. Num a => a -> a -> a
+ SequenceNumber
1)

  -- Handle the new events
  model -> [GlobalStreamEvent serialized] -> m ()
readModelHandleEvents model
readModelModel [GlobalStreamEvent serialized]
newEvents

  -- Wait before running again
  IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay (Int -> IO ()) -> Int -> IO ()
forall a b. (a -> b) -> a -> b
$ PollingPeriodSeconds -> Int
forall b. Integral b => PollingPeriodSeconds -> b
forall a b. (RealFrac a, Integral b) => a -> b
ceiling (PollingPeriodSeconds
waitSeconds PollingPeriodSeconds
-> PollingPeriodSeconds -> PollingPeriodSeconds
forall a. Num a => a -> a -> a
* PollingPeriodSeconds
1000000) -- threadDelay accepts microseconds

-- data EventHandler m serialized = forall event. (Serializable event serialized, Monad m) => EventHandler (event -> m ())

-- combineHandlers :: (Monad m) => [EventHandler m serialized] -> (serialized -> m ())
-- combineHandlers handlers event = mapM_ ($ event) (mkHandler <$> handlers)

-- mkHandler :: EventHandler m serialized -> (serialized -> m ())
-- mkHandler (EventHandler handler) event = maybe (return ()) handler (deserialize event)