{-|
Module      : Async.Worker
Description : Abstract async worker implementation using the 'Queue' typeclass
Copyright   : (c) Gargantext, 2024-Present
License     : AGPL
Maintainer  : gargantext@iscpif.fr
Stability   : experimental
Portability : POSIX

Asynchronous worker.

-}

{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}


module Async.Worker
  ( KillWorkerSafely(..)
  -- * Running
  , run
  , runSingle
  , runSingle'
    -- * Sending jobs
  , sendJob
  -- ** 'SendJob' wrappers
  -- $sendJob
  , mkDefaultSendJob
  , mkDefaultSendJob'
  , sendJob'
  , SendJob(..) )
where


{- | 'Broker' class type for the underlying broker
-}
import Async.Worker.Broker
{- | Various worker types, in particular 'State'
-}
import Async.Worker.Types
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TVar (readTVarIO, newTVarIO, writeTVar)
import Control.Concurrent.Timeout qualified as Timeout
import Control.Exception.Safe (catches, Handler(..), throwIO, SomeException, Exception)
import Control.Monad (forever, void, when)


-- | If you want to stop a worker safely, use `throwTo'
-- 'workerThreadId' 'KillWorkerSafely'. This way the worker will stop
-- whatever is doing now and resend the message back to the
-- broker. This way you won't lose your jobs. If you don't care about
-- resuming a job, just set 'resendWhenWorkerKilled' property to
-- 'False'.
data KillWorkerSafely = KillWorkerSafely
  deriving (Int -> KillWorkerSafely -> ShowS
[KillWorkerSafely] -> ShowS
KillWorkerSafely -> String
(Int -> KillWorkerSafely -> ShowS)
-> (KillWorkerSafely -> String)
-> ([KillWorkerSafely] -> ShowS)
-> Show KillWorkerSafely
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> KillWorkerSafely -> ShowS
showsPrec :: Int -> KillWorkerSafely -> ShowS
$cshow :: KillWorkerSafely -> String
show :: KillWorkerSafely -> String
$cshowList :: [KillWorkerSafely] -> ShowS
showList :: [KillWorkerSafely] -> ShowS
Show)
instance Exception KillWorkerSafely


-- | This is the main function to start a worker. It's an infinite
-- loop of reading the next broker message, processing it and handling
-- any errors, issues that might arrise in the meantime.
run :: (HasWorkerBroker b a) => State b a -> IO ()
run :: forall b a. HasWorkerBroker b a => State b a -> IO ()
run state :: State b a
state@(State { String
WorkerMJobEvent b a
WorkerJobEvent b a
WorkerJobErrorEvent b a
Broker b (Job a)
Queue
PerformAction b a
broker :: Broker b (Job a)
queueName :: Queue
name :: String
performAction :: PerformAction b a
onMessageReceived :: WorkerJobEvent b a
onJobFinish :: WorkerJobEvent b a
onJobTimeout :: WorkerJobEvent b a
onJobError :: WorkerJobErrorEvent b a
onWorkerKilledSafely :: WorkerMJobEvent b a
$sel:broker:State :: forall b a. State b a -> Broker b (Job a)
$sel:queueName:State :: forall b a. State b a -> Queue
$sel:name:State :: forall b a. State b a -> String
$sel:performAction:State :: forall b a. State b a -> PerformAction b a
$sel:onMessageReceived:State :: forall b a. State b a -> WorkerJobEvent b a
$sel:onJobFinish:State :: forall b a. State b a -> WorkerJobEvent b a
$sel:onJobTimeout:State :: forall b a. State b a -> WorkerJobEvent b a
$sel:onJobError:State :: forall b a. State b a -> WorkerJobErrorEvent b a
$sel:onWorkerKilledSafely:State :: forall b a. State b a -> WorkerMJobEvent b a
.. }) = do
  Broker b (Job a) -> Queue -> IO ()
forall b a. MessageBroker b a => Broker b a -> Queue -> IO ()
createQueue Broker b (Job a)
broker Queue
queueName
  IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ State b a -> IO ()
forall b a. HasWorkerBroker b a => State b a -> IO ()
runSingle State b a
state

-- | Fetch a single job and run it (create queue first)
runSingle :: (HasWorkerBroker b a) => State b a -> IO ()
runSingle :: forall b a. HasWorkerBroker b a => State b a -> IO ()
runSingle state :: State b a
state@(State { String
WorkerMJobEvent b a
WorkerJobEvent b a
WorkerJobErrorEvent b a
Broker b (Job a)
Queue
PerformAction b a
$sel:broker:State :: forall b a. State b a -> Broker b (Job a)
$sel:queueName:State :: forall b a. State b a -> Queue
$sel:name:State :: forall b a. State b a -> String
$sel:performAction:State :: forall b a. State b a -> PerformAction b a
$sel:onMessageReceived:State :: forall b a. State b a -> WorkerJobEvent b a
$sel:onJobFinish:State :: forall b a. State b a -> WorkerJobEvent b a
$sel:onJobTimeout:State :: forall b a. State b a -> WorkerJobEvent b a
$sel:onJobError:State :: forall b a. State b a -> WorkerJobErrorEvent b a
$sel:onWorkerKilledSafely:State :: forall b a. State b a -> WorkerMJobEvent b a
broker :: Broker b (Job a)
queueName :: Queue
name :: String
performAction :: PerformAction b a
onMessageReceived :: WorkerJobEvent b a
onJobFinish :: WorkerJobEvent b a
onJobTimeout :: WorkerJobEvent b a
onJobError :: WorkerJobErrorEvent b a
onWorkerKilledSafely :: WorkerMJobEvent b a
.. }) = do
  Broker b (Job a) -> Queue -> IO ()
forall b a. MessageBroker b a => Broker b a -> Queue -> IO ()
createQueue Broker b (Job a)
broker Queue
queueName
  State b a -> IO ()
forall b a. HasWorkerBroker b a => State b a -> IO ()
runSingle' State b a
state

-- | Fetch a single job and run it. Assumes the queue already exists
runSingle' :: (HasWorkerBroker b a) => State b a -> IO ()
runSingle' :: forall b a. HasWorkerBroker b a => State b a -> IO ()
runSingle' state :: State b a
state@(State { String
WorkerMJobEvent b a
WorkerJobEvent b a
WorkerJobErrorEvent b a
Broker b (Job a)
Queue
PerformAction b a
$sel:broker:State :: forall b a. State b a -> Broker b (Job a)
$sel:queueName:State :: forall b a. State b a -> Queue
$sel:name:State :: forall b a. State b a -> String
$sel:performAction:State :: forall b a. State b a -> PerformAction b a
$sel:onMessageReceived:State :: forall b a. State b a -> WorkerJobEvent b a
$sel:onJobFinish:State :: forall b a. State b a -> WorkerJobEvent b a
$sel:onJobTimeout:State :: forall b a. State b a -> WorkerJobEvent b a
$sel:onJobError:State :: forall b a. State b a -> WorkerJobErrorEvent b a
$sel:onWorkerKilledSafely:State :: forall b a. State b a -> WorkerMJobEvent b a
broker :: Broker b (Job a)
queueName :: Queue
name :: String
performAction :: PerformAction b a
onMessageReceived :: WorkerJobEvent b a
onJobFinish :: WorkerJobEvent b a
onJobTimeout :: WorkerJobEvent b a
onJobError :: WorkerJobErrorEvent b a
onWorkerKilledSafely :: WorkerMJobEvent b a
.. }) = do
      -- TVar to hold currently processed job. This is used for
      -- exception handling.
      TVar (Maybe (BrokerMessage b (Job a)))
mBrokerMessageTVar <- Maybe (BrokerMessage b (Job a))
-> IO (TVar (Maybe (BrokerMessage b (Job a))))
forall a. a -> IO (TVar a)
newTVarIO Maybe (BrokerMessage b (Job a))
forall a. Maybe a
Nothing -- :: IO (TVar (Maybe (BrokerMessage b (Job a))))
      
      IO () -> [Handler IO ()] -> IO ()
forall (m :: * -> *) a.
(HasCallStack, MonadCatch m, MonadThrow m) =>
m a -> [Handler m a] -> m a
catches (do
                BrokerMessage b (Job a)
brokerMessage <- Broker b (Job a) -> Queue -> IO (BrokerMessage b (Job a))
forall b a.
MessageBroker b a =>
Broker b a -> Queue -> IO (BrokerMessage b a)
readMessageWaiting Broker b (Job a)
broker Queue
queueName
                STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Maybe (BrokerMessage b (Job a)))
-> Maybe (BrokerMessage b (Job a)) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe (BrokerMessage b (Job a)))
mBrokerMessageTVar (BrokerMessage b (Job a) -> Maybe (BrokerMessage b (Job a))
forall a. a -> Maybe a
Just BrokerMessage b (Job a)
brokerMessage)
                PerformAction b a
forall b a.
HasWorkerBroker b a =>
State b a -> BrokerMessage b (Job a) -> IO ()
handleMessage State b a
state BrokerMessage b (Job a)
brokerMessage
                WorkerJobEvent b a -> PerformAction b a
forall b a.
WorkerJobEvent b a -> State b a -> BrokerMessage b (Job a) -> IO ()
callWorkerJobEvent WorkerJobEvent b a
onJobFinish State b a
state BrokerMessage b (Job a)
brokerMessage
            ) [
        (KillWorkerSafely -> IO ()) -> Handler IO ()
forall (m :: * -> *) a e. Exception e => (e -> m a) -> Handler m a
Handler ((KillWorkerSafely -> IO ()) -> Handler IO ())
-> (KillWorkerSafely -> IO ()) -> Handler IO ()
forall a b. (a -> b) -> a -> b
$ \(KillWorkerSafely
_err :: KillWorkerSafely) -> do
          Maybe (BrokerMessage b (Job a))
mBrokerMessage <- TVar (Maybe (BrokerMessage b (Job a)))
-> IO (Maybe (BrokerMessage b (Job a)))
forall a. TVar a -> IO a
readTVarIO TVar (Maybe (BrokerMessage b (Job a)))
mBrokerMessageTVar
          case Maybe (BrokerMessage b (Job a))
mBrokerMessage of
            Just BrokerMessage b (Job a)
brokerMessage -> do
              let job :: Job a
job = Message b (Job a) -> Job a
forall b a. MessageBroker b a => Message b a -> a
toA (Message b (Job a) -> Job a) -> Message b (Job a) -> Job a
forall a b. (a -> b) -> a -> b
$ BrokerMessage b (Job a) -> Message b (Job a)
forall b a. MessageBroker b a => BrokerMessage b a -> Message b a
getMessage BrokerMessage b (Job a)
brokerMessage
              let mdata :: JobMetadata
mdata = Job a -> JobMetadata
forall a. Job a -> JobMetadata
metadata Job a
job
              -- Should we resend this message?
              Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (JobMetadata -> Bool
resendWhenWorkerKilled JobMetadata
mdata) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
                -- putStrLn $ formatStr state $ "resending job: " <> show job
                
                -- NOTE: Delete first, then create job. It's a bit
                -- safer (same job won't be picked up twice, though we
                -- should be safe anyways, with the timeout)
                IO () -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Broker b (Job a) -> Queue -> MessageId b -> IO ()
forall b a.
MessageBroker b a =>
Broker b a -> Queue -> MessageId b -> IO ()
deleteMessage Broker b (Job a)
broker Queue
queueName (MessageId b -> IO ()) -> MessageId b -> IO ()
forall a b. (a -> b) -> a -> b
$ BrokerMessage b (Job a) -> MessageId b
forall b a. MessageBroker b a => BrokerMessage b a -> MessageId b
messageId BrokerMessage b (Job a)
brokerMessage
                IO (MessageId b) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (MessageId b) -> IO ()) -> IO (MessageId b) -> IO ()
forall a b. (a -> b) -> a -> b
$ Broker b (Job a) -> Queue -> Job a -> IO (MessageId b)
forall b a.
HasWorkerBroker b a =>
Broker b (Job a) -> Queue -> Job a -> IO (MessageId b)
sendJob Broker b (Job a)
broker Queue
queueName (Job a
job { metadata = mdata { readCount = readCount mdata + 1 } })
                -- size <- getQueueSize broker queueName
                -- putStrLn $ formatStr state $ "queue size: " <> show size
                
              -- In any case, deinit the broker (i.e. close connection)
              -- deinitBroker broker
            Maybe (BrokerMessage b (Job a))
Nothing -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
            
          -- callback
          WorkerMJobEvent b a
-> State b a -> Maybe (BrokerMessage b (Job a)) -> IO ()
forall b a.
WorkerMJobEvent b a
-> State b a -> Maybe (BrokerMessage b (Job a)) -> IO ()
callWorkerMJobEvent WorkerMJobEvent b a
onWorkerKilledSafely State b a
state Maybe (BrokerMessage b (Job a))
mBrokerMessage
          
          -- kill worker
          KillWorkerSafely -> IO ()
forall (m :: * -> *) e a.
(HasCallStack, MonadThrow m, Exception e) =>
e -> m a
throwIO KillWorkerSafely
KillWorkerSafely
        , (JobTimeout b a -> IO ()) -> Handler IO ()
forall (m :: * -> *) a e. Exception e => (e -> m a) -> Handler m a
Handler ((JobTimeout b a -> IO ()) -> Handler IO ())
-> (JobTimeout b a -> IO ()) -> Handler IO ()
forall a b. (a -> b) -> a -> b
$ \(JobTimeout b a
err :: JobTimeout b a) -> State b a -> JobTimeout b a -> IO ()
forall b a.
HasWorkerBroker b a =>
State b a -> JobTimeout b a -> IO ()
handleTimeoutError State b a
state JobTimeout b a
err
        , (SomeException -> IO ()) -> Handler IO ()
forall (m :: * -> *) a e. Exception e => (e -> m a) -> Handler m a
Handler ((SomeException -> IO ()) -> Handler IO ())
-> (SomeException -> IO ()) -> Handler IO ()
forall a b. (a -> b) -> a -> b
$ \SomeException
err -> do
            Maybe (BrokerMessage b (Job a))
mBrokerMessage <- TVar (Maybe (BrokerMessage b (Job a)))
-> IO (Maybe (BrokerMessage b (Job a)))
forall a. TVar a -> IO a
readTVarIO TVar (Maybe (BrokerMessage b (Job a)))
mBrokerMessageTVar
            case Maybe (BrokerMessage b (Job a))
mBrokerMessage of
              Just BrokerMessage b (Job a)
brokerMessage -> do
                WorkerJobErrorEvent b a
-> State b a -> BrokerMessage b (Job a) -> SomeException -> IO ()
forall b a.
WorkerJobErrorEvent b a
-> State b a -> BrokerMessage b (Job a) -> SomeException -> IO ()
callWorkerJobErrorEvent WorkerJobErrorEvent b a
onJobError State b a
state BrokerMessage b (Job a)
brokerMessage SomeException
err
                PerformAction b a
forall b a.
HasWorkerBroker b a =>
State b a -> BrokerMessage b (Job a) -> IO ()
handleJobError State b a
state BrokerMessage b (Job a)
brokerMessage
              Maybe (BrokerMessage b (Job a))
Nothing -> State b a -> SomeException -> IO ()
forall b a. State b a -> SomeException -> IO ()
handleUnknownError State b a
state SomeException
err
        ]
 
handleMessage :: (HasWorkerBroker b a) => State b a -> BrokerMessage b (Job a) -> IO ()
handleMessage :: forall b a.
HasWorkerBroker b a =>
State b a -> BrokerMessage b (Job a) -> IO ()
handleMessage state :: State b a
state@(State { String
WorkerMJobEvent b a
WorkerJobEvent b a
WorkerJobErrorEvent b a
Broker b (Job a)
Queue
PerformAction b a
$sel:broker:State :: forall b a. State b a -> Broker b (Job a)
$sel:queueName:State :: forall b a. State b a -> Queue
$sel:name:State :: forall b a. State b a -> String
$sel:performAction:State :: forall b a. State b a -> PerformAction b a
$sel:onMessageReceived:State :: forall b a. State b a -> WorkerJobEvent b a
$sel:onJobFinish:State :: forall b a. State b a -> WorkerJobEvent b a
$sel:onJobTimeout:State :: forall b a. State b a -> WorkerJobEvent b a
$sel:onJobError:State :: forall b a. State b a -> WorkerJobErrorEvent b a
$sel:onWorkerKilledSafely:State :: forall b a. State b a -> WorkerMJobEvent b a
broker :: Broker b (Job a)
queueName :: Queue
name :: String
performAction :: PerformAction b a
onMessageReceived :: WorkerJobEvent b a
onJobFinish :: WorkerJobEvent b a
onJobTimeout :: WorkerJobEvent b a
onJobError :: WorkerJobErrorEvent b a
onWorkerKilledSafely :: WorkerMJobEvent b a
.. }) BrokerMessage b (Job a)
brokerMessage = do
  WorkerJobEvent b a -> PerformAction b a
forall b a.
WorkerJobEvent b a -> State b a -> BrokerMessage b (Job a) -> IO ()
callWorkerJobEvent WorkerJobEvent b a
onMessageReceived State b a
state BrokerMessage b (Job a)
brokerMessage
  let msgId :: MessageId b
msgId = BrokerMessage b (Job a) -> MessageId b
forall b a. MessageBroker b a => BrokerMessage b a -> MessageId b
messageId BrokerMessage b (Job a)
brokerMessage
  let msg :: Message b (Job a)
msg = BrokerMessage b (Job a) -> Message b (Job a)
forall b a. MessageBroker b a => BrokerMessage b a -> Message b a
getMessage BrokerMessage b (Job a)
brokerMessage
  let job' :: Job a
job' = Message b (Job a) -> Job a
forall b a. MessageBroker b a => Message b a -> a
toA Message b (Job a)
msg
  -- putStrLn $ formatStr state $ "received job: " <> show (job job')
  let mdata :: JobMetadata
mdata = Job a -> JobMetadata
forall a. Job a -> JobMetadata
metadata Job a
job'
  let timeoutS :: Int
timeoutS = Job a -> Int
forall a. Job a -> Int
jobTimeout Job a
job'
  -- Inform the broker how long a task could take. This way we prevent
  -- the broker from sending this task to another worker (e.g. 'vt' in
  -- PGMQ).
  Broker b (Job a) -> Queue -> MessageId b -> TimeoutS -> IO ()
forall b a.
MessageBroker b a =>
Broker b a -> Queue -> MessageId b -> TimeoutS -> IO ()
setMessageTimeout Broker b (Job a)
broker Queue
queueName MessageId b
msgId (Int -> TimeoutS
TimeoutS Int
timeoutS)
  -- mTimeout <- Timeout.timeout timeoutS (wrapPerformActionInJobException state brokerMessage)
  Maybe ()
mTimeout <- Integer -> IO () -> IO (Maybe ())
forall α. Integer -> IO α -> IO (Maybe α)
Timeout.timeout ((Int -> Integer
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
timeoutS) Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
* Integer
microsecond) (PerformAction b a
forall b a. State b a -> BrokerMessage b (Job a) -> IO ()
runAction State b a
state BrokerMessage b (Job a)
brokerMessage)

  let archiveHandler :: IO ()
archiveHandler = do
        case JobMetadata -> ArchiveStrategy
archiveStrategy JobMetadata
mdata of
          ArchiveStrategy
ASDelete -> do
            -- putStrLn $ formatStr state $ "deleting completed job " <> show msgId <> " (strategy: " <> show archiveStrategy <> ")"
            Broker b (Job a) -> Queue -> MessageId b -> IO ()
forall b a.
MessageBroker b a =>
Broker b a -> Queue -> MessageId b -> IO ()
deleteMessage Broker b (Job a)
broker Queue
queueName MessageId b
msgId
          ArchiveStrategy
ASArchive -> do
            -- putStrLn $ formatStr state $ "archiving completed job " <> show msgId <> " (strategy: " <> show archiveStrategy <> ")"
            Broker b (Job a) -> Queue -> MessageId b -> IO ()
forall b a.
MessageBroker b a =>
Broker b a -> Queue -> MessageId b -> IO ()
archiveMessage Broker b (Job a)
broker Queue
queueName MessageId b
msgId
  
  case Maybe ()
mTimeout of
    Just ()
_ -> IO ()
archiveHandler
    Maybe ()
Nothing -> do
      WorkerJobEvent b a -> PerformAction b a
forall b a.
WorkerJobEvent b a -> State b a -> BrokerMessage b (Job a) -> IO ()
callWorkerJobEvent WorkerJobEvent b a
onJobTimeout State b a
state BrokerMessage b (Job a)
brokerMessage
      JobTimeout b a -> IO ()
forall (m :: * -> *) e a.
(HasCallStack, MonadThrow m, Exception e) =>
e -> m a
throwIO (JobTimeout b a -> IO ()) -> JobTimeout b a -> IO ()
forall a b. (a -> b) -> a -> b
$ JobTimeout { $sel:jtBMessage:JobTimeout :: BrokerMessage b (Job a)
jtBMessage = BrokerMessage b (Job a)
brokerMessage
                           , $sel:jtTimeout:JobTimeout :: Int
jtTimeout = Int
timeoutS }
  -- onMessageFetched broker queue msg


-- -- | It's important to know if an exception occured inside a job. This
-- -- way we can apply error recovering strategy and adjust this job in
-- -- the broker
-- wrapPerformActionInJobException :: (HasWorkerBroker b a) => State b a -> BrokerMessage b (Job a) -> IO ()
-- wrapPerformActionInJobException state@(State { onJobError }) brokerMessage = do
--   catch (do
--             runAction state brokerMessage
--         )
--     (\(err :: SomeException) -> do
--         callWorkerJobEvent onJobError state brokerMessage
--         throwIO err
--         )


callWorkerJobEvent :: WorkerJobEvent b a
                   -> State b a
                   -> BrokerMessage b (Job a)
                   -> IO ()
callWorkerJobEvent :: forall b a.
WorkerJobEvent b a -> State b a -> BrokerMessage b (Job a) -> IO ()
callWorkerJobEvent Maybe (State b a -> BrokerMessage b (Job a) -> IO ())
Nothing State b a
_ BrokerMessage b (Job a)
_ = () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
callWorkerJobEvent (Just State b a -> BrokerMessage b (Job a) -> IO ()
event) State b a
state BrokerMessage b (Job a)
brokerMessage = State b a -> BrokerMessage b (Job a) -> IO ()
event State b a
state BrokerMessage b (Job a)
brokerMessage

callWorkerJobErrorEvent :: WorkerJobErrorEvent b a
                        -> State b a
                        -> BrokerMessage b (Job a)
                        -> SomeException
                        -> IO ()
callWorkerJobErrorEvent :: forall b a.
WorkerJobErrorEvent b a
-> State b a -> BrokerMessage b (Job a) -> SomeException -> IO ()
callWorkerJobErrorEvent Maybe
  (State b a -> BrokerMessage b (Job a) -> SomeException -> IO ())
Nothing State b a
_ BrokerMessage b (Job a)
_ SomeException
_ = () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
callWorkerJobErrorEvent (Just State b a -> BrokerMessage b (Job a) -> SomeException -> IO ()
event) State b a
state BrokerMessage b (Job a)
brokerMessage SomeException
err = State b a -> BrokerMessage b (Job a) -> SomeException -> IO ()
event State b a
state BrokerMessage b (Job a)
brokerMessage SomeException
err

callWorkerMJobEvent :: WorkerMJobEvent b a
                    -> State b a
                    -> Maybe (BrokerMessage b (Job a))
                    -> IO ()
callWorkerMJobEvent :: forall b a.
WorkerMJobEvent b a
-> State b a -> Maybe (BrokerMessage b (Job a)) -> IO ()
callWorkerMJobEvent Maybe (State b a -> Maybe (BrokerMessage b (Job a)) -> IO ())
Nothing State b a
_ Maybe (BrokerMessage b (Job a))
_ = () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
callWorkerMJobEvent (Just State b a -> Maybe (BrokerMessage b (Job a)) -> IO ()
event) State b a
state Maybe (BrokerMessage b (Job a))
mBrokerMessage = State b a -> Maybe (BrokerMessage b (Job a)) -> IO ()
event State b a
state Maybe (BrokerMessage b (Job a))
mBrokerMessage

handleTimeoutError :: (HasWorkerBroker b a) => State b a -> JobTimeout b a -> IO ()
handleTimeoutError :: forall b a.
HasWorkerBroker b a =>
State b a -> JobTimeout b a -> IO ()
handleTimeoutError _state :: State b a
_state@(State { String
WorkerMJobEvent b a
WorkerJobEvent b a
WorkerJobErrorEvent b a
Broker b (Job a)
Queue
PerformAction b a
$sel:broker:State :: forall b a. State b a -> Broker b (Job a)
$sel:queueName:State :: forall b a. State b a -> Queue
$sel:name:State :: forall b a. State b a -> String
$sel:performAction:State :: forall b a. State b a -> PerformAction b a
$sel:onMessageReceived:State :: forall b a. State b a -> WorkerJobEvent b a
$sel:onJobFinish:State :: forall b a. State b a -> WorkerJobEvent b a
$sel:onJobTimeout:State :: forall b a. State b a -> WorkerJobEvent b a
$sel:onJobError:State :: forall b a. State b a -> WorkerJobErrorEvent b a
$sel:onWorkerKilledSafely:State :: forall b a. State b a -> WorkerMJobEvent b a
broker :: Broker b (Job a)
queueName :: Queue
name :: String
performAction :: PerformAction b a
onMessageReceived :: WorkerJobEvent b a
onJobFinish :: WorkerJobEvent b a
onJobTimeout :: WorkerJobEvent b a
onJobError :: WorkerJobErrorEvent b a
onWorkerKilledSafely :: WorkerMJobEvent b a
.. }) _jt :: JobTimeout b a
_jt@(JobTimeout { Int
BrokerMessage b (Job a)
$sel:jtBMessage:JobTimeout :: forall b a. JobTimeout b a -> BrokerMessage b (Job a)
$sel:jtTimeout:JobTimeout :: forall b a. JobTimeout b a -> Int
jtBMessage :: BrokerMessage b (Job a)
jtTimeout :: Int
.. }) = do
  -- putStrLn $ formatStr state $ show jt
  let msgId :: MessageId b
msgId = BrokerMessage b (Job a) -> MessageId b
forall b a. MessageBroker b a => BrokerMessage b a -> MessageId b
messageId BrokerMessage b (Job a)
jtBMessage
  let job :: Job a
job = Message b (Job a) -> Job a
forall b a. MessageBroker b a => Message b a -> a
toA (Message b (Job a) -> Job a) -> Message b (Job a) -> Job a
forall a b. (a -> b) -> a -> b
$ BrokerMessage b (Job a) -> Message b (Job a)
forall b a. MessageBroker b a => BrokerMessage b a -> Message b a
getMessage BrokerMessage b (Job a)
jtBMessage
  -- putStrLn $ formatStr state $ "timeout for job: " <> show job
  let mdata :: JobMetadata
mdata = Job a -> JobMetadata
forall a. Job a -> JobMetadata
metadata Job a
job
  case JobMetadata -> TimeoutStrategy
timeoutStrategy JobMetadata
mdata of
    TimeoutStrategy
TSDelete -> Broker b (Job a) -> Queue -> MessageId b -> IO ()
forall b a.
MessageBroker b a =>
Broker b a -> Queue -> MessageId b -> IO ()
deleteMessage Broker b (Job a)
broker Queue
queueName MessageId b
msgId
    TimeoutStrategy
TSArchive -> Broker b (Job a) -> Queue -> MessageId b -> IO ()
forall b a.
MessageBroker b a =>
Broker b a -> Queue -> MessageId b -> IO ()
archiveMessage Broker b (Job a)
broker Queue
queueName MessageId b
msgId
    TimeoutStrategy
TSRepeat -> do
      IO () -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Broker b (Job a) -> Queue -> MessageId b -> IO ()
forall b a.
MessageBroker b a =>
Broker b a -> Queue -> MessageId b -> IO ()
deleteMessage Broker b (Job a)
broker Queue
queueName MessageId b
msgId
      IO (MessageId b) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (MessageId b) -> IO ()) -> IO (MessageId b) -> IO ()
forall a b. (a -> b) -> a -> b
$ Broker b (Job a) -> Queue -> Job a -> IO (MessageId b)
forall b a.
HasWorkerBroker b a =>
Broker b (Job a) -> Queue -> Job a -> IO (MessageId b)
sendJob Broker b (Job a)
broker Queue
queueName (Job a
job { metadata = mdata { readCount = readCount mdata + 1 } })
    TSRepeatNElseArchive Int
n -> do
      let readCt :: Int
readCt = JobMetadata -> Int
readCount JobMetadata
mdata
      -- OK so this can be repeated at most 'n' times, compare 'readCt' with 'n'
      if Int
readCt Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
n then
        Broker b (Job a) -> Queue -> MessageId b -> IO ()
forall b a.
MessageBroker b a =>
Broker b a -> Queue -> MessageId b -> IO ()
archiveMessage Broker b (Job a)
broker Queue
queueName MessageId b
msgId
      else do
        -- NOTE In rare cases, when worker hangs, we might lose a job
        -- here? (i.e. delete, then resend)
        -- Also, be aware that messsage id will change with resend

        -- Delete this job first, otherwise we'll be duplicating jobs.
        Broker b (Job a) -> Queue -> MessageId b -> IO ()
forall b a.
MessageBroker b a =>
Broker b a -> Queue -> MessageId b -> IO ()
deleteMessage Broker b (Job a)
broker Queue
queueName MessageId b
msgId

        -- Send this job again, with increased 'readCount'
        IO (MessageId b) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (MessageId b) -> IO ()) -> IO (MessageId b) -> IO ()
forall a b. (a -> b) -> a -> b
$ Broker b (Job a) -> Queue -> Job a -> IO (MessageId b)
forall b a.
HasWorkerBroker b a =>
Broker b (Job a) -> Queue -> Job a -> IO (MessageId b)
sendJob Broker b (Job a)
broker Queue
queueName (Job a
job { metadata = mdata { readCount = readCt + 1 } })
    TSRepeatNElseDelete Int
n -> do
      let readCt :: Int
readCt = JobMetadata -> Int
readCount JobMetadata
mdata
      -- OK so this can be repeated at most 'n' times, compare 'readCt' with 'n'
      if Int
readCt Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
n then
        Broker b (Job a) -> Queue -> MessageId b -> IO ()
forall b a.
MessageBroker b a =>
Broker b a -> Queue -> MessageId b -> IO ()
deleteMessage Broker b (Job a)
broker Queue
queueName MessageId b
msgId
      else do
        -- NOTE In rare cases, when worker hangs, we might lose a job
        -- here? (i.e. delete, then resend)
        -- Also, be aware that messsage id will change with resend

        -- Delete this job first, otherwise we'll be duplicating jobs.
        Broker b (Job a) -> Queue -> MessageId b -> IO ()
forall b a.
MessageBroker b a =>
Broker b a -> Queue -> MessageId b -> IO ()
deleteMessage Broker b (Job a)
broker Queue
queueName MessageId b
msgId

        -- Send this job again, with increased 'readCount'
        IO (MessageId b) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (MessageId b) -> IO ()) -> IO (MessageId b) -> IO ()
forall a b. (a -> b) -> a -> b
$ Broker b (Job a) -> Queue -> Job a -> IO (MessageId b)
forall b a.
HasWorkerBroker b a =>
Broker b (Job a) -> Queue -> Job a -> IO (MessageId b)
sendJob Broker b (Job a)
broker Queue
queueName (Job a
job { metadata = mdata { readCount = readCt + 1 } })

handleJobError :: (HasWorkerBroker b a) => State b a -> BrokerMessage b (Job a) -> IO ()
handleJobError :: forall b a.
HasWorkerBroker b a =>
State b a -> BrokerMessage b (Job a) -> IO ()
handleJobError _state :: State b a
_state@(State { String
WorkerMJobEvent b a
WorkerJobEvent b a
WorkerJobErrorEvent b a
Broker b (Job a)
Queue
PerformAction b a
$sel:broker:State :: forall b a. State b a -> Broker b (Job a)
$sel:queueName:State :: forall b a. State b a -> Queue
$sel:name:State :: forall b a. State b a -> String
$sel:performAction:State :: forall b a. State b a -> PerformAction b a
$sel:onMessageReceived:State :: forall b a. State b a -> WorkerJobEvent b a
$sel:onJobFinish:State :: forall b a. State b a -> WorkerJobEvent b a
$sel:onJobTimeout:State :: forall b a. State b a -> WorkerJobEvent b a
$sel:onJobError:State :: forall b a. State b a -> WorkerJobErrorEvent b a
$sel:onWorkerKilledSafely:State :: forall b a. State b a -> WorkerMJobEvent b a
broker :: Broker b (Job a)
queueName :: Queue
name :: String
performAction :: PerformAction b a
onMessageReceived :: WorkerJobEvent b a
onJobFinish :: WorkerJobEvent b a
onJobTimeout :: WorkerJobEvent b a
onJobError :: WorkerJobErrorEvent b a
onWorkerKilledSafely :: WorkerMJobEvent b a
.. }) BrokerMessage b (Job a)
brokerMessage = do
  let msgId :: MessageId b
msgId = BrokerMessage b (Job a) -> MessageId b
forall b a. MessageBroker b a => BrokerMessage b a -> MessageId b
messageId BrokerMessage b (Job a)
brokerMessage
  let job :: Job a
job = Message b (Job a) -> Job a
forall b a. MessageBroker b a => Message b a -> a
toA (Message b (Job a) -> Job a) -> Message b (Job a) -> Job a
forall a b. (a -> b) -> a -> b
$ BrokerMessage b (Job a) -> Message b (Job a)
forall b a. MessageBroker b a => BrokerMessage b a -> Message b a
getMessage BrokerMessage b (Job a)
brokerMessage
  -- putStrLn $ formatStr state $ "error: " <> show je <> " for job " <> show job
  let mdata :: JobMetadata
mdata = Job a -> JobMetadata
forall a. Job a -> JobMetadata
metadata Job a
job
  case JobMetadata -> ErrorStrategy
errorStrategy JobMetadata
mdata of
    ErrorStrategy
ESDelete -> Broker b (Job a) -> Queue -> MessageId b -> IO ()
forall b a.
MessageBroker b a =>
Broker b a -> Queue -> MessageId b -> IO ()
deleteMessage Broker b (Job a)
broker Queue
queueName MessageId b
msgId
    ErrorStrategy
ESArchive -> Broker b (Job a) -> Queue -> MessageId b -> IO ()
forall b a.
MessageBroker b a =>
Broker b a -> Queue -> MessageId b -> IO ()
archiveMessage Broker b (Job a)
broker Queue
queueName MessageId b
msgId
    ESRepeatNElseArchive Int
n -> do
      let readCt :: Int
readCt = JobMetadata -> Int
readCount JobMetadata
mdata
      if Int
readCt Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
n then
        Broker b (Job a) -> Queue -> MessageId b -> IO ()
forall b a.
MessageBroker b a =>
Broker b a -> Queue -> MessageId b -> IO ()
archiveMessage Broker b (Job a)
broker Queue
queueName MessageId b
msgId
      else do
        -- Delete this job first, otherwise we'll be duplicating jobs.
        Broker b (Job a) -> Queue -> MessageId b -> IO ()
forall b a.
MessageBroker b a =>
Broker b a -> Queue -> MessageId b -> IO ()
deleteMessage Broker b (Job a)
broker Queue
queueName MessageId b
msgId

        IO (MessageId b) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (MessageId b) -> IO ()) -> IO (MessageId b) -> IO ()
forall a b. (a -> b) -> a -> b
$ Broker b (Job a) -> Queue -> Job a -> IO (MessageId b)
forall b a.
HasWorkerBroker b a =>
Broker b (Job a) -> Queue -> Job a -> IO (MessageId b)
sendJob Broker b (Job a)
broker Queue
queueName (Job a
job { metadata = mdata { readCount = readCt + 1 } })

handleUnknownError :: State b a -> SomeException -> IO ()
handleUnknownError :: forall b a. State b a -> SomeException -> IO ()
handleUnknownError State b a
state SomeException
err = String -> IO ()
putStrLn (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ State b a -> ShowS
forall b a. State b a -> ShowS
formatStr State b a
state ShowS -> ShowS
forall a b. (a -> b) -> a -> b
$ String
"unknown error: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> SomeException -> String
forall a. Show a => a -> String
show SomeException
err

sendJob :: (HasWorkerBroker b a) => Broker b (Job a) -> Queue -> Job a -> IO (MessageId b)
sendJob :: forall b a.
HasWorkerBroker b a =>
Broker b (Job a) -> Queue -> Job a -> IO (MessageId b)
sendJob Broker b (Job a)
broker Queue
queueName Job a
job =
  Broker b (Job a) -> Queue -> Job a -> TimeoutS -> IO (MessageId b)
forall b a.
HasWorkerBroker b a =>
Broker b (Job a) -> Queue -> Job a -> TimeoutS -> IO (MessageId b)
sendJobDelayed Broker b (Job a)
broker Queue
queueName Job a
job TimeoutS
0

sendJobDelayed :: (HasWorkerBroker b a) => Broker b (Job a) -> Queue -> Job a -> TimeoutS -> IO (MessageId b)
sendJobDelayed :: forall b a.
HasWorkerBroker b a =>
Broker b (Job a) -> Queue -> Job a -> TimeoutS -> IO (MessageId b)
sendJobDelayed Broker b (Job a)
broker Queue
queueName Job a
job TimeoutS
timeoutS = do
  Broker b (Job a)
-> Queue -> Message b (Job a) -> TimeoutS -> IO (MessageId b)
forall b a.
MessageBroker b a =>
Broker b a -> Queue -> Message b a -> TimeoutS -> IO (MessageId b)
sendMessageDelayed Broker b (Job a)
broker Queue
queueName (Job a -> Message b (Job a)
forall b a. MessageBroker b a => a -> Message b a
toMessage Job a
job) TimeoutS
timeoutS

microsecond :: Integer
microsecond :: Integer
microsecond = Integer
10Integer -> Integer -> Integer
forall a b. (Num a, Integral b) => a -> b -> a
^(Integer
6 :: Integer)


{- $sendJob
 A worker job has quite a few metadata. Here are some utilities for
 constructing them more easily.
-}

-- | Wraps parameters for the 'sendJob' function
data SendJob b a =
  SendJob { forall b a. SendJob b a -> Broker b (Job a)
broker       :: Broker b (Job a)
          , forall b a. SendJob b a -> Queue
queue        :: Queue
          , forall b a. SendJob b a -> a
msg          :: a
          , forall b a. SendJob b a -> TimeoutS
delay        :: TimeoutS  -- initial delay for the message
          , forall b a. SendJob b a -> ArchiveStrategy
archStrat    :: ArchiveStrategy
          , forall b a. SendJob b a -> ErrorStrategy
errStrat     :: ErrorStrategy
          , forall b a. SendJob b a -> TimeoutStrategy
toStrat      :: TimeoutStrategy
          , forall b a. SendJob b a -> Int
timeout      :: Timeout
          , forall b a. SendJob b a -> Bool
resendOnKill :: Bool}

-- | Create a 'SendJob' data with some defaults
mkDefaultSendJob :: Broker b (Job a)
                 -> Queue
                 -> a
                 -> Timeout
                 -> SendJob b a
mkDefaultSendJob :: forall b a. Broker b (Job a) -> Queue -> a -> Int -> SendJob b a
mkDefaultSendJob Broker b (Job a)
broker Queue
queue a
msg Int
timeout =
  SendJob { Broker b (Job a)
$sel:broker:SendJob :: Broker b (Job a)
broker :: Broker b (Job a)
broker
          , Queue
$sel:queue:SendJob :: Queue
queue :: Queue
queue
          , a
$sel:msg:SendJob :: a
msg :: a
msg
          , $sel:delay:SendJob :: TimeoutS
delay = TimeoutS
0
          -- | remove finished jobs
          , $sel:archStrat:SendJob :: ArchiveStrategy
archStrat = ArchiveStrategy
ASDelete
          -- | archive errored jobs (for inspection later)
          , $sel:errStrat:SendJob :: ErrorStrategy
errStrat = ErrorStrategy
ESArchive
          -- | repeat timed out jobs
          , $sel:toStrat:SendJob :: TimeoutStrategy
toStrat = TimeoutStrategy
TSRepeat
          , Int
$sel:timeout:SendJob :: Int
timeout :: Int
timeout
          , $sel:resendOnKill:SendJob :: Bool
resendOnKill = Bool
True }


-- | Like 'mkDefaultSendJob' but with default timeout
mkDefaultSendJob' :: Broker b (Job a)
                  -> Queue
                  -> a
                  -> SendJob b a
mkDefaultSendJob' :: forall b a. Broker b (Job a) -> Queue -> a -> SendJob b a
mkDefaultSendJob' Broker b (Job a)
b Queue
q a
m = Broker b (Job a) -> Queue -> a -> Int -> SendJob b a
forall b a. Broker b (Job a) -> Queue -> a -> Int -> SendJob b a
mkDefaultSendJob Broker b (Job a)
b Queue
q a
m Int
defaultTimeout
  where
    defaultTimeout :: Int
defaultTimeout = Int
10

    
-- | Call 'sendJob' with 'SendJob b a' data
sendJob' :: (HasWorkerBroker b a) => SendJob b a -> IO (MessageId b)
sendJob' :: forall b a. HasWorkerBroker b a => SendJob b a -> IO (MessageId b)
sendJob' (SendJob { a
Bool
Int
Broker b (Job a)
TimeoutS
Queue
TimeoutStrategy
ErrorStrategy
ArchiveStrategy
$sel:broker:SendJob :: forall b a. SendJob b a -> Broker b (Job a)
$sel:queue:SendJob :: forall b a. SendJob b a -> Queue
$sel:msg:SendJob :: forall b a. SendJob b a -> a
$sel:delay:SendJob :: forall b a. SendJob b a -> TimeoutS
$sel:archStrat:SendJob :: forall b a. SendJob b a -> ArchiveStrategy
$sel:errStrat:SendJob :: forall b a. SendJob b a -> ErrorStrategy
$sel:toStrat:SendJob :: forall b a. SendJob b a -> TimeoutStrategy
$sel:timeout:SendJob :: forall b a. SendJob b a -> Int
$sel:resendOnKill:SendJob :: forall b a. SendJob b a -> Bool
broker :: Broker b (Job a)
queue :: Queue
msg :: a
delay :: TimeoutS
archStrat :: ArchiveStrategy
errStrat :: ErrorStrategy
toStrat :: TimeoutStrategy
timeout :: Int
resendOnKill :: Bool
.. }) = do
  let metadata :: JobMetadata
metadata = JobMetadata
defaultMetadata { archiveStrategy = archStrat
                                 , errorStrategy = errStrat
                                 , timeoutStrategy = toStrat
                                 , timeout = timeout
                                 , resendWhenWorkerKilled = resendOnKill }
  let job :: Job a
job = Job { $sel:job:Job :: a
job = a
msg, JobMetadata
$sel:metadata:Job :: JobMetadata
metadata :: JobMetadata
metadata }
  Broker b (Job a) -> Queue -> Job a -> TimeoutS -> IO (MessageId b)
forall b a.
HasWorkerBroker b a =>
Broker b (Job a) -> Queue -> Job a -> TimeoutS -> IO (MessageId b)
sendJobDelayed Broker b (Job a)
broker Queue
queue Job a
job TimeoutS
delay