{-|
Module      : Async.Worker.Types
Description : Types for the async worker
Copyright   : (c) Gargantext, 2024-Present
License     : AGPL
Maintainer  : gargantext@iscpif.fr
Stability   : experimental
Portability : POSIX

Types for worker.
-}

{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE UndecidableInstances #-}
    
module Async.Worker.Types
  (
    -- * Main worker state
    State(..)
  -- * Job wrapped in metadata
  , Job(..)
    -- ** Job metadata
  , JobMetadata(..)
  , defaultMetadata
    -- *** Strategies for handling finished, errored, timed-out jobs
  , ArchiveStrategy(..)
  , ErrorStrategy(..)
  , TimeoutStrategy(..)
  -- ** Job utility functions
  , getJob
  , jobTimeout
  , Timeout
  , runAction
  -- ** Actions that the worker will perform
  , PerformAction
  -- * Events emitted during job lifetime
  , WorkerJobEvent
  , WorkerJobErrorEvent
  , WorkerMJobEvent
  -- * Other useful types and functions
  , HasWorkerBroker
  , formatStr
  , JobTimeout(..) )
where

import Async.Worker.Broker.Types (Broker, BrokerMessage, MessageBroker, Queue)
import Control.Applicative ((<|>))
import Control.Exception.Safe (Exception, SomeException)
import Data.Aeson (FromJSON(..), ToJSON(..), object, (.=), (.:), withObject, withText)
import Data.Text qualified as T
import Data.Typeable (Typeable)


type ReadCount = Int
type Timeout = Int

    

-- | Strategy for archiving finished jobs
data ArchiveStrategy =
    -- | Delete message when it's done
    ASDelete
    -- | Archive message when it's done
  | ASArchive
  deriving (ArchiveStrategy -> ArchiveStrategy -> Bool
(ArchiveStrategy -> ArchiveStrategy -> Bool)
-> (ArchiveStrategy -> ArchiveStrategy -> Bool)
-> Eq ArchiveStrategy
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ArchiveStrategy -> ArchiveStrategy -> Bool
== :: ArchiveStrategy -> ArchiveStrategy -> Bool
$c/= :: ArchiveStrategy -> ArchiveStrategy -> Bool
/= :: ArchiveStrategy -> ArchiveStrategy -> Bool
Eq, Int -> ArchiveStrategy -> ShowS
[ArchiveStrategy] -> ShowS
ArchiveStrategy -> String
(Int -> ArchiveStrategy -> ShowS)
-> (ArchiveStrategy -> String)
-> ([ArchiveStrategy] -> ShowS)
-> Show ArchiveStrategy
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ArchiveStrategy -> ShowS
showsPrec :: Int -> ArchiveStrategy -> ShowS
$cshow :: ArchiveStrategy -> String
show :: ArchiveStrategy -> String
$cshowList :: [ArchiveStrategy] -> ShowS
showList :: [ArchiveStrategy] -> ShowS
Show)
instance ToJSON ArchiveStrategy where
  toJSON :: ArchiveStrategy -> Value
toJSON ArchiveStrategy
ASDelete = String -> Value
forall a. ToJSON a => a -> Value
toJSON (String
"ASDelete" :: String)
  toJSON ArchiveStrategy
ASArchive = String -> Value
forall a. ToJSON a => a -> Value
toJSON (String
"ASArchive" :: String)
instance FromJSON ArchiveStrategy where
  parseJSON :: Value -> Parser ArchiveStrategy
parseJSON = String
-> (Text -> Parser ArchiveStrategy)
-> Value
-> Parser ArchiveStrategy
forall a. String -> (Text -> Parser a) -> Value -> Parser a
withText String
"ArchiveStrategy" ((Text -> Parser ArchiveStrategy)
 -> Value -> Parser ArchiveStrategy)
-> (Text -> Parser ArchiveStrategy)
-> Value
-> Parser ArchiveStrategy
forall a b. (a -> b) -> a -> b
$ \Text
s -> do
    case Text
s of
      Text
"ASDelete" -> ArchiveStrategy -> Parser ArchiveStrategy
forall a. a -> Parser a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ArchiveStrategy
ASDelete
      Text
"ASArchive" -> ArchiveStrategy -> Parser ArchiveStrategy
forall a. a -> Parser a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ArchiveStrategy
ASArchive
      Text
s' -> String -> Parser ArchiveStrategy
forall a. String -> Parser a
forall (m :: * -> *) a. MonadFail m => String -> m a
fail (String -> Parser ArchiveStrategy)
-> String -> Parser ArchiveStrategy
forall a b. (a -> b) -> a -> b
$ Text -> String
T.unpack Text
s'
  
-- | Strategy for handling jobs with errors
data ErrorStrategy =
    -- | Delete job when it threw an error
    ESDelete
    -- | Archive job when it threw an error
  | ESArchive
    -- | Try to repeat the job when an error ocurred (at most N
    -- times), otherwise archive the job.
  | ESRepeatNElseArchive Int
  -- TODO Repeat N times
  deriving (ErrorStrategy -> ErrorStrategy -> Bool
(ErrorStrategy -> ErrorStrategy -> Bool)
-> (ErrorStrategy -> ErrorStrategy -> Bool) -> Eq ErrorStrategy
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ErrorStrategy -> ErrorStrategy -> Bool
== :: ErrorStrategy -> ErrorStrategy -> Bool
$c/= :: ErrorStrategy -> ErrorStrategy -> Bool
/= :: ErrorStrategy -> ErrorStrategy -> Bool
Eq, Int -> ErrorStrategy -> ShowS
[ErrorStrategy] -> ShowS
ErrorStrategy -> String
(Int -> ErrorStrategy -> ShowS)
-> (ErrorStrategy -> String)
-> ([ErrorStrategy] -> ShowS)
-> Show ErrorStrategy
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ErrorStrategy -> ShowS
showsPrec :: Int -> ErrorStrategy -> ShowS
$cshow :: ErrorStrategy -> String
show :: ErrorStrategy -> String
$cshowList :: [ErrorStrategy] -> ShowS
showList :: [ErrorStrategy] -> ShowS
Show)
instance ToJSON ErrorStrategy where
  toJSON :: ErrorStrategy -> Value
toJSON ErrorStrategy
ESDelete = String -> Value
forall a. ToJSON a => a -> Value
toJSON (String
"ESDelete" :: String)
  toJSON ErrorStrategy
ESArchive = String -> Value
forall a. ToJSON a => a -> Value
toJSON (String
"ESArchive" :: String)
  toJSON (ESRepeatNElseArchive Int
n) = Value -> Value
forall a. ToJSON a => a -> Value
toJSON (Value -> Value) -> Value -> Value
forall a b. (a -> b) -> a -> b
$ [Pair] -> Value
object [ Key
"ESRepeatNElseArchive" Key -> Int -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= Int
n ]
instance FromJSON ErrorStrategy where
  parseJSON :: Value -> Parser ErrorStrategy
parseJSON Value
v = Value -> Parser ErrorStrategy
parseText Value
v Parser ErrorStrategy
-> Parser ErrorStrategy -> Parser ErrorStrategy
forall a. Parser a -> Parser a -> Parser a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> Value -> Parser ErrorStrategy
parseESRepeatN Value
v
    where
      parseText :: Value -> Parser ErrorStrategy
parseText = String
-> (Text -> Parser ErrorStrategy) -> Value -> Parser ErrorStrategy
forall a. String -> (Text -> Parser a) -> Value -> Parser a
withText String
"ErrorStrategy" ((Text -> Parser ErrorStrategy) -> Value -> Parser ErrorStrategy)
-> (Text -> Parser ErrorStrategy) -> Value -> Parser ErrorStrategy
forall a b. (a -> b) -> a -> b
$ \Text
s -> do
        case Text
s of
          Text
"ESDelete" -> ErrorStrategy -> Parser ErrorStrategy
forall a. a -> Parser a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ErrorStrategy
ESDelete
          Text
"ESArchive" -> ErrorStrategy -> Parser ErrorStrategy
forall a. a -> Parser a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ErrorStrategy
ESArchive
          Text
s' -> String -> Parser ErrorStrategy
forall a. String -> Parser a
forall (m :: * -> *) a. MonadFail m => String -> m a
fail (String -> Parser ErrorStrategy) -> String -> Parser ErrorStrategy
forall a b. (a -> b) -> a -> b
$ Text -> String
T.unpack Text
s'
      parseESRepeatN :: Value -> Parser ErrorStrategy
parseESRepeatN = String
-> (Object -> Parser ErrorStrategy)
-> Value
-> Parser ErrorStrategy
forall a. String -> (Object -> Parser a) -> Value -> Parser a
withObject String
"ESRepeatN" ((Object -> Parser ErrorStrategy) -> Value -> Parser ErrorStrategy)
-> (Object -> Parser ErrorStrategy)
-> Value
-> Parser ErrorStrategy
forall a b. (a -> b) -> a -> b
$ \Object
o -> do
        Int
n <- Object
o Object -> Key -> Parser Int
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"ESRepeatNElseArchive"
        ErrorStrategy -> Parser ErrorStrategy
forall a. a -> Parser a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ErrorStrategy -> Parser ErrorStrategy)
-> ErrorStrategy -> Parser ErrorStrategy
forall a b. (a -> b) -> a -> b
$ Int -> ErrorStrategy
ESRepeatNElseArchive Int
n

