| Copyright | (c) Gargantext 2024-Present |
|---|---|
| License | AGPL |
| Maintainer | gargantext@iscpif.fr |
| Stability | experimental |
| Portability | POSIX |
| Safe Haskell | Safe-Inferred |
| Language | Haskell2010 |
Async.Worker.Broker.Types
Contents
Description
Broker typeclass definition.
Synopsis
- newtype Queue = Queue {}
- renderQueue :: Queue -> String
- newtype TimeoutS = TimeoutS {}
- class (Eq (MessageId b), Show (MessageId b), Ord (MessageId b), ToJSON (MessageId b), FromJSON (MessageId b), Show (BrokerMessage b a)) => MessageBroker b a where
- data Broker b a :: Type
- data BrokerMessage b a :: Type
- data Message b a :: Type
- data MessageId b :: Type
- data BrokerInitParams b a :: Type
- messageId :: BrokerMessage b a -> MessageId b
- getMessage :: BrokerMessage b a -> Message b a
- toMessage :: a -> Message b a
- toA :: Message b a -> a
- initBroker :: BrokerInitParams b a -> IO (Broker b a)
- deinitBroker :: Broker b a -> IO ()
- createQueue :: Broker b a -> Queue -> IO ()
- dropQueue :: Broker b a -> Queue -> IO ()
- readMessageWaiting :: Broker b a -> Queue -> IO (BrokerMessage b a)
- popMessageWaiting :: Broker b a -> Queue -> IO (BrokerMessage b a)
- setMessageTimeout :: Broker b a -> Queue -> MessageId b -> TimeoutS -> IO ()
- sendMessage :: Broker b a -> Queue -> Message b a -> IO (MessageId b)
- sendMessageDelayed :: Broker b a -> Queue -> Message b a -> TimeoutS -> IO (MessageId b)
- deleteMessage :: Broker b a -> Queue -> MessageId b -> IO ()
- archiveMessage :: Broker b a -> Queue -> MessageId b -> IO ()
- getQueueSize :: Broker b a -> Queue -> IO Int
- getArchivedMessage :: Broker b a -> Queue -> MessageId b -> IO (Maybe (BrokerMessage b a))
- listPendingMessageIds :: Broker b a -> Queue -> IO [MessageId b]
- getMessageById :: Broker b a -> Queue -> MessageId b -> IO (Maybe (BrokerMessage b a))
- type SerializableMessage a = (FromJSON a, ToJSON a, Typeable a)
Documentation
renderQueue :: Queue -> String Source #
This is just a wrapper so that you know you use seconds as units
for Int.
Main broker typeclass
NOTE There are 3 types of messages here:
athe underlying, user-defined messageJobaworker definition, containing message metadataBrokerMessagemessagem, i.e. forPGMQ, it returns things like:msgId,readCt,enqueuedAt,vt
Also:
ais read-writeJobais read-writeBrokerMessageis read-only, i.e. we can't save it to broker and it doesn't make sense to construct it on Haskell side. Instead, we saveJobaand getBrokerMessagewhen reading. In this sense, read and send are not symmetrical (similarly, Opaleye has Read and Write tables).
class (Eq (MessageId b), Show (MessageId b), Ord (MessageId b), ToJSON (MessageId b), FromJSON (MessageId b), Show (BrokerMessage b a)) => MessageBroker b a where Source #
This is an interface for basic broker functionality.
Associated Types
data Broker b a :: Type Source #
Data representing the broker
data BrokerMessage b a :: Type Source #
Data represenging message that is returned by broker. You're
not supposed to construct this type yourself (in similar spirit,
Opaleye uses selectTable
https://hackage.haskell.org/package/opaleye-0.10.3.1/docs/Opaleye-Table.html#v:selectTable)
data Message b a :: Type Source #
Data that we serialize into broker (worker will wrap this into
Job a)
data MessageId b :: Type Source #
The message id type (needed for delete/archive operations)
data BrokerInitParams b a :: Type Source #
All the parameters needed for broker intialization
Methods
messageId :: BrokerMessage b a -> MessageId b Source #
Operation for getting the MessageId from BrokerMessage
getMessage :: BrokerMessage b a -> Message b a Source #
BrokerMessage contains Message inside, this is a
deconstructor for BrokerMessage
toMessage :: a -> Message b a Source #
Convert a to Message b a
toA :: Message b a -> a Source #
Convert Message b a to a
initBroker :: BrokerInitParams b a -> IO (Broker b a) Source #
Initialize broker with given BrokerInitParams.
deinitBroker :: Broker b a -> IO () Source #
Deconstruct broker (e.g. close DB connection)
createQueue :: Broker b a -> Queue -> IO () Source #
Create new queue with given name. Optionally any other initializations can be added here.
dropQueue :: Broker b a -> Queue -> IO () Source #
Drop queue
readMessageWaiting :: Broker b a -> Queue -> IO (BrokerMessage b a) Source #
Read message from queue, waiting for it if not present (NOTE: for
pgmq, this leaves the message in queue, you need to use
setMessageTimeout to prevent other workers from seeing this
message).
popMessageWaiting :: Broker b a -> Queue -> IO (BrokerMessage b a) Source #
Pop message from queue, waiting for it if not present
setMessageTimeout :: Broker b a -> Queue -> MessageId b -> TimeoutS -> IO () Source #
We sometimes need a way to tell the broker that a message shouldn't
be visible for given amount of time (e.g. 'visibility timeout'
setting in PGMQ). The broker operates only on a level and isn't
aware of Job with its JobMetadata. Hence, it's the worker's
responsibility to properly set timeout after message is read.
sendMessage :: Broker b a -> Queue -> Message b a -> IO (MessageId b) Source #
Send message
sendMessageDelayed :: Broker b a -> Queue -> Message b a -> TimeoutS -> IO (MessageId b) Source #
Send message with initial delay
deleteMessage :: Broker b a -> Queue -> MessageId b -> IO () Source #
Delete message
archiveMessage :: Broker b a -> Queue -> MessageId b -> IO () Source #
Archive message
getQueueSize :: Broker b a -> Queue -> IO Int Source #
Queue size
getArchivedMessage :: Broker b a -> Queue -> MessageId b -> IO (Maybe (BrokerMessage b a)) Source #
Read archived message
listPendingMessageIds :: Broker b a -> Queue -> IO [MessageId b] Source #
List all pending message ids
getMessageById :: Broker b a -> Queue -> MessageId b -> IO (Maybe (BrokerMessage b a)) Source #
Get message by it's id