{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}

module Eventium.ProjectionCache.Types
  ( ProjectionCache (..),
    VersionedProjectionCache,
    GlobalStreamProjectionCache,
    runProjectionCacheUsing,
    serializedProjectionCache,
    getLatestVersionedProjectionWithCache,
    getLatestGlobalProjectionWithCache,
    updateProjectionCache,
    updateGlobalProjectionCache,
  )
where

import Eventium.Projection
import Eventium.Serializer
import Eventium.Store.Class
import Eventium.UUID

-- | A 'ProjectionCache' caches snapshots of 'Projection's in event streams.
-- This is useful if your event streams are very large. This cache operates on
-- some 'Monad' @m@ and stores the 'Projection' state of type @serialized@.
--
-- At its core, this is essentially just a key-value store with knowledge of
-- the stream 'UUID' and 'EventVersion'. It is recommended to use the other
-- helper functions in this module to interpret the stored values using a
-- 'Projection'.
--
-- The @key@ and @position@ type parameters are polymorphic so we can abstract
-- over a cache for individual event streams, and a cache for globally ordered
-- streams.
data ProjectionCache key position serialized m
  = ProjectionCache
  { -- | Stores the state for a projection at a given @key@ and @position@.
    -- This is pretty unsafe, because there is no guarantee what is stored is
    -- actually derived from the events in the stream. Consider using
    -- 'updateProjectionCache'.
    forall key position serialized (m :: * -> *).
ProjectionCache key position serialized m
-> key -> position -> serialized -> m ()
storeProjectionSnapshot :: key -> position -> serialized -> m (),
    -- | Loads the latest projection state from the cache.
    forall key position serialized (m :: * -> *).
ProjectionCache key position serialized m
-> key -> m (Maybe (position, serialized))
loadProjectionSnapshot :: key -> m (Maybe (position, serialized))
  }

-- | Type synonym for a 'ProjectionCache' used on individual event streams.
type VersionedProjectionCache serialized m = ProjectionCache UUID EventVersion serialized m

-- | Type synonym for a 'ProjectionCache' that is used in conjunction with a
-- 'GlobalStreamEventStore'.
type GlobalStreamProjectionCache key serialized m = ProjectionCache key SequenceNumber serialized m

-- | Changes the monad a 'ProjectionCache' runs in. This is useful to run the
-- cache in another 'Monad' while forgetting the original 'Monad'.
runProjectionCacheUsing ::
  (Monad m, Monad mstore) =>
  (forall a. mstore a -> m a) ->
  ProjectionCache key position serialized mstore ->
  ProjectionCache key position serialized m
runProjectionCacheUsing :: forall (m :: * -> *) (mstore :: * -> *) key position serialized.
(Monad m, Monad mstore) =>
(forall a. mstore a -> m a)
-> ProjectionCache key position serialized mstore
-> ProjectionCache key position serialized m
runProjectionCacheUsing forall a. mstore a -> m a
runCache ProjectionCache {key -> mstore (Maybe (position, serialized))
key -> position -> serialized -> mstore ()
storeProjectionSnapshot :: forall key position serialized (m :: * -> *).
ProjectionCache key position serialized m
-> key -> position -> serialized -> m ()
loadProjectionSnapshot :: forall key position serialized (m :: * -> *).
ProjectionCache key position serialized m
-> key -> m (Maybe (position, serialized))
storeProjectionSnapshot :: key -> position -> serialized -> mstore ()
loadProjectionSnapshot :: key -> mstore (Maybe (position, serialized))
..} =
  ProjectionCache
    { storeProjectionSnapshot :: key -> position -> serialized -> m ()
storeProjectionSnapshot = \key
uuid position
version serialized
state -> mstore () -> m ()
forall a. mstore a -> m a
runCache (mstore () -> m ()) -> mstore () -> m ()
forall a b. (a -> b) -> a -> b
$ key -> position -> serialized -> mstore ()
storeProjectionSnapshot key
uuid position
version serialized
state,
      loadProjectionSnapshot :: key -> m (Maybe (position, serialized))
loadProjectionSnapshot = mstore (Maybe (position, serialized))
-> m (Maybe (position, serialized))
forall a. mstore a -> m a
runCache (mstore (Maybe (position, serialized))
 -> m (Maybe (position, serialized)))
-> (key -> mstore (Maybe (position, serialized)))
-> key
-> m (Maybe (position, serialized))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. key -> mstore (Maybe (position, serialized))
loadProjectionSnapshot
    }