-- | Strategy for handling job timeouts
data TimeoutStrategy =
    -- | Delete job when it timed out
    TSDelete
    -- | Archive job when it timed out
  | TSArchive
    -- | Repeat job when it timed out (infinitely)
  | TSRepeat
    -- | Repeat job when it timed out (at most N times), otherwise archive it
  | TSRepeatNElseArchive Int
    -- | Repeat job when it timed out (at most N times), otherwise delete it
  | TSRepeatNElseDelete Int
  deriving (TimeoutStrategy -> TimeoutStrategy -> Bool
(TimeoutStrategy -> TimeoutStrategy -> Bool)
-> (TimeoutStrategy -> TimeoutStrategy -> Bool)
-> Eq TimeoutStrategy
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: TimeoutStrategy -> TimeoutStrategy -> Bool
== :: TimeoutStrategy -> TimeoutStrategy -> Bool
$c/= :: TimeoutStrategy -> TimeoutStrategy -> Bool
/= :: TimeoutStrategy -> TimeoutStrategy -> Bool
Eq, Int -> TimeoutStrategy -> ShowS
[TimeoutStrategy] -> ShowS
TimeoutStrategy -> String
(Int -> TimeoutStrategy -> ShowS)
-> (TimeoutStrategy -> String)
-> ([TimeoutStrategy] -> ShowS)
-> Show TimeoutStrategy
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> TimeoutStrategy -> ShowS
showsPrec :: Int -> TimeoutStrategy -> ShowS
$cshow :: TimeoutStrategy -> String
show :: TimeoutStrategy -> String
$cshowList :: [TimeoutStrategy] -> ShowS
showList :: [TimeoutStrategy] -> ShowS
Show)
instance ToJSON TimeoutStrategy where
  toJSON :: TimeoutStrategy -> Value
