haskell-bee-0.1.0.0: A lightweight library for asynchronous job workers with multiple broker backends
Copyright(c) Gargantext 2024-Present
LicenseAGPL
Maintainergargantext@iscpif.fr
Stabilityexperimental
PortabilityPOSIX
Safe HaskellSafe-Inferred
LanguageHaskell2010

Async.Worker.Broker.Types

Description

Broker typeclass definition.

Synopsis

Documentation

newtype Queue Source #

Constructors

Queue 

Fields

Instances

Instances details
IsString Queue Source # 
Instance details

Defined in Async.Worker.Broker.Types

Methods

fromString :: String -> Queue #

Semigroup Queue Source # 
Instance details

Defined in Async.Worker.Broker.Types

Methods

(<>) :: Queue -> Queue -> Queue #

sconcat :: NonEmpty Queue -> Queue #

stimes :: Integral b => b -> Queue -> Queue #

Show Queue Source # 
Instance details

Defined in Async.Worker.Broker.Types

Methods

showsPrec :: Int -> Queue -> ShowS #

show :: Queue -> String #

showList :: [Queue] -> ShowS #

Eq Queue Source # 
Instance details

Defined in Async.Worker.Broker.Types

Methods

(==) :: Queue -> Queue -> Bool #

(/=) :: Queue -> Queue -> Bool #

Ord Queue Source # 
Instance details

Defined in Async.Worker.Broker.Types

Methods

compare :: Queue -> Queue -> Ordering #

(<) :: Queue -> Queue -> Bool #

(<=) :: Queue -> Queue -> Bool #

(>) :: Queue -> Queue -> Bool #

(>=) :: Queue -> Queue -> Bool #

max :: Queue -> Queue -> Queue #

min :: Queue -> Queue -> Queue #

newtype TimeoutS Source #

This is just a wrapper so that you know you use seconds as units for Int.

Constructors

TimeoutS 

Fields

Main broker typeclass

NOTE There are 3 types of messages here:

Also:

  • a is read-write
  • Job a is read-write
  • BrokerMessage is read-only, i.e. we can't save it to broker and it doesn't make sense to construct it on Haskell side. Instead, we save Job a and get BrokerMessage when reading. In this sense, read and send are not symmetrical (similarly, Opaleye has Read and Write tables).

class (Eq (MessageId b), Show (MessageId b), Ord (MessageId b), ToJSON (MessageId b), FromJSON (MessageId b), Show (BrokerMessage b a)) => MessageBroker b a where Source #

This is an interface for basic broker functionality.

Associated Types

data Broker b a :: Type Source #

Data representing the broker

data BrokerMessage b a :: Type Source #

Data represenging message that is returned by broker. You're not supposed to construct this type yourself (in similar spirit, Opaleye uses selectTable https://hackage.haskell.org/package/opaleye-0.10.3.1/docs/Opaleye-Table.html#v:selectTable)

data Message b a :: Type Source #

Data that we serialize into broker (worker will wrap this into Job a)

data MessageId b :: Type Source #

The message id type (needed for delete/archive operations)

data BrokerInitParams b a :: Type Source #

All the parameters needed for broker intialization

Methods

messageId :: BrokerMessage b a -> MessageId b Source #

Operation for getting the MessageId from BrokerMessage

getMessage :: BrokerMessage b a -> Message b a Source #

BrokerMessage contains Message inside, this is a deconstructor for BrokerMessage

toMessage :: a -> Message b a Source #

Convert a to Message b a

toA :: Message b a -> a Source #

Convert Message b a to a

initBroker :: BrokerInitParams b a -> IO (Broker b a) Source #

Initialize broker with given BrokerInitParams.

deinitBroker :: Broker b a -> IO () Source #

Deconstruct broker (e.g. close DB connection)

createQueue :: Broker b a -> Queue -> IO () Source #

Create new queue with given name. Optionally any other initializations can be added here.

dropQueue :: Broker b a -> Queue -> IO () Source #

Drop queue

readMessageWaiting :: Broker b a -> Queue -> IO (BrokerMessage b a) Source #

Read message from queue, waiting for it if not present (NOTE: for pgmq, this leaves the message in queue, you need to use setMessageTimeout to prevent other workers from seeing this message).

popMessageWaiting :: Broker b a -> Queue -> IO (BrokerMessage b a) Source #

Pop message from queue, waiting for it if not present

setMessageTimeout :: Broker b a -> Queue -> MessageId b -> TimeoutS -> IO () Source #

We sometimes need a way to tell the broker that a message shouldn't be visible for given amount of time (e.g. 'visibility timeout' setting in PGMQ). The broker operates only on a level and isn't aware of Job with its JobMetadata. Hence, it's the worker's responsibility to properly set timeout after message is read.

sendMessage :: Broker b a -> Queue -> Message b a -> IO (MessageId b) Source #

Send message

sendMessageDelayed :: Broker b a -> Queue -> Message b a -> TimeoutS -> IO (MessageId b) Source #

Send message with initial delay

deleteMessage :: Broker b a -> Queue -> MessageId b -> IO () Source #

Delete message

archiveMessage :: Broker b a -> Queue -> MessageId b -> IO () Source #

Archive message

getQueueSize :: Broker b a -> Queue -> IO Int Source #

Queue size

getArchivedMessage :: Broker b a -> Queue -> MessageId b -> IO (Maybe (BrokerMessage b a)) Source #

Read archived message

listPendingMessageIds :: Broker b a -> Queue -> IO [MessageId b] Source #

List all pending message ids

getMessageById :: Broker b a -> Queue -> MessageId b -> IO (Maybe (BrokerMessage b a)) Source #

Get message by it's id

type SerializableMessage a = (FromJSON a, ToJSON a, Typeable a) Source #

We want to assert some way to serialize a message. JSON is assumed here. This isn't strictly broker-related but nevertheless is useful.