module Database.PostgreSQL.Consumers.Consumer
( ConsumerID
, registerConsumer
, unregisterConsumer
) where
import Control.Monad.Base
import Control.Monad.Catch
import Control.Monad.Time
import Data.Int
import Data.Monoid.Utils
import Database.PostgreSQL.Consumers.Config
import Database.PostgreSQL.Consumers.Utils
import Database.PostgreSQL.PQTypes
newtype ConsumerID = ConsumerID Int64
deriving (ConsumerID -> ConsumerID -> Bool
(ConsumerID -> ConsumerID -> Bool)
-> (ConsumerID -> ConsumerID -> Bool) -> Eq ConsumerID
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ConsumerID -> ConsumerID -> Bool
== :: ConsumerID -> ConsumerID -> Bool
$c/= :: ConsumerID -> ConsumerID -> Bool
/= :: ConsumerID -> ConsumerID -> Bool
Eq, Eq ConsumerID
Eq ConsumerID =>
(ConsumerID -> ConsumerID -> Ordering)
-> (ConsumerID -> ConsumerID -> Bool)
-> (ConsumerID -> ConsumerID -> Bool)
-> (ConsumerID -> ConsumerID -> Bool)
-> (ConsumerID -> ConsumerID -> Bool)
-> (ConsumerID -> ConsumerID -> ConsumerID)
-> (ConsumerID -> ConsumerID -> ConsumerID)
-> Ord ConsumerID
ConsumerID -> ConsumerID -> Bool
ConsumerID -> ConsumerID -> Ordering
ConsumerID -> ConsumerID -> ConsumerID
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: ConsumerID -> ConsumerID -> Ordering
compare :: ConsumerID -> ConsumerID -> Ordering
$c< :: ConsumerID -> ConsumerID -> Bool
< :: ConsumerID -> ConsumerID -> Bool
$c<= :: ConsumerID -> ConsumerID -> Bool
<= :: ConsumerID -> ConsumerID -> Bool
$c> :: ConsumerID -> ConsumerID -> Bool
> :: ConsumerID -> ConsumerID -> Bool
$c>= :: ConsumerID -> ConsumerID -> Bool
>= :: ConsumerID -> ConsumerID -> Bool
$cmax :: ConsumerID -> ConsumerID -> ConsumerID
max :: ConsumerID -> ConsumerID -> ConsumerID
$cmin :: ConsumerID -> ConsumerID -> ConsumerID
min :: ConsumerID -> ConsumerID -> ConsumerID
Ord)
instance PQFormat ConsumerID where
pqFormat :: ByteString
pqFormat = forall t. PQFormat t => ByteString
pqFormat @Int64
instance FromSQL ConsumerID where
type PQBase ConsumerID = PQBase Int64
fromSQL :: Maybe (PQBase ConsumerID) -> IO ConsumerID
fromSQL Maybe (PQBase ConsumerID)
mbase = Int64 -> ConsumerID
ConsumerID (Int64 -> ConsumerID) -> IO Int64 -> IO ConsumerID
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (PQBase Int64) -> IO Int64
forall t. FromSQL t => Maybe (PQBase t) -> IO t
fromSQL Maybe (PQBase Int64)
Maybe (PQBase ConsumerID)
mbase
instance ToSQL ConsumerID where
type PQDest ConsumerID = PQDest Int64
toSQL :: forall r.
ConsumerID
-> ParamAllocator -> (Ptr (PQDest ConsumerID) -> IO r) -> IO r
toSQL (ConsumerID Int64
n) = Int64 -> ParamAllocator -> (Ptr (PQDest Int64) -> IO r) -> IO r
forall r.
Int64 -> ParamAllocator -> (Ptr (PQDest Int64) -> IO r) -> IO r
forall t r.
ToSQL t =>
t -> ParamAllocator -> (Ptr (PQDest t) -> IO r) -> IO r
toSQL Int64
n
instance Show ConsumerID where
showsPrec :: Int -> ConsumerID -> ShowS
showsPrec Int
p (ConsumerID Int64
n) = Int -> Int64 -> ShowS
forall a. Show a => Int -> a -> ShowS
showsPrec Int
p Int64
n
registerConsumer
:: (MonadBase IO m, MonadMask m, MonadTime m)
=> ConsumerConfig n idx job
-> ConnectionSourceM m
-> m ConsumerID
registerConsumer :: forall (m :: * -> *) (n :: * -> *) idx job.
(MonadBase IO m, MonadMask m, MonadTime m) =>
ConsumerConfig n idx job -> ConnectionSourceM m -> m ConsumerID
registerConsumer ConsumerConfig {Int
[SQL]
Maybe Channel
RawSQL ()
job -> idx
job -> n Result
job -> [Pair]
row -> job
SomeException -> job -> n Action
ccJobsTable :: RawSQL ()
ccConsumersTable :: RawSQL ()
ccJobSelectors :: [SQL]
ccJobFetcher :: row -> job
ccJobIndex :: job -> idx
ccNotificationChannel :: Maybe Channel
ccNotificationTimeout :: Int
ccMaxRunningJobs :: Int
ccProcessJob :: job -> n Result
ccOnException :: SomeException -> job -> n Action
ccJobLogData :: job -> [Pair]
ccJobsTable :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> RawSQL ()
ccConsumersTable :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> RawSQL ()
ccJobSelectors :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> [SQL]
ccJobFetcher :: ()
ccJobIndex :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> job -> idx
ccNotificationChannel :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> Maybe Channel
ccNotificationTimeout :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> Int
ccMaxRunningJobs :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> Int
ccProcessJob :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> job -> m Result
ccOnException :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> SomeException -> job -> m Action
ccJobLogData :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> job -> [Pair]
..} ConnectionSourceM m
cs = ConnectionSourceM m
-> TransactionSettings -> DBT m ConsumerID -> m ConsumerID
forall (m :: * -> *) a.
(HasCallStack, MonadBase IO m, MonadMask m) =>
ConnectionSourceM m -> TransactionSettings -> DBT m a -> m a
runDBT ConnectionSourceM m
cs TransactionSettings
ts (DBT m ConsumerID -> m ConsumerID)
-> DBT m ConsumerID -> m ConsumerID
forall a b. (a -> b) -> a -> b
$ do
UTCTime
now <- DBT_ m m UTCTime
forall (m :: * -> *). MonadTime m => m UTCTime
currentTime
QueryName -> SQL -> DBT_ m m ()
forall (m :: * -> *).
(HasCallStack, MonadDB m) =>
QueryName -> SQL -> m ()
runPreparedSQL_ (Text -> RawSQL () -> QueryName
preparedSqlName Text
"registerConsumer" RawSQL ()
ccConsumersTable) (SQL -> DBT_ m m ()) -> SQL -> DBT_ m m ()
forall a b. (a -> b) -> a -> b
$
[SQL] -> SQL
forall m. (IsString m, Monoid m) => [m] -> m
smconcat
[ SQL
"INSERT INTO" SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> RawSQL () -> SQL
raw RawSQL ()
ccConsumersTable
, SQL
"(name, last_activity) VALUES (" SQL -> Text -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> RawSQL () -> Text
unRawSQL RawSQL ()
ccJobsTable SQL -> SQL -> SQL
forall a. Semigroup a => a -> a -> a
<> SQL
", " SQL -> UTCTime -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> UTCTime
now SQL -> SQL -> SQL
forall a. Semigroup a => a -> a -> a
<> SQL
")"
, SQL
"RETURNING id"
]
(Identity ConsumerID -> ConsumerID) -> DBT m ConsumerID
forall (m :: * -> *) row t.
(HasCallStack, MonadDB m, MonadThrow m, FromRow row) =>
(row -> t) -> m t
fetchOne Identity ConsumerID -> ConsumerID
forall a. Identity a -> a
runIdentity
where
ts :: TransactionSettings
ts =
TransactionSettings
defaultTransactionSettings
{ tsAutoTransaction = False
}
unregisterConsumer
:: (MonadBase IO m, MonadMask m)
=> ConsumerConfig n idx job
-> ConnectionSourceM m
-> ConsumerID
-> m ()
unregisterConsumer :: forall (m :: * -> *) (n :: * -> *) idx job.
(MonadBase IO m, MonadMask m) =>
ConsumerConfig n idx job
-> ConnectionSourceM m -> ConsumerID -> m ()
unregisterConsumer ConsumerConfig {Int
[SQL]
Maybe Channel
RawSQL ()
job -> idx
job -> n Result
job -> [Pair]
row -> job
SomeException -> job -> n Action
ccJobsTable :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> RawSQL ()
ccConsumersTable :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> RawSQL ()
ccJobSelectors :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> [SQL]
ccJobFetcher :: ()
ccJobIndex :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> job -> idx
ccNotificationChannel :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> Maybe Channel
ccNotificationTimeout :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> Int
ccMaxRunningJobs :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> Int
ccProcessJob :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> job -> m Result
ccOnException :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> SomeException -> job -> m Action
ccJobLogData :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> job -> [Pair]
ccJobsTable :: RawSQL ()
ccConsumersTable :: RawSQL ()
ccJobSelectors :: [SQL]
ccJobFetcher :: row -> job
ccJobIndex :: job -> idx
ccNotificationChannel :: Maybe Channel
ccNotificationTimeout :: Int
ccMaxRunningJobs :: Int
ccProcessJob :: job -> n Result
ccOnException :: SomeException -> job -> n Action
ccJobLogData :: job -> [Pair]
..} ConnectionSourceM m
cs ConsumerID
wid = ConnectionSourceM m -> TransactionSettings -> DBT m () -> m ()
forall (m :: * -> *) a.
(HasCallStack, MonadBase IO m, MonadMask m) =>
ConnectionSourceM m -> TransactionSettings -> DBT m a -> m a
runDBT ConnectionSourceM m
cs TransactionSettings
ts (DBT m () -> m ()) -> DBT m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
QueryName -> SQL -> DBT m ()
forall (m :: * -> *).
(HasCallStack, MonadDB m) =>
QueryName -> SQL -> m ()
runPreparedSQL_ (Text -> RawSQL () -> QueryName
preparedSqlName Text
"deregisterJobs" RawSQL ()
ccJobsTable) (SQL -> DBT m ()) -> SQL -> DBT m ()
forall a b. (a -> b) -> a -> b
$
[SQL] -> SQL
forall m. (IsString m, Monoid m) => [m] -> m
smconcat
[ SQL
"UPDATE" SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> RawSQL () -> SQL
raw RawSQL ()
ccJobsTable
, SQL
" SET reserved_by = NULL"
, SQL
" WHERE reserved_by =" SQL -> ConsumerID -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> ConsumerID
wid
]
QueryName -> SQL -> DBT m ()
forall (m :: * -> *).
(HasCallStack, MonadDB m) =>
QueryName -> SQL -> m ()
runPreparedSQL_ (Text -> RawSQL () -> QueryName
preparedSqlName Text
"removeConsumers" RawSQL ()
ccConsumersTable) (SQL -> DBT m ()) -> SQL -> DBT m ()
forall a b. (a -> b) -> a -> b
$
[SQL] -> SQL
forall m. (IsString m, Monoid m) => [m] -> m
smconcat
[ SQL
"DELETE FROM " SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> RawSQL () -> SQL
raw RawSQL ()
ccConsumersTable
, SQL
"WHERE id =" SQL -> ConsumerID -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> ConsumerID
wid
, SQL
" AND name =" SQL -> Text -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> RawSQL () -> Text
unRawSQL RawSQL ()
ccJobsTable
]
where
ts :: TransactionSettings
ts =
TransactionSettings
defaultTransactionSettings
{ tsRestartPredicate = Just . RestartPredicate $
\DetailedQueryError
e Integer
_ -> DetailedQueryError -> ErrorCode
qeErrorCode DetailedQueryError
e ErrorCode -> ErrorCode -> Bool
forall a. Eq a => a -> a -> Bool
== ErrorCode
DeadlockDetected
}