toJSON TimeoutStrategy
TSDelete = String -> Value
forall a. ToJSON a => a -> Value
toJSON (String
"TSDelete" :: String)
  toJSON TimeoutStrategy
TSArchive = String -> Value
forall a. ToJSON a => a -> Value
toJSON (String
"TSArchive" :: String)
  toJSON TimeoutStrategy
TSRepeat = String -> Value
forall a. ToJSON a => a -> Value
toJSON (String
"TSRepeat" :: String)
  toJSON (TSRepeatNElseArchive Int
n) = Value -> Value
forall a. ToJSON a => a -> Value
toJSON (Value -> Value) -> Value -> Value
forall a b. (a -> b) -> a -> b
$ [Pair] -> Value
object [ Key
"TSRepeatNElseArchive" Key -> Int -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= Int
n ]
  toJSON (TSRepeatNElseDelete Int
n) = Value -> Value
forall a. ToJSON a => a -> Value
toJSON (Value -> Value) -> Value -> Value
forall a b. (a -> b) -> a -> b
$ [Pair] -> Value
object [ Key
"TSRepeatNElseDelete" Key -> Int -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= Int
n ]
instance FromJSON TimeoutStrategy where
  parseJSON :: Value -> Parser TimeoutStrategy
parseJSON Value
v = Value -> Parser TimeoutStrategy
parseText Value
v
            Parser TimeoutStrategy