-- | Wraps a 'ProjectionCache' and transparently serializes/deserializes events for
-- you. Note that in this implementation deserialization errors when using
-- 'getEvents' are simply ignored (the event is not returned).
serializedProjectionCache ::
  (Monad m) =>
  Serializer state serialized ->
  ProjectionCache key position serialized m ->
  ProjectionCache key position state m
serializedProjectionCache :: forall (m :: * -> *) state serialized key position.
Monad m =>
Serializer state serialized
-> ProjectionCache key position serialized m
-> ProjectionCache key position state m
serializedProjectionCache Serializer {state -> serialized
serialized -> Maybe state
serialized -> Either String state
serialize :: state -> serialized
deserialize :: serialized -> Maybe state
deserializeEither :: serialized -> Either String state
serialize :: forall a b. Serializer a b -> a -> b
deserialize :: forall a b. Serializer a b -> b -> Maybe a
deserializeEither :: forall a b. Serializer a b -> b -> Either String a
..} ProjectionCache {key -> m (Maybe (position, serialized))
key -> position -> serialized -> m ()
storeProjectionSnapshot :: forall key position serialized (m :: * -> *).
ProjectionCache key position serialized m
-> key -> position -> serialized -> m ()
loadProjectionSnapshot :: forall key position serialized (m :: * -> *).
ProjectionCache key position serialized m
-> key -> m (Maybe (position, serialized))
storeProjectionSnapshot :: key -> position -> serialized -> m ()
loadProjectionSnapshot :: key -> m (Maybe (position, serialized))
..} =
  (key -> position -> state -> m ())
-> (key -> m (Maybe (position, state)))
-> ProjectionCache key position state m
forall key position serialized (m :: * -> *).
(key -> position -> serialized -> m ())
-> (key -> m (Maybe (position, serialized)))
-> ProjectionCache key position serialized m
ProjectionCache key -> position -> state -> m ()
storeProjectionSnapshot' key -> m (Maybe (position, state))
loadProjectionSnapshot'
  where
    storeProjectionSnapshot' :: key -> position -> state -> m ()
storeProjectionSnapshot' key
uuid position
version = key -> position -> serialized -> m ()
storeProjectionSnapshot key
uuid position
version (serialized -> m ()) -> (state -> serialized) -> state -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. state -> serialized
serialize
    loadProjectionSnapshot' :: key -> m (Maybe (position, state))
loadProjectionSnapshot' key
uuid = do
      Maybe (position, serialized)
mState <- key -> m (Maybe (position, serialized))
loadProjectionSnapshot key
uuid
      Maybe (position, state) -> m (Maybe (position, state))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (position, state) -> m (Maybe (position, state)))
-> Maybe (position, state) -> m (Maybe (position, state))
forall a b. (a -> b) -> a -> b
$ Maybe (position, serialized)
mState Maybe (position, serialized)
-> ((position, serialized) -> Maybe (position, state))
-> Maybe (position, state)
forall a b. Maybe a -> (a -> Maybe b) -> Maybe b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (serialized -> Maybe state)
-> (position, serialized) -> Maybe (position, state)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> (position, a) -> f (position, b)
traverse serialized -> Maybe state
deserialize

-- | Like 'getLatestVersionedProjection', but uses a 'ProjectionCache' if it contains
-- more recent state.
getLatestVersionedProjectionWithCache ::
  (Monad m) =>
  VersionedEventStoreReader m event ->
  VersionedProjectionCache state m ->
  VersionedStreamProjection state event ->
  m (VersionedStreamProjection state event)
getLatestVersionedProjectionWithCache :: forall (m :: * -> *) event state.
Monad m =>
VersionedEventStoreReader m event
-> VersionedProjectionCache state m
-> VersionedStreamProjection state event
-> m (VersionedStreamProjection state event)
getLatestVersionedProjectionWithCache VersionedEventStoreReader m event
store VersionedProjectionCache state m
cache VersionedStreamProjection state event
projection =
  VersionedProjectionCache state m
-> VersionedStreamProjection state event
-> UUID
-> m (VersionedStreamProjection state event)
forall (m :: * -> *) position key state projKey event.
(Monad m, Ord position) =>
ProjectionCache key position state m
-> StreamProjection projKey position state event
-> key
-> m (StreamProjection projKey position state event)
getLatestProjectionWithCache' VersionedProjectionCache state m
cache VersionedStreamProjection state event
projection (VersionedStreamProjection state event -> UUID
forall key position state event.
StreamProjection key position state event -> key
streamProjectionKey VersionedStreamProjection state event
projection) m (VersionedStreamProjection state event)
-> (VersionedStreamProjection state event
    -> m (VersionedStreamProjection state event))
