{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
module Eventium.ProjectionCache.Types
( ProjectionCache (..),
VersionedProjectionCache,
GlobalStreamProjectionCache,
runProjectionCacheUsing,
serializedProjectionCache,
getLatestVersionedProjectionWithCache,
getLatestGlobalProjectionWithCache,
updateProjectionCache,
updateGlobalProjectionCache,
)
where
import Eventium.Projection
import Eventium.Serializer
import Eventium.Store.Class
import Eventium.UUID
data ProjectionCache key position serialized m
= ProjectionCache
{
forall key position serialized (m :: * -> *).
ProjectionCache key position serialized m
-> key -> position -> serialized -> m ()
storeProjectionSnapshot :: key -> position -> serialized -> m (),
forall key position serialized (m :: * -> *).
ProjectionCache key position serialized m
-> key -> m (Maybe (position, serialized))
loadProjectionSnapshot :: key -> m (Maybe (position, serialized))
}
type VersionedProjectionCache serialized m = ProjectionCache UUID EventVersion serialized m
type GlobalStreamProjectionCache key serialized m = ProjectionCache key SequenceNumber serialized m
runProjectionCacheUsing ::
(Monad m, Monad mstore) =>
(forall a. mstore a -> m a) ->
ProjectionCache key position serialized mstore ->
ProjectionCache key position serialized m
runProjectionCacheUsing :: forall (m :: * -> *) (mstore :: * -> *) key position serialized.
(Monad m, Monad mstore) =>
(forall a. mstore a -> m a)
-> ProjectionCache key position serialized mstore
-> ProjectionCache key position serialized m
runProjectionCacheUsing forall a. mstore a -> m a
runCache ProjectionCache {key -> mstore (Maybe (position, serialized))
key -> position -> serialized -> mstore ()
storeProjectionSnapshot :: forall key position serialized (m :: * -> *).
ProjectionCache key position serialized m
-> key -> position -> serialized -> m ()
loadProjectionSnapshot :: forall key position serialized (m :: * -> *).
ProjectionCache key position serialized m
-> key -> m (Maybe (position, serialized))
storeProjectionSnapshot :: key -> position -> serialized -> mstore ()
loadProjectionSnapshot :: key -> mstore (Maybe (position, serialized))
..} =
ProjectionCache
{ storeProjectionSnapshot :: key -> position -> serialized -> m ()
storeProjectionSnapshot = \key
uuid position
version serialized
state -> mstore () -> m ()
forall a. mstore a -> m a
runCache (mstore () -> m ()) -> mstore () -> m ()
forall a b. (a -> b) -> a -> b
$ key -> position -> serialized -> mstore ()
storeProjectionSnapshot key
uuid position
version serialized
state,
loadProjectionSnapshot :: key -> m (Maybe (position, serialized))
loadProjectionSnapshot = mstore (Maybe (position, serialized))
-> m (Maybe (position, serialized))
forall a. mstore a -> m a
runCache (mstore (Maybe (position, serialized))
-> m (Maybe (position, serialized)))
-> (key -> mstore (Maybe (position, serialized)))
-> key
-> m (Maybe (position, serialized))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. key -> mstore (Maybe (position, serialized))
loadProjectionSnapshot
}
serializedProjectionCache ::
(Monad m) =>
Serializer state serialized ->
ProjectionCache key position serialized m ->
ProjectionCache key position state m
serializedProjectionCache :: forall (m :: * -> *) state serialized key position.
Monad m =>
Serializer state serialized
-> ProjectionCache key position serialized m
-> ProjectionCache key position state m
serializedProjectionCache Serializer {state -> serialized
serialized -> Maybe state
serialized -> Either String state
serialize :: state -> serialized
deserialize :: serialized -> Maybe state
deserializeEither :: serialized -> Either String state
serialize :: forall a b. Serializer a b -> a -> b
deserialize :: forall a b. Serializer a b -> b -> Maybe a
deserializeEither :: forall a b. Serializer a b -> b -> Either String a
..} ProjectionCache {key -> m (Maybe (position, serialized))
key -> position -> serialized -> m ()
storeProjectionSnapshot :: forall key position serialized (m :: * -> *).
ProjectionCache key position serialized m
-> key -> position -> serialized -> m ()
loadProjectionSnapshot :: forall key position serialized (m :: * -> *).
ProjectionCache key position serialized m
-> key -> m (Maybe (position, serialized))
storeProjectionSnapshot :: key -> position -> serialized -> m ()
loadProjectionSnapshot :: key -> m (Maybe (position, serialized))
..} =
(key -> position -> state -> m ())
-> (key -> m (Maybe (position, state)))
-> ProjectionCache key position state m
forall key position serialized (m :: * -> *).
(key -> position -> serialized -> m ())
-> (key -> m (Maybe (position, serialized)))
-> ProjectionCache key position serialized m
ProjectionCache key -> position -> state -> m ()
storeProjectionSnapshot' key -> m (Maybe (position, state))
loadProjectionSnapshot'
where
storeProjectionSnapshot' :: key -> position -> state -> m ()
storeProjectionSnapshot' key
uuid position
version = key -> position -> serialized -> m ()
storeProjectionSnapshot key
uuid position
version (serialized -> m ()) -> (state -> serialized) -> state -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. state -> serialized
serialize
loadProjectionSnapshot' :: key -> m (Maybe (position, state))
loadProjectionSnapshot' key
uuid = do
Maybe (position, serialized)
mState <- key -> m (Maybe (position, serialized))
loadProjectionSnapshot key
uuid
Maybe (position, state) -> m (Maybe (position, state))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (position, state) -> m (Maybe (position, state)))
-> Maybe (position, state) -> m (Maybe (position, state))
forall a b. (a -> b) -> a -> b
$ Maybe (position, serialized)
mState Maybe (position, serialized)
-> ((position, serialized) -> Maybe (position, state))
-> Maybe (position, state)
forall a b. Maybe a -> (a -> Maybe b) -> Maybe b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (serialized -> Maybe state)
-> (position, serialized) -> Maybe (position, state)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> (position, a) -> f (position, b)
traverse serialized -> Maybe state
deserialize
getLatestVersionedProjectionWithCache ::
(Monad m) =>
VersionedEventStoreReader m event ->
VersionedProjectionCache state m ->
VersionedStreamProjection state event ->
m (VersionedStreamProjection state event)
getLatestVersionedProjectionWithCache :: forall (m :: * -> *) event state.
Monad m =>
VersionedEventStoreReader m event
-> VersionedProjectionCache state m
-> VersionedStreamProjection state event
-> m (VersionedStreamProjection state event)
getLatestVersionedProjectionWithCache VersionedEventStoreReader m event
store VersionedProjectionCache state m
cache VersionedStreamProjection state event
projection =
VersionedProjectionCache state m
-> VersionedStreamProjection state event
-> UUID
-> m (VersionedStreamProjection state event)
forall (m :: * -> *) position key state projKey event.
(Monad m, Ord position) =>
ProjectionCache key position state m
-> StreamProjection projKey position state event
-> key
-> m (StreamProjection projKey position state event)
getLatestProjectionWithCache' VersionedProjectionCache state m
cache VersionedStreamProjection state event
projection (VersionedStreamProjection state event -> UUID
forall key position state event.
StreamProjection key position state event -> key
streamProjectionKey VersionedStreamProjection state event
projection) m (VersionedStreamProjection state event)
-> (VersionedStreamProjection state event
-> m (VersionedStreamProjection state event))
-> m (VersionedStreamProjection state event)
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= VersionedEventStoreReader m event
-> VersionedStreamProjection state event
-> m (VersionedStreamProjection state event)
forall (m :: * -> *) position key event state.
(Monad m, Num position) =>
EventStoreReader key position m (StreamEvent key position event)
-> StreamProjection key position state event
-> m (StreamProjection key position state event)
getLatestStreamProjection VersionedEventStoreReader m event
store
getLatestGlobalProjectionWithCache ::
(Monad m) =>
GlobalEventStoreReader m event ->
GlobalStreamProjectionCache key state m ->
GlobalStreamProjection state event ->
key ->
m (GlobalStreamProjection state event)
getLatestGlobalProjectionWithCache :: forall (m :: * -> *) event key state.
Monad m =>
GlobalEventStoreReader m event
-> GlobalStreamProjectionCache key state m
-> GlobalStreamProjection state event
-> key
-> m (GlobalStreamProjection state event)
getLatestGlobalProjectionWithCache GlobalEventStoreReader m event
store GlobalStreamProjectionCache key state m
cache GlobalStreamProjection state event
projection key
key =
GlobalStreamProjectionCache key state m
-> GlobalStreamProjection state event
-> key
-> m (GlobalStreamProjection state event)
forall (m :: * -> *) position key state projKey event.
(Monad m, Ord position) =>
ProjectionCache key position state m
-> StreamProjection projKey position state event
-> key
-> m (StreamProjection projKey position state event)
getLatestProjectionWithCache' GlobalStreamProjectionCache key state m
cache GlobalStreamProjection state event
projection key
key m (GlobalStreamProjection state event)
-> (GlobalStreamProjection state event
-> m (GlobalStreamProjection state event))
-> m (GlobalStreamProjection state event)
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= GlobalEventStoreReader m event
-> GlobalStreamProjection state event
-> m (GlobalStreamProjection state event)
forall (m :: * -> *) position key event state.
(Monad m, Num position) =>
EventStoreReader key position m (StreamEvent key position event)
-> StreamProjection key position state event
-> m (StreamProjection key position state event)
getLatestStreamProjection GlobalEventStoreReader m event
store
getLatestProjectionWithCache' ::
(Monad m, Ord position) =>
ProjectionCache key position state m ->
StreamProjection projKey position state event ->
key ->
m (StreamProjection projKey position state event)
getLatestProjectionWithCache' :: forall (m :: * -> *) position key state projKey event.
(Monad m, Ord position) =>
ProjectionCache key position state m
-> StreamProjection projKey position state event
-> key
-> m (StreamProjection projKey position state event)
getLatestProjectionWithCache' ProjectionCache key position state m
cache StreamProjection projKey position state event
projection key
key = do
Maybe (position, state)
mLatestState <- ProjectionCache key position state m
-> key -> m (Maybe (position, state))
forall key position serialized (m :: * -> *).
ProjectionCache key position serialized m
-> key -> m (Maybe (position, serialized))
loadProjectionSnapshot ProjectionCache key position state m
cache key
key
let mkProjection' :: (position, state) -> StreamProjection projKey position state event
mkProjection' (position
position, state
state) =
if position
position position -> position -> Bool
forall a. Ord a => a -> a -> Bool
> StreamProjection projKey position state event -> position
forall key position state event.
StreamProjection key position state event -> position
streamProjectionPosition StreamProjection projKey position state event
projection
then
StreamProjection projKey position state event
projection
{ streamProjectionPosition = position,
streamProjectionState = state
}
else StreamProjection projKey position state event
projection
StreamProjection projKey position state event
-> m (StreamProjection projKey position state event)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (StreamProjection projKey position state event
-> m (StreamProjection projKey position state event))
-> StreamProjection projKey position state event
-> m (StreamProjection projKey position state event)
forall a b. (a -> b) -> a -> b
$ StreamProjection projKey position state event
-> ((position, state)
-> StreamProjection projKey position state event)
-> Maybe (position, state)
-> StreamProjection projKey position state event
forall b a. b -> (a -> b) -> Maybe a -> b
maybe StreamProjection projKey position state event
projection (position, state) -> StreamProjection projKey position state event
mkProjection' Maybe (position, state)
mLatestState
updateProjectionCache ::
(Monad m) =>
VersionedEventStoreReader m event ->
VersionedProjectionCache state m ->
VersionedStreamProjection state event ->
m ()
updateProjectionCache :: forall (m :: * -> *) event state.
Monad m =>
VersionedEventStoreReader m event
-> VersionedProjectionCache state m
-> VersionedStreamProjection state event
-> m ()
updateProjectionCache VersionedEventStoreReader m event
reader VersionedProjectionCache state m
cache VersionedStreamProjection state event
projection = do
StreamProjection {state
UUID
EventVersion
Projection state event
streamProjectionKey :: forall key position state event.
StreamProjection key position state event -> key
streamProjectionPosition :: forall key position state event.
StreamProjection key position state event -> position
streamProjectionState :: forall key position state event.
StreamProjection key position state event -> state
streamProjectionKey :: UUID
streamProjectionPosition :: EventVersion
streamProjectionProjection :: Projection state event
streamProjectionState :: state
streamProjectionProjection :: forall key position state event.
StreamProjection key position state event -> Projection state event
..} <- VersionedEventStoreReader m event
-> VersionedProjectionCache state m
-> VersionedStreamProjection state event
-> m (VersionedStreamProjection state event)
forall (m :: * -> *) event state.
Monad m =>
VersionedEventStoreReader m event
-> VersionedProjectionCache state m
-> VersionedStreamProjection state event
-> m (VersionedStreamProjection state event)
getLatestVersionedProjectionWithCache VersionedEventStoreReader m event
reader VersionedProjectionCache state m
cache VersionedStreamProjection state event
projection
VersionedProjectionCache state m
-> UUID -> EventVersion -> state -> m ()
forall key position serialized (m :: * -> *).
ProjectionCache key position serialized m
-> key -> position -> serialized -> m ()
storeProjectionSnapshot VersionedProjectionCache state m
cache UUID
streamProjectionKey EventVersion
streamProjectionPosition state
streamProjectionState
updateGlobalProjectionCache ::
(Monad m) =>
GlobalEventStoreReader m event ->
GlobalStreamProjectionCache key state m ->
GlobalStreamProjection state event ->
key ->
m ()
updateGlobalProjectionCache :: forall (m :: * -> *) event key state.
Monad m =>
GlobalEventStoreReader m event
-> GlobalStreamProjectionCache key state m
-> GlobalStreamProjection state event
-> key
-> m ()
updateGlobalProjectionCache GlobalEventStoreReader m event
reader GlobalStreamProjectionCache key state m
cache GlobalStreamProjection state event
projection key
key = do
StreamProjection {state
()
SequenceNumber
Projection state (VersionedStreamEvent event)
streamProjectionKey :: forall key position state event.
StreamProjection key position state event -> key
streamProjectionPosition :: forall key position state event.
StreamProjection key position state event -> position
streamProjectionState :: forall key position state event.
StreamProjection key position state event -> state
streamProjectionProjection :: forall key position state event.
StreamProjection key position state event -> Projection state event
streamProjectionKey :: ()
streamProjectionPosition :: SequenceNumber
streamProjectionProjection :: Projection state (VersionedStreamEvent event)
streamProjectionState :: state
..} <- GlobalEventStoreReader m event
-> GlobalStreamProjectionCache key state m
-> GlobalStreamProjection state event
-> key
-> m (GlobalStreamProjection state event)
forall (m :: * -> *) event key state.
Monad m =>
GlobalEventStoreReader m event
-> GlobalStreamProjectionCache key state m
-> GlobalStreamProjection state event
-> key
-> m (GlobalStreamProjection state event)
getLatestGlobalProjectionWithCache GlobalEventStoreReader m event
reader GlobalStreamProjectionCache key state m
cache GlobalStreamProjection state event
projection key
key
GlobalStreamProjectionCache key state m
-> key -> SequenceNumber -> state -> m ()
forall key position serialized (m :: * -> *).
ProjectionCache key position serialized m
-> key -> position -> serialized -> m ()
storeProjectionSnapshot GlobalStreamProjectionCache key state m
cache key
key SequenceNumber
streamProjectionPosition state
streamProjectionState