-> Parser TimeoutStrategy -> Parser TimeoutStrategy
forall a. Parser a -> Parser a -> Parser a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> Value -> Parser TimeoutStrategy
parseTSRepeatNElseArchive Value
v
            Parser TimeoutStrategy
-> Parser TimeoutStrategy -> Parser TimeoutStrategy
forall a. Parser a -> Parser a -> Parser a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> Value -> Parser TimeoutStrategy
parseTSRepeatNElseDelete Value
v
    where
      -- | Parser for textual formats
      parseText :: Value -> Parser TimeoutStrategy
parseText = String
-> (Text -> Parser TimeoutStrategy)
-> Value
-> Parser TimeoutStrategy
forall a. String -> (Text -> Parser a) -> Value -> Parser a
withText String
"TimeoutStrategy (text)" ((Text -> Parser TimeoutStrategy)
 -> Value -> Parser TimeoutStrategy)
-> (Text -> Parser TimeoutStrategy)
-> Value
-> Parser TimeoutStrategy
forall a b. (a -> b) -> a -> b
$ \Text
s -> do
        case Text
s of
          Text
"TSDelete" -> TimeoutStrategy -> Parser TimeoutStrategy
forall a. a -> Parser a
forall (f :: * -> *) a. Applicative f => a -> f a
pure TimeoutStrategy
TSDelete
          Text
"TSArchive" -> TimeoutStrategy -> Parser TimeoutStrategy
forall a. a -> Parser a
forall (f :: * -> *) a. Applicative f => a -> f a
pure TimeoutStrategy
TSArchive
          Text
"TSRepeat" -> TimeoutStrategy -> Parser TimeoutStrategy
forall a. a -> Parser a
forall (f :: * -> *) a. Applicative f => a -> f a
pure TimeoutStrategy
TSRepeat
          Text
s' -> String -> Parser TimeoutStrategy
forall a. String -> Parser a
forall (m :: * -> *) a. MonadFail m => String -> m a
fail (String -> Parser TimeoutStrategy)
-> String -> Parser TimeoutStrategy
forall a b. (a -> b) -> a -> b
$ Text -> String
T.unpack Text
s'
      -- | Parser for 'TSRepeatN' object
      parseTSRepeatNElseArchive :: Value -> Parser TimeoutStrategy
parseTSRepeatNElseArchive = String
-> (Object -> Parser TimeoutStrategy)
-> Value
-> Parser TimeoutStrategy
forall a. String -> (Object -> Parser a) -> Value -> Parser a
withObject String
"TimeoutStrategy (TSRepeatNElseArchive)" ((Object -> Parser TimeoutStrategy)
 -> Value -> Parser TimeoutStrategy)
-> (Object -> Parser TimeoutStrategy)
-> Value
-> Parser TimeoutStrategy
forall a b. (a -> b) -> a -> b
$ \Object
o -> do
        Int
n <- Object
o Object -> Key -> Parser Int
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"TSRepeatNElseArchive"
        TimeoutStrategy -> Parser TimeoutStrategy
