{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Async.Worker
( KillWorkerSafely(..)
, run
, runSingle
, runSingle'
, sendJob
, mkDefaultSendJob
, mkDefaultSendJob'
, sendJob'
, SendJob(..) )
where
import Async.Worker.Broker
import Async.Worker.Types
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TVar (readTVarIO, newTVarIO, writeTVar)
import Control.Concurrent.Timeout qualified as Timeout
import Control.Exception.Safe (catches, Handler(..), throwIO, SomeException, Exception)
import Control.Monad (forever, void, when)
data KillWorkerSafely = KillWorkerSafely
deriving (Int -> KillWorkerSafely -> ShowS
[KillWorkerSafely] -> ShowS
KillWorkerSafely -> String
(Int -> KillWorkerSafely -> ShowS)
-> (KillWorkerSafely -> String)
-> ([KillWorkerSafely] -> ShowS)
-> Show KillWorkerSafely
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> KillWorkerSafely -> ShowS
showsPrec :: Int -> KillWorkerSafely -> ShowS
$cshow :: KillWorkerSafely -> String
show :: KillWorkerSafely -> String
$cshowList :: [KillWorkerSafely] -> ShowS
showList :: [KillWorkerSafely] -> ShowS
Show)
instance Exception KillWorkerSafely
run :: (HasWorkerBroker b a) => State b a -> IO ()
run :: forall b a. HasWorkerBroker b a => State b a -> IO ()
run state :: State b a
state@(State { String
WorkerMJobEvent b a
WorkerJobEvent b a
WorkerJobErrorEvent b a
Broker b (Job a)
Queue
PerformAction b a
broker :: Broker b (Job a)
queueName :: Queue
name :: String
performAction :: PerformAction b a
onMessageReceived :: WorkerJobEvent b a
onJobFinish :: WorkerJobEvent b a
onJobTimeout :: WorkerJobEvent b a
onJobError :: WorkerJobErrorEvent b a
onWorkerKilledSafely :: WorkerMJobEvent b a
$sel:broker:State :: forall b a. State b a -> Broker b (Job a)
$sel:queueName:State :: forall b a. State b a -> Queue
$sel:name:State :: forall b a. State b a -> String
$sel:performAction:State :: forall b a. State b a -> PerformAction b a
$sel:onMessageReceived:State :: forall b a. State b a -> WorkerJobEvent b a
$sel:onJobFinish:State :: forall b a. State b a -> WorkerJobEvent b a
$sel:onJobTimeout:State :: forall b a. State b a -> WorkerJobEvent b a
$sel:onJobError:State :: forall b a. State b a -> WorkerJobErrorEvent b a
$sel:onWorkerKilledSafely:State :: forall b a. State b a -> WorkerMJobEvent b a
.. }) = do
Broker b (Job a) -> Queue -> IO ()
forall b a. MessageBroker b a => Broker b a -> Queue -> IO ()
createQueue Broker b (Job a)
broker Queue
queueName
IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ State b a -> IO ()
forall b a. HasWorkerBroker b a => State b a -> IO ()
runSingle State b a
state
runSingle :: (HasWorkerBroker b a) => State b a -> IO ()
runSingle :: forall b a. HasWorkerBroker b a => State b a -> IO ()
runSingle state :: State b a
state@(State { String
WorkerMJobEvent b a
WorkerJobEvent b a
WorkerJobErrorEvent b a
Broker b (Job a)
Queue
PerformAction b a
$sel:broker:State :: forall b a. State b a -> Broker b (Job a)
$sel:queueName:State :: forall b a. State b a -> Queue
$sel:name:State :: forall b a. State b a -> String
$sel:performAction:State :: forall b a. State b a -> PerformAction b a
$sel:onMessageReceived:State :: forall b a. State b a -> WorkerJobEvent b a
$sel:onJobFinish:State :: forall b a. State b a -> WorkerJobEvent b a
$sel:onJobTimeout:State :: forall b a. State b a -> WorkerJobEvent b a
$sel:onJobError:State :: forall b a. State b a -> WorkerJobErrorEvent b a
$sel:onWorkerKilledSafely:State :: forall b a. State b a -> WorkerMJobEvent b a
broker :: Broker b (Job a)
queueName :: Queue
name :: String
performAction :: PerformAction b a
onMessageReceived :: WorkerJobEvent b a
onJobFinish :: WorkerJobEvent b a
onJobTimeout :: WorkerJobEvent b a
onJobError :: WorkerJobErrorEvent b a
onWorkerKilledSafely :: WorkerMJobEvent b a
.. }) = do
Broker b (Job a) -> Queue -> IO ()
forall b a. MessageBroker b a => Broker b a -> Queue -> IO ()
createQueue Broker b (Job a)
broker Queue
queueName
State b a -> IO ()
forall b a. HasWorkerBroker b a => State b a -> IO ()
runSingle' State b a
state
runSingle' :: (HasWorkerBroker b a) => State b a -> IO ()
runSingle' :: forall b a. HasWorkerBroker b a => State b a -> IO ()
runSingle' state :: State b a
state@(State { String
WorkerMJobEvent b a
WorkerJobEvent b a
WorkerJobErrorEvent b a
Broker b (Job a)
Queue
PerformAction b a
$sel:broker:State :: forall b a. State b a -> Broker b (Job a)
$sel:queueName:State :: forall b a. State b a -> Queue
$sel:name:State :: forall b a. State b a -> String
$sel:performAction:State :: forall b a. State b a -> PerformAction b a
$sel:onMessageReceived:State :: forall b a. State b a -> WorkerJobEvent b a
$sel:onJobFinish:State :: forall b a. State b a -> WorkerJobEvent b a
$sel:onJobTimeout:State :: forall b a. State b a -> WorkerJobEvent b a
$sel:onJobError:State :: forall b a. State b a -> WorkerJobErrorEvent b a
$sel:onWorkerKilledSafely:State :: forall b a. State b a -> WorkerMJobEvent b a
broker :: Broker b (Job a)
queueName :: Queue
name :: String
performAction :: PerformAction b a
onMessageReceived :: WorkerJobEvent b a
onJobFinish :: WorkerJobEvent b a
onJobTimeout :: WorkerJobEvent b a
onJobError :: WorkerJobErrorEvent b a
onWorkerKilledSafely :: WorkerMJobEvent b a
.. }) = do
TVar (Maybe (BrokerMessage b (Job a)))
mBrokerMessageTVar <- Maybe (BrokerMessage b (Job a))
-> IO (TVar (Maybe (BrokerMessage b (Job a))))
forall a. a -> IO (TVar a)
newTVarIO Maybe (BrokerMessage b (Job a))
forall a. Maybe a
Nothing
IO () -> [Handler IO ()] -> IO ()
forall (m :: * -> *) a.
(HasCallStack, MonadCatch m, MonadThrow m) =>
m a -> [Handler m a] -> m a
catches (do
BrokerMessage b (Job a)
brokerMessage <- Broker b (Job a) -> Queue -> IO (BrokerMessage b (Job a))
forall b a.
MessageBroker b a =>
Broker b a -> Queue -> IO (BrokerMessage b a)
readMessageWaiting Broker b (Job a)
broker Queue
queueName
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Maybe (BrokerMessage b (Job a)))
-> Maybe (BrokerMessage b (Job a)) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe (BrokerMessage b (Job a)))
mBrokerMessageTVar (BrokerMessage b (Job a) -> Maybe (BrokerMessage b (Job a))
forall a. a -> Maybe a
Just BrokerMessage b (Job a)
brokerMessage)
PerformAction b a
forall b a.
HasWorkerBroker b a =>
State b a -> BrokerMessage b (Job a) -> IO ()
handleMessage State b a
state BrokerMessage b (Job a)
brokerMessage
WorkerJobEvent b a -> PerformAction b a
forall b a.
WorkerJobEvent b a -> State b a -> BrokerMessage b (Job a) -> IO ()
callWorkerJobEvent WorkerJobEvent b a
onJobFinish State b a
state BrokerMessage b (Job a)
brokerMessage
) [
(KillWorkerSafely -> IO ()) -> Handler IO ()
forall (m :: * -> *) a e. Exception e => (e -> m a) -> Handler m a
Handler ((KillWorkerSafely -> IO ()) -> Handler IO ())
-> (KillWorkerSafely -> IO ()) -> Handler IO ()
forall a b. (a -> b) -> a -> b
$ \(KillWorkerSafely
_err :: KillWorkerSafely) -> do
Maybe (BrokerMessage b (Job a))
mBrokerMessage <- TVar (Maybe (BrokerMessage b (Job a)))
-> IO (Maybe (BrokerMessage b (Job a)))
forall a. TVar a -> IO a
readTVarIO TVar (Maybe (BrokerMessage b (Job a)))
mBrokerMessageTVar
case Maybe (BrokerMessage b (Job a))
mBrokerMessage of
Just BrokerMessage b (Job a)
brokerMessage -> do
let job :: Job a
job = Message b (Job a) -> Job a
forall b a. MessageBroker b a => Message b a -> a
toA (Message b (Job a) -> Job a) -> Message b (Job a) -> Job a
forall a b. (a -> b) -> a -> b
$ BrokerMessage b (Job a) -> Message b (Job a)
forall b a. MessageBroker b a => BrokerMessage b a -> Message b a
getMessage BrokerMessage b (Job a)
brokerMessage
let mdata :: JobMetadata
mdata = Job a -> JobMetadata
forall a. Job a -> JobMetadata
metadata Job a
job
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (JobMetadata -> Bool
resendWhenWorkerKilled JobMetadata
mdata) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
IO () -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Broker b (Job a) -> Queue -> MessageId b -> IO ()
forall b a.
MessageBroker b a =>
Broker b a -> Queue -> MessageId b -> IO ()
deleteMessage Broker b (Job a)
broker Queue
queueName (MessageId b -> IO ()) -> MessageId b -> IO ()
forall a b. (a -> b) -> a -> b
$ BrokerMessage b (Job a) -> MessageId b
forall b a. MessageBroker b a => BrokerMessage b a -> MessageId b
messageId BrokerMessage b (Job a)
brokerMessage
IO (MessageId b) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (MessageId b) -> IO ()) -> IO (MessageId b) -> IO ()
forall a b. (a -> b) -> a -> b
$ Broker b (Job a) -> Queue -> Job a -> IO (MessageId b)
forall b a.
HasWorkerBroker b a =>
Broker b (Job a) -> Queue -> Job a -> IO (MessageId b)
sendJob Broker b (Job a)
broker Queue
queueName (Job a
job { metadata = mdata { readCount = readCount mdata + 1 } })
Maybe (BrokerMessage b (Job a))
Nothing -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
WorkerMJobEvent b a
-> State b a -> Maybe (BrokerMessage b (Job a)) -> IO ()
forall b a.
WorkerMJobEvent b a
-> State b a -> Maybe (BrokerMessage b (Job a)) -> IO ()
callWorkerMJobEvent WorkerMJobEvent b a
onWorkerKilledSafely State b a
state Maybe (BrokerMessage b (Job a))
mBrokerMessage
KillWorkerSafely -> IO ()
forall (m :: * -> *) e a.
(HasCallStack, MonadThrow m, Exception e) =>
e -> m a
throwIO KillWorkerSafely
KillWorkerSafely
, (JobTimeout b a -> IO ()) -> Handler IO ()
forall (m :: * -> *) a e. Exception e => (e -> m a) -> Handler m a
Handler ((JobTimeout b a -> IO ()) -> Handler IO ())
-> (JobTimeout b a -> IO ()) -> Handler IO ()
forall a b. (a -> b) -> a -> b
$ \(JobTimeout b a
err :: JobTimeout b a) -> State b a -> JobTimeout b a -> IO ()
forall b a.
HasWorkerBroker b a =>
State b a -> JobTimeout b a -> IO ()
handleTimeoutError State b a
state JobTimeout b a
err
, (SomeException -> IO ()) -> Handler IO ()
forall (m :: * -> *) a e. Exception e => (e -> m a) -> Handler m a
Handler ((SomeException -> IO ()) -> Handler IO ())
-> (SomeException -> IO ()) -> Handler IO ()
forall a b. (a -> b) -> a -> b
$ \SomeException
err -> do
Maybe (BrokerMessage b (Job a))
mBrokerMessage <- TVar (Maybe (BrokerMessage b (Job a)))
-> IO (Maybe (BrokerMessage b (Job a)))
forall a. TVar a -> IO a
readTVarIO TVar (Maybe (BrokerMessage b (Job a)))
mBrokerMessageTVar
case Maybe (BrokerMessage b (Job a))
mBrokerMessage of
Just BrokerMessage b (Job a)
brokerMessage -> do
WorkerJobErrorEvent b a
-> State b a -> BrokerMessage b (Job a) -> SomeException -> IO ()
forall b a.
WorkerJobErrorEvent b a
-> State b a -> BrokerMessage b (Job a) -> SomeException -> IO ()
callWorkerJobErrorEvent WorkerJobErrorEvent b a
onJobError State b a
state BrokerMessage b (Job a)
brokerMessage SomeException
err
PerformAction b a
forall b a.
HasWorkerBroker b a =>
State b a -> BrokerMessage b (Job a) -> IO ()
handleJobError State b a
state BrokerMessage b (Job a)
brokerMessage
Maybe (BrokerMessage b (Job a))
Nothing -> State b a -> SomeException -> IO ()
forall b a. State b a -> SomeException -> IO ()
handleUnknownError State b a
state SomeException
err
]
handleMessage :: (HasWorkerBroker b a) => State b a -> BrokerMessage b (Job a) -> IO ()
handleMessage :: forall b a.
HasWorkerBroker b a =>
State b a -> BrokerMessage b (Job a) -> IO ()
handleMessage state :: State b a
state@(State { String
WorkerMJobEvent b a
WorkerJobEvent b a
WorkerJobErrorEvent b a
Broker b (Job a)
Queue
PerformAction b a
$sel:broker:State :: forall b a. State b a -> Broker b (Job a)
$sel:queueName:State :: forall b a. State b a -> Queue
$sel:name:State :: forall b a. State b a -> String
$sel:performAction:State :: forall b a. State b a -> PerformAction b a
$sel:onMessageReceived:State :: forall b a. State b a -> WorkerJobEvent b a
$sel:onJobFinish:State :: forall b a. State b a -> WorkerJobEvent b a
$sel:onJobTimeout:State :: forall b a. State b a -> WorkerJobEvent b a
$sel:onJobError:State :: forall b a. State b a -> WorkerJobErrorEvent b a
$sel:onWorkerKilledSafely:State :: forall b a. State b a -> WorkerMJobEvent b a
broker :: Broker b (Job a)
queueName :: Queue
name :: String
performAction :: PerformAction b a
onMessageReceived :: WorkerJobEvent b a
onJobFinish :: WorkerJobEvent b a
onJobTimeout :: WorkerJobEvent b a
onJobError :: WorkerJobErrorEvent b a
onWorkerKilledSafely :: WorkerMJobEvent b a
.. }) BrokerMessage b (Job a)
brokerMessage = do
WorkerJobEvent b a -> PerformAction b a
forall b a.
WorkerJobEvent b a -> State b a -> BrokerMessage b (Job a) -> IO ()
callWorkerJobEvent WorkerJobEvent b a
onMessageReceived State b a
state BrokerMessage b (Job a)
brokerMessage
let msgId :: MessageId b
msgId = BrokerMessage b (Job a) -> MessageId b
forall b a. MessageBroker b a => BrokerMessage b a -> MessageId b
messageId BrokerMessage b (Job a)
brokerMessage
let msg :: Message b (Job a)
msg = BrokerMessage b (Job a) -> Message b (Job a)
forall b a. MessageBroker b a => BrokerMessage b a -> Message b a
getMessage BrokerMessage b (Job a)
brokerMessage
let job' :: Job a
job' = Message b (Job a) -> Job a
forall b a. MessageBroker b a => Message b a -> a
toA Message b (Job a)
msg
let mdata :: JobMetadata
mdata = Job a -> JobMetadata
forall a. Job a -> JobMetadata
metadata Job a
job'
let timeoutS :: Int
timeoutS = Job a -> Int
forall a. Job a -> Int
jobTimeout Job a
job'
Broker b (Job a) -> Queue -> MessageId b -> TimeoutS -> IO ()
forall b a.
MessageBroker b a =>
Broker b a -> Queue -> MessageId b -> TimeoutS -> IO ()
setMessageTimeout Broker b (Job a)
broker Queue
queueName MessageId b
msgId (Int -> TimeoutS
TimeoutS Int
timeoutS)
Maybe ()
mTimeout <- Integer -> IO () -> IO (Maybe ())
forall α. Integer -> IO α -> IO (Maybe α)
Timeout.timeout ((Int -> Integer
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
timeoutS) Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
* Integer
microsecond) (PerformAction b a
forall b a. State b a -> BrokerMessage b (Job a) -> IO ()
runAction State b a
state BrokerMessage b (Job a)
brokerMessage)
let archiveHandler :: IO ()
archiveHandler = do
case JobMetadata -> ArchiveStrategy
archiveStrategy JobMetadata
mdata of
ArchiveStrategy
ASDelete -> do
Broker b (Job a) -> Queue -> MessageId b -> IO ()
forall b a.
MessageBroker b a =>
Broker b a -> Queue -> MessageId b -> IO ()
deleteMessage Broker b (Job a)
broker Queue
queueName MessageId b
msgId
ArchiveStrategy
ASArchive -> do
Broker b (Job a) -> Queue -> MessageId b -> IO ()
forall b a.
MessageBroker b a =>
Broker b a -> Queue -> MessageId b -> IO ()
archiveMessage Broker b (Job a)
broker Queue
queueName MessageId b
msgId
case Maybe ()
mTimeout of
Just ()
_ -> IO ()
archiveHandler
Maybe ()
Nothing -> do
WorkerJobEvent b a -> PerformAction b a
forall b a.
WorkerJobEvent b a -> State b a -> BrokerMessage b (Job a) -> IO ()
callWorkerJobEvent WorkerJobEvent b a
onJobTimeout State b a
state BrokerMessage b (Job a)
brokerMessage
JobTimeout b a -> IO ()
forall (m :: * -> *) e a.
(HasCallStack, MonadThrow m, Exception e) =>
e -> m a
throwIO (JobTimeout b a -> IO ()) -> JobTimeout b a -> IO ()
forall a b. (a -> b) -> a -> b
$ JobTimeout { $sel:jtBMessage:JobTimeout :: BrokerMessage b (Job a)
jtBMessage = BrokerMessage b (Job a)
brokerMessage
, $sel:jtTimeout:JobTimeout :: Int
jtTimeout = Int
timeoutS }
callWorkerJobEvent :: WorkerJobEvent b a
-> State b a
-> BrokerMessage b (Job a)
-> IO ()
callWorkerJobEvent :: forall b a.
WorkerJobEvent b a -> State b a -> BrokerMessage b (Job a) -> IO ()
callWorkerJobEvent Maybe (State b a -> BrokerMessage b (Job a) -> IO ())
Nothing State b a
_ BrokerMessage b (Job a)
_ = () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
callWorkerJobEvent (Just State b a -> BrokerMessage b (Job a) -> IO ()
event) State b a
state BrokerMessage b (Job a)
brokerMessage = State b a -> BrokerMessage b (Job a) -> IO ()
event State b a
state BrokerMessage b (Job a)
brokerMessage
callWorkerJobErrorEvent :: WorkerJobErrorEvent b a
-> State b a
-> BrokerMessage b (Job a)
-> SomeException
-> IO ()
callWorkerJobErrorEvent :: forall b a.
WorkerJobErrorEvent b a
-> State b a -> BrokerMessage b (Job a) -> SomeException -> IO ()
callWorkerJobErrorEvent Maybe
(State b a -> BrokerMessage b (Job a) -> SomeException -> IO ())
Nothing State b a
_ BrokerMessage b (Job a)
_ SomeException
_ = () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
callWorkerJobErrorEvent (Just State b a -> BrokerMessage b (Job a) -> SomeException -> IO ()
event) State b a
state BrokerMessage b (Job a)
brokerMessage SomeException
err = State b a -> BrokerMessage b (Job a) -> SomeException -> IO ()
event State b a
state BrokerMessage b (Job a)
brokerMessage SomeException
err
callWorkerMJobEvent :: WorkerMJobEvent b a
-> State b a
-> Maybe (BrokerMessage b (Job a))
-> IO ()
callWorkerMJobEvent :: forall b a.
WorkerMJobEvent b a
-> State b a -> Maybe (BrokerMessage b (Job a)) -> IO ()
callWorkerMJobEvent Maybe (State b a -> Maybe (BrokerMessage b (Job a)) -> IO ())
Nothing State b a
_ Maybe (BrokerMessage b (Job a))
_ = () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
callWorkerMJobEvent (Just State b a -> Maybe (BrokerMessage b (Job a)) -> IO ()
event) State b a
state Maybe (BrokerMessage b (Job a))
mBrokerMessage = State b a -> Maybe (BrokerMessage b (Job a)) -> IO ()
event State b a
state Maybe (BrokerMessage b (Job a))
mBrokerMessage
handleTimeoutError :: (HasWorkerBroker b a) => State b a -> JobTimeout b a -> IO ()
handleTimeoutError :: forall b a.
HasWorkerBroker b a =>
State b a -> JobTimeout b a -> IO ()
handleTimeoutError _state :: State b a
_state@(State { String
WorkerMJobEvent b a
WorkerJobEvent b a
WorkerJobErrorEvent b a
Broker b (Job a)
Queue
PerformAction b a
$sel:broker:State :: forall b a. State b a -> Broker b (Job a)
$sel:queueName:State :: forall b a. State b a -> Queue
$sel:name:State :: forall b a. State b a -> String
$sel:performAction:State :: forall b a. State b a -> PerformAction b a
$sel:onMessageReceived:State :: forall b a. State b a -> WorkerJobEvent b a
$sel:onJobFinish:State :: forall b a. State b a -> WorkerJobEvent b a
$sel:onJobTimeout:State :: forall b a. State b a -> WorkerJobEvent b a
$sel:onJobError:State :: forall b a. State b a -> WorkerJobErrorEvent b a
$sel:onWorkerKilledSafely:State :: forall b a. State b a -> WorkerMJobEvent b a
broker :: Broker b (Job a)
queueName :: Queue
name :: String
performAction :: PerformAction b a
onMessageReceived :: WorkerJobEvent b a
onJobFinish :: WorkerJobEvent b a
onJobTimeout :: WorkerJobEvent b a
onJobError :: WorkerJobErrorEvent b a
onWorkerKilledSafely :: WorkerMJobEvent b a
.. }) _jt :: JobTimeout b a
_jt@(JobTimeout { Int
BrokerMessage b (Job a)
$sel:jtBMessage:JobTimeout :: forall b a. JobTimeout b a -> BrokerMessage b (Job a)
$sel:jtTimeout:JobTimeout :: forall b a. JobTimeout b a -> Int
jtBMessage :: BrokerMessage b (Job a)
jtTimeout :: Int
.. }) = do
let msgId :: MessageId b
msgId = BrokerMessage b (Job a) -> MessageId b
forall b a. MessageBroker b a => BrokerMessage b a -> MessageId b
messageId BrokerMessage b (Job a)
jtBMessage
let job :: Job a
job = Message b (Job a) -> Job a
forall b a. MessageBroker b a => Message b a -> a
toA (Message b (Job a) -> Job a) -> Message b (Job a) -> Job a
forall a b. (a -> b) -> a -> b
$ BrokerMessage b (Job a) -> Message b (Job a)
forall b a. MessageBroker b a => BrokerMessage b a -> Message b a
getMessage BrokerMessage b (Job a)
jtBMessage
let mdata :: JobMetadata
mdata = Job a -> JobMetadata
forall a. Job a -> JobMetadata
metadata Job a
job
case JobMetadata -> TimeoutStrategy
timeoutStrategy JobMetadata
mdata of
TimeoutStrategy
TSDelete -> Broker b (Job a) -> Queue -> MessageId b -> IO ()
forall b a.
MessageBroker b a =>
Broker b a -> Queue -> MessageId b -> IO ()
deleteMessage Broker b (Job a)
broker Queue
queueName MessageId b
msgId
TimeoutStrategy
TSArchive -> Broker b (Job a) -> Queue -> MessageId b -> IO ()
forall b a.
MessageBroker b a =>
Broker b a -> Queue -> MessageId b -> IO ()
archiveMessage Broker b (Job a)
broker Queue
queueName MessageId b
msgId
TimeoutStrategy
TSRepeat -> do
IO () -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Broker b (Job a) -> Queue -> MessageId b -> IO ()
forall b a.
MessageBroker b a =>
Broker b a -> Queue -> MessageId b -> IO ()
deleteMessage Broker b (Job a)
broker Queue
queueName MessageId b
msgId
IO (MessageId b) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (MessageId b) -> IO ()) -> IO (MessageId b) -> IO ()
forall a b. (a -> b) -> a -> b
$ Broker b (Job a) -> Queue -> Job a -> IO (MessageId b)
forall b a.
HasWorkerBroker b a =>
Broker b (Job a) -> Queue -> Job a -> IO (MessageId b)
sendJob Broker b (Job a)
broker Queue
queueName (Job a
job { metadata = mdata { readCount = readCount mdata + 1 } })
TSRepeatNElseArchive Int
n -> do
let readCt :: Int
readCt = JobMetadata -> Int
readCount JobMetadata
mdata
if Int
readCt Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
n then
Broker b (Job a) -> Queue -> MessageId b -> IO ()
forall b a.
MessageBroker b a =>
Broker b a -> Queue -> MessageId b -> IO ()
archiveMessage Broker b (Job a)
broker Queue
queueName MessageId b
msgId
else do
Broker b (Job a) -> Queue -> MessageId b -> IO ()
forall b a.
MessageBroker b a =>
Broker b a -> Queue -> MessageId b -> IO ()
deleteMessage Broker b (Job a)
broker Queue
queueName MessageId b
msgId
IO (MessageId b) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (MessageId b) -> IO ()) -> IO (MessageId b) -> IO ()
forall a b. (a -> b) -> a -> b
$ Broker b (Job a) -> Queue -> Job a -> IO (MessageId b)
forall b a.
HasWorkerBroker b a =>
Broker b (Job a) -> Queue -> Job a -> IO (MessageId b)
sendJob Broker b (Job a)
broker Queue
queueName (Job a
job { metadata = mdata { readCount = readCt + 1 } })
TSRepeatNElseDelete Int
n -> do
let readCt :: Int
readCt = JobMetadata -> Int
readCount JobMetadata
mdata
if Int
readCt Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
n then
Broker b (Job a) -> Queue -> MessageId b -> IO ()
forall b a.
MessageBroker b a =>
Broker b a -> Queue -> MessageId b -> IO ()
deleteMessage Broker b (Job a)
broker Queue
queueName MessageId b
msgId
else do
Broker b (Job a) -> Queue -> MessageId b -> IO ()
forall b a.
MessageBroker b a =>
Broker b a -> Queue -> MessageId b -> IO ()
deleteMessage Broker b (Job a)
broker Queue
queueName MessageId b
msgId
IO (MessageId b) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (MessageId b) -> IO ()) -> IO (MessageId b) -> IO ()
forall a b. (a -> b) -> a -> b
$ Broker b (Job a) -> Queue -> Job a -> IO (MessageId b)
forall b a.
HasWorkerBroker b a =>
Broker b (Job a) -> Queue -> Job a -> IO (MessageId b)
sendJob Broker b (Job a)
broker Queue
queueName (Job a
job { metadata = mdata { readCount = readCt + 1 } })
handleJobError :: (HasWorkerBroker b a) => State b a -> BrokerMessage b (Job a) -> IO ()
handleJobError :: forall b a.
HasWorkerBroker b a =>
State b a -> BrokerMessage b (Job a) -> IO ()
handleJobError _state :: State b a
_state@(State { String
WorkerMJobEvent b a
WorkerJobEvent b a
WorkerJobErrorEvent b a
Broker b (Job a)
Queue
PerformAction b a
$sel:broker:State :: forall b a. State b a -> Broker b (Job a)
$sel:queueName:State :: forall b a. State b a -> Queue
$sel:name:State :: forall b a. State b a -> String
$sel:performAction:State :: forall b a. State b a -> PerformAction b a
$sel:onMessageReceived:State :: forall b a. State b a -> WorkerJobEvent b a
$sel:onJobFinish:State :: forall b a. State b a -> WorkerJobEvent b a
$sel:onJobTimeout:State :: forall b a. State b a -> WorkerJobEvent b a
$sel:onJobError:State :: forall b a. State b a -> WorkerJobErrorEvent b a
$sel:onWorkerKilledSafely:State :: forall b a. State b a -> WorkerMJobEvent b a
broker :: Broker b (Job a)
queueName :: Queue
name :: String
performAction :: PerformAction b a
onMessageReceived :: WorkerJobEvent b a
onJobFinish :: WorkerJobEvent b a
onJobTimeout :: WorkerJobEvent b a
onJobError :: WorkerJobErrorEvent b a
onWorkerKilledSafely :: WorkerMJobEvent b a
.. }) BrokerMessage b (Job a)
brokerMessage = do
let msgId :: MessageId b
msgId = BrokerMessage b (Job a) -> MessageId b
forall b a. MessageBroker b a => BrokerMessage b a -> MessageId b
messageId BrokerMessage b (Job a)
brokerMessage
let job :: Job a
job = Message b (Job a) -> Job a
forall b a. MessageBroker b a => Message b a -> a
toA (Message b (Job a) -> Job a) -> Message b (Job a) -> Job a
forall a b. (a -> b) -> a -> b
$ BrokerMessage b (Job a) -> Message b (Job a)
forall b a. MessageBroker b a => BrokerMessage b a -> Message b a
getMessage BrokerMessage b (Job a)
brokerMessage
let mdata :: JobMetadata
mdata = Job a -> JobMetadata
forall a. Job a -> JobMetadata
metadata Job a
job
case JobMetadata -> ErrorStrategy
errorStrategy JobMetadata
mdata of
ErrorStrategy
ESDelete -> Broker b (Job a) -> Queue -> MessageId b -> IO ()
forall b a.
MessageBroker b a =>
Broker b a -> Queue -> MessageId b -> IO ()
deleteMessage Broker b (Job a)
broker Queue
queueName MessageId b
msgId
ErrorStrategy
ESArchive -> Broker b (Job a) -> Queue -> MessageId b -> IO ()
forall b a.
MessageBroker b a =>
Broker b a -> Queue -> MessageId b -> IO ()
archiveMessage Broker b (Job a)
broker Queue
queueName MessageId b
msgId
ESRepeatNElseArchive Int
n -> do
let readCt :: Int
readCt = JobMetadata -> Int
readCount JobMetadata
mdata
if Int
readCt Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
n then
Broker b (Job a) -> Queue -> MessageId b -> IO ()
forall b a.
MessageBroker b a =>
Broker b a -> Queue -> MessageId b -> IO ()
archiveMessage Broker b (Job a)
broker Queue
queueName MessageId b
msgId
else do
Broker b (Job a) -> Queue -> MessageId b -> IO ()
forall b a.
MessageBroker b a =>
Broker b a -> Queue -> MessageId b -> IO ()
deleteMessage Broker b (Job a)
broker Queue
queueName MessageId b
msgId
IO (MessageId b) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (MessageId b) -> IO ()) -> IO (MessageId b) -> IO ()
forall a b. (a -> b) -> a -> b
$ Broker b (Job a) -> Queue -> Job a -> IO (MessageId b)
forall b a.
HasWorkerBroker b a =>
Broker b (Job a) -> Queue -> Job a -> IO (MessageId b)
sendJob Broker b (Job a)
broker Queue
queueName (Job a
job { metadata = mdata { readCount = readCt + 1 } })
handleUnknownError :: State b a -> SomeException -> IO ()
handleUnknownError :: forall b a. State b a -> SomeException -> IO ()
handleUnknownError State b a
state SomeException
err = String -> IO ()
putStrLn (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ State b a -> ShowS
forall b a. State b a -> ShowS
formatStr State b a
state ShowS -> ShowS
forall a b. (a -> b) -> a -> b
$ String
"unknown error: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> SomeException -> String
forall a. Show a => a -> String
show SomeException
err
sendJob :: (HasWorkerBroker b a) => Broker b (Job a) -> Queue -> Job a -> IO (MessageId b)
sendJob :: forall b a.
HasWorkerBroker b a =>
Broker b (Job a) -> Queue -> Job a -> IO (MessageId b)
sendJob Broker b (Job a)
broker Queue
queueName Job a
job =
Broker b (Job a) -> Queue -> Job a -> TimeoutS -> IO (MessageId b)
forall b a.
HasWorkerBroker b a =>
Broker b (Job a) -> Queue -> Job a -> TimeoutS -> IO (MessageId b)
sendJobDelayed Broker b (Job a)
broker Queue
queueName Job a
job TimeoutS
0
sendJobDelayed :: (HasWorkerBroker b a) => Broker b (Job a) -> Queue -> Job a -> TimeoutS -> IO (MessageId b)
sendJobDelayed :: forall b a.
HasWorkerBroker b a =>
Broker b (Job a) -> Queue -> Job a -> TimeoutS -> IO (MessageId b)
sendJobDelayed Broker b (Job a)
broker Queue
queueName Job a
job TimeoutS
timeoutS = do
Broker b (Job a)
-> Queue -> Message b (Job a) -> TimeoutS -> IO (MessageId b)
forall b a.
MessageBroker b a =>
Broker b a -> Queue -> Message b a -> TimeoutS -> IO (MessageId b)
sendMessageDelayed Broker b (Job a)
broker Queue
queueName (Job a -> Message b (Job a)
forall b a. MessageBroker b a => a -> Message b a
toMessage Job a
job) TimeoutS
timeoutS
microsecond :: Integer
microsecond :: Integer
microsecond = Integer
10Integer -> Integer -> Integer
forall a b. (Num a, Integral b) => a -> b -> a
^(Integer
6 :: Integer)
data SendJob b a =
SendJob { forall b a. SendJob b a -> Broker b (Job a)
broker :: Broker b (Job a)
, forall b a. SendJob b a -> Queue
queue :: Queue
, forall b a. SendJob b a -> a
msg :: a
, forall b a. SendJob b a -> TimeoutS
delay :: TimeoutS
, forall b a. SendJob b a -> ArchiveStrategy
archStrat :: ArchiveStrategy
, forall b a. SendJob b a -> ErrorStrategy
errStrat :: ErrorStrategy
, forall b a. SendJob b a -> TimeoutStrategy
toStrat :: TimeoutStrategy
, forall b a. SendJob b a -> Int
timeout :: Timeout
, forall b a. SendJob b a -> Bool
resendOnKill :: Bool}
mkDefaultSendJob :: Broker b (Job a)
-> Queue
-> a
-> Timeout
-> SendJob b a
mkDefaultSendJob :: forall b a. Broker b (Job a) -> Queue -> a -> Int -> SendJob b a
mkDefaultSendJob Broker b (Job a)
broker Queue
queue a
msg Int
timeout =
SendJob { Broker b (Job a)
$sel:broker:SendJob :: Broker b (Job a)
broker :: Broker b (Job a)
broker
, Queue
$sel:queue:SendJob :: Queue
queue :: Queue
queue
, a
$sel:msg:SendJob :: a
msg :: a
msg
, $sel:delay:SendJob :: TimeoutS
delay = TimeoutS
0
, $sel:archStrat:SendJob :: ArchiveStrategy
archStrat = ArchiveStrategy
ASDelete
, $sel:errStrat:SendJob :: ErrorStrategy
errStrat = ErrorStrategy
ESArchive
, $sel:toStrat:SendJob :: TimeoutStrategy
toStrat = TimeoutStrategy
TSRepeat
, Int
$sel:timeout:SendJob :: Int
timeout :: Int
timeout
, $sel:resendOnKill:SendJob :: Bool
resendOnKill = Bool
True }
mkDefaultSendJob' :: Broker b (Job a)
-> Queue
-> a
-> SendJob b a
mkDefaultSendJob' :: forall b a. Broker b (Job a) -> Queue -> a -> SendJob b a
mkDefaultSendJob' Broker b (Job a)
b Queue
q a
m = Broker b (Job a) -> Queue -> a -> Int -> SendJob b a
forall b a. Broker b (Job a) -> Queue -> a -> Int -> SendJob b a
mkDefaultSendJob Broker b (Job a)
b Queue
q a
m Int
defaultTimeout
where
defaultTimeout :: Int
defaultTimeout = Int
10
sendJob' :: (HasWorkerBroker b a) => SendJob b a -> IO (MessageId b)
sendJob' :: forall b a. HasWorkerBroker b a => SendJob b a -> IO (MessageId b)
sendJob' (SendJob { a
Bool
Int
Broker b (Job a)
TimeoutS
Queue
TimeoutStrategy
ErrorStrategy
ArchiveStrategy
$sel:broker:SendJob :: forall b a. SendJob b a -> Broker b (Job a)
$sel:queue:SendJob :: forall b a. SendJob b a -> Queue
$sel:msg:SendJob :: forall b a. SendJob b a -> a
$sel:delay:SendJob :: forall b a. SendJob b a -> TimeoutS
$sel:archStrat:SendJob :: forall b a. SendJob b a -> ArchiveStrategy
$sel:errStrat:SendJob :: forall b a. SendJob b a -> ErrorStrategy
$sel:toStrat:SendJob :: forall b a. SendJob b a -> TimeoutStrategy
$sel:timeout:SendJob :: forall b a. SendJob b a -> Int
$sel:resendOnKill:SendJob :: forall b a. SendJob b a -> Bool
broker :: Broker b (Job a)
queue :: Queue
msg :: a
delay :: TimeoutS
archStrat :: ArchiveStrategy
errStrat :: ErrorStrategy
toStrat :: TimeoutStrategy
timeout :: Int
resendOnKill :: Bool
.. }) = do
let metadata :: JobMetadata
metadata = JobMetadata
defaultMetadata { archiveStrategy = archStrat
, errorStrategy = errStrat
, timeoutStrategy = toStrat
, timeout = timeout
, resendWhenWorkerKilled = resendOnKill }
let job :: Job a
job = Job { $sel:job:Job :: a
job = a
msg, JobMetadata
$sel:metadata:Job :: JobMetadata
metadata :: JobMetadata
metadata }
Broker b (Job a) -> Queue -> Job a -> TimeoutS -> IO (MessageId b)
forall b a.
HasWorkerBroker b a =>
Broker b (Job a) -> Queue -> Job a -> TimeoutS -> IO (MessageId b)
sendJobDelayed Broker b (Job a)
broker Queue
queue Job a
job TimeoutS
delay