module Database.PostgreSQL.Consumers.Components
( runConsumer
, runConsumerWithIdleSignal
, spawnListener
, spawnMonitor
, spawnDispatcher
) where
import Control.Concurrent.Lifted
import Control.Concurrent.STM hiding (atomically)
import Control.Concurrent.STM qualified as STM
import Control.Concurrent.Thread.Lifted qualified as T
import Control.Exception (AsyncException (ThreadKilled))
import Control.Exception.Safe qualified as ES
import Control.Monad
import Control.Monad.Base
import Control.Monad.Catch
import Control.Monad.Time
import Control.Monad.Trans
import Control.Monad.Trans.Control
import Data.Foldable qualified as F
import Data.Function
import Data.Int
import Data.Map.Strict qualified as M
import Data.Monoid.Utils
import Database.PostgreSQL.Consumers.Config
import Database.PostgreSQL.Consumers.Consumer
import Database.PostgreSQL.Consumers.Utils
import Database.PostgreSQL.PQTypes
import Log
runConsumer
:: ( MonadBaseControl IO m
, MonadLog m
, MonadMask m
, MonadTime m
, Eq idx
, Show idx
, FromSQL idx
, ToSQL idx
)
=> ConsumerConfig m idx job
-> ConnectionSourceM m
-> m (m ())
runConsumer :: forall (m :: * -> *) idx job.
(MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m,
Eq idx, Show idx, FromSQL idx, ToSQL idx) =>
ConsumerConfig m idx job -> ConnectionSourceM m -> m (m ())
runConsumer ConsumerConfig m idx job
cc ConnectionSourceM m
cs = ConsumerConfig m idx job
-> ConnectionSourceM m -> Maybe (TMVar Bool) -> m (m ())
forall (m :: * -> *) idx job.
(MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m,
Eq idx, Show idx, FromSQL idx, ToSQL idx) =>
ConsumerConfig m idx job
-> ConnectionSourceM m -> Maybe (TMVar Bool) -> m (m ())
runConsumerWithMaybeIdleSignal ConsumerConfig m idx job
cc ConnectionSourceM m
cs Maybe (TMVar Bool)
forall a. Maybe a
Nothing
runConsumerWithIdleSignal
:: ( MonadBaseControl IO m
, MonadLog m
, MonadMask m
, MonadTime m
, Eq idx
, Show idx
, FromSQL idx
, ToSQL idx
)
=> ConsumerConfig m idx job
-> ConnectionSourceM m
-> TMVar Bool
-> m (m ())
runConsumerWithIdleSignal :: forall (m :: * -> *) idx job.
(MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m,
Eq idx, Show idx, FromSQL idx, ToSQL idx) =>
ConsumerConfig m idx job
-> ConnectionSourceM m -> TMVar Bool -> m (m ())
runConsumerWithIdleSignal ConsumerConfig m idx job
cc ConnectionSourceM m
cs TMVar Bool
idleSignal = ConsumerConfig m idx job
-> ConnectionSourceM m -> Maybe (TMVar Bool) -> m (m ())
forall (m :: * -> *) idx job.
(MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m,
Eq idx, Show idx, FromSQL idx, ToSQL idx) =>
ConsumerConfig m idx job
-> ConnectionSourceM m -> Maybe (TMVar Bool) -> m (m ())
runConsumerWithMaybeIdleSignal ConsumerConfig m idx job
cc ConnectionSourceM m
cs (TMVar Bool -> Maybe (TMVar Bool)
forall a. a -> Maybe a
Just TMVar Bool
idleSignal)
runConsumerWithMaybeIdleSignal
:: ( MonadBaseControl IO m
, MonadLog m
, MonadMask m
, MonadTime m
, Eq idx
, Show idx
, FromSQL idx
, ToSQL idx
)
=> ConsumerConfig m idx job
-> ConnectionSourceM m
-> Maybe (TMVar Bool)
-> m (m ())
runConsumerWithMaybeIdleSignal :: forall (m :: * -> *) idx job.
(MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m,
Eq idx, Show idx, FromSQL idx, ToSQL idx) =>
ConsumerConfig m idx job
-> ConnectionSourceM m -> Maybe (TMVar Bool) -> m (m ())
runConsumerWithMaybeIdleSignal ConsumerConfig m idx job
cc0 ConnectionSourceM m
cs Maybe (TMVar Bool)
mIdleSignal
| ConsumerConfig m idx job -> Int
forall (m :: * -> *) idx job. ConsumerConfig m idx job -> Int
ccMaxRunningJobs ConsumerConfig m idx job
cc Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
1 = do
Text -> m ()
forall (m :: * -> *). MonadLog m => Text -> m ()
logInfo_ Text
"ccMaxRunningJobs < 1, not starting the consumer"
m () -> m (m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (m () -> m (m ())) -> m () -> m (m ())
forall a b. (a -> b) -> a -> b
$ () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
| Bool
otherwise = do
MVar ()
semaphore <- () -> m (MVar ())
forall (m :: * -> *) a. MonadBase IO m => a -> m (MVar a)
newMVar ()
TVar (Map ThreadId idx)
runningJobsInfo <- IO (TVar (Map ThreadId idx)) -> m (TVar (Map ThreadId idx))
forall α. IO α -> m α
forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase (IO (TVar (Map ThreadId idx)) -> m (TVar (Map ThreadId idx)))
-> IO (TVar (Map ThreadId idx)) -> m (TVar (Map ThreadId idx))
forall a b. (a -> b) -> a -> b
$ Map ThreadId idx -> IO (TVar (Map ThreadId idx))
forall a. a -> IO (TVar a)
newTVarIO Map ThreadId idx
forall k a. Map k a
M.empty
TVar Int
runningJobs <- IO (TVar Int) -> m (TVar Int)
forall α. IO α -> m α
forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase (IO (TVar Int) -> m (TVar Int)) -> IO (TVar Int) -> m (TVar Int)
forall a b. (a -> b) -> a -> b
$ Int -> IO (TVar Int)
forall a. a -> IO (TVar a)
newTVarIO Int
0
Either DBException ()
skipLockedTest :: Either DBException () <-
m () -> m (Either DBException ())
forall (m :: * -> *) e a.
(HasCallStack, MonadCatch m, Exception e) =>
m a -> m (Either e a)
try (m () -> m (Either DBException ()))
-> (DBT m () -> m ()) -> DBT m () -> m (Either DBException ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConnectionSourceM m -> TransactionSettings -> DBT m () -> m ()
forall (m :: * -> *) a.
(HasCallStack, MonadBase IO m, MonadMask m) =>
ConnectionSourceM m -> TransactionSettings -> DBT m a -> m a
runDBT ConnectionSourceM m
cs TransactionSettings
defaultTransactionSettings (DBT m () -> m (Either DBException ()))
-> DBT m () -> m (Either DBException ())
forall a b. (a -> b) -> a -> b
$
SQL -> DBT m ()
forall (m :: * -> *). (HasCallStack, MonadDB m) => SQL -> m ()
runSQL_ SQL
"SELECT TRUE FOR UPDATE SKIP LOCKED"
(DBException -> m ())
-> (() -> m ()) -> Either DBException () -> m ()
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (m () -> DBException -> m ()
forall a b. a -> b -> a
const (m () -> DBException -> m ()) -> m () -> DBException -> m ()
forall a b. (a -> b) -> a -> b
$ String -> m ()
forall a. HasCallStack => String -> a
error String
"PostgreSQL version with support for SKIP LOCKED is required") () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Either DBException ()
skipLockedTest
ConsumerID
cid <- ConsumerConfig m idx job -> ConnectionSourceM m -> m ConsumerID
forall (m :: * -> *) (n :: * -> *) idx job.
(MonadBase IO m, MonadMask m, MonadTime m) =>
ConsumerConfig n idx job -> ConnectionSourceM m -> m ConsumerID
registerConsumer ConsumerConfig m idx job
cc ConnectionSourceM m
cs
[Pair] -> m (m ()) -> m (m ())
forall a. [Pair] -> m a -> m a
forall (m :: * -> *) a. MonadLog m => [Pair] -> m a -> m a
localData [Key
"consumer_id" Key -> String -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= ConsumerID -> String
forall a. Show a => a -> String
show ConsumerID
cid] (m (m ()) -> m (m ())) -> m (m ()) -> m (m ())
forall a b. (a -> b) -> a -> b
$ do
ThreadId
listener <- ConsumerConfig m idx job
-> ConnectionSourceM m -> MVar () -> m ThreadId
forall (m :: * -> *) idx job.
(MonadBaseControl IO m, MonadMask m) =>
ConsumerConfig m idx job
-> ConnectionSourceM m -> MVar () -> m ThreadId
spawnListener ConsumerConfig m idx job
cc ConnectionSourceM m
cs MVar ()
semaphore
ThreadId
monitor <- Text -> m ThreadId -> m ThreadId
forall a. Text -> m a -> m a
forall (m :: * -> *) a. MonadLog m => Text -> m a -> m a
localDomain Text
"monitor" (m ThreadId -> m ThreadId) -> m ThreadId -> m ThreadId
forall a b. (a -> b) -> a -> b
$ ConsumerConfig m idx job
-> ConnectionSourceM m -> ConsumerID -> m ThreadId
forall (m :: * -> *) idx job.
(MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m,
Show idx, FromSQL idx, ToSQL idx) =>
ConsumerConfig m idx job
-> ConnectionSourceM m -> ConsumerID -> m ThreadId
spawnMonitor ConsumerConfig m idx job
cc ConnectionSourceM m
cs ConsumerID
cid
ThreadId
dispatcher <-
Text -> m ThreadId -> m ThreadId
forall a. Text -> m a -> m a
forall (m :: * -> *) a. MonadLog m => Text -> m a -> m a
localDomain Text
"dispatcher" (m ThreadId -> m ThreadId) -> m ThreadId -> m ThreadId
forall a b. (a -> b) -> a -> b
$
ConsumerConfig m idx job
-> ConnectionSourceM m
-> ConsumerID
-> MVar ()
-> TVar (Map ThreadId idx)
-> TVar Int
-> Maybe (TMVar Bool)
-> m ThreadId
forall (m :: * -> *) idx job.
(MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m,
Show idx, ToSQL idx) =>
ConsumerConfig m idx job
-> ConnectionSourceM m
-> ConsumerID
-> MVar ()
-> TVar (Map ThreadId idx)
-> TVar Int
-> Maybe (TMVar Bool)
-> m ThreadId
spawnDispatcher
ConsumerConfig m idx job
cc
ConnectionSourceM m
cs
ConsumerID
cid
MVar ()
semaphore
TVar (Map ThreadId idx)
runningJobsInfo
TVar Int
runningJobs
Maybe (TMVar Bool)
mIdleSignal
m () -> m (m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (m () -> m (m ())) -> (m () -> m ()) -> m () -> m (m ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> m () -> m ()
forall a. Text -> m a -> m a
forall (m :: * -> *) a. MonadLog m => Text -> m a -> m a
localDomain Text
"finalizer" (m () -> m (m ())) -> m () -> m (m ())
forall a b. (a -> b) -> a -> b
$ do
ThreadId -> m ()
forall (m :: * -> *). MonadBase IO m => ThreadId -> m ()
stopExecution ThreadId
listener
ThreadId -> m ()
forall (m :: * -> *). MonadBase IO m => ThreadId -> m ()
stopExecution ThreadId
dispatcher
TVar (Map ThreadId idx) -> TVar Int -> m ()
forall {m :: * -> *} {a} {k} {a}.
(MonadLog m, MonadBase IO m, Num a, Eq a, Eq k, Eq a, Show a) =>
TVar (Map k a) -> TVar a -> m ()
waitForRunningJobs TVar (Map ThreadId idx)
runningJobsInfo TVar Int
runningJobs
ThreadId -> m ()
forall (m :: * -> *). MonadBase IO m => ThreadId -> m ()
stopExecution ThreadId
monitor
ConsumerConfig m idx job
-> ConnectionSourceM m -> ConsumerID -> m ()
forall (m :: * -> *) (n :: * -> *) idx job.
(MonadBase IO m, MonadMask m) =>
ConsumerConfig n idx job
-> ConnectionSourceM m -> ConsumerID -> m ()
unregisterConsumer ConsumerConfig m idx job
cc ConnectionSourceM m
cs ConsumerID
cid
where
cc :: ConsumerConfig m idx job
cc =
ConsumerConfig m idx job
cc0
{ ccOnException = \SomeException
ex job
job -> [Pair] -> m Action -> m Action
forall a. [Pair] -> m a -> m a
forall (m :: * -> *) a. MonadLog m => [Pair] -> m a -> m a
localData (ConsumerConfig m idx job -> job -> [Pair]
forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> job -> [Pair]
ccJobLogData ConsumerConfig m idx job
cc0 job
job) (m Action -> m Action) -> m Action -> m Action
forall a b. (a -> b) -> a -> b
$ do
let doOnException :: m Action
doOnException = do
Action
action <- ConsumerConfig m idx job -> SomeException -> job -> m Action
forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> SomeException -> job -> m Action
ccOnException ConsumerConfig m idx job
cc0 SomeException
ex job
job
Text -> Value -> m ()
forall (m :: * -> *) a. (MonadLog m, ToJSON a) => Text -> a -> m ()
logInfo Text
"Unexpected exception caught while processing job" (Value -> m ()) -> Value -> m ()
forall a b. (a -> b) -> a -> b
$
[Pair] -> Value
object
[ Key
"exception" Key -> String -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= SomeException -> String
forall a. Show a => a -> String
show SomeException
ex
, Key
"action" Key -> String -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= Action -> String
forall a. Show a => a -> String
show Action
action
]
Action -> m Action
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Action
action
m Action
doOnException m Action -> (SomeException -> m Action) -> m Action
forall (m :: * -> *) a.
(HasCallStack, MonadCatch m) =>
m a -> (SomeException -> m a) -> m a
`ES.catchAny` \SomeException
handlerEx -> do
let action :: Action
action = Interval -> Action
RerunAfter (Interval -> Action) -> Interval -> Action
forall a b. (a -> b) -> a -> b
$ Int32 -> Interval
idays Int32
1
Text -> Value -> m ()
forall (m :: * -> *) a. (MonadLog m, ToJSON a) => Text -> a -> m ()
logAttention Text
"ccOnException threw an exception" (Value -> m ()) -> Value -> m ()
forall a b. (a -> b) -> a -> b
$
[Pair] -> Value
object
[ Key
"exception" Key -> String -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= SomeException -> String
forall a. Show a => a -> String
show SomeException
handlerEx
, Key
"action" Key -> String -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= Action -> String
forall a. Show a => a -> String
show Action
action
]
Action -> m Action
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Action
action
}
waitForRunningJobs :: TVar (Map k a) -> TVar a -> m ()
waitForRunningJobs TVar (Map k a)
runningJobsInfo TVar a
runningJobs = do
Map k a
initialJobs <- IO (Map k a) -> m (Map k a)
forall α. IO α -> m α
forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase (IO (Map k a) -> m (Map k a)) -> IO (Map k a) -> m (Map k a)
forall a b. (a -> b) -> a -> b
$ TVar (Map k a) -> IO (Map k a)
forall a. TVar a -> IO a
readTVarIO TVar (Map k a)
runningJobsInfo
(((Map k a -> m ()) -> Map k a -> m ()) -> Map k a -> m ()
forall a. (a -> a) -> a
`fix` Map k a
initialJobs) (((Map k a -> m ()) -> Map k a -> m ()) -> m ())
-> ((Map k a -> m ()) -> Map k a -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \Map k a -> m ()
loop Map k a
jobsInfo -> do
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Map k a -> Bool
forall k a. Map k a -> Bool
M.null Map k a
jobsInfo) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
Text -> Value -> m ()
forall (m :: * -> *) a. (MonadLog m, ToJSON a) => Text -> a -> m ()
logInfo Text
"Waiting for running jobs" (Value -> m ()) -> Value -> m ()
forall a b. (a -> b) -> a -> b
$
[Pair] -> Value
object
[ Key
"job_ids" Key -> [String] -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= Map k a -> [String]
forall {k}. Map k a -> [String]
showJobsInfo Map k a
jobsInfo
]
m (m ()) -> m ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (m (m ()) -> m ())
-> (STM (m ()) -> m (m ())) -> STM (m ()) -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (m ()) -> m (m ())
forall (m :: * -> *) a. MonadBase IO m => STM a -> m a
atomically (STM (m ()) -> m ()) -> STM (m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ do
a
jobs <- TVar a -> STM a
forall a. TVar a -> STM a
readTVar TVar a
runningJobs
if a
jobs a -> a -> Bool
forall a. Eq a => a -> a -> Bool
== a
0
then m () -> STM (m ())
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (m () -> STM (m ())) -> m () -> STM (m ())
forall a b. (a -> b) -> a -> b
$ () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
else do
Map k a
newJobsInfo <- TVar (Map k a) -> STM (Map k a)
forall a. TVar a -> STM a
readTVar TVar (Map k a)
runningJobsInfo
if Map k a
newJobsInfo Map k a -> Map k a -> Bool
forall a. Eq a => a -> a -> Bool
== Map k a
jobsInfo
then STM (m ())
forall a. STM a
retry
else m () -> STM (m ())
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (m () -> STM (m ())) -> m () -> STM (m ())
forall a b. (a -> b) -> a -> b
$ Map k a -> m ()
loop Map k a
newJobsInfo
where
showJobsInfo :: Map k a -> [String]
showJobsInfo = (a -> [String] -> [String]) -> [String] -> Map k a -> [String]
forall a b k. (a -> b -> b) -> b -> Map k a -> b
M.foldr (\a
idx [String]
acc -> a -> String
forall a. Show a => a -> String
show a
idx String -> [String] -> [String]
forall a. a -> [a] -> [a]
: [String]
acc) []
spawnListener
:: (MonadBaseControl IO m, MonadMask m)
=> ConsumerConfig m idx job
-> ConnectionSourceM m
-> MVar ()
-> m ThreadId
spawnListener :: forall (m :: * -> *) idx job.
(MonadBaseControl IO m, MonadMask m) =>
ConsumerConfig m idx job
-> ConnectionSourceM m -> MVar () -> m ThreadId
spawnListener ConsumerConfig m idx job
cc ConnectionSourceM m
cs MVar ()
semaphore =
String -> m () -> m ThreadId
forall (m :: * -> *).
MonadBaseControl IO m =>
String -> m () -> m ThreadId
forkP String
"listener" (m () -> m ThreadId) -> m () -> m ThreadId
forall a b. (a -> b) -> a -> b
$
case ConsumerConfig m idx job -> Maybe Channel
forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> Maybe Channel
ccNotificationChannel ConsumerConfig m idx job
cc of
Just Channel
chan ->
ConnectionSourceM m -> TransactionSettings -> DBT m () -> m ()
forall (m :: * -> *) a.
(HasCallStack, MonadBase IO m, MonadMask m) =>
ConnectionSourceM m -> TransactionSettings -> DBT m a -> m a
runDBT ConnectionSourceM m
cs TransactionSettings
noTs
(DBT m () -> m ())
-> (DBT_ m m Bool -> DBT m ()) -> DBT_ m m Bool -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DBT m () -> DBT m () -> DBT m () -> DBT m ()
forall (m :: * -> *) a c b.
(HasCallStack, MonadMask m) =>
m a -> m c -> m b -> m b
bracket_ (Channel -> DBT m ()
forall (m :: * -> *). (HasCallStack, MonadDB m) => Channel -> m ()
listen Channel
chan) (Channel -> DBT m ()
forall (m :: * -> *). (HasCallStack, MonadDB m) => Channel -> m ()
unlisten Channel
chan)
(DBT m () -> DBT m ())
-> (DBT_ m m Bool -> DBT m ()) -> DBT_ m m Bool -> DBT m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DBT_ m m Bool -> DBT m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever
(DBT_ m m Bool -> m ()) -> DBT_ m m Bool -> m ()
forall a b. (a -> b) -> a -> b
$ do
DBT_ m m (Maybe Notification) -> DBT m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (DBT_ m m (Maybe Notification) -> DBT m ())
-> (Int -> DBT_ m m (Maybe Notification)) -> Int -> DBT m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> DBT_ m m (Maybe Notification)
forall (m :: * -> *). MonadDB m => Int -> m (Maybe Notification)
getNotification (Int -> DBT m ()) -> Int -> DBT m ()
forall a b. (a -> b) -> a -> b
$ ConsumerConfig m idx job -> Int
forall (m :: * -> *) idx job. ConsumerConfig m idx job -> Int
ccNotificationTimeout ConsumerConfig m idx job
cc
m Bool -> DBT_ m m Bool
forall (m :: * -> *) a. Monad m => m a -> DBT_ m m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift m Bool
signalDispatcher
Maybe Channel
Nothing -> m Bool -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m Bool -> m ()) -> m Bool -> m ()
forall a b. (a -> b) -> a -> b
$ do
IO () -> m ()
forall α. IO α -> m α
forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase (IO () -> m ()) -> (Int -> IO ()) -> Int -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> IO ()
forall (m :: * -> *). MonadBase IO m => Int -> m ()
threadDelay (Int -> m ()) -> Int -> m ()
forall a b. (a -> b) -> a -> b
$ ConsumerConfig m idx job -> Int
forall (m :: * -> *) idx job. ConsumerConfig m idx job -> Int
ccNotificationTimeout ConsumerConfig m idx job
cc
m Bool
signalDispatcher
where
signalDispatcher :: m Bool
signalDispatcher = do
IO Bool -> m Bool
forall α. IO α -> m α
forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> IO Bool
forall (m :: * -> *) a. MonadBase IO m => MVar a -> a -> m Bool
tryPutMVar MVar ()
semaphore ()
noTs :: TransactionSettings
noTs =
TransactionSettings
defaultTransactionSettings
{ tsAutoTransaction = False
}
spawnMonitor
:: forall m idx job
. ( MonadBaseControl IO m
, MonadLog m
, MonadMask m
, MonadTime m
, Show idx
, FromSQL idx
, ToSQL idx
)
=> ConsumerConfig m idx job
-> ConnectionSourceM m
-> ConsumerID
-> m ThreadId
spawnMonitor :: forall (m :: * -> *) idx job.
(MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m,
Show idx, FromSQL idx, ToSQL idx) =>
ConsumerConfig m idx job
-> ConnectionSourceM m -> ConsumerID -> m ThreadId
spawnMonitor ConsumerConfig {Int
[SQL]
Maybe Channel
RawSQL ()
job -> idx
job -> m Result
job -> [Pair]
row -> job
SomeException -> job -> m Action
ccMaxRunningJobs :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> Int
ccOnException :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> SomeException -> job -> m Action
ccJobLogData :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> job -> [Pair]
ccNotificationChannel :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> Maybe Channel
ccNotificationTimeout :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> Int
ccJobsTable :: RawSQL ()
ccConsumersTable :: RawSQL ()
ccJobSelectors :: [SQL]
ccJobFetcher :: row -> job
ccJobIndex :: job -> idx
ccNotificationChannel :: Maybe Channel
ccNotificationTimeout :: Int
ccMaxRunningJobs :: Int
ccProcessJob :: job -> m Result
ccOnException :: SomeException -> job -> m Action
ccJobLogData :: job -> [Pair]
ccJobsTable :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> RawSQL ()
ccConsumersTable :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> RawSQL ()
ccJobSelectors :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> [SQL]
ccJobFetcher :: ()
ccJobIndex :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> job -> idx
ccProcessJob :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> job -> m Result
..} ConnectionSourceM m
cs ConsumerID
cid = String -> m () -> m ThreadId
forall (m :: * -> *).
MonadBaseControl IO m =>
String -> m () -> m ThreadId
forkP String
"monitor" (m () -> m ThreadId) -> (m () -> m ()) -> m () -> m ThreadId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ThreadId) -> m () -> m ThreadId
forall a b. (a -> b) -> a -> b
$ do
ConnectionSourceM m -> TransactionSettings -> DBT m () -> m ()
forall (m :: * -> *) a.
(HasCallStack, MonadBase IO m, MonadMask m) =>
ConnectionSourceM m -> TransactionSettings -> DBT m a -> m a
runDBT ConnectionSourceM m
cs TransactionSettings
ts (DBT m () -> m ()) -> DBT m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
UTCTime
now <- DBT_ m m UTCTime
forall (m :: * -> *). MonadTime m => m UTCTime
currentTime
Bool
ok <-
QueryName -> SQL -> DBT_ m m Bool
forall (m :: * -> *).
(HasCallStack, MonadDB m, MonadThrow m) =>
QueryName -> SQL -> m Bool
runPreparedSQL01 (Text -> RawSQL () -> QueryName
preparedSqlName Text
"setActivity" RawSQL ()
ccConsumersTable) (SQL -> DBT_ m m Bool) -> SQL -> DBT_ m m Bool
forall a b. (a -> b) -> a -> b
$
[SQL] -> SQL
forall m. (IsString m, Monoid m) => [m] -> m
smconcat
[ SQL
"UPDATE" SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> RawSQL () -> SQL
raw RawSQL ()
ccConsumersTable
, SQL
"SET last_activity = " SQL -> UTCTime -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> UTCTime
now
, SQL
"WHERE id =" SQL -> ConsumerID -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> ConsumerID
cid
, SQL
" AND name =" SQL -> Text -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> RawSQL () -> Text
unRawSQL RawSQL ()
ccJobsTable
]
if Bool
ok
then Text -> DBT m ()
forall (m :: * -> *). MonadLog m => Text -> m ()
logInfo_ Text
"Activity of the consumer updated"
else do
Text -> DBT m ()
forall (m :: * -> *). MonadLog m => Text -> m ()
logInfo_ Text
"Consumer is not registered"
AsyncException -> DBT m ()
forall e a. (HasCallStack, Exception e) => e -> DBT_ m m a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM AsyncException
ThreadKilled
(Int
inactiveConsumers, [idx]
freedJobs) <- ConnectionSourceM m
-> TransactionSettings -> DBT m (Int, [idx]) -> m (Int, [idx])
forall (m :: * -> *) a.
(HasCallStack, MonadBase IO m, MonadMask m) =>
ConnectionSourceM m -> TransactionSettings -> DBT m a -> m a
runDBT ConnectionSourceM m
cs TransactionSettings
ts (DBT m (Int, [idx]) -> m (Int, [idx]))
-> DBT m (Int, [idx]) -> m (Int, [idx])
forall a b. (a -> b) -> a -> b
$ do
UTCTime
now <- DBT_ m m UTCTime
forall (m :: * -> *). MonadTime m => m UTCTime
currentTime
QueryName -> SQL -> DBT m ()
forall (m :: * -> *).
(HasCallStack, MonadDB m) =>
QueryName -> SQL -> m ()
runPreparedSQL_ (Text -> RawSQL () -> QueryName
preparedSqlName Text
"reserveConsumers" RawSQL ()
ccConsumersTable) (SQL -> DBT m ()) -> SQL -> DBT m ()
forall a b. (a -> b) -> a -> b
$
[SQL] -> SQL
forall m. (IsString m, Monoid m) => [m] -> m
smconcat
[ SQL
"SELECT id::bigint"
, SQL
"FROM" SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> RawSQL () -> SQL
raw RawSQL ()
ccConsumersTable
, SQL
"WHERE last_activity +" SQL -> Interval -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> Int32 -> Interval
iminutes Int32
1 SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> SQL
"<= " SQL -> UTCTime -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> UTCTime
now
, SQL
" AND name =" SQL -> Text -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> RawSQL () -> Text
unRawSQL RawSQL ()
ccJobsTable
, SQL
"FOR UPDATE SKIP LOCKED"
]
(Identity Int64 -> Int64) -> DBT_ m m [Int64]
forall (m :: * -> *) row t.
(HasCallStack, MonadDB m, FromRow row) =>
(row -> t) -> m [t]
fetchMany (forall a. Identity a -> a
runIdentity @Int64) DBT_ m m [Int64]
-> ([Int64] -> DBT m (Int, [idx])) -> DBT m (Int, [idx])
forall a b. DBT_ m m a -> (a -> DBT_ m m b) -> DBT_ m m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
[] -> (Int, [idx]) -> DBT m (Int, [idx])
forall a. a -> DBT_ m m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int
0, [])
[Int64]
inactive -> do
QueryName -> SQL -> DBT m ()
forall (m :: * -> *).
(HasCallStack, MonadDB m) =>
QueryName -> SQL -> m ()
runPreparedSQL_ (Text -> RawSQL () -> QueryName
preparedSqlName Text
"findStuck" RawSQL ()
ccJobsTable) (SQL -> DBT m ()) -> SQL -> DBT m ()
forall a b. (a -> b) -> a -> b
$
[SQL] -> SQL
forall m. (IsString m, Monoid m) => [m] -> m
smconcat
[ SQL
"SELECT" SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> SQL -> [SQL] -> SQL
forall m. Monoid m => m -> [m] -> m
mintercalate SQL
", " [SQL]
ccJobSelectors
, SQL
"FROM" SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> RawSQL () -> SQL
raw RawSQL ()
ccJobsTable
, SQL
"WHERE reserved_by = ANY(" SQL -> Array1 Int64 -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> [Int64] -> Array1 Int64
forall a. [a] -> Array1 a
Array1 [Int64]
inactive SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> SQL
")"
, SQL
"FOR UPDATE SKIP LOCKED"
]
[job]
stuckJobs <- (row -> job) -> DBT_ m m [job]
forall (m :: * -> *) row t.
(HasCallStack, MonadDB m, FromRow row) =>
(row -> t) -> m [t]
fetchMany row -> job
ccJobFetcher
Bool -> DBT m () -> DBT m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([job] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [job]
stuckJobs) (DBT m () -> DBT m ()) -> DBT m () -> DBT m ()
forall a b. (a -> b) -> a -> b
$ do
[(idx, Result)]
results <- [job]
-> (job -> DBT_ m m (idx, Result)) -> DBT_ m m [(idx, Result)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [job]
stuckJobs ((job -> DBT_ m m (idx, Result)) -> DBT_ m m [(idx, Result)])
-> (job -> DBT_ m m (idx, Result)) -> DBT_ m m [(idx, Result)]
forall a b. (a -> b) -> a -> b
$ \job
job -> do
Action
action <- m Action -> DBT_ m m Action
forall (m :: * -> *) a. Monad m => m a -> DBT_ m m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m Action -> DBT_ m m Action) -> m Action -> DBT_ m m Action
forall a b. (a -> b) -> a -> b
$ SomeException -> job -> m Action
ccOnException (AsyncException -> SomeException
forall e. Exception e => e -> SomeException
toException AsyncException
ThreadKilled) job
job
(idx, Result) -> DBT_ m m (idx, Result)
forall a. a -> DBT_ m m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (job -> idx
ccJobIndex job
job, Action -> Result
Failed Action
action)
SQL -> DBT m ()
forall (m :: * -> *). (HasCallStack, MonadDB m) => SQL -> m ()
runSQL_ (SQL -> DBT m ()) -> SQL -> DBT m ()
forall a b. (a -> b) -> a -> b
$ RawSQL () -> [(idx, Result)] -> UTCTime -> SQL
forall idx.
(Show idx, ToSQL idx) =>
RawSQL () -> [(idx, Result)] -> UTCTime -> SQL
updateJobsQuery RawSQL ()
ccJobsTable [(idx, Result)]
results UTCTime
now
QueryName -> SQL -> DBT m ()
forall (m :: * -> *).
(HasCallStack, MonadDB m) =>
QueryName -> SQL -> m ()
runPreparedSQL_ (Text -> RawSQL () -> QueryName
preparedSqlName Text
"removeInactive" RawSQL ()
ccConsumersTable) (SQL -> DBT m ()) -> SQL -> DBT m ()
forall a b. (a -> b) -> a -> b
$
[SQL] -> SQL
forall m. (IsString m, Monoid m) => [m] -> m
smconcat
[ SQL
"DELETE FROM" SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> RawSQL () -> SQL
raw RawSQL ()
ccConsumersTable
, SQL
"WHERE id = ANY(" SQL -> Array1 Int64 -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> [Int64] -> Array1 Int64
forall a. [a] -> Array1 a
Array1 [Int64]
inactive SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> SQL
")"
]
(Int, [idx]) -> DBT m (Int, [idx])
forall a. a -> DBT_ m m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([Int64] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Int64]
inactive, (job -> idx) -> [job] -> [idx]
forall a b. (a -> b) -> [a] -> [b]
map job -> idx
ccJobIndex [job]
stuckJobs)
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
inactiveConsumers Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
Text -> Value -> m ()
forall (m :: * -> *) a. (MonadLog m, ToJSON a) => Text -> a -> m ()
logInfo Text
"Unregistered inactive consumers" (Value -> m ()) -> Value -> m ()
forall a b. (a -> b) -> a -> b
$
[Pair] -> Value
object
[ Key
"inactive_consumers" Key -> Int -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= Int
inactiveConsumers
]
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([idx] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [idx]
freedJobs) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
Text -> Value -> m ()
forall (m :: * -> *) a. (MonadLog m, ToJSON a) => Text -> a -> m ()
logInfo Text
"Freed locked jobs" (Value -> m ()) -> Value -> m ()
forall a b. (a -> b) -> a -> b
$
[Pair] -> Value
object
[ Key
"freed_jobs" Key -> [String] -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= (idx -> String) -> [idx] -> [String]
forall a b. (a -> b) -> [a] -> [b]
map idx -> String
forall a. Show a => a -> String
show [idx]
freedJobs
]
IO () -> m ()
forall α. IO α -> m α
forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase (IO () -> m ()) -> (Int -> IO ()) -> Int -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> IO ()
forall (m :: * -> *). MonadBase IO m => Int -> m ()
threadDelay (Int -> m ()) -> Int -> m ()
forall a b. (a -> b) -> a -> b
$ Int
30 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000000
spawnDispatcher
:: forall m idx job
. ( MonadBaseControl IO m
, MonadLog m
, MonadMask m
, MonadTime m
, Show idx
, ToSQL idx
)
=> ConsumerConfig m idx job
-> ConnectionSourceM m
-> ConsumerID
-> MVar ()
-> TVar (M.Map ThreadId idx)
-> TVar Int
-> Maybe (TMVar Bool)
-> m ThreadId
spawnDispatcher :: forall (m :: * -> *) idx job.
(MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m,
Show idx, ToSQL idx) =>
ConsumerConfig m idx job
-> ConnectionSourceM m
-> ConsumerID
-> MVar ()
-> TVar (Map ThreadId idx)
-> TVar Int
-> Maybe (TMVar Bool)
-> m ThreadId
spawnDispatcher ConsumerConfig {Int
[SQL]
Maybe Channel
RawSQL ()
job -> idx
job -> m Result
job -> [Pair]
row -> job
SomeException -> job -> m Action
ccMaxRunningJobs :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> Int
ccOnException :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> SomeException -> job -> m Action
ccJobLogData :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> job -> [Pair]
ccNotificationChannel :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> Maybe Channel
ccNotificationTimeout :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> Int
ccJobsTable :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> RawSQL ()
ccConsumersTable :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> RawSQL ()
ccJobSelectors :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> [SQL]
ccJobFetcher :: ()
ccJobIndex :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> job -> idx
ccProcessJob :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> job -> m Result
ccJobsTable :: RawSQL ()
ccConsumersTable :: RawSQL ()
ccJobSelectors :: [SQL]
ccJobFetcher :: row -> job
ccJobIndex :: job -> idx
ccNotificationChannel :: Maybe Channel
ccNotificationTimeout :: Int
ccMaxRunningJobs :: Int
ccProcessJob :: job -> m Result
ccOnException :: SomeException -> job -> m Action
ccJobLogData :: job -> [Pair]
..} ConnectionSourceM m
cs ConsumerID
cid MVar ()
semaphore TVar (Map ThreadId idx)
runningJobsInfo TVar Int
runningJobs Maybe (TMVar Bool)
mIdleSignal =
String -> m () -> m ThreadId
forall (m :: * -> *).
MonadBaseControl IO m =>
String -> m () -> m ThreadId
forkP String
"dispatcher" (m () -> m ThreadId) -> (m () -> m ()) -> m () -> m ThreadId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ThreadId) -> m () -> m ThreadId
forall a b. (a -> b) -> a -> b
$ do
m () -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ MVar () -> m ()
forall (m :: * -> *) a. MonadBase IO m => MVar a -> m a
takeMVar MVar ()
semaphore
Bool
someJobWasProcessed <- Int -> m Bool
loop Int
1
if Bool
someJobWasProcessed
then Bool -> m ()
forall (m' :: * -> *). MonadBaseControl IO m' => Bool -> m' ()
setIdle Bool
False
else Bool -> m ()
forall (m' :: * -> *). MonadBaseControl IO m' => Bool -> m' ()
setIdle Bool
True
where
setIdle :: forall m'. MonadBaseControl IO m' => Bool -> m' ()
setIdle :: forall (m' :: * -> *). MonadBaseControl IO m' => Bool -> m' ()
setIdle Bool
isIdle = case Maybe (TMVar Bool)
mIdleSignal of
Maybe (TMVar Bool)
Nothing -> () -> m' ()
forall a. a -> m' a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Just TMVar Bool
idleSignal -> STM () -> m' ()
forall (m :: * -> *) a. MonadBase IO m => STM a -> m a
atomically (STM () -> m' ()) -> STM () -> m' ()
forall a b. (a -> b) -> a -> b
$ do
Maybe Bool
_ <- TMVar Bool -> STM (Maybe Bool)
forall a. TMVar a -> STM (Maybe a)
tryTakeTMVar TMVar Bool
idleSignal
TMVar Bool -> Bool -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar Bool
idleSignal Bool
isIdle
loop :: Int -> m Bool
loop :: Int -> m Bool
loop Int
limit = do
([job]
batch, Int
batchSize) <- Int -> m ([job], Int)
reserveJobs Int
limit
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
batchSize Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
Text -> Value -> m ()
forall (m :: * -> *) a. (MonadLog m, ToJSON a) => Text -> a -> m ()
logInfo Text
"Processing batch" (Value -> m ()) -> Value -> m ()
forall a b. (a -> b) -> a -> b
$
[Pair] -> Value
object
[ Key
"batch_size" Key -> Int -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= Int
batchSize
]
((forall a. m a -> m a) -> m ()) -> m ()
forall b. HasCallStack => ((forall a. m a -> m a) -> m b) -> m b
forall (m :: * -> *) b.
(MonadMask m, HasCallStack) =>
((forall a. m a -> m a) -> m b) -> m b
mask (((forall a. m a -> m a) -> m ()) -> m ())
-> ((forall a. m a -> m a) -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> m a
restore -> do
STM () -> m ()
forall (m :: * -> *) a. MonadBase IO m => STM a -> m a
atomically (STM () -> m ()) -> STM () -> m ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
runningJobs (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
batchSize)
let subtractJobs :: m ()
subtractJobs = STM () -> m ()
forall (m :: * -> *) a. MonadBase IO m => STM a -> m a
atomically (STM () -> m ()) -> STM () -> m ()
forall a b. (a -> b) -> a -> b
$ do
TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
runningJobs (Int -> Int -> Int
forall a. Num a => a -> a -> a
subtract Int
batchSize)
m ThreadId -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void
(m ThreadId -> m ()) -> (m () -> m ThreadId) -> m () -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> m () -> m ThreadId
forall (m :: * -> *).
MonadBaseControl IO m =>
String -> m () -> m ThreadId
forkP String
"batch processor"
(m () -> m ThreadId) -> (m () -> m ()) -> m () -> m ThreadId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (m () -> m () -> m ()
forall (m :: * -> *) a b.
(HasCallStack, MonadMask m) =>
m a -> m b -> m a
`finally` m ()
subtractJobs)
(m () -> m ()) -> (m () -> m ()) -> m () -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m () -> m ()
forall a. m a -> m a
restore
(m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
(job -> m (job, m (Result Result)))
-> [job] -> m [(job, m (Result Result))]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM job -> m (job, m (Result Result))
startJob [job]
batch m [(job, m (Result Result))]
-> ([(job, m (Result Result))] -> m [(idx, Result)])
-> m [(idx, Result)]
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ((job, m (Result Result)) -> m (idx, Result))
-> [(job, m (Result Result))] -> m [(idx, Result)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (job, m (Result Result)) -> m (idx, Result)
joinJob m [(idx, Result)] -> ([(idx, Result)] -> m ()) -> m ()
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= [(idx, Result)] -> m ()
updateJobs
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
batchSize Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
limit) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
Int
maxBatchSize <- STM Int -> m Int
forall (m :: * -> *) a. MonadBase IO m => STM a -> m a
atomically (STM Int -> m Int) -> STM Int -> m Int
forall a b. (a -> b) -> a -> b
$ do
Int
jobs <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
runningJobs
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
jobs Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
ccMaxRunningJobs) STM ()
forall a. STM a
retry
Int -> STM Int
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> STM Int) -> Int -> STM Int
forall a b. (a -> b) -> a -> b
$ Int
ccMaxRunningJobs Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
jobs
m Bool -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Bool -> m ()) -> (Int -> m Bool) -> Int -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> m Bool
loop (Int -> m ()) -> Int -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> Int -> Int
forall a. Ord a => a -> a -> a
min Int
maxBatchSize (Int
2 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
limit)
Bool -> m Bool
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int
batchSize Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0)
reserveJobs :: Int -> m ([job], Int)
reserveJobs :: Int -> m ([job], Int)
reserveJobs Int
limit = ConnectionSourceM m
-> TransactionSettings -> DBT m ([job], Int) -> m ([job], Int)
forall (m :: * -> *) a.
(HasCallStack, MonadBase IO m, MonadMask m) =>
ConnectionSourceM m -> TransactionSettings -> DBT m a -> m a
runDBT ConnectionSourceM m
cs TransactionSettings
ts (DBT m ([job], Int) -> m ([job], Int))
-> DBT m ([job], Int) -> m ([job], Int)
forall a b. (a -> b) -> a -> b
$ do
UTCTime
now <- DBT_ m m UTCTime
forall (m :: * -> *). MonadTime m => m UTCTime
currentTime
Int
n <-
QueryName -> SQL -> DBT_ m m Int
forall (m :: * -> *).
(HasCallStack, MonadDB m) =>
QueryName -> SQL -> m Int
runPreparedSQL (Text -> RawSQL () -> QueryName
preparedSqlName Text
"setReservation" RawSQL ()
ccJobsTable) (SQL -> DBT_ m m Int) -> SQL -> DBT_ m m Int
forall a b. (a -> b) -> a -> b
$
[SQL] -> SQL
forall m. (IsString m, Monoid m) => [m] -> m
smconcat
[ SQL
"UPDATE" SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> RawSQL () -> SQL
raw RawSQL ()
ccJobsTable SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> SQL
"SET"
, SQL
" reserved_by =" SQL -> ConsumerID -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> ConsumerID
cid
, SQL
", attempts = CASE"
, SQL
" WHEN finished_at IS NULL THEN attempts + 1"
, SQL
" ELSE 1"
, SQL
" END"
, SQL
"WHERE id IN (" SQL -> SQL -> SQL
forall a. Semigroup a => a -> a -> a
<> UTCTime -> SQL
reservedJobs UTCTime
now SQL -> SQL -> SQL
forall a. Semigroup a => a -> a -> a
<> SQL
")"
, SQL
"RETURNING" SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> SQL -> [SQL] -> SQL
forall m. Monoid m => m -> [m] -> m
mintercalate SQL
", " [SQL]
ccJobSelectors
]
(,Int
n) ([job] -> ([job], Int))
-> (QueryResult row -> [job]) -> QueryResult row -> ([job], Int)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. QueryResult job -> [job]
forall a. QueryResult a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
F.toList (QueryResult job -> [job])
-> (QueryResult row -> QueryResult job) -> QueryResult row -> [job]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (row -> job) -> QueryResult row -> QueryResult job
forall a b. (a -> b) -> QueryResult a -> QueryResult b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap row -> job
ccJobFetcher (QueryResult row -> ([job], Int))
-> DBT_ m m (QueryResult row) -> DBT m ([job], Int)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> DBT_ m m (QueryResult row)
forall (m :: * -> *) row.
(HasCallStack, MonadDB m, MonadThrow m, FromRow row) =>
m (QueryResult row)
queryResult
where
reservedJobs :: UTCTime -> SQL
reservedJobs :: UTCTime -> SQL
reservedJobs UTCTime
now =
[SQL] -> SQL
forall m. (IsString m, Monoid m) => [m] -> m
smconcat
[ SQL
"SELECT id FROM" SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> RawSQL () -> SQL
raw RawSQL ()
ccJobsTable
, SQL
"WHERE"
, SQL
" reserved_by IS NULL"
, SQL
" AND run_at IS NOT NULL"
, SQL
" AND run_at <= " SQL -> UTCTime -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> UTCTime
now
, SQL
" ORDER BY run_at"
, SQL
"LIMIT" SQL -> Int -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> Int
limit
, SQL
"FOR UPDATE SKIP LOCKED"
]
startJob :: job -> m (job, m (T.Result Result))
startJob :: job -> m (job, m (Result Result))
startJob job
job = do
(ThreadId
_, m (Result Result)
joinFork) <- ((forall a. m a -> m a) -> m (ThreadId, m (Result Result)))
-> m (ThreadId, m (Result Result))
forall b. HasCallStack => ((forall a. m a -> m a) -> m b) -> m b
forall (m :: * -> *) b.
(MonadMask m, HasCallStack) =>
((forall a. m a -> m a) -> m b) -> m b
mask (((forall a. m a -> m a) -> m (ThreadId, m (Result Result)))
-> m (ThreadId, m (Result Result)))
-> ((forall a. m a -> m a) -> m (ThreadId, m (Result Result)))
-> m (ThreadId, m (Result Result))
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> m a
restore -> m Result -> m (ThreadId, m (Result Result))
forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> m (ThreadId, m (Result a))
T.fork (m Result -> m (ThreadId, m (Result Result)))
-> m Result -> m (ThreadId, m (Result Result))
forall a b. (a -> b) -> a -> b
$ do
ThreadId
tid <- m ThreadId
forall (m :: * -> *). MonadBase IO m => m ThreadId
myThreadId
m () -> m () -> m Result -> m Result
forall (m :: * -> *) a c b.
(HasCallStack, MonadMask m) =>
m a -> m c -> m b -> m b
bracket_ (ThreadId -> m ()
registerJob ThreadId
tid) (ThreadId -> m ()
unregisterJob ThreadId
tid) (m Result -> m Result)
-> (m Result -> m Result) -> m Result -> m Result
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m Result -> m Result
forall a. m a -> m a
restore (m Result -> m Result) -> m Result -> m Result
forall a b. (a -> b) -> a -> b
$ do
[Pair] -> m Result -> m Result
forall a. [Pair] -> m a -> m a
forall (m :: * -> *) a. MonadLog m => [Pair] -> m a -> m a
localData (job -> [Pair]
ccJobLogData job
job) (m Result -> m Result) -> m Result -> m Result
forall a b. (a -> b) -> a -> b
$ job -> m Result
ccProcessJob job
job
(job, m (Result Result)) -> m (job, m (Result Result))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (job
job, m (Result Result)
joinFork)
where
registerJob :: ThreadId -> m ()
registerJob ThreadId
tid = STM () -> m ()
forall (m :: * -> *) a. MonadBase IO m => STM a -> m a
atomically (STM () -> m ()) -> STM () -> m ()
forall a b. (a -> b) -> a -> b
$ do
TVar (Map ThreadId idx)
-> (Map ThreadId idx -> Map ThreadId idx) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Map ThreadId idx)
runningJobsInfo ((Map ThreadId idx -> Map ThreadId idx) -> STM ())
-> (idx -> Map ThreadId idx -> Map ThreadId idx) -> idx -> STM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ThreadId -> idx -> Map ThreadId idx -> Map ThreadId idx
forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert ThreadId
tid (idx -> STM ()) -> idx -> STM ()
forall a b. (a -> b) -> a -> b
$ job -> idx
ccJobIndex job
job
unregisterJob :: ThreadId -> m ()
unregisterJob ThreadId
tid = STM () -> m ()
forall (m :: * -> *) a. MonadBase IO m => STM a -> m a
atomically (STM () -> m ()) -> STM () -> m ()
forall a b. (a -> b) -> a -> b
$ do
TVar (Map ThreadId idx)
-> (Map ThreadId idx -> Map ThreadId idx) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Map ThreadId idx)
runningJobsInfo ((Map ThreadId idx -> Map ThreadId idx) -> STM ())
-> (Map ThreadId idx -> Map ThreadId idx) -> STM ()
forall a b. (a -> b) -> a -> b
$ ThreadId -> Map ThreadId idx -> Map ThreadId idx
forall k a. Ord k => k -> Map k a -> Map k a
M.delete ThreadId
tid
joinJob :: (job, m (T.Result Result)) -> m (idx, Result)
joinJob :: (job, m (Result Result)) -> m (idx, Result)
joinJob (job
job, m (Result Result)
joinFork) =
m (Result Result)
joinFork m (Result Result)
-> (Result Result -> m (idx, Result)) -> m (idx, Result)
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Right Result
result -> (idx, Result) -> m (idx, Result)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (job -> idx
ccJobIndex job
job, Result
result)
Left SomeException
ex -> do
Action
action <- SomeException -> job -> m Action
ccOnException SomeException
ex job
job
(idx, Result) -> m (idx, Result)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (job -> idx
ccJobIndex job
job, Action -> Result
Failed Action
action)
updateJobs :: [(idx, Result)] -> m ()
updateJobs :: [(idx, Result)] -> m ()
updateJobs [(idx, Result)]
results = ConnectionSourceM m -> TransactionSettings -> DBT m () -> m ()
forall (m :: * -> *) a.
(HasCallStack, MonadBase IO m, MonadMask m) =>
ConnectionSourceM m -> TransactionSettings -> DBT m a -> m a
runDBT ConnectionSourceM m
cs TransactionSettings
ts (DBT m () -> m ()) -> DBT m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
UTCTime
now <- DBT_ m m UTCTime
forall (m :: * -> *). MonadTime m => m UTCTime
currentTime
SQL -> DBT m ()
forall (m :: * -> *). (HasCallStack, MonadDB m) => SQL -> m ()
runSQL_ (SQL -> DBT m ()) -> SQL -> DBT m ()
forall a b. (a -> b) -> a -> b
$ RawSQL () -> [(idx, Result)] -> UTCTime -> SQL
forall idx.
(Show idx, ToSQL idx) =>
RawSQL () -> [(idx, Result)] -> UTCTime -> SQL
updateJobsQuery RawSQL ()
ccJobsTable [(idx, Result)]
results UTCTime
now
updateJobsQuery
:: (Show idx, ToSQL idx)
=> RawSQL ()
-> [(idx, Result)]
-> UTCTime
-> SQL
updateJobsQuery :: forall idx.
(Show idx, ToSQL idx) =>
RawSQL () -> [(idx, Result)] -> UTCTime -> SQL
updateJobsQuery RawSQL ()
jobsTable [(idx, Result)]
results UTCTime
now =
[SQL] -> SQL
forall m. (IsString m, Monoid m) => [m] -> m
smconcat
[ SQL
"WITH removed AS ("
, SQL
" DELETE FROM" SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> RawSQL () -> SQL
raw RawSQL ()
jobsTable
, SQL
" WHERE id = ANY(" SQL -> Array1 idx -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> [idx] -> Array1 idx
forall a. [a] -> Array1 a
Array1 [idx]
deletes SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> SQL
")"
, SQL
")"
, SQL
"UPDATE" SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> RawSQL () -> SQL
raw RawSQL ()
jobsTable SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> SQL
"SET"
, SQL
" reserved_by = NULL"
, SQL
", run_at = CASE"
, SQL
" WHEN FALSE THEN run_at"
, [SQL] -> SQL
forall m. (IsString m, Monoid m) => [m] -> m
smconcat ([SQL] -> SQL) -> [SQL] -> SQL
forall a b. (a -> b) -> a -> b
$ (Either Interval UTCTime -> [idx] -> [SQL] -> [SQL])
-> [SQL] -> Map (Either Interval UTCTime) [idx] -> [SQL]
forall k a b. (k -> a -> b -> b) -> b -> Map k a -> b
M.foldrWithKey Either Interval UTCTime -> [idx] -> [SQL] -> [SQL]
retryToSQL [] Map (Either Interval UTCTime) [idx]
retries
, SQL
" ELSE NULL"
, SQL
" END"
, SQL
", finished_at = CASE"
, SQL
" WHEN id = ANY(" SQL -> Array1 idx -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> [idx] -> Array1 idx
forall a. [a] -> Array1 a
Array1 [idx]
successes SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> SQL
") THEN " SQL -> UTCTime -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> UTCTime
now
, SQL
" ELSE NULL"
, SQL
" END"
, SQL
"WHERE id = ANY(" SQL -> Array1 idx -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> [idx] -> Array1 idx
forall a. [a] -> Array1 a
Array1 (((idx, Result) -> idx) -> [(idx, Result)] -> [idx]
forall a b. (a -> b) -> [a] -> [b]
map (idx, Result) -> idx
forall a b. (a, b) -> a
fst [(idx, Result)]
updates) SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> SQL
")"
]
where
retryToSQL :: Either Interval UTCTime -> [idx] -> [SQL] -> [SQL]
retryToSQL (Left Interval
int) [idx]
ids =
(SQL
"WHEN id = ANY(" SQL -> Array1 idx -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> [idx] -> Array1 idx
forall a. [a] -> Array1 a
Array1 [idx]
ids SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> SQL
") THEN " SQL -> UTCTime -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> UTCTime
now SQL -> SQL -> SQL
forall a. Semigroup a => a -> a -> a
<> SQL
" +" SQL -> Interval -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> Interval
int SQL -> [SQL] -> [SQL]
forall a. a -> [a] -> [a]
:)
retryToSQL (Right UTCTime
time) [idx]
ids =
(SQL
"WHEN id = ANY(" SQL -> Array1 idx -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> [idx] -> Array1 idx
forall a. [a] -> Array1 a
Array1 [idx]
ids SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> SQL
") THEN" SQL -> UTCTime -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> UTCTime
time SQL -> [SQL] -> [SQL]
forall a. a -> [a] -> [a]
:)
retries :: Map (Either Interval UTCTime) [idx]
retries = ((idx, Result)
-> Map (Either Interval UTCTime) [idx]
-> Map (Either Interval UTCTime) [idx])
-> Map (Either Interval UTCTime) [idx]
-> [(idx, Result)]
-> Map (Either Interval UTCTime) [idx]
forall a b. (a -> b -> b) -> b -> [a] -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr ((idx, Action)
-> Map (Either Interval UTCTime) [idx]
-> Map (Either Interval UTCTime) [idx]
forall {a}.
(a, Action)
-> Map (Either Interval UTCTime) [a]
-> Map (Either Interval UTCTime) [a]
step ((idx, Action)
-> Map (Either Interval UTCTime) [idx]
-> Map (Either Interval UTCTime) [idx])
-> ((idx, Result) -> (idx, Action))
-> (idx, Result)
-> Map (Either Interval UTCTime) [idx]
-> Map (Either Interval UTCTime) [idx]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (idx, Result) -> (idx, Action)
forall {a}. (a, Result) -> (a, Action)
getAction) Map (Either Interval UTCTime) [idx]
forall k a. Map k a
M.empty [(idx, Result)]
updates
where
getAction :: (a, Result) -> (a, Action)
getAction (a
idx, Result
result) = case Result
result of
Ok Action
action -> (a
idx, Action
action)
Failed Action
action -> (a
idx, Action
action)
step :: (a, Action)
-> Map (Either Interval UTCTime) [a]
-> Map (Either Interval UTCTime) [a]
step (a
idx, Action
action) Map (Either Interval UTCTime) [a]
iretries = case Action
action of
Action
MarkProcessed -> Map (Either Interval UTCTime) [a]
iretries
RerunAfter Interval
int -> ([a] -> [a] -> [a])
-> Either Interval UTCTime
-> [a]
-> Map (Either Interval UTCTime) [a]
-> Map (Either Interval UTCTime) [a]
forall k a. Ord k => (a -> a -> a) -> k -> a -> Map k a -> Map k a
M.insertWith [a] -> [a] -> [a]
forall a. [a] -> [a] -> [a]
(++) (Interval -> Either Interval UTCTime
forall a b. a -> Either a b
Left Interval
int) [a
idx] Map (Either Interval UTCTime) [a]
iretries
RerunAt UTCTime
time -> ([a] -> [a] -> [a])
-> Either Interval UTCTime
-> [a]
-> Map (Either Interval UTCTime) [a]
-> Map (Either Interval UTCTime) [a]
forall k a. Ord k => (a -> a -> a) -> k -> a -> Map k a -> Map k a
M.insertWith [a] -> [a] -> [a]
forall a. [a] -> [a] -> [a]
(++) (UTCTime -> Either Interval UTCTime
forall a b. b -> Either a b
Right UTCTime
time) [a
idx] Map (Either Interval UTCTime) [a]
iretries
Action
Remove -> String -> Map (Either Interval UTCTime) [a]
forall a. HasCallStack => String -> a
error String
"updateJobs: Remove should've been filtered out"
successes :: [idx]
successes = ((idx, Result) -> [idx] -> [idx])
-> [idx] -> [(idx, Result)] -> [idx]
forall a b. (a -> b -> b) -> b -> [a] -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr (idx, Result) -> [idx] -> [idx]
forall {a}. (a, Result) -> [a] -> [a]
step [] [(idx, Result)]
updates
where
step :: (a, Result) -> [a] -> [a]
step (a
idx, Ok Action
_) [a]
acc = a
idx a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
acc
step (a
_, Failed Action
_) [a]
acc = [a]
acc
([idx]
deletes, [(idx, Result)]
updates) = ((idx, Result)
-> ([idx], [(idx, Result)]) -> ([idx], [(idx, Result)]))
-> ([idx], [(idx, Result)])
-> [(idx, Result)]
-> ([idx], [(idx, Result)])
forall a b. (a -> b -> b) -> b -> [a] -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr (idx, Result)
-> ([idx], [(idx, Result)]) -> ([idx], [(idx, Result)])
forall {a}.
(a, Result) -> ([a], [(a, Result)]) -> ([a], [(a, Result)])
step ([], []) [(idx, Result)]
results
where
step :: (a, Result) -> ([a], [(a, Result)]) -> ([a], [(a, Result)])
step job :: (a, Result)
job@(a
idx, Result
result) ([a]
ideletes, [(a, Result)]
iupdates) = case Result
result of
Ok Action
Remove -> (a
idx a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
ideletes, [(a, Result)]
iupdates)
Failed Action
Remove -> (a
idx a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
ideletes, [(a, Result)]
iupdates)
Result
_ -> ([a]
ideletes, (a, Result)
job (a, Result) -> [(a, Result)] -> [(a, Result)]
forall a. a -> [a] -> [a]
: [(a, Result)]
iupdates)
ts :: TransactionSettings
ts :: TransactionSettings
ts =
TransactionSettings
defaultTransactionSettings
{
tsRestartPredicate = Just . RestartPredicate $
\DetailedQueryError
e Integer
_ ->
DetailedQueryError -> ErrorCode
qeErrorCode DetailedQueryError
e ErrorCode -> ErrorCode -> Bool
forall a. Eq a => a -> a -> Bool
== ErrorCode
DeadlockDetected
Bool -> Bool -> Bool
|| DetailedQueryError -> ErrorCode
qeErrorCode DetailedQueryError
e ErrorCode -> ErrorCode -> Bool
forall a. Eq a => a -> a -> Bool
== ErrorCode
SerializationFailure
}
atomically :: MonadBase IO m => STM a -> m a
atomically :: forall (m :: * -> *) a. MonadBase IO m => STM a -> m a
atomically = IO a -> m a
forall α. IO α -> m α
forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase (IO a -> m a) -> (STM a -> IO a) -> STM a -> m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM a -> IO a
forall a. STM a -> IO a
STM.atomically