forall a. a -> Parser a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (TimeoutStrategy -> Parser TimeoutStrategy)
-> TimeoutStrategy -> Parser TimeoutStrategy
forall a b. (a -> b) -> a -> b
$ Int -> TimeoutStrategy
TSRepeatNElseArchive Int
n
      parseTSRepeatNElseDelete :: Value -> Parser TimeoutStrategy
parseTSRepeatNElseDelete = String
-> (Object -> Parser TimeoutStrategy)
-> Value
-> Parser TimeoutStrategy
forall a. String -> (Object -> Parser a) -> Value -> Parser a
withObject String
"TimeoutStrategy (TSRepeatNElseDelete)" ((Object -> Parser TimeoutStrategy)
 -> Value -> Parser TimeoutStrategy)
-> (Object -> Parser TimeoutStrategy)
-> Value
-> Parser TimeoutStrategy
forall a b. (a -> b) -> a -> b
$ \Object
o -> do
        Int
n <- Object
o Object -> Key -> Parser Int
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"TSRepeatNElseDelete"
        TimeoutStrategy -> Parser TimeoutStrategy
forall a. a -> Parser a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (TimeoutStrategy -> Parser TimeoutStrategy)
-> TimeoutStrategy -> Parser TimeoutStrategy
forall a b. (a -> b) -> a -> b
$ Int -> TimeoutStrategy
TSRepeatNElseDelete Int
n

    
-- | Metadata associated with a job.
data JobMetadata =
  JobMetadata {
                -- | What to do after job is finished successfully
                JobMetadata -> ArchiveStrategy
archiveStrategy  :: ArchiveStrategy
                -- | What to do when a job ends with error
              , JobMetadata -> ErrorStrategy
errorStrategy    :: ErrorStrategy
                -- | What to do when a job ends with timeout
              , JobMetadata -> TimeoutStrategy
timeoutStrategy  :: TimeoutStrategy
              -- | Time after which the job is considered to time-out
              -- (in seconds).
              , JobMetadata -> Int
timeout          :: Timeout
              -- | Read count so we know how many times this message
              -- was processed
              , JobMetadata -> Int
readCount        :: ReadCount
              -- | A worker might have processed a task and be
              -- killed. If 'resendWhenWorkerKilled' is 'True', this
              -- job will be resent to broker and picked up
              -- later. Otherwise it will be discarded.
              , JobMetadata -> Bool
resendWhenWorkerKilled :: Bool }
  deriving (JobMetadata -> JobMetadata -> Bool
(JobMetadata -> JobMetadata -> Bool)
-> (JobMetadata -> JobMetadata -> Bool) -> Eq JobMetadata
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: JobMetadata -> JobMetadata -> Bool
== :: JobMetadata -> JobMetadata -> Bool
$c/= :: JobMetadata -> JobMetadata -> Bool
/= :: JobMetadata -> JobMetadata -> Bool
Eq, Int -> JobMetadata -> ShowS
[JobMetadata] -> ShowS
JobMetadata -> String
(Int -> JobMetadata -> ShowS)
-> (JobMetadata -> String)
-> ([JobMetadata] -> ShowS)
-> Show JobMetadata
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> JobMetadata -> ShowS
showsPrec :: Int -> JobMetadata -> ShowS
$cshow :: JobMetadata -> String
show :: JobMetadata -> String
$cshowList :: [JobMetadata] -> ShowS
showList :: [JobMetadata] -> ShowS
Show)
instance ToJSON JobMetadata where
  toJSON :: JobMetadata -> Value
toJSON (JobMetadata { Bool
Int
TimeoutStrategy
ErrorStrategy
ArchiveStrategy
$sel:archiveStrategy:JobMetadata :: JobMetadata -> ArchiveStrategy
$sel:errorStrategy:JobMetadata :: JobMetadata -> ErrorStrategy
$sel:timeoutStrategy:JobMetadata :: JobMetadata -> TimeoutStrategy
$sel:timeout:JobMetadata :: JobMetadata -> Int
$sel:readCount:JobMetadata :: JobMetadata -> Int
resendWhenWorkerKilled :: JobMetadata -> Bool
archiveStrategy :: ArchiveStrategy
errorStrategy :: ErrorStrategy
timeoutStrategy :: TimeoutStrategy
timeout :: Int
readCount :: Int
resendWhenWorkerKilled :: Bool
.. }) =
    Value -> Value
