{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE TypeOperators #-}

-- | Defines an Postgresql event store.
module Eventium.Store.Postgresql
  ( postgresqlEventStoreWriter,
    module Eventium.Store.Class,
    module Eventium.Store.Sql,
  )
where

import Control.Monad.Reader
import Data.Monoid ((<>))
import Data.Text (Text)
import Database.Persist
import Database.Persist.Names (EntityNameDB (..), FieldNameDB (..))
import Database.Persist.Sql
import Eventium.Store.Class
import Eventium.Store.Sql

-- | An 'EventStore' that uses a PostgreSQL database as a backend. Use
-- 'SqlEventStoreConfig' to configure this event store.
postgresqlEventStoreWriter ::
  (MonadIO m, PersistEntity entity, PersistEntityBackend entity ~ SqlBackend, SafeToInsert entity) =>
  SqlEventStoreConfig entity serialized ->
  VersionedEventStoreWriter (SqlPersistT m) serialized
postgresqlEventStoreWriter :: forall (m :: * -> *) entity serialized.
(MonadIO m, PersistEntity entity,
 PersistEntityBackend entity ~ SqlBackend, SafeToInsert entity) =>
SqlEventStoreConfig entity serialized
-> VersionedEventStoreWriter (SqlPersistT m) serialized
postgresqlEventStoreWriter SqlEventStoreConfig entity serialized
config = (UUID
 -> ExpectedPosition EventVersion
 -> [serialized]
 -> SqlPersistT
      m (Either (EventWriteError EventVersion) EventVersion))
-> EventStoreWriter
     UUID EventVersion (ReaderT SqlBackend m) serialized
forall key position (m :: * -> *) event.
(key
 -> ExpectedPosition position
 -> [event]
 -> m (Either (EventWriteError position) EventVersion))
-> EventStoreWriter key position m event
EventStoreWriter ((UUID
  -> ExpectedPosition EventVersion
  -> [serialized]
  -> SqlPersistT
       m (Either (EventWriteError EventVersion) EventVersion))
 -> EventStoreWriter
      UUID EventVersion (ReaderT SqlBackend m) serialized)
-> (UUID
    -> ExpectedPosition EventVersion
    -> [serialized]
    -> SqlPersistT
         m (Either (EventWriteError EventVersion) EventVersion))
-> EventStoreWriter
     UUID EventVersion (ReaderT SqlBackend m) serialized
forall a b. (a -> b) -> a -> b
$ (UUID -> ReaderT SqlBackend m EventVersion)
-> (UUID -> [serialized] -> ReaderT SqlBackend m EventVersion)
-> UUID
-> ExpectedPosition EventVersion
-> [serialized]
-> SqlPersistT
     m (Either (EventWriteError EventVersion) EventVersion)
forall (m :: * -> *) position key event.
(Monad m, Ord position, Num position) =>
(key -> m position)
-> (key -> [event] -> m EventVersion)
-> key
-> ExpectedPosition position
-> [event]
-> m (Either (EventWriteError position) EventVersion)
transactionalExpectedWriteHelper UUID -> ReaderT SqlBackend m EventVersion
getLatestVersion UUID -> [serialized] -> ReaderT SqlBackend m EventVersion
storeEvents'
  where
    getLatestVersion :: UUID -> ReaderT SqlBackend m EventVersion
getLatestVersion = SqlEventStoreConfig entity serialized
-> (FieldNameDB -> FieldNameDB -> FieldNameDB -> Text)
-> UUID
-> ReaderT SqlBackend m EventVersion
forall (m :: * -> *) entity serialized.
(MonadIO m, PersistEntity entity,
 PersistEntityBackend entity ~ SqlBackend) =>
SqlEventStoreConfig entity serialized
-> (FieldNameDB -> FieldNameDB -> FieldNameDB -> Text)
-> UUID
-> SqlPersistT m EventVersion
sqlMaxEventVersion SqlEventStoreConfig entity serialized
config FieldNameDB -> FieldNameDB -> FieldNameDB -> Text
maxPostgresVersionSql
    storeEvents' :: UUID -> [serialized] -> ReaderT SqlBackend m EventVersion
storeEvents' = SqlEventStoreConfig entity serialized
-> Maybe (Text -> Text)
-> (FieldNameDB -> FieldNameDB -> FieldNameDB -> Text)
-> UUID
-> [serialized]
-> ReaderT SqlBackend m EventVersion
forall (m :: * -> *) entity serialized.
(MonadIO m, PersistEntity entity,
 PersistEntityBackend entity ~ SqlBackend, SafeToInsert entity) =>
SqlEventStoreConfig entity serialized
-> Maybe (Text -> Text)
-> (FieldNameDB -> FieldNameDB -> FieldNameDB -> Text)
-> UUID
-> [serialized]
-> SqlPersistT m EventVersion
sqlStoreEvents SqlEventStoreConfig entity serialized
config ((Text -> Text) -> Maybe (Text -> Text)
forall a. a -> Maybe a
Just Text -> Text
tableLockFunc) FieldNameDB -> FieldNameDB -> FieldNameDB -> Text
maxPostgresVersionSql

maxPostgresVersionSql :: FieldNameDB -> FieldNameDB -> FieldNameDB -> Text
maxPostgresVersionSql :: FieldNameDB -> FieldNameDB -> FieldNameDB -> Text
maxPostgresVersionSql (FieldNameDB Text
tableName) (FieldNameDB Text
uuidFieldName) (FieldNameDB Text
versionFieldName) =
  Text
"SELECT COALESCE(MAX(" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
versionFieldName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"), -1) FROM " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" WHERE " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
uuidFieldName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" = ?"

-- | We need to lock the events table or else our global sequence number might
-- not be monotonically increasing over time from the point of view of a
-- reader.
--
-- For example, say transaction A begins to write an event and the
-- auto-increment key is 1. Then, transaction B starts to insert an event and
-- gets an id of 2. If transaction B is quick and completes, then a listener
-- might see the event from B and thinks it has all the events up to a sequence
-- number of 2. However, once A finishes and the event with the id of 1 is
-- done, then the listener won't know that event exists.
tableLockFunc :: Text -> Text
tableLockFunc :: Text -> Text
tableLockFunc Text
tableName = Text
"LOCK " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" IN EXCLUSIVE MODE"