-> m (VersionedStreamProjection state event)
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= VersionedEventStoreReader m event
-> VersionedStreamProjection state event
-> m (VersionedStreamProjection state event)
forall (m :: * -> *) position key event state.
(Monad m, Num position) =>
EventStoreReader key position m (StreamEvent key position event)
-> StreamProjection key position state event
-> m (StreamProjection key position state event)
getLatestStreamProjection VersionedEventStoreReader m event
store

-- | Like 'getLatestGlobalProjection', but uses a 'ProjectionCache' if it
-- contains more recent state.
getLatestGlobalProjectionWithCache ::
  (Monad m) =>
  GlobalEventStoreReader m event ->
  GlobalStreamProjectionCache key state m ->
  GlobalStreamProjection state event ->
  key ->
  m (GlobalStreamProjection state event)
getLatestGlobalProjectionWithCache :: forall (m :: * -> *) event key state.
Monad m =>
GlobalEventStoreReader m event
-> GlobalStreamProjectionCache key state m
-> GlobalStreamProjection state event
-> key
-> m (GlobalStreamProjection state event)
getLatestGlobalProjectionWithCache GlobalEventStoreReader m event
store GlobalStreamProjectionCache key state m
cache GlobalStreamProjection state event
projection key
key =
  GlobalStreamProjectionCache key state m
-> GlobalStreamProjection state event
-> key
-> m (GlobalStreamProjection state event)
forall (m :: * -> *) position key state projKey event.
(Monad m, Ord position) =>
ProjectionCache key position state m
-> StreamProjection projKey position state event
-> key
-> m (StreamProjection projKey position state event)
getLatestProjectionWithCache' GlobalStreamProjectionCache key state m
cache GlobalStreamProjection state event
projection key
key m (GlobalStreamProjection state event)
-> (GlobalStreamProjection state event
    -> m (GlobalStreamProjection state event))
-> m (GlobalStreamProjection state event)
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= GlobalEventStoreReader m event
-> GlobalStreamProjection state event
-> m (GlobalStreamProjection state event)
forall (m :: * -> *) position key event state.
(Monad m, Num position) =>
EventStoreReader key position m (StreamEvent key position event)
-> StreamProjection key position state event
-> m (StreamProjection key position state event)
getLatestStreamProjection GlobalEventStoreReader m event
store

getLatestProjectionWithCache' ::
  (Monad m, Ord position) =>
  ProjectionCache key position state m ->
  StreamProjection projKey position state event ->
  key ->
  m (StreamProjection projKey position state event)
getLatestProjectionWithCache' :: forall (m :: * -> *) position key state projKey event.
(Monad m, Ord position) =>
ProjectionCache key position state m
-> StreamProjection projKey position state event
-> key
-> m (StreamProjection projKey position state event)
getLatestProjectionWithCache' ProjectionCache key position state m
cache StreamProjection projKey position state event
projection key
key = do
  Maybe (position, state)
mLatestState <- ProjectionCache key position state m
-> key -> m (Maybe (position, state))
forall key position serialized (m :: * -> *).
ProjectionCache key position serialized m
-> key -> m (Maybe (position, serialized))
loadProjectionSnapshot ProjectionCache key position state m
cache key
key
  let mkProjection' :: (position, state) -> StreamProjection projKey position state event
mkProjection' (position
position, state
state) =
        if position
position position -> position -> Bool
forall a. Ord a => a -> a -> Bool
> StreamProjection projKey position state event -> position
forall key position state event.
StreamProjection key position state event -> position
streamProjectionPosition StreamProjection projKey position state event
projection
          then
            StreamProjection projKey position state event
projection
              { streamProjectionPosition = position,
                streamProjectionState = state
              }
          else StreamProjection projKey position state event
projection
  StreamProjection projKey position state event
-> m (StreamProjection projKey position state event)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (StreamProjection projKey position state event
 -> m (StreamProjection projKey position state event))
-> StreamProjection projKey position state event
-> m (StreamProjection projKey position state event)
forall a b. (a -> b) -> a -> b
$ StreamProjection projKey position state event
-> ((position, state)
    -> StreamProjection projKey position state event)
-> Maybe (position, state)
-> StreamProjection projKey position state event
forall b a. b -> (a -> b) -> Maybe a -> b
maybe StreamProjection projKey position state event
projection (position, state) -> StreamProjection projKey position state event
mkProjection' Maybe (position, state)
mLatestState