forall a. ToJSON a => a -> Value
toJSON (Value -> Value) -> Value -> Value
forall a b. (a -> b) -> a -> b
$ [Pair] -> Value
object [
          Key
"astrat" Key -> ArchiveStrategy -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= ArchiveStrategy
archiveStrategy
        , Key
"estrat" Key -> ErrorStrategy -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= ErrorStrategy
errorStrategy
        , Key
"tstrat" Key -> TimeoutStrategy -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= TimeoutStrategy
timeoutStrategy
        , Key
"timeout" Key -> Int -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= Int
timeout
        , Key
"readCount" Key -> Int -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= Int
readCount
        , Key
"resendWhenWorkerKilled" Key -> Bool -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= Bool
resendWhenWorkerKilled
        ]
instance FromJSON JobMetadata where
  parseJSON :: Value -> Parser JobMetadata
parseJSON = String
-> (Object -> Parser JobMetadata) -> Value -> Parser JobMetadata
forall a. String -> (Object -> Parser a) -> Value -> Parser a
withObject String
"JobMetadata" ((Object -> Parser JobMetadata) -> Value -> Parser JobMetadata)
-> (Object -> Parser JobMetadata) -> Value -> Parser JobMetadata
forall a b. (a -> b) -> a -> b
$ \Object
o -> do
    ArchiveStrategy
archiveStrategy <- Object
o Object -> Key -> Parser ArchiveStrategy
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"astrat"
    ErrorStrategy
errorStrategy <- Object
o Object -> Key -> Parser ErrorStrategy
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"estrat"
    TimeoutStrategy
timeoutStrategy <- Object
o Object -> Key -> Parser TimeoutStrategy
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"tstrat"
    Int
timeout <- Object
o Object -> Key -> Parser Int
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"timeout"
    Int
readCount <- Object
o Object -> Key -> Parser Int
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"readCount"
    Bool
resendWhenWorkerKilled <- Object
o Object -> Key -> Parser Bool
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"resendWhenWorkerKilled"
    JobMetadata -> Parser JobMetadata
forall a. a -> Parser a
forall (m :: * -> *) a. Monad m => a -> m a
return (JobMetadata -> Parser JobMetadata)
-> JobMetadata -> Parser JobMetadata
forall a b. (a -> b) -> a -> b
$ JobMetadata { Bool
Int
TimeoutStrategy
ErrorStrategy
ArchiveStrategy
$sel:archiveStrategy:JobMetadata :: ArchiveStrategy
$sel:errorStrategy:JobMetadata :: ErrorStrategy
$sel:timeoutStrategy:JobMetadata :: TimeoutStrategy
$sel:timeout:JobMetadata :: Int
$sel:readCount:JobMetadata :: Int
resendWhenWorkerKilled :: Bool
archiveStrategy :: ArchiveStrategy
errorStrategy :: ErrorStrategy
timeoutStrategy :: TimeoutStrategy
timeout :: Int
readCount :: Int
resendWhenWorkerKilled :: Bool
.. }

-- | For a typical 'Job' it's probably sane to just archive it no
-- matter how it finished.
defaultMetadata :: JobMetadata
defaultMetadata :: JobMetadata
defaultMetadata =
  JobMetadata { $sel:archiveStrategy:JobMetadata :: ArchiveStrategy
archiveStrategy = ArchiveStrategy
ASArchive
              , $sel:errorStrategy:JobMetadata :: ErrorStrategy
errorStrategy = ErrorStrategy
ESArchive
              , $sel:timeoutStrategy:JobMetadata :: TimeoutStrategy
timeoutStrategy = TimeoutStrategy
TSArchive
              , $sel:timeout:JobMetadata :: Int
timeout = Int
10
              , $sel:readCount:JobMetadata :: Int
readCount = Int
0
              , resendWhenWorkerKilled :: Bool
resendWhenWorkerKilled = Bool
True }
    
