module Database.PostgreSQL.Consumers.Components
  ( runConsumer
  , runConsumerWithIdleSignal
  , spawnListener
  , spawnMonitor
  , spawnDispatcher
  ) where

import Control.Concurrent.Lifted
import Control.Concurrent.STM hiding (atomically)
import Control.Concurrent.STM qualified as STM
import Control.Concurrent.Thread.Lifted qualified as T
import Control.Exception (AsyncException (ThreadKilled))
import Control.Exception.Safe qualified as ES
import Control.Monad
import Control.Monad.Base
import Control.Monad.Catch
import Control.Monad.Time
import Control.Monad.Trans
import Control.Monad.Trans.Control
import Data.Foldable qualified as F
import Data.Function
import Data.Int
import Data.Map.Strict qualified as M
import Data.Monoid.Utils
import Database.PostgreSQL.Consumers.Config
import Database.PostgreSQL.Consumers.Consumer
import Database.PostgreSQL.Consumers.Utils
import Database.PostgreSQL.PQTypes
import Log

-- | Run the consumer. The purpose of the returned monadic action is to wait for
-- currently processed jobs and clean up. This function is best used in
-- conjunction with 'finalize' to seamlessly handle the finalization.
--
-- If you want to add metrics, see the
-- [@consumers-metrics-prometheus@](https://hackage.haskell.org/package/consumers-metrics-prometheus)
-- package to seamlessly instrument your consumer.
runConsumer
  :: ( MonadBaseControl IO m
     , MonadLog m
     , MonadMask m
     , MonadTime m
     , Eq idx
     , Show idx
     , FromSQL idx
     , ToSQL idx
     )
  => ConsumerConfig m idx job
  -- ^ The consumer.
  -> ConnectionSourceM m
  -> m (m ())
runConsumer :: forall (m :: * -> *) idx job.
(MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m,
 Eq idx, Show idx, FromSQL idx, ToSQL idx) =>
ConsumerConfig m idx job -> ConnectionSourceM m -> m (m ())
runConsumer ConsumerConfig m idx job
cc ConnectionSourceM m
cs = ConsumerConfig m idx job
-> ConnectionSourceM m -> Maybe (TMVar Bool) -> m (m ())
forall (m :: * -> *) idx job.
(MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m,
 Eq idx, Show idx, FromSQL idx, ToSQL idx) =>
ConsumerConfig m idx job
-> ConnectionSourceM m -> Maybe (TMVar Bool) -> m (m ())
runConsumerWithMaybeIdleSignal ConsumerConfig m idx job
cc ConnectionSourceM m
cs Maybe (TMVar Bool)
forall a. Maybe a
Nothing

runConsumerWithIdleSignal
  :: ( MonadBaseControl IO m
     , MonadLog m
     , MonadMask m
     , MonadTime m
     , Eq idx
     , Show idx
     , FromSQL idx
     , ToSQL idx
     )
  => ConsumerConfig m idx job
  -- ^ The consumer.
  -> ConnectionSourceM m
  -> TMVar Bool
  -> m (m ())
runConsumerWithIdleSignal :: forall (m :: * -> *) idx job.
(MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m,
 Eq idx, Show idx, FromSQL idx, ToSQL idx) =>
ConsumerConfig m idx job
-> ConnectionSourceM m -> TMVar Bool -> m (m ())
runConsumerWithIdleSignal ConsumerConfig m idx job
cc ConnectionSourceM m
cs TMVar Bool
idleSignal = ConsumerConfig m idx job
-> ConnectionSourceM m -> Maybe (TMVar Bool) -> m (m ())
forall (m :: * -> *) idx job.
(MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m,
 Eq idx, Show idx, FromSQL idx, ToSQL idx) =>
ConsumerConfig m idx job
-> ConnectionSourceM m -> Maybe (TMVar Bool) -> m (m ())
runConsumerWithMaybeIdleSignal ConsumerConfig m idx job
cc ConnectionSourceM m
cs (TMVar Bool -> Maybe (TMVar Bool)
forall a. a -> Maybe a
Just TMVar Bool
idleSignal)

-- | Run the consumer and also signal whenever the consumer is waiting for
-- getNotification or threadDelay.
runConsumerWithMaybeIdleSignal
  :: ( MonadBaseControl IO m
     , MonadLog m
     , MonadMask m
     , MonadTime m
     , Eq idx
     , Show idx
     , FromSQL idx
     , ToSQL idx
     )
  => ConsumerConfig m idx job
  -> ConnectionSourceM m
  -> Maybe (TMVar Bool)
  -> m (m ())
runConsumerWithMaybeIdleSignal :: forall (m :: * -> *) idx job.
(MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m,
 Eq idx, Show idx, FromSQL idx, ToSQL idx) =>
ConsumerConfig m idx job
-> ConnectionSourceM m -> Maybe (TMVar Bool) -> m (m ())
runConsumerWithMaybeIdleSignal ConsumerConfig m idx job
cc0 ConnectionSourceM m
cs Maybe (TMVar Bool)
mIdleSignal
  | ConsumerConfig m idx job -> Int
forall (m :: * -> *) idx job. ConsumerConfig m idx job -> Int
ccMaxRunningJobs ConsumerConfig m idx job
cc Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
1 = do
      Text -> m ()
forall (m :: * -> *). MonadLog m => Text -> m ()
logInfo_ Text
"ccMaxRunningJobs < 1, not starting the consumer"
      m () -> m (m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (m () -> m (m ())) -> m () -> m (m ())
forall a b. (a -> b) -> a -> b
$ () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
  | Bool
otherwise = do
      MVar ()
semaphore <- () -> m (MVar ())
forall (m :: * -> *) a. MonadBase IO m => a -> m (MVar a)
newMVar ()
      TVar (Map ThreadId idx)
runningJobsInfo <- IO (TVar (Map ThreadId idx)) -> m (TVar (Map ThreadId idx))
forall α. IO α -> m α
forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase (IO (TVar (Map ThreadId idx)) -> m (TVar (Map ThreadId idx)))
-> IO (TVar (Map ThreadId idx)) -> m (TVar (Map ThreadId idx))
forall a b. (a -> b) -> a -> b
$ Map ThreadId idx -> IO (TVar (Map ThreadId idx))
forall a. a -> IO (TVar a)
newTVarIO Map ThreadId idx
forall k a. Map k a
M.empty
      TVar Int
runningJobs <- IO (TVar Int) -> m (TVar Int)
forall α. IO α -> m α
forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase (IO (TVar Int) -> m (TVar Int)) -> IO (TVar Int) -> m (TVar Int)
forall a b. (a -> b) -> a -> b
$ Int -> IO (TVar Int)
forall a. a -> IO (TVar a)
newTVarIO Int
0

      Either DBException ()
skipLockedTest :: Either DBException () <-
        m () -> m (Either DBException ())
forall (m :: * -> *) e a.
(HasCallStack, MonadCatch m, Exception e) =>
m a -> m (Either e a)
try (m () -> m (Either DBException ()))
-> (DBT m () -> m ()) -> DBT m () -> m (Either DBException ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConnectionSourceM m -> TransactionSettings -> DBT m () -> m ()
forall (m :: * -> *) a.
(HasCallStack, MonadBase IO m, MonadMask m) =>
ConnectionSourceM m -> TransactionSettings -> DBT m a -> m a
runDBT ConnectionSourceM m
cs TransactionSettings
defaultTransactionSettings (DBT m () -> m (Either DBException ()))
-> DBT m () -> m (Either DBException ())
forall a b. (a -> b) -> a -> b
$
          SQL -> DBT m ()
forall (m :: * -> *). (HasCallStack, MonadDB m) => SQL -> m ()
runSQL_ SQL
"SELECT TRUE FOR UPDATE SKIP LOCKED"
      -- If we can't lock rows using 'skip locked' throw an exception
      (DBException -> m ())
-> (() -> m ()) -> Either DBException () -> m ()
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (m () -> DBException -> m ()
forall a b. a -> b -> a
const (m () -> DBException -> m ()) -> m () -> DBException -> m ()
forall a b. (a -> b) -> a -> b
$ String -> m ()
forall a. HasCallStack => String -> a
error String
"PostgreSQL version with support for SKIP LOCKED is required") () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Either DBException ()
skipLockedTest

      ConsumerID
cid <- ConsumerConfig m idx job -> ConnectionSourceM m -> m ConsumerID
forall (m :: * -> *) (n :: * -> *) idx job.
(MonadBase IO m, MonadMask m, MonadTime m) =>
ConsumerConfig n idx job -> ConnectionSourceM m -> m ConsumerID
registerConsumer ConsumerConfig m idx job
cc ConnectionSourceM m
cs
      [Pair] -> m (m ()) -> m (m ())
forall a. [Pair] -> m a -> m a
forall (m :: * -> *) a. MonadLog m => [Pair] -> m a -> m a
localData [Key
"consumer_id" Key -> String -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= ConsumerID -> String
forall a. Show a => a -> String
show ConsumerID
cid] (m (m ()) -> m (m ())) -> m (m ()) -> m (m ())
forall a b. (a -> b) -> a -> b
$ do
        ThreadId
listener <- ConsumerConfig m idx job
-> ConnectionSourceM m -> MVar () -> m ThreadId
forall (m :: * -> *) idx job.
(MonadBaseControl IO m, MonadMask m) =>
ConsumerConfig m idx job
-> ConnectionSourceM m -> MVar () -> m ThreadId
spawnListener ConsumerConfig m idx job
cc ConnectionSourceM m
cs MVar ()
semaphore
        ThreadId
monitor <- Text -> m ThreadId -> m ThreadId
forall a. Text -> m a -> m a
forall (m :: * -> *) a. MonadLog m => Text -> m a -> m a
localDomain Text
"monitor" (m ThreadId -> m ThreadId) -> m ThreadId -> m ThreadId
forall a b. (a -> b) -> a -> b
$ ConsumerConfig m idx job
-> ConnectionSourceM m -> ConsumerID -> m ThreadId
forall (m :: * -> *) idx job.
(MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m,
 Show idx, FromSQL idx, ToSQL idx) =>
ConsumerConfig m idx job
-> ConnectionSourceM m -> ConsumerID -> m ThreadId
spawnMonitor ConsumerConfig m idx job
cc ConnectionSourceM m
cs ConsumerID
cid
        ThreadId
dispatcher <-
          Text -> m ThreadId -> m ThreadId
forall a. Text -> m a -> m a
forall (m :: * -> *) a. MonadLog m => Text -> m a -> m a
localDomain Text
"dispatcher" (m ThreadId -> m ThreadId) -> m ThreadId -> m ThreadId
forall a b. (a -> b) -> a -> b
$
            ConsumerConfig m idx job
-> ConnectionSourceM m
-> ConsumerID
-> MVar ()
-> TVar (Map ThreadId idx)
-> TVar Int
-> Maybe (TMVar Bool)
-> m ThreadId
forall (m :: * -> *) idx job.
(MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m,
 Show idx, ToSQL idx) =>
ConsumerConfig m idx job
-> ConnectionSourceM m
-> ConsumerID
-> MVar ()
-> TVar (Map ThreadId idx)
-> TVar Int
-> Maybe (TMVar Bool)
-> m ThreadId
spawnDispatcher
              ConsumerConfig m idx job
cc
              ConnectionSourceM m
cs
              ConsumerID
cid
              MVar ()
semaphore
              TVar (Map ThreadId idx)
runningJobsInfo
              TVar Int
runningJobs
              Maybe (TMVar Bool)
mIdleSignal
        m () -> m (m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (m () -> m (m ())) -> (m () -> m ()) -> m () -> m (m ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> m () -> m ()
forall a. Text -> m a -> m a
forall (m :: * -> *) a. MonadLog m => Text -> m a -> m a
localDomain Text
"finalizer" (m () -> m (m ())) -> m () -> m (m ())
forall a b. (a -> b) -> a -> b
$ do
          ThreadId -> m ()
forall (m :: * -> *). MonadBase IO m => ThreadId -> m ()
stopExecution ThreadId
listener
          ThreadId -> m ()
forall (m :: * -> *). MonadBase IO m => ThreadId -> m ()
stopExecution ThreadId
dispatcher
          TVar (Map ThreadId idx) -> TVar Int -> m ()
forall {m :: * -> *} {a} {k} {a}.
(MonadLog m, MonadBase IO m, Num a, Eq a, Eq k, Eq a, Show a) =>
TVar (Map k a) -> TVar a -> m ()
waitForRunningJobs TVar (Map ThreadId idx)
runningJobsInfo TVar Int
runningJobs
          ThreadId -> m ()
forall (m :: * -> *). MonadBase IO m => ThreadId -> m ()
stopExecution ThreadId
monitor
          ConsumerConfig m idx job
-> ConnectionSourceM m -> ConsumerID -> m ()
forall (m :: * -> *) (n :: * -> *) idx job.
(MonadBase IO m, MonadMask m) =>
ConsumerConfig n idx job
-> ConnectionSourceM m -> ConsumerID -> m ()
unregisterConsumer ConsumerConfig m idx job
cc ConnectionSourceM m
cs ConsumerID
cid
  where
    cc :: ConsumerConfig m idx job
cc =
      ConsumerConfig m idx job
cc0
        { ccOnException = \SomeException
ex job
job -> [Pair] -> m Action -> m Action
forall a. [Pair] -> m a -> m a
forall (m :: * -> *) a. MonadLog m => [Pair] -> m a -> m a
localData (ConsumerConfig m idx job -> job -> [Pair]
forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> job -> [Pair]
ccJobLogData ConsumerConfig m idx job
cc0 job
job) (m Action -> m Action) -> m Action -> m Action
forall a b. (a -> b) -> a -> b
$ do
            let doOnException :: m Action
doOnException = do
                  Action
action <- ConsumerConfig m idx job -> SomeException -> job -> m Action
forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> SomeException -> job -> m Action
ccOnException ConsumerConfig m idx job
cc0 SomeException
ex job
job
                  Text -> Value -> m ()
forall (m :: * -> *) a. (MonadLog m, ToJSON a) => Text -> a -> m ()
logInfo Text
"Unexpected exception caught while processing job" (Value -> m ()) -> Value -> m ()
forall a b. (a -> b) -> a -> b
$
                    [Pair] -> Value
object
                      [ Key
"exception" Key -> String -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= SomeException -> String
forall a. Show a => a -> String
show SomeException
ex
                      , Key
"action" Key -> String -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= Action -> String
forall a. Show a => a -> String
show Action
action
                      ]
                  Action -> m Action
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Action
action
            -- Let asynchronous exceptions through (StopExecution in particular).
            m Action
doOnException m Action -> (SomeException -> m Action) -> m Action
forall (m :: * -> *) a.
(HasCallStack, MonadCatch m) =>
m a -> (SomeException -> m a) -> m a
`ES.catchAny` \SomeException
handlerEx -> do
              -- Arbitrary delay, but better than letting exceptions from the
              -- handler through and potentially crashlooping the consumer:
              --
              -- 1. A job J fails with an exception, ccOnException is called and
              -- it throws an exception.
              --
              -- 2. The consumer goes down, J is now stuck.
              --
              -- 3. The consumer is restarted, it tries to clean up stuck jobs
              -- (which include J), the cleanup code calls ccOnException on J
              -- and if it throws again, we're back to (2).
              let action :: Action
action = Interval -> Action
RerunAfter (Interval -> Action) -> Interval -> Action
forall a b. (a -> b) -> a -> b
$ Int32 -> Interval
idays Int32
1
              Text -> Value -> m ()
forall (m :: * -> *) a. (MonadLog m, ToJSON a) => Text -> a -> m ()
logAttention Text
"ccOnException threw an exception" (Value -> m ()) -> Value -> m ()
forall a b. (a -> b) -> a -> b
$
                [Pair] -> Value
object
                  [ Key
"exception" Key -> String -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= SomeException -> String
forall a. Show a => a -> String
show SomeException
handlerEx
                  , Key
"action" Key -> String -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= Action -> String
forall a. Show a => a -> String
show Action
action
                  ]
              Action -> m Action
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Action
action
        }

    waitForRunningJobs :: TVar (Map k a) -> TVar a -> m ()
waitForRunningJobs TVar (Map k a)
runningJobsInfo TVar a
runningJobs = do
      Map k a
initialJobs <- IO (Map k a) -> m (Map k a)
forall α. IO α -> m α
forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase (IO (Map k a) -> m (Map k a)) -> IO (Map k a) -> m (Map k a)
forall a b. (a -> b) -> a -> b
$ TVar (Map k a) -> IO (Map k a)
forall a. TVar a -> IO a
readTVarIO TVar (Map k a)
runningJobsInfo
      (((Map k a -> m ()) -> Map k a -> m ()) -> Map k a -> m ()
forall a. (a -> a) -> a
`fix` Map k a
initialJobs) (((Map k a -> m ()) -> Map k a -> m ()) -> m ())
-> ((Map k a -> m ()) -> Map k a -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \Map k a -> m ()
loop Map k a
jobsInfo -> do
        -- If jobs are still running, display info about them.
        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Map k a -> Bool
forall k a. Map k a -> Bool
M.null Map k a
jobsInfo) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
          Text -> Value -> m ()
forall (m :: * -> *) a. (MonadLog m, ToJSON a) => Text -> a -> m ()
logInfo Text
"Waiting for running jobs" (Value -> m ()) -> Value -> m ()
forall a b. (a -> b) -> a -> b
$
            [Pair] -> Value
object
              [ Key
"job_ids" Key -> [String] -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= Map k a -> [String]
forall {k}. Map k a -> [String]
showJobsInfo Map k a
jobsInfo
              ]
        m (m ()) -> m ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (m (m ()) -> m ())
-> (STM (m ()) -> m (m ())) -> STM (m ()) -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (m ()) -> m (m ())
forall (m :: * -> *) a. MonadBase IO m => STM a -> m a
atomically (STM (m ()) -> m ()) -> STM (m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ do
          a
jobs <- TVar a -> STM a
forall a. TVar a -> STM a
readTVar TVar a
runningJobs
          if a
jobs a -> a -> Bool
forall a. Eq a => a -> a -> Bool
== a
0
            then m () -> STM (m ())
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (m () -> STM (m ())) -> m () -> STM (m ())
forall a b. (a -> b) -> a -> b
$ () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
            else do
              Map k a
newJobsInfo <- TVar (Map k a) -> STM (Map k a)
forall a. TVar a -> STM a
readTVar TVar (Map k a)
runningJobsInfo
              -- If jobs info didn't change, wait for it to change.  Otherwise
              -- loop so it either displays the new info or exits if there are
              -- no jobs running anymore.
              if Map k a
newJobsInfo Map k a -> Map k a -> Bool
forall a. Eq a => a -> a -> Bool
== Map k a
jobsInfo
                then STM (m ())
forall a. STM a
retry
                else m () -> STM (m ())
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (m () -> STM (m ())) -> m () -> STM (m ())
forall a b. (a -> b) -> a -> b
$ Map k a -> m ()
loop Map k a
newJobsInfo
      where
        showJobsInfo :: Map k a -> [String]
showJobsInfo = (a -> [String] -> [String]) -> [String] -> Map k a -> [String]
forall a b k. (a -> b -> b) -> b -> Map k a -> b
M.foldr (\a
idx [String]
acc -> a -> String
forall a. Show a => a -> String
show a
idx String -> [String] -> [String]
forall a. a -> [a] -> [a]
: [String]
acc) []

-- | Spawn a thread that generates signals for the dispatcher to probe the
-- database for incoming jobs.
spawnListener
  :: (MonadBaseControl IO m, MonadMask m)
  => ConsumerConfig m idx job
  -> ConnectionSourceM m
  -> MVar ()
  -> m ThreadId
spawnListener :: forall (m :: * -> *) idx job.
(MonadBaseControl IO m, MonadMask m) =>
ConsumerConfig m idx job
-> ConnectionSourceM m -> MVar () -> m ThreadId
spawnListener ConsumerConfig m idx job
cc ConnectionSourceM m
cs MVar ()
semaphore =
  String -> m () -> m ThreadId
forall (m :: * -> *).
MonadBaseControl IO m =>
String -> m () -> m ThreadId
forkP String
"listener" (m () -> m ThreadId) -> m () -> m ThreadId
forall a b. (a -> b) -> a -> b
$
    case ConsumerConfig m idx job -> Maybe Channel
forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> Maybe Channel
ccNotificationChannel ConsumerConfig m idx job
cc of
      Just Channel
chan ->
        ConnectionSourceM m -> TransactionSettings -> DBT m () -> m ()
forall (m :: * -> *) a.
(HasCallStack, MonadBase IO m, MonadMask m) =>
ConnectionSourceM m -> TransactionSettings -> DBT m a -> m a
runDBT ConnectionSourceM m
cs TransactionSettings
noTs
          (DBT m () -> m ())
-> (DBT_ m m Bool -> DBT m ()) -> DBT_ m m Bool -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DBT m () -> DBT m () -> DBT m () -> DBT m ()
forall (m :: * -> *) a c b.
(HasCallStack, MonadMask m) =>
m a -> m c -> m b -> m b
bracket_ (Channel -> DBT m ()
forall (m :: * -> *). (HasCallStack, MonadDB m) => Channel -> m ()
listen Channel
chan) (Channel -> DBT m ()
forall (m :: * -> *). (HasCallStack, MonadDB m) => Channel -> m ()
unlisten Channel
chan)
          (DBT m () -> DBT m ())
-> (DBT_ m m Bool -> DBT m ()) -> DBT_ m m Bool -> DBT m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DBT_ m m Bool -> DBT m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever
          (DBT_ m m Bool -> m ()) -> DBT_ m m Bool -> m ()
forall a b. (a -> b) -> a -> b
$ do
            -- If there are many notifications, we need to collect them as soon
            -- as possible, because they are stored in memory by libpq. They are
            -- also not squashed, so we perform the squashing ourselves with the
            -- help of MVar ().
            DBT_ m m (Maybe Notification) -> DBT m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (DBT_ m m (Maybe Notification) -> DBT m ())
-> (Int -> DBT_ m m (Maybe Notification)) -> Int -> DBT m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> DBT_ m m (Maybe Notification)
forall (m :: * -> *). MonadDB m => Int -> m (Maybe Notification)
getNotification (Int -> DBT m ()) -> Int -> DBT m ()
forall a b. (a -> b) -> a -> b
$ ConsumerConfig m idx job -> Int
forall (m :: * -> *) idx job. ConsumerConfig m idx job -> Int
ccNotificationTimeout ConsumerConfig m idx job
cc
            m Bool -> DBT_ m m Bool
forall (m :: * -> *) a. Monad m => m a -> DBT_ m m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift m Bool
signalDispatcher
      Maybe Channel
Nothing -> m Bool -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m Bool -> m ()) -> m Bool -> m ()
forall a b. (a -> b) -> a -> b
$ do
        IO () -> m ()
forall α. IO α -> m α
forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase (IO () -> m ()) -> (Int -> IO ()) -> Int -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> IO ()
forall (m :: * -> *). MonadBase IO m => Int -> m ()
threadDelay (Int -> m ()) -> Int -> m ()
forall a b. (a -> b) -> a -> b
$ ConsumerConfig m idx job -> Int
forall (m :: * -> *) idx job. ConsumerConfig m idx job -> Int
ccNotificationTimeout ConsumerConfig m idx job
cc
        m Bool
signalDispatcher
  where
    signalDispatcher :: m Bool
signalDispatcher = do
      IO Bool -> m Bool
forall α. IO α -> m α
forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> IO Bool
forall (m :: * -> *) a. MonadBase IO m => MVar a -> a -> m Bool
tryPutMVar MVar ()
semaphore ()

    noTs :: TransactionSettings
noTs =
      TransactionSettings
defaultTransactionSettings
        { tsAutoTransaction = False
        }

-- | Spawn a thread that monitors working consumers for activity and
-- periodically updates its own.
spawnMonitor
  :: forall m idx job
   . ( MonadBaseControl IO m
     , MonadLog m
     , MonadMask m
     , MonadTime m
     , Show idx
     , FromSQL idx
     , ToSQL idx
     )
  => ConsumerConfig m idx job
  -> ConnectionSourceM m
  -> ConsumerID
  -> m ThreadId
spawnMonitor :: forall (m :: * -> *) idx job.
(MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m,
 Show idx, FromSQL idx, ToSQL idx) =>
ConsumerConfig m idx job
-> ConnectionSourceM m -> ConsumerID -> m ThreadId
spawnMonitor ConsumerConfig {Int
[SQL]
Maybe Channel
RawSQL ()
job -> idx
job -> m Result
job -> [Pair]
row -> job
SomeException -> job -> m Action
ccMaxRunningJobs :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> Int
ccOnException :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> SomeException -> job -> m Action
ccJobLogData :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> job -> [Pair]
ccNotificationChannel :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> Maybe Channel
ccNotificationTimeout :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> Int
ccJobsTable :: RawSQL ()
ccConsumersTable :: RawSQL ()
ccJobSelectors :: [SQL]
ccJobFetcher :: row -> job
ccJobIndex :: job -> idx
ccNotificationChannel :: Maybe Channel
ccNotificationTimeout :: Int
ccMaxRunningJobs :: Int
ccProcessJob :: job -> m Result
ccOnException :: SomeException -> job -> m Action
ccJobLogData :: job -> [Pair]
ccJobsTable :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> RawSQL ()
ccConsumersTable :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> RawSQL ()
ccJobSelectors :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> [SQL]
ccJobFetcher :: ()
ccJobIndex :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> job -> idx
ccProcessJob :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> job -> m Result
..} ConnectionSourceM m
cs ConsumerID
cid = String -> m () -> m ThreadId
forall (m :: * -> *).
MonadBaseControl IO m =>
String -> m () -> m ThreadId
forkP String
"monitor" (m () -> m ThreadId) -> (m () -> m ()) -> m () -> m ThreadId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ThreadId) -> m () -> m ThreadId
forall a b. (a -> b) -> a -> b
$ do
  ConnectionSourceM m -> TransactionSettings -> DBT m () -> m ()
forall (m :: * -> *) a.
(HasCallStack, MonadBase IO m, MonadMask m) =>
ConnectionSourceM m -> TransactionSettings -> DBT m a -> m a
runDBT ConnectionSourceM m
cs TransactionSettings
ts (DBT m () -> m ()) -> DBT m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
    UTCTime
now <- DBT_ m m UTCTime
forall (m :: * -> *). MonadTime m => m UTCTime
currentTime
    -- Update last_activity of the consumer.
    Bool
ok <-
      QueryName -> SQL -> DBT_ m m Bool
forall (m :: * -> *).
(HasCallStack, MonadDB m, MonadThrow m) =>
QueryName -> SQL -> m Bool
runPreparedSQL01 (Text -> RawSQL () -> QueryName
preparedSqlName Text
"setActivity" RawSQL ()
ccConsumersTable) (SQL -> DBT_ m m Bool) -> SQL -> DBT_ m m Bool
forall a b. (a -> b) -> a -> b
$
        [SQL] -> SQL
forall m. (IsString m, Monoid m) => [m] -> m
smconcat
          [ SQL
"UPDATE" SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> RawSQL () -> SQL
raw RawSQL ()
ccConsumersTable
          , SQL
"SET last_activity = " SQL -> UTCTime -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> UTCTime
now
          , SQL
"WHERE id =" SQL -> ConsumerID -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> ConsumerID
cid
          , SQL
"  AND name =" SQL -> Text -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> RawSQL () -> Text
unRawSQL RawSQL ()
ccJobsTable
          ]
    if Bool
ok
      then Text -> DBT m ()
forall (m :: * -> *). MonadLog m => Text -> m ()
logInfo_ Text
"Activity of the consumer updated"
      else do
        Text -> DBT m ()
forall (m :: * -> *). MonadLog m => Text -> m ()
logInfo_ Text
"Consumer is not registered"
        AsyncException -> DBT m ()
forall e a. (HasCallStack, Exception e) => e -> DBT_ m m a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM AsyncException
ThreadKilled
  (Int
inactiveConsumers, [idx]
freedJobs) <- ConnectionSourceM m
-> TransactionSettings -> DBT m (Int, [idx]) -> m (Int, [idx])
forall (m :: * -> *) a.
(HasCallStack, MonadBase IO m, MonadMask m) =>
ConnectionSourceM m -> TransactionSettings -> DBT m a -> m a
runDBT ConnectionSourceM m
cs TransactionSettings
ts (DBT m (Int, [idx]) -> m (Int, [idx]))
-> DBT m (Int, [idx]) -> m (Int, [idx])
forall a b. (a -> b) -> a -> b
$ do
    UTCTime
now <- DBT_ m m UTCTime
forall (m :: * -> *). MonadTime m => m UTCTime
currentTime
    -- Reserve all inactive (assumed dead) consumers and get their ids. We don't
    -- delete them here, because if the coresponding reserved_by column in the
    -- jobs table has an IMMEDIATE foreign key with the ON DELETE SET NULL
    -- property, we will not be able to determine stuck jobs in the next step.
    QueryName -> SQL -> DBT m ()
forall (m :: * -> *).
(HasCallStack, MonadDB m) =>
QueryName -> SQL -> m ()
runPreparedSQL_ (Text -> RawSQL () -> QueryName
preparedSqlName Text
"reserveConsumers" RawSQL ()
ccConsumersTable) (SQL -> DBT m ()) -> SQL -> DBT m ()
forall a b. (a -> b) -> a -> b
$
      [SQL] -> SQL
forall m. (IsString m, Monoid m) => [m] -> m
smconcat
        [ SQL
"SELECT id::bigint"
        , SQL
"FROM" SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> RawSQL () -> SQL
raw RawSQL ()
ccConsumersTable
        , SQL
"WHERE last_activity +" SQL -> Interval -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> Int32 -> Interval
iminutes Int32
1 SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> SQL
"<= " SQL -> UTCTime -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> UTCTime
now
        , SQL
"  AND name =" SQL -> Text -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> RawSQL () -> Text
unRawSQL RawSQL ()
ccJobsTable
        , SQL
"FOR UPDATE SKIP LOCKED"
        ]
    (Identity Int64 -> Int64) -> DBT_ m m [Int64]
forall (m :: * -> *) row t.
(HasCallStack, MonadDB m, FromRow row) =>
(row -> t) -> m [t]
fetchMany (forall a. Identity a -> a
runIdentity @Int64) DBT_ m m [Int64]
-> ([Int64] -> DBT m (Int, [idx])) -> DBT m (Int, [idx])
forall a b. DBT_ m m a -> (a -> DBT_ m m b) -> DBT_ m m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      [] -> (Int, [idx]) -> DBT m (Int, [idx])
forall a. a -> DBT_ m m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int
0, [])
      [Int64]
inactive -> do
        -- Fetch all stuck jobs and run ccOnException on them to determine
        -- actions. This is necessary e.g. to be able to apply exponential
        -- backoff to them correctly.
        QueryName -> SQL -> DBT m ()
forall (m :: * -> *).
(HasCallStack, MonadDB m) =>
QueryName -> SQL -> m ()
runPreparedSQL_ (Text -> RawSQL () -> QueryName
preparedSqlName Text
"findStuck" RawSQL ()
ccJobsTable) (SQL -> DBT m ()) -> SQL -> DBT m ()
forall a b. (a -> b) -> a -> b
$
          [SQL] -> SQL
forall m. (IsString m, Monoid m) => [m] -> m
smconcat
            [ SQL
"SELECT" SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> SQL -> [SQL] -> SQL
forall m. Monoid m => m -> [m] -> m
mintercalate SQL
", " [SQL]
ccJobSelectors
            , SQL
"FROM" SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> RawSQL () -> SQL
raw RawSQL ()
ccJobsTable
            , SQL
"WHERE reserved_by = ANY(" SQL -> Array1 Int64 -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> [Int64] -> Array1 Int64
forall a. [a] -> Array1 a
Array1 [Int64]
inactive SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> SQL
")"
            , SQL
"FOR UPDATE SKIP LOCKED"
            ]
        [job]
stuckJobs <- (row -> job) -> DBT_ m m [job]
forall (m :: * -> *) row t.
(HasCallStack, MonadDB m, FromRow row) =>
(row -> t) -> m [t]
fetchMany row -> job
ccJobFetcher
        Bool -> DBT m () -> DBT m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([job] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [job]
stuckJobs) (DBT m () -> DBT m ()) -> DBT m () -> DBT m ()
forall a b. (a -> b) -> a -> b
$ do
          [(idx, Result)]
results <- [job]
-> (job -> DBT_ m m (idx, Result)) -> DBT_ m m [(idx, Result)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [job]
stuckJobs ((job -> DBT_ m m (idx, Result)) -> DBT_ m m [(idx, Result)])
-> (job -> DBT_ m m (idx, Result)) -> DBT_ m m [(idx, Result)]
forall a b. (a -> b) -> a -> b
$ \job
job -> do
            Action
action <- m Action -> DBT_ m m Action
forall (m :: * -> *) a. Monad m => m a -> DBT_ m m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m Action -> DBT_ m m Action) -> m Action -> DBT_ m m Action
forall a b. (a -> b) -> a -> b
$ SomeException -> job -> m Action
ccOnException (AsyncException -> SomeException
forall e. Exception e => e -> SomeException
toException AsyncException
ThreadKilled) job
job
            (idx, Result) -> DBT_ m m (idx, Result)
forall a. a -> DBT_ m m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (job -> idx
ccJobIndex job
job, Action -> Result
Failed Action
action)
          SQL -> DBT m ()
forall (m :: * -> *). (HasCallStack, MonadDB m) => SQL -> m ()
runSQL_ (SQL -> DBT m ()) -> SQL -> DBT m ()
forall a b. (a -> b) -> a -> b
$ RawSQL () -> [(idx, Result)] -> UTCTime -> SQL
forall idx.
(Show idx, ToSQL idx) =>
RawSQL () -> [(idx, Result)] -> UTCTime -> SQL
updateJobsQuery RawSQL ()
ccJobsTable [(idx, Result)]
results UTCTime
now
        QueryName -> SQL -> DBT m ()
forall (m :: * -> *).
(HasCallStack, MonadDB m) =>
QueryName -> SQL -> m ()
runPreparedSQL_ (Text -> RawSQL () -> QueryName
preparedSqlName Text
"removeInactive" RawSQL ()
ccConsumersTable) (SQL -> DBT m ()) -> SQL -> DBT m ()
forall a b. (a -> b) -> a -> b
$
          [SQL] -> SQL
forall m. (IsString m, Monoid m) => [m] -> m
smconcat
            [ SQL
"DELETE FROM" SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> RawSQL () -> SQL
raw RawSQL ()
ccConsumersTable
            , SQL
"WHERE id = ANY(" SQL -> Array1 Int64 -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> [Int64] -> Array1 Int64
forall a. [a] -> Array1 a
Array1 [Int64]
inactive SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> SQL
")"
            ]
        (Int, [idx]) -> DBT m (Int, [idx])
forall a. a -> DBT_ m m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([Int64] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Int64]
inactive, (job -> idx) -> [job] -> [idx]
forall a b. (a -> b) -> [a] -> [b]
map job -> idx
ccJobIndex [job]
stuckJobs)
  Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
inactiveConsumers Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
    Text -> Value -> m ()
forall (m :: * -> *) a. (MonadLog m, ToJSON a) => Text -> a -> m ()
logInfo Text
"Unregistered inactive consumers" (Value -> m ()) -> Value -> m ()
forall a b. (a -> b) -> a -> b
$
      [Pair] -> Value
object
        [ Key
"inactive_consumers" Key -> Int -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= Int
inactiveConsumers
        ]
  Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([idx] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [idx]
freedJobs) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
    Text -> Value -> m ()
forall (m :: * -> *) a. (MonadLog m, ToJSON a) => Text -> a -> m ()
logInfo Text
"Freed locked jobs" (Value -> m ()) -> Value -> m ()
forall a b. (a -> b) -> a -> b
$
      [Pair] -> Value
object
        [ Key
"freed_jobs" Key -> [String] -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= (idx -> String) -> [idx] -> [String]
forall a b. (a -> b) -> [a] -> [b]
map idx -> String
forall a. Show a => a -> String
show [idx]
freedJobs
        ]
  IO () -> m ()
forall α. IO α -> m α
forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase (IO () -> m ()) -> (Int -> IO ()) -> Int -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> IO ()
forall (m :: * -> *). MonadBase IO m => Int -> m ()
threadDelay (Int -> m ()) -> Int -> m ()
forall a b. (a -> b) -> a -> b
$ Int
30 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000000 -- wait 30 seconds

-- | Spawn a thread that reserves and processes jobs.
spawnDispatcher
  :: forall m idx job
   . ( MonadBaseControl IO m
     , MonadLog m
     , MonadMask m
     , MonadTime m
     , Show idx
     , ToSQL idx
     )
  => ConsumerConfig m idx job
  -> ConnectionSourceM m
  -> ConsumerID
  -> MVar ()
  -> TVar (M.Map ThreadId idx)
  -> TVar Int
  -> Maybe (TMVar Bool)
  -> m ThreadId
spawnDispatcher :: forall (m :: * -> *) idx job.
(MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m,
 Show idx, ToSQL idx) =>
ConsumerConfig m idx job
-> ConnectionSourceM m
-> ConsumerID
-> MVar ()
-> TVar (Map ThreadId idx)
-> TVar Int
-> Maybe (TMVar Bool)
-> m ThreadId
spawnDispatcher ConsumerConfig {Int
[SQL]
Maybe Channel
RawSQL ()
job -> idx
job -> m Result
job -> [Pair]
row -> job
SomeException -> job -> m Action
ccMaxRunningJobs :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> Int
ccOnException :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> SomeException -> job -> m Action
ccJobLogData :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> job -> [Pair]
ccNotificationChannel :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> Maybe Channel
ccNotificationTimeout :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> Int
ccJobsTable :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> RawSQL ()
ccConsumersTable :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> RawSQL ()
ccJobSelectors :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> [SQL]
ccJobFetcher :: ()
ccJobIndex :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> job -> idx
ccProcessJob :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> job -> m Result
ccJobsTable :: RawSQL ()
ccConsumersTable :: RawSQL ()
ccJobSelectors :: [SQL]
ccJobFetcher :: row -> job
ccJobIndex :: job -> idx
ccNotificationChannel :: Maybe Channel
ccNotificationTimeout :: Int
ccMaxRunningJobs :: Int
ccProcessJob :: job -> m Result
ccOnException :: SomeException -> job -> m Action
ccJobLogData :: job -> [Pair]
..} ConnectionSourceM m
cs ConsumerID
cid MVar ()
semaphore TVar (Map ThreadId idx)
runningJobsInfo TVar Int
runningJobs Maybe (TMVar Bool)
mIdleSignal =
  String -> m () -> m ThreadId
forall (m :: * -> *).
MonadBaseControl IO m =>
String -> m () -> m ThreadId
forkP String
"dispatcher" (m () -> m ThreadId) -> (m () -> m ()) -> m () -> m ThreadId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ThreadId) -> m () -> m ThreadId
forall a b. (a -> b) -> a -> b
$ do
    m () -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ MVar () -> m ()
forall (m :: * -> *) a. MonadBase IO m => MVar a -> m a
takeMVar MVar ()
semaphore
    Bool
someJobWasProcessed <- Int -> m Bool
loop Int
1
    if Bool
someJobWasProcessed
      then Bool -> m ()
forall (m' :: * -> *). MonadBaseControl IO m' => Bool -> m' ()
setIdle Bool
False
      else Bool -> m ()
forall (m' :: * -> *). MonadBaseControl IO m' => Bool -> m' ()
setIdle Bool
True
  where
    setIdle :: forall m'. MonadBaseControl IO m' => Bool -> m' ()
    setIdle :: forall (m' :: * -> *). MonadBaseControl IO m' => Bool -> m' ()
setIdle Bool
isIdle = case Maybe (TMVar Bool)
mIdleSignal of
      Maybe (TMVar Bool)
Nothing -> () -> m' ()
forall a. a -> m' a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
      Just TMVar Bool
idleSignal -> STM () -> m' ()
forall (m :: * -> *) a. MonadBase IO m => STM a -> m a
atomically (STM () -> m' ()) -> STM () -> m' ()
forall a b. (a -> b) -> a -> b
$ do
        Maybe Bool
_ <- TMVar Bool -> STM (Maybe Bool)
forall a. TMVar a -> STM (Maybe a)
tryTakeTMVar TMVar Bool
idleSignal
        TMVar Bool -> Bool -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar Bool
idleSignal Bool
isIdle

    loop :: Int -> m Bool
    loop :: Int -> m Bool
loop Int
limit = do
      ([job]
batch, Int
batchSize) <- Int -> m ([job], Int)
reserveJobs Int
limit
      Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
batchSize Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
        Text -> Value -> m ()
forall (m :: * -> *) a. (MonadLog m, ToJSON a) => Text -> a -> m ()
logInfo Text
"Processing batch" (Value -> m ()) -> Value -> m ()
forall a b. (a -> b) -> a -> b
$
          [Pair] -> Value
object
            [ Key
"batch_size" Key -> Int -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= Int
batchSize
            ]
        -- Update runningJobs before forking so that we can adjust
        -- maxBatchSize appropriately later. We also need to mask asynchronous
        -- exceptions here as we rely on correct value of runningJobs to
        -- perform graceful termination.
        ((forall a. m a -> m a) -> m ()) -> m ()
forall b. HasCallStack => ((forall a. m a -> m a) -> m b) -> m b
forall (m :: * -> *) b.
(MonadMask m, HasCallStack) =>
((forall a. m a -> m a) -> m b) -> m b
mask (((forall a. m a -> m a) -> m ()) -> m ())
-> ((forall a. m a -> m a) -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> m a
restore -> do
          STM () -> m ()
forall (m :: * -> *) a. MonadBase IO m => STM a -> m a
atomically (STM () -> m ()) -> STM () -> m ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
runningJobs (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
batchSize)
          let subtractJobs :: m ()
subtractJobs = STM () -> m ()
forall (m :: * -> *) a. MonadBase IO m => STM a -> m a
atomically (STM () -> m ()) -> STM () -> m ()
forall a b. (a -> b) -> a -> b
$ do
                TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
runningJobs (Int -> Int -> Int
forall a. Num a => a -> a -> a
subtract Int
batchSize)
          m ThreadId -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void
            (m ThreadId -> m ()) -> (m () -> m ThreadId) -> m () -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> m () -> m ThreadId
forall (m :: * -> *).
MonadBaseControl IO m =>
String -> m () -> m ThreadId
forkP String
"batch processor"
            (m () -> m ThreadId) -> (m () -> m ()) -> m () -> m ThreadId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (m () -> m () -> m ()
forall (m :: * -> *) a b.
(HasCallStack, MonadMask m) =>
m a -> m b -> m a
`finally` m ()
subtractJobs)
            (m () -> m ()) -> (m () -> m ()) -> m () -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m () -> m ()
forall a. m a -> m a
restore
            (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
              (job -> m (job, m (Result Result)))
-> [job] -> m [(job, m (Result Result))]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM job -> m (job, m (Result Result))
startJob [job]
batch m [(job, m (Result Result))]
-> ([(job, m (Result Result))] -> m [(idx, Result)])
-> m [(idx, Result)]
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ((job, m (Result Result)) -> m (idx, Result))
-> [(job, m (Result Result))] -> m [(idx, Result)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (job, m (Result Result)) -> m (idx, Result)
joinJob m [(idx, Result)] -> ([(idx, Result)] -> m ()) -> m ()
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= [(idx, Result)] -> m ()
updateJobs

        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
batchSize Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
limit) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
          Int
maxBatchSize <- STM Int -> m Int
forall (m :: * -> *) a. MonadBase IO m => STM a -> m a
atomically (STM Int -> m Int) -> STM Int -> m Int
forall a b. (a -> b) -> a -> b
$ do
            Int
jobs <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
runningJobs
            Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
jobs Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
ccMaxRunningJobs) STM ()
forall a. STM a
retry
            Int -> STM Int
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> STM Int) -> Int -> STM Int
forall a b. (a -> b) -> a -> b
$ Int
ccMaxRunningJobs Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
jobs
          m Bool -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Bool -> m ()) -> (Int -> m Bool) -> Int -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> m Bool
loop (Int -> m ()) -> Int -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> Int -> Int
forall a. Ord a => a -> a -> a
min Int
maxBatchSize (Int
2 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
limit)

      Bool -> m Bool
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int
batchSize Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0)

    reserveJobs :: Int -> m ([job], Int)
    reserveJobs :: Int -> m ([job], Int)
reserveJobs Int
limit = ConnectionSourceM m
-> TransactionSettings -> DBT m ([job], Int) -> m ([job], Int)
forall (m :: * -> *) a.
(HasCallStack, MonadBase IO m, MonadMask m) =>
ConnectionSourceM m -> TransactionSettings -> DBT m a -> m a
runDBT ConnectionSourceM m
cs TransactionSettings
ts (DBT m ([job], Int) -> m ([job], Int))
-> DBT m ([job], Int) -> m ([job], Int)
forall a b. (a -> b) -> a -> b
$ do
      UTCTime
now <- DBT_ m m UTCTime
forall (m :: * -> *). MonadTime m => m UTCTime
currentTime
      Int
n <-
        QueryName -> SQL -> DBT_ m m Int
forall (m :: * -> *).
(HasCallStack, MonadDB m) =>
QueryName -> SQL -> m Int
runPreparedSQL (Text -> RawSQL () -> QueryName
preparedSqlName Text
"setReservation" RawSQL ()
ccJobsTable) (SQL -> DBT_ m m Int) -> SQL -> DBT_ m m Int
forall a b. (a -> b) -> a -> b
$
          [SQL] -> SQL
forall m. (IsString m, Monoid m) => [m] -> m
smconcat
            [ SQL
"UPDATE" SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> RawSQL () -> SQL
raw RawSQL ()
ccJobsTable SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> SQL
"SET"
            , SQL
"  reserved_by =" SQL -> ConsumerID -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> ConsumerID
cid
            , SQL
", attempts = CASE"
            , SQL
"    WHEN finished_at IS NULL THEN attempts + 1"
            , SQL
"    ELSE 1"
            , SQL
"  END"
            , SQL
"WHERE id IN (" SQL -> SQL -> SQL
forall a. Semigroup a => a -> a -> a
<> UTCTime -> SQL
reservedJobs UTCTime
now SQL -> SQL -> SQL
forall a. Semigroup a => a -> a -> a
<> SQL
")"
            , SQL
"RETURNING" SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> SQL -> [SQL] -> SQL
forall m. Monoid m => m -> [m] -> m
mintercalate SQL
", " [SQL]
ccJobSelectors
            ]
      -- Decode lazily as we want the transaction to be as short as possible.
      (,Int
n) ([job] -> ([job], Int))
-> (QueryResult row -> [job]) -> QueryResult row -> ([job], Int)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. QueryResult job -> [job]
forall a. QueryResult a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
F.toList (QueryResult job -> [job])
-> (QueryResult row -> QueryResult job) -> QueryResult row -> [job]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (row -> job) -> QueryResult row -> QueryResult job
forall a b. (a -> b) -> QueryResult a -> QueryResult b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap row -> job
ccJobFetcher (QueryResult row -> ([job], Int))
-> DBT_ m m (QueryResult row) -> DBT m ([job], Int)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> DBT_ m m (QueryResult row)
forall (m :: * -> *) row.
(HasCallStack, MonadDB m, MonadThrow m, FromRow row) =>
m (QueryResult row)
queryResult
      where
        reservedJobs :: UTCTime -> SQL
        reservedJobs :: UTCTime -> SQL
reservedJobs UTCTime
now =
          [SQL] -> SQL
forall m. (IsString m, Monoid m) => [m] -> m
smconcat
            [ SQL
"SELECT id FROM" SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> RawSQL () -> SQL
raw RawSQL ()
ccJobsTable
            , SQL
"WHERE"
            , SQL
"       reserved_by IS NULL"
            , SQL
"       AND run_at IS NOT NULL"
            , SQL
"       AND run_at <= " SQL -> UTCTime -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> UTCTime
now
            , SQL
"       ORDER BY run_at"
            , SQL
"LIMIT" SQL -> Int -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> Int
limit
            , SQL
"FOR UPDATE SKIP LOCKED"
            ]

    -- Spawn each job in a separate thread.
    startJob :: job -> m (job, m (T.Result Result))
    startJob :: job -> m (job, m (Result Result))
startJob job
job = do
      (ThreadId
_, m (Result Result)
joinFork) <- ((forall a. m a -> m a) -> m (ThreadId, m (Result Result)))
-> m (ThreadId, m (Result Result))
forall b. HasCallStack => ((forall a. m a -> m a) -> m b) -> m b
forall (m :: * -> *) b.
(MonadMask m, HasCallStack) =>
((forall a. m a -> m a) -> m b) -> m b
mask (((forall a. m a -> m a) -> m (ThreadId, m (Result Result)))
 -> m (ThreadId, m (Result Result)))
-> ((forall a. m a -> m a) -> m (ThreadId, m (Result Result)))
-> m (ThreadId, m (Result Result))
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> m a
restore -> m Result -> m (ThreadId, m (Result Result))
forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> m (ThreadId, m (Result a))
T.fork (m Result -> m (ThreadId, m (Result Result)))
-> m Result -> m (ThreadId, m (Result Result))
forall a b. (a -> b) -> a -> b
$ do
        ThreadId
tid <- m ThreadId
forall (m :: * -> *). MonadBase IO m => m ThreadId
myThreadId
        m () -> m () -> m Result -> m Result
forall (m :: * -> *) a c b.
(HasCallStack, MonadMask m) =>
m a -> m c -> m b -> m b
bracket_ (ThreadId -> m ()
registerJob ThreadId
tid) (ThreadId -> m ()
unregisterJob ThreadId
tid) (m Result -> m Result)
-> (m Result -> m Result) -> m Result -> m Result
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m Result -> m Result
forall a. m a -> m a
restore (m Result -> m Result) -> m Result -> m Result
forall a b. (a -> b) -> a -> b
$ do
          [Pair] -> m Result -> m Result
forall a. [Pair] -> m a -> m a
forall (m :: * -> *) a. MonadLog m => [Pair] -> m a -> m a
localData (job -> [Pair]
ccJobLogData job
job) (m Result -> m Result) -> m Result -> m Result
forall a b. (a -> b) -> a -> b
$ job -> m Result
ccProcessJob job
job
      (job, m (Result Result)) -> m (job, m (Result Result))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (job
job, m (Result Result)
joinFork)
      where
        registerJob :: ThreadId -> m ()
registerJob ThreadId
tid = STM () -> m ()
forall (m :: * -> *) a. MonadBase IO m => STM a -> m a
atomically (STM () -> m ()) -> STM () -> m ()
forall a b. (a -> b) -> a -> b
$ do
          TVar (Map ThreadId idx)
-> (Map ThreadId idx -> Map ThreadId idx) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Map ThreadId idx)
runningJobsInfo ((Map ThreadId idx -> Map ThreadId idx) -> STM ())
-> (idx -> Map ThreadId idx -> Map ThreadId idx) -> idx -> STM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ThreadId -> idx -> Map ThreadId idx -> Map ThreadId idx
forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert ThreadId
tid (idx -> STM ()) -> idx -> STM ()
forall a b. (a -> b) -> a -> b
$ job -> idx
ccJobIndex job
job
        unregisterJob :: ThreadId -> m ()
unregisterJob ThreadId
tid = STM () -> m ()
forall (m :: * -> *) a. MonadBase IO m => STM a -> m a
atomically (STM () -> m ()) -> STM () -> m ()
forall a b. (a -> b) -> a -> b
$ do
          TVar (Map ThreadId idx)
-> (Map ThreadId idx -> Map ThreadId idx) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Map ThreadId idx)
runningJobsInfo ((Map ThreadId idx -> Map ThreadId idx) -> STM ())
-> (Map ThreadId idx -> Map ThreadId idx) -> STM ()
forall a b. (a -> b) -> a -> b
$ ThreadId -> Map ThreadId idx -> Map ThreadId idx
forall k a. Ord k => k -> Map k a -> Map k a
M.delete ThreadId
tid

    -- Wait for all the jobs and collect their results.
    joinJob :: (job, m (T.Result Result)) -> m (idx, Result)
    joinJob :: (job, m (Result Result)) -> m (idx, Result)
joinJob (job
job, m (Result Result)
joinFork) =
      m (Result Result)
joinFork m (Result Result)
-> (Result Result -> m (idx, Result)) -> m (idx, Result)
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        Right Result
result -> (idx, Result) -> m (idx, Result)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (job -> idx
ccJobIndex job
job, Result
result)
        Left SomeException
ex -> do
          Action
action <- SomeException -> job -> m Action
ccOnException SomeException
ex job
job
          (idx, Result) -> m (idx, Result)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (job -> idx
ccJobIndex job
job, Action -> Result
Failed Action
action)

    -- Update status of the jobs.
    updateJobs :: [(idx, Result)] -> m ()
    updateJobs :: [(idx, Result)] -> m ()
updateJobs [(idx, Result)]
results = ConnectionSourceM m -> TransactionSettings -> DBT m () -> m ()
forall (m :: * -> *) a.
(HasCallStack, MonadBase IO m, MonadMask m) =>
ConnectionSourceM m -> TransactionSettings -> DBT m a -> m a
runDBT ConnectionSourceM m
cs TransactionSettings
ts (DBT m () -> m ()) -> DBT m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
      UTCTime
now <- DBT_ m m UTCTime
forall (m :: * -> *). MonadTime m => m UTCTime
currentTime
      SQL -> DBT m ()
forall (m :: * -> *). (HasCallStack, MonadDB m) => SQL -> m ()
runSQL_ (SQL -> DBT m ()) -> SQL -> DBT m ()
forall a b. (a -> b) -> a -> b
$ RawSQL () -> [(idx, Result)] -> UTCTime -> SQL
forall idx.
(Show idx, ToSQL idx) =>
RawSQL () -> [(idx, Result)] -> UTCTime -> SQL
updateJobsQuery RawSQL ()
ccJobsTable [(idx, Result)]
results UTCTime
now

----------------------------------------

-- | Generate a single SQL query for updating all given jobs.
--
-- /Note:/ this query can't be run as prepared because it has a variable number
-- of query parameters (see retryToSQL helper).
updateJobsQuery
  :: (Show idx, ToSQL idx)
  => RawSQL ()
  -> [(idx, Result)]
  -> UTCTime
  -> SQL
updateJobsQuery :: forall idx.
(Show idx, ToSQL idx) =>
RawSQL () -> [(idx, Result)] -> UTCTime -> SQL
updateJobsQuery RawSQL ()
jobsTable [(idx, Result)]
results UTCTime
now =
  [SQL] -> SQL
forall m. (IsString m, Monoid m) => [m] -> m
smconcat
    [ SQL
"WITH removed AS ("
    , SQL
"  DELETE FROM" SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> RawSQL () -> SQL
raw RawSQL ()
jobsTable
    , SQL
"  WHERE id = ANY(" SQL -> Array1 idx -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> [idx] -> Array1 idx
forall a. [a] -> Array1 a
Array1 [idx]
deletes SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> SQL
")"
    , SQL
")"
    , SQL
"UPDATE" SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> RawSQL () -> SQL
raw RawSQL ()
jobsTable SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> SQL
"SET"
    , SQL
"  reserved_by = NULL"
    , SQL
", run_at = CASE"
    , SQL
"    WHEN FALSE THEN run_at"
    , [SQL] -> SQL
forall m. (IsString m, Monoid m) => [m] -> m
smconcat ([SQL] -> SQL) -> [SQL] -> SQL
forall a b. (a -> b) -> a -> b
$ (Either Interval UTCTime -> [idx] -> [SQL] -> [SQL])
-> [SQL] -> Map (Either Interval UTCTime) [idx] -> [SQL]
forall k a b. (k -> a -> b -> b) -> b -> Map k a -> b
M.foldrWithKey Either Interval UTCTime -> [idx] -> [SQL] -> [SQL]
retryToSQL [] Map (Either Interval UTCTime) [idx]
retries
    , SQL
"    ELSE NULL" -- processed
    , SQL
"  END"
    , SQL
", finished_at = CASE"
    , SQL
"    WHEN id = ANY(" SQL -> Array1 idx -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> [idx] -> Array1 idx
forall a. [a] -> Array1 a
Array1 [idx]
successes SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> SQL
") THEN " SQL -> UTCTime -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> UTCTime
now
    , SQL
"    ELSE NULL"
    , SQL
"  END"
    , SQL
"WHERE id = ANY(" SQL -> Array1 idx -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> [idx] -> Array1 idx
forall a. [a] -> Array1 a
Array1 (((idx, Result) -> idx) -> [(idx, Result)] -> [idx]
forall a b. (a -> b) -> [a] -> [b]
map (idx, Result) -> idx
forall a b. (a, b) -> a
fst [(idx, Result)]
updates) SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> SQL
")"
    ]
  where
    retryToSQL :: Either Interval UTCTime -> [idx] -> [SQL] -> [SQL]
retryToSQL (Left Interval
int) [idx]
ids =
      (SQL
"WHEN id = ANY(" SQL -> Array1 idx -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> [idx] -> Array1 idx
forall a. [a] -> Array1 a
Array1 [idx]
ids SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> SQL
") THEN " SQL -> UTCTime -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> UTCTime
now SQL -> SQL -> SQL
forall a. Semigroup a => a -> a -> a
<> SQL
" +" SQL -> Interval -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> Interval
int SQL -> [SQL] -> [SQL]
forall a. a -> [a] -> [a]
:)
    retryToSQL (Right UTCTime
time) [idx]
ids =
      (SQL
"WHEN id = ANY(" SQL -> Array1 idx -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> [idx] -> Array1 idx
forall a. [a] -> Array1 a
Array1 [idx]
ids SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> SQL
") THEN" SQL -> UTCTime -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> UTCTime
time SQL -> [SQL] -> [SQL]
forall a. a -> [a] -> [a]
:)

    retries :: Map (Either Interval UTCTime) [idx]
retries = ((idx, Result)
 -> Map (Either Interval UTCTime) [idx]
 -> Map (Either Interval UTCTime) [idx])
-> Map (Either Interval UTCTime) [idx]
-> [(idx, Result)]
-> Map (Either Interval UTCTime) [idx]
forall a b. (a -> b -> b) -> b -> [a] -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr ((idx, Action)
-> Map (Either Interval UTCTime) [idx]
-> Map (Either Interval UTCTime) [idx]
forall {a}.
(a, Action)
-> Map (Either Interval UTCTime) [a]
-> Map (Either Interval UTCTime) [a]
step ((idx, Action)
 -> Map (Either Interval UTCTime) [idx]
 -> Map (Either Interval UTCTime) [idx])
-> ((idx, Result) -> (idx, Action))
-> (idx, Result)
-> Map (Either Interval UTCTime) [idx]
-> Map (Either Interval UTCTime) [idx]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (idx, Result) -> (idx, Action)
forall {a}. (a, Result) -> (a, Action)
getAction) Map (Either Interval UTCTime) [idx]
forall k a. Map k a
M.empty [(idx, Result)]
updates
      where
        getAction :: (a, Result) -> (a, Action)
getAction (a
idx, Result
result) = case Result
result of
          Ok Action
action -> (a
idx, Action
action)
          Failed Action
action -> (a
idx, Action
action)

        step :: (a, Action)
-> Map (Either Interval UTCTime) [a]
-> Map (Either Interval UTCTime) [a]
step (a
idx, Action
action) Map (Either Interval UTCTime) [a]
iretries = case Action
action of
          Action
MarkProcessed -> Map (Either Interval UTCTime) [a]
iretries
          RerunAfter Interval
int -> ([a] -> [a] -> [a])
-> Either Interval UTCTime
-> [a]
-> Map (Either Interval UTCTime) [a]
-> Map (Either Interval UTCTime) [a]
forall k a. Ord k => (a -> a -> a) -> k -> a -> Map k a -> Map k a
M.insertWith [a] -> [a] -> [a]
forall a. [a] -> [a] -> [a]
(++) (Interval -> Either Interval UTCTime
forall a b. a -> Either a b
Left Interval
int) [a
idx] Map (Either Interval UTCTime) [a]
iretries
          RerunAt UTCTime
time -> ([a] -> [a] -> [a])
-> Either Interval UTCTime
-> [a]
-> Map (Either Interval UTCTime) [a]
-> Map (Either Interval UTCTime) [a]
forall k a. Ord k => (a -> a -> a) -> k -> a -> Map k a -> Map k a
M.insertWith [a] -> [a] -> [a]
forall a. [a] -> [a] -> [a]
(++) (UTCTime -> Either Interval UTCTime
forall a b. b -> Either a b
Right UTCTime
time) [a
idx] Map (Either Interval UTCTime) [a]
iretries
          Action
Remove -> String -> Map (Either Interval UTCTime) [a]
forall a. HasCallStack => String -> a
error String
"updateJobs: Remove should've been filtered out"

    successes :: [idx]
successes = ((idx, Result) -> [idx] -> [idx])
-> [idx] -> [(idx, Result)] -> [idx]
forall a b. (a -> b -> b) -> b -> [a] -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr (idx, Result) -> [idx] -> [idx]
forall {a}. (a, Result) -> [a] -> [a]
step [] [(idx, Result)]
updates
      where
        step :: (a, Result) -> [a] -> [a]
step (a
idx, Ok Action
_) [a]
acc = a
idx a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
acc
        step (a
_, Failed Action
_) [a]
acc = [a]
acc

    ([idx]
deletes, [(idx, Result)]
updates) = ((idx, Result)
 -> ([idx], [(idx, Result)]) -> ([idx], [(idx, Result)]))
-> ([idx], [(idx, Result)])
-> [(idx, Result)]
-> ([idx], [(idx, Result)])
forall a b. (a -> b -> b) -> b -> [a] -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr (idx, Result)
-> ([idx], [(idx, Result)]) -> ([idx], [(idx, Result)])
forall {a}.
(a, Result) -> ([a], [(a, Result)]) -> ([a], [(a, Result)])
step ([], []) [(idx, Result)]
results
      where
        step :: (a, Result) -> ([a], [(a, Result)]) -> ([a], [(a, Result)])
step job :: (a, Result)
job@(a
idx, Result
result) ([a]
ideletes, [(a, Result)]
iupdates) = case Result
result of
          Ok Action
Remove -> (a
idx a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
ideletes, [(a, Result)]
iupdates)
          Failed Action
Remove -> (a
idx a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
ideletes, [(a, Result)]
iupdates)
          Result
_ -> ([a]
ideletes, (a, Result)
job (a, Result) -> [(a, Result)] -> [(a, Result)]
forall a. a -> [a] -> [a]
: [(a, Result)]
iupdates)

ts :: TransactionSettings
ts :: TransactionSettings
ts =
  TransactionSettings
defaultTransactionSettings
    { -- PostgreSQL doesn't seem to handle very high amount of concurrent
      -- transactions that modify multiple rows in the same table well (see
      -- updateJobs) and sometimes (very rarely though) ends up in a
      -- deadlock. It doesn't matter much though, we just restart the
      -- transaction in such case.
      tsRestartPredicate = Just . RestartPredicate $
        \DetailedQueryError
e Integer
_ ->
          DetailedQueryError -> ErrorCode
qeErrorCode DetailedQueryError
e ErrorCode -> ErrorCode -> Bool
forall a. Eq a => a -> a -> Bool
== ErrorCode
DeadlockDetected
            Bool -> Bool -> Bool
|| DetailedQueryError -> ErrorCode
qeErrorCode DetailedQueryError
e ErrorCode -> ErrorCode -> Bool
forall a. Eq a => a -> a -> Bool
== ErrorCode
SerializationFailure
    }

atomically :: MonadBase IO m => STM a -> m a
atomically :: forall (m :: * -> *) a. MonadBase IO m => STM a -> m a
atomically = IO a -> m a
forall α. IO α -> m α
forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase (IO a -> m a) -> (STM a -> IO a) -> STM a -> m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM a -> IO a
forall a. STM a -> IO a
STM.atomically