-- | Loads the latest projection state from the cache/store and stores this
-- value back into the projection cache.
updateProjectionCache ::
  (Monad m) =>
  VersionedEventStoreReader m event ->
  VersionedProjectionCache state m ->
  VersionedStreamProjection state event ->
  m ()
updateProjectionCache :: forall (m :: * -> *) event state.
Monad m =>
VersionedEventStoreReader m event
-> VersionedProjectionCache state m
-> VersionedStreamProjection state event
-> m ()
updateProjectionCache VersionedEventStoreReader m event
reader VersionedProjectionCache state m
cache VersionedStreamProjection state event
projection = do
  StreamProjection {state
UUID
EventVersion
Projection state event
streamProjectionKey :: forall key position state event.
StreamProjection key position state event -> key
streamProjectionPosition :: forall key position state event.
StreamProjection key position state event -> position
streamProjectionState :: forall key position state event.
StreamProjection key position state event -> state
streamProjectionKey :: UUID
streamProjectionPosition :: EventVersion
streamProjectionProjection :: Projection state event
streamProjectionState :: state
streamProjectionProjection :: forall key position state event.
StreamProjection key position state event -> Projection state event
..} <- VersionedEventStoreReader m event
-> VersionedProjectionCache state m
-> VersionedStreamProjection state event
-> m (VersionedStreamProjection state event)
forall (m :: * -> *) event state.
Monad m =>
VersionedEventStoreReader m event
-> VersionedProjectionCache state m
-> VersionedStreamProjection state event
-> m (VersionedStreamProjection state event)
getLatestVersionedProjectionWithCache VersionedEventStoreReader m event
reader VersionedProjectionCache state m
cache VersionedStreamProjection state event
projection
  VersionedProjectionCache state m
-> UUID -> EventVersion -> state -> m ()
forall key position serialized (m :: * -> *).
ProjectionCache key position serialized m
-> key -> position -> serialized -> m ()
storeProjectionSnapshot VersionedProjectionCache state m
cache UUID
streamProjectionKey EventVersion
streamProjectionPosition state
streamProjectionState

-- | Analog of 'updateProjectionCache' for a 'GlobalStreamProjectionCache'.
updateGlobalProjectionCache ::
  (Monad m) =>
  GlobalEventStoreReader m event ->
  GlobalStreamProjectionCache key state m ->
  GlobalStreamProjection state event ->
  key ->
  m ()
updateGlobalProjectionCache :: forall (m :: * -> *) event key state.
Monad m =>
GlobalEventStoreReader m event
-> GlobalStreamProjectionCache key state m
-> GlobalStreamProjection state event
-> key
-> m ()
updateGlobalProjectionCache GlobalEventStoreReader m event
reader GlobalStreamProjectionCache key state m
cache GlobalStreamProjection state event
projection key
key = do
  StreamProjection {state
()
SequenceNumber
Projection state (VersionedStreamEvent event)
streamProjectionKey :: forall key position state event.
StreamProjection key position state event -> key
streamProjectionPosition :: forall key position state event.
StreamProjection key position state event -> position
streamProjectionState :: forall key position state event.
StreamProjection key position state event -> state
streamProjectionProjection :: forall key position state event.
StreamProjection key position state event -> Projection state event
streamProjectionKey :: ()
streamProjectionPosition :: SequenceNumber
streamProjectionProjection :: Projection state (VersionedStreamEvent event)
streamProjectionState :: state
..} <- GlobalEventStoreReader m event
-> GlobalStreamProjectionCache key state m
-> GlobalStreamProjection state event
-> key
-> m (GlobalStreamProjection state event)
forall (m :: * -> *) event key state.
Monad m =>
GlobalEventStoreReader m event
-> GlobalStreamProjectionCache key state m
-> GlobalStreamProjection state event
-> key
-> m (GlobalStreamProjection state event)
getLatestGlobalProjectionWithCache GlobalEventStoreReader m event
reader GlobalStreamProjectionCache key state m
cache GlobalStreamProjection state event
projection key
key
  GlobalStreamProjectionCache key state m
-> key -> SequenceNumber -> state -> m ()
forall key position serialized (m :: * -> *).
ProjectionCache key position serialized m
-> key -> position -> serialized -> m ()
storeProjectionSnapshot GlobalStreamProjectionCache key state m
cache key
key SequenceNumber
streamProjectionPosition state
streamProjectionState