-- | Worker 'Job' is 'a' (defining action to call via 'performAction')
-- together with associated 'JobMetadata'.
data Job a =
  Job { forall a. Job a -> a
job :: a
      , forall a. Job a -> JobMetadata
metadata :: JobMetadata }
  deriving (Job a -> Job a -> Bool
(Job a -> Job a -> Bool) -> (Job a -> Job a -> Bool) -> Eq (Job a)
forall a. Eq a => Job a -> Job a -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: forall a. Eq a => Job a -> Job a -> Bool
== :: Job a -> Job a -> Bool
$c/= :: forall a. Eq a => Job a -> Job a -> Bool
/= :: Job a -> Job a -> Bool
Eq, Int -> Job a -> ShowS
[Job a] -> ShowS
Job a -> String
(Int -> Job a -> ShowS)
-> (Job a -> String) -> ([Job a] -> ShowS) -> Show (Job a)
forall a. Show a => Int -> Job a -> ShowS
forall a. Show a => [Job a] -> ShowS
forall a. Show a => Job a -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: forall a. Show a => Int -> Job a -> ShowS
showsPrec :: Int -> Job a -> ShowS
$cshow :: forall a. Show a => Job a -> String
show :: Job a -> String
$cshowList :: forall a. Show a => [Job a] -> ShowS
showList :: [Job a] -> ShowS
Show)

-- | Given worker 'Job' 'a', return the 'a' part
getJob :: Job a -> a
getJob :: forall a. Job a -> a
getJob (Job { a
$sel:job:Job :: forall a. Job a -> a
job :: a
job }) = a
job
instance ToJSON a => ToJSON (Job a) where
  toJSON :: Job a -> Value
toJSON (Job { a
JobMetadata
$sel:job:Job :: forall a. Job a -> a
$sel:metadata:Job :: forall a. Job a -> JobMetadata
job :: a
metadata :: JobMetadata
.. }) =
    Value -> Value
forall a. ToJSON a => a -> Value
toJSON (Value -> Value) -> Value -> Value
forall a b. (a -> b) -> a -> b
$ [Pair] -> Value
object [
          Key
"metadata" Key -> JobMetadata -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= JobMetadata
metadata
        , Key
"job" Key -> a -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= a
job
        ]
instance FromJSON a => FromJSON (Job a) where
  parseJSON :: Value -> Parser (Job a)
parseJSON = String -> (Object -> Parser (Job a)) -> Value -> Parser (Job a)
forall a. String -> (Object -> Parser a) -> Value -> Parser a
withObject String
"Job" ((Object -> Parser (Job a)) -> Value -> Parser (Job a))
-> (Object -> Parser (Job a)) -> Value -> Parser (Job a)
forall a b. (a -> b) -> a -> b
$ \Object
o -> do
    JobMetadata
metadata <- Object
o Object -> Key -> Parser JobMetadata
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"metadata"
    a
job <- Object
o Object -> Key -> Parser a
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"job"
    Job a -> Parser (Job a)
forall a. a -> Parser a
forall (m :: * -> *) a. Monad m => a -> m a
return (Job a -> Parser (Job a)) -> Job a -> Parser (Job a)
forall a b. (a -> b) -> a -> b
$ Job { a
JobMetadata
$sel:job:Job :: a
$sel:metadata:Job :: JobMetadata
metadata :: JobMetadata
job :: a
.. }

-- | For a given 'Job', return it's 'timeout' value from
-- 'JobMetadata'.
jobTimeout :: Job a -> Timeout
jobTimeout :: forall a. Job a -> Int
jobTimeout (Job { JobMetadata
$sel:metadata:Job :: forall a. Job a -> JobMetadata
metadata :: JobMetadata
metadata }) = JobMetadata -> Int
timeout JobMetadata
metadata
    

