Eventium Core
Core abstractions for building event-sourced applications in Haskell.
Overview
eventium-core is the foundational package of the Eventium framework. It defines storage-agnostic interfaces and pure types -- no database drivers, no I/O beyond what the caller provides via monad parameters.
Key Types
StreamEvent
data StreamEvent key position event = StreamEvent
{ key :: key
, position :: position
, metadata :: EventMetadata
, payload :: event
}
Every stored event carries a stream key, a position, metadata (event type, correlation/causation IDs, timestamp), and the domain payload.
EventStoreReader / EventStoreWriter
newtype EventStoreReader key position m event = EventStoreReader
{ getEvents :: QueryRange key position -> m [event] }
newtype EventStoreWriter key position m event = EventStoreWriter
{ storeEvents :: key -> ExpectedPosition position -> [event]
-> m (Either (EventWriteError position) EventVersion) }
Polymorphic over key, position, monad, and event types. runEventStoreReaderUsing / runEventStoreWriterUsing lift between monads. codecEventStoreReader / codecEventStoreWriter wrap with a Codec.
Projection
data Projection state event = Projection
{ seed :: state
, eventHandler :: state -> event -> state
}
Pure fold for rebuilding state from events. Used for both aggregates (write side) and read models (query side). getLatestStreamProjection loads events from a reader and applies them.
CommandHandler
data CommandHandler state event command err = CommandHandler
{ decide :: state -> command -> Either err [event]
, projection :: Projection state event
}
Validates a command against current state, returning either a domain error or new events. applyCommandHandler orchestrates the full load-decide-write cycle.
ProcessManager
data ProcessManager state event command = ProcessManager
{ projection :: Projection state (VersionedStreamEvent event)
, react :: state -> VersionedStreamEvent event
-> [ProcessManagerEffect command]
}
data ProcessManagerEffect command
= IssueCommand UUID command
| IssueCommandWithCompensation UUID command (Text -> [ProcessManagerEffect command])
Coordinates cross-aggregate workflows. react is pure; runProcessManagerEffects dispatches the resulting commands via a CommandDispatcher. IssueCommandWithCompensation triggers compensation effects when a command fails.
CommandDispatcher
newtype CommandDispatcher m command = CommandDispatcher
{ dispatchCommand :: UUID -> command -> m CommandDispatchResult }
data CommandDispatchResult = CommandSucceeded | CommandFailed Text
Routes commands to aggregates and reports outcomes. Construct with mkCommandDispatcher, or use fireAndForgetDispatcher for legacy callbacks. commandHandlerDispatcher (from Eventium.CommandDispatcher) builds a dispatcher from a list of AggregateHandlers for multi-aggregate routing.
processManagerEventHandler wires a ProcessManager to a global reader and dispatcher, producing a ready-to-use EventHandler.
ProjectionCache
data ProjectionCache key position encoded m = ProjectionCache
{ storeSnapshot :: key -> position -> encoded -> m ()
, loadSnapshot :: key -> m (Maybe (position, encoded))
}
Snapshot store for aggregate state, avoiding full event replay on every load. Two flavors:
- VersionedProjectionCache -- keyed by
UUID + EventVersion, one snapshot per aggregate instance.
- GlobalProjectionCache -- keyed by
() + SequenceNumber, singleton snapshot for global projections.
Wiring helpers:
snapshotEventHandler -- EventHandler that auto-updates a VersionedProjectionCache on each event. Compose with publishingEventStoreWriter for transparent snapshotting.
snapshotGlobalEventHandler -- same for GlobalProjectionCache.
applyCommandHandlerWithCache -- like applyCommandHandler but loads from cache and updates after write.
ReadModel
data ReadModel m event = ReadModel
{ initialize :: m ()
, eventHandler :: EventHandler m (GlobalStreamEvent event)
, checkpointStore :: CheckpointStore m SequenceNumber
, reset :: m ()
}
Abstraction for queryable persistent views driven by the global event stream. Users define their own schema and event handler; the library manages the event pipeline and checkpointing. ReadModels always consume the global stream (cross-aggregate views need total ordering).
runReadModel -- polling subscription that keeps the view updated (runs forever).
rebuildReadModel -- reset + replay all events (one-shot rebuild).
combineReadModels -- fan-out events to multiple read models.
EventHandler / EventPublisher / EventSubscription
- EventHandler -- composable event consumer (
Contravariant, Semigroup, Monoid).
- EventPublisher -- decouples post-write dispatch.
publishingEventStoreWriter wraps a writer for auto-publish.
- EventSubscription -- push-based delivery.
pollingSubscription polls the global stream with a CheckpointStore.
Codec
data Codec a b = Codec
{ encode :: a -> b
, decode :: b -> Maybe a
}
Value-level bidirectional conversion. Composable via composeCodecs. Eventium.TH generates sum-type codecs and JSON instances.
TypeEmbedding
data TypeEmbedding a b = TypeEmbedding
{ embed :: a -> b
, extract :: b -> Maybe a
}
Embeds one sum type into another (e.g. aggregate events into an application-wide event type). Separate from Codec to distinguish type-level subset relationships from wire-format encoding.
Modules
| Module |
Purpose |
Eventium.Store.Class |
Reader/Writer interfaces, monad lifting, codec wrappers |
Eventium.Store.Types |
StreamEvent, EventMetadata, EventVersion, SequenceNumber, ExpectedPosition |
Eventium.Store.Queries |
QueryRange builders (allEvents, eventsStartingAt, etc.) |
Eventium.Projection |
Projection, StreamProjection, getLatestStreamProjection |
Eventium.CommandHandler |
CommandHandler, applyCommandHandler, applyCommandHandlerWithCache |
Eventium.ProcessManager |
ProcessManager, ProcessManagerEffect, CommandDispatcher, CommandDispatchResult, runProcessManagerEffects, processManagerEventHandler |
Eventium.CommandDispatcher |
AggregateHandler, mkAggregateHandler, commandHandlerDispatcher |
Eventium.EventHandler |
EventHandler, handleEvents |
Eventium.EventPublisher |
EventPublisher, publishingEventStoreWriter, synchronousPublisher |
Eventium.EventSubscription |
EventSubscription, pollingSubscription, CheckpointStore |
Eventium.ReadModel |
ReadModel, runReadModel, rebuildReadModel, combineReadModels |
Eventium.ProjectionCache.Cache |
ProjectionCache helpers: snapshotEventHandler, snapshotGlobalEventHandler, getLatestVersionedProjectionWithCache, updateVersionedProjectionCache |
Eventium.ProjectionCache.Types |
ProjectionCache, VersionedProjectionCache, GlobalProjectionCache |
Eventium.Codec |
Codec, jsonCodec, jsonTextCodec, composeCodecs |
Eventium.UUID |
UUID utilities (uuidNextRandom, uuidFromText, uuidFromInteger) |
Eventium.TH |
Template Haskell: deriveJSON, mkSumTypeCodec, mkSumTypeEmbedding, makeProjection |
Usage
dependencies:
- eventium-core >= 0.1.0
Then pick a storage backend: eventium-memory, eventium-sqlite, or eventium-postgresql.
Documentation
License
MIT -- see LICENSE.md