{-# LANGUAGE RecordWildCards #-}
module Eventium.Projection
( Projection (..),
latestProjection,
allProjections,
StreamProjection (..),
VersionedStreamProjection,
GlobalStreamProjection,
streamProjection,
versionedStreamProjection,
globalStreamProjection,
streamProjectionEventHandler,
getLatestStreamProjection,
serializedProjection,
projectionMapMaybe,
)
where
import Data.Foldable (foldl')
import Data.Functor.Contravariant
import Data.List (scanl')
import Eventium.Serializer
import Eventium.Store.Class
import Eventium.UUID
data Projection state event
= Projection
{
forall state event. Projection state event -> state
projectionSeed :: state,
forall state event.
Projection state event -> state -> event -> state
projectionEventHandler :: state -> event -> state
}
instance Contravariant (Projection state) where
contramap :: forall a' a. (a' -> a) -> Projection state a -> Projection state a'
contramap a' -> a
f (Projection state
seed state -> a -> state
handler) = state -> (state -> a' -> state) -> Projection state a'
forall state event.
state -> (state -> event -> state) -> Projection state event
Projection state
seed state -> a' -> state
handler'
where
handler' :: state -> a' -> state
handler' state
state a'
event = state -> a -> state
handler state
state (a' -> a
f a'
event)
latestProjection :: (Foldable t) => Projection state event -> t event -> state
latestProjection :: forall (t :: * -> *) state event.
Foldable t =>
Projection state event -> t event -> state
latestProjection (Projection state
seed state -> event -> state
handler) = (state -> event -> state) -> state -> t event -> state
forall b a. (b -> a -> b) -> b -> t a -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' state -> event -> state
handler state
seed
allProjections :: Projection state event -> [event] -> [state]
allProjections :: forall state event. Projection state event -> [event] -> [state]
allProjections (Projection state
seed state -> event -> state
handler) = (state -> event -> state) -> state -> [event] -> [state]
forall b a. (b -> a -> b) -> b -> [a] -> [b]
scanl' state -> event -> state
handler state
seed
data StreamProjection key position state event
= StreamProjection
{ forall key position state event.
StreamProjection key position state event -> key
streamProjectionKey :: !key,
forall key position state event.
StreamProjection key position state event -> position
streamProjectionPosition :: !position,
forall key position state event.
StreamProjection key position state event -> Projection state event
streamProjectionProjection :: !(Projection state event),
forall key position state event.
StreamProjection key position state event -> state
streamProjectionState :: !state
}
type VersionedStreamProjection = StreamProjection UUID EventVersion
type GlobalStreamProjection state event = StreamProjection () SequenceNumber state (VersionedStreamEvent event)
streamProjection ::
key ->
position ->
Projection state event ->
StreamProjection key position state event
streamProjection :: forall key position state event.
key
-> position
-> Projection state event
-> StreamProjection key position state event
streamProjection key
key position
position projection :: Projection state event
projection@Projection {state
state -> event -> state
projectionSeed :: forall state event. Projection state event -> state
projectionEventHandler :: forall state event.
Projection state event -> state -> event -> state
projectionSeed :: state
projectionEventHandler :: state -> event -> state
..} =
key
-> position
-> Projection state event
-> state
-> StreamProjection key position state event
forall key position state event.
key
-> position
-> Projection state event
-> state
-> StreamProjection key position state event
StreamProjection key
key position
position Projection state event
projection state
projectionSeed
versionedStreamProjection ::
UUID ->
Projection state event ->
VersionedStreamProjection state event
versionedStreamProjection :: forall state event.
UUID
-> Projection state event -> VersionedStreamProjection state event
versionedStreamProjection UUID
uuid = UUID
-> EventVersion
-> Projection state event
-> StreamProjection UUID EventVersion state event
forall key position state event.
key
-> position
-> Projection state event
-> StreamProjection key position state event
streamProjection UUID
uuid (-EventVersion
1)
globalStreamProjection ::
Projection state (VersionedStreamEvent event) ->
GlobalStreamProjection state event
globalStreamProjection :: forall state event.
Projection state (VersionedStreamEvent event)
-> GlobalStreamProjection state event
globalStreamProjection = ()
-> SequenceNumber
-> Projection state (VersionedStreamEvent event)
-> StreamProjection
() SequenceNumber state (VersionedStreamEvent event)
forall key position state event.
key
-> position
-> Projection state event
-> StreamProjection key position state event
streamProjection () SequenceNumber
0
streamProjectionEventHandler ::
StreamProjection key position state event ->
StreamEvent eventKey position event ->
StreamProjection key position state event
streamProjectionEventHandler :: forall key position state event eventKey.
StreamProjection key position state event
-> StreamEvent eventKey position event
-> StreamProjection key position state event
streamProjectionEventHandler StreamProjection {key
position
state
Projection state event
streamProjectionKey :: forall key position state event.
StreamProjection key position state event -> key
streamProjectionPosition :: forall key position state event.
StreamProjection key position state event -> position
streamProjectionProjection :: forall key position state event.
StreamProjection key position state event -> Projection state event
streamProjectionState :: forall key position state event.
StreamProjection key position state event -> state
streamProjectionKey :: key
streamProjectionPosition :: position
streamProjectionProjection :: Projection state event
streamProjectionState :: state
..} StreamEvent eventKey position event
event =
let Projection {state
state -> event -> state
projectionSeed :: forall state event. Projection state event -> state
projectionEventHandler :: forall state event.
Projection state event -> state -> event -> state
projectionSeed :: state
projectionEventHandler :: state -> event -> state
..} = Projection state event
streamProjectionProjection
position' :: position
position' = StreamEvent eventKey position event -> position
forall key position event.
StreamEvent key position event -> position
streamEventPosition StreamEvent eventKey position event
event
state' :: state
state' = state -> event -> state
projectionEventHandler state
streamProjectionState (StreamEvent eventKey position event -> event
forall key position event. StreamEvent key position event -> event
streamEventEvent StreamEvent eventKey position event
event)
in key
-> position
-> Projection state event
-> state
-> StreamProjection key position state event
forall key position state event.
key
-> position
-> Projection state event
-> state
-> StreamProjection key position state event
StreamProjection key
streamProjectionKey position
position' Projection state event
streamProjectionProjection state
state'
getLatestStreamProjection ::
(Monad m, Num position) =>
EventStoreReader key position m (StreamEvent key position event) ->
StreamProjection key position state event ->
m (StreamProjection key position state event)
getLatestStreamProjection :: forall (m :: * -> *) position key event state.
(Monad m, Num position) =>
EventStoreReader key position m (StreamEvent key position event)
-> StreamProjection key position state event
-> m (StreamProjection key position state event)
getLatestStreamProjection (EventStoreReader QueryRange key position -> m [StreamEvent key position event]
getEvents') projection :: StreamProjection key position state event
projection@StreamProjection {position
key
state
Projection state event
streamProjectionKey :: forall key position state event.
StreamProjection key position state event -> key
streamProjectionPosition :: forall key position state event.
StreamProjection key position state event -> position
streamProjectionProjection :: forall key position state event.
StreamProjection key position state event -> Projection state event
streamProjectionState :: forall key position state event.
StreamProjection key position state event -> state
streamProjectionKey :: key
streamProjectionPosition :: position
streamProjectionProjection :: Projection state event
streamProjectionState :: state
..} = do
[StreamEvent key position event]
events <- QueryRange key position -> m [StreamEvent key position event]
getEvents' (key -> position -> QueryRange key position
forall key position. key -> position -> QueryRange key position
eventsStartingAt key
streamProjectionKey (position -> QueryRange key position)
-> position -> QueryRange key position
forall a b. (a -> b) -> a -> b
$ position
streamProjectionPosition position -> position -> position
forall a. Num a => a -> a -> a
+ position
1)
StreamProjection key position state event
-> m (StreamProjection key position state event)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (StreamProjection key position state event
-> m (StreamProjection key position state event))
-> StreamProjection key position state event
-> m (StreamProjection key position state event)
forall a b. (a -> b) -> a -> b
$ (StreamProjection key position state event
-> StreamEvent key position event
-> StreamProjection key position state event)
-> StreamProjection key position state event
-> [StreamEvent key position event]
-> StreamProjection key position state event
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' StreamProjection key position state event
-> StreamEvent key position event
-> StreamProjection key position state event
forall key position state event eventKey.
StreamProjection key position state event
-> StreamEvent eventKey position event
-> StreamProjection key position state event
streamProjectionEventHandler StreamProjection key position state event
projection [StreamEvent key position event]
events
serializedProjection ::
Projection state event ->
Serializer event serialized ->
Projection state serialized
serializedProjection :: forall state event serialized.
Projection state event
-> Serializer event serialized -> Projection state serialized
serializedProjection Projection state event
proj Serializer {event -> serialized
serialized -> Maybe event
serialized -> Either String event
serialize :: event -> serialized
deserialize :: serialized -> Maybe event
deserializeEither :: serialized -> Either String event
deserializeEither :: forall a b. Serializer a b -> b -> Either String a
deserialize :: forall a b. Serializer a b -> b -> Maybe a
serialize :: forall a b. Serializer a b -> a -> b
..} = (serialized -> Maybe event)
-> Projection state event -> Projection state serialized
forall eventB eventA state.
(eventB -> Maybe eventA)
-> Projection state eventA -> Projection state eventB
projectionMapMaybe serialized -> Maybe event
deserialize Projection state event
proj
projectionMapMaybe ::
(eventB -> Maybe eventA) ->
Projection state eventA ->
Projection state eventB
projectionMapMaybe :: forall eventB eventA state.
(eventB -> Maybe eventA)
-> Projection state eventA -> Projection state eventB
projectionMapMaybe eventB -> Maybe eventA
f (Projection state
seed state -> eventA -> state
handler) = state -> (state -> eventB -> state) -> Projection state eventB
forall state event.
state -> (state -> event -> state) -> Projection state event
Projection state
seed state -> eventB -> state
handler'
where
handler' :: state -> eventB -> state
handler' state
state = state -> (eventA -> state) -> Maybe eventA -> state
forall b a. b -> (a -> b) -> Maybe a -> b
maybe state
state (state -> eventA -> state
handler state
state) (Maybe eventA -> state)
-> (eventB -> Maybe eventA) -> eventB -> state
forall b c a. (b -> c) -> (a -> b) -> a -> c
. eventB -> Maybe eventA
f