-- | Main state for a running worker ('b' is 'Broker', 'a' is the
-- underlying message).
--
-- 'a' is the underlying message specification and is
-- implementation-dependent, e.g. can be of JSON form
--
-- @
-- {"function": ..., "arguments": [...]}
-- @
--
-- 'Job' 'a' is worker's wrapper around that message with metadata
-- (corresponds to broker 'Async.Worker.Broker.Types.Message' 'b'
-- 'a'). 'BrokerMessage' 'b 'a', on the other hand, is what we get
-- when the broker reads that message).
--
-- Note that our underlying 'Broker' handles messages of type 'Job'
-- 'a'.
data State b a =
  State { forall b a. State b a -> Broker b (Job a)
broker            :: Broker b (Job a)
        -- | Queue associated with this worker. If you want to support
        -- more queues, spawn more workers.
        , forall b a. State b a -> Queue
queueName         :: Queue
        -- | Name of this worker (useful for debugging).
        , forall b a. State b a -> String
name              :: String
        -- | Actions that will be performed when message 'a' arrives.
        , forall b a. State b a -> PerformAction b a
performAction     :: PerformAction b a
        
        -- | Event emitted after job was received from broker
        , forall b a. State b a -> WorkerJobEvent b a
onMessageReceived :: WorkerJobEvent b a
        -- | Event emitted after job was finished successfully
        , forall b a. State b a -> WorkerJobEvent b a
onJobFinish       :: WorkerJobEvent b a
        -- | Event emitted after job timed out
        , forall b a. State b a -> WorkerJobEvent b a
onJobTimeout      :: WorkerJobEvent b a
        -- | Event emitted after job ended with error
        , forall b a. State b a -> WorkerJobErrorEvent b a
onJobError        :: WorkerJobErrorEvent b a
        -- | Event emitted when worker is safely killed (don't overuse it)
        , forall b a. State b a -> WorkerMJobEvent b a
onWorkerKilledSafely :: WorkerMJobEvent b a }

-- | Helper function to call an action for given worker, for a
-- 'BrokerMessage'.
runAction :: State b a -> BrokerMessage b (Job a) -> IO ()
runAction :: forall b a. State b a -> BrokerMessage b (Job a) -> IO ()
runAction State b a
state BrokerMessage b (Job a)
brokerMessage = (State b a -> PerformAction b a
forall b a. State b a -> PerformAction b a
performAction State b a
state) State b a
state BrokerMessage b (Job a)
brokerMessage

type WorkerJobEvent b a = Maybe (State b a -> BrokerMessage b (Job a) -> IO ())
type WorkerJobErrorEvent b a = Maybe (State b a -> BrokerMessage b (Job a) -> SomeException -> IO ())
type WorkerMJobEvent b a = Maybe (State b a -> Maybe (BrokerMessage b (Job a)) -> IO ())

-- | Callback definition (what to execute when a message arrives)
type PerformAction b a =
  State b a -> BrokerMessage b (Job a) -> IO ()


-- | /TODO/ 'Show' 'a' could be removed. Any logging can be done as
-- part of 'on' events in 'State'.
type HasWorkerBroker b a = ( MessageBroker b (Job a), Typeable a, Typeable b, Show a )

-- | Helper function to format a string with worker name (for logging)
formatStr :: State b a -> String -> String
formatStr :: forall b a. State b a -> ShowS
formatStr (State { String
$sel:name:State :: forall b a. State b a -> String
name :: String
name }) String
msg =
  String
"[" String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
name String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
"] " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
msg

-- | An exception, thrown when job times out
data JobTimeout b a =
  JobTimeout { forall b a. JobTimeout b a -> BrokerMessage b (Job a)
jtBMessage  :: BrokerMessage b (Job a)
             , forall b a. JobTimeout b a -> Int
jtTimeout   :: Timeout }
deriving instance (Show (BrokerMessage b (Job a))) => Show (JobTimeout b a)
instance (Show (BrokerMessage b (Job a)), Typeable a, Typeable b) => Exception (JobTimeout b a)