| Copyright | (c) 2023 Sean Hess |
|---|---|
| License | BSD3 |
| Maintainer | Sean Hess <seanhess@gmail.com> |
| Stability | experimental |
| Portability | portable |
| Safe Haskell | None |
| Language | Haskell2010 |
Network.AMQP.Worker
Description
Type safe and simplified message queues with AMQP
Synopsis
- newtype Key a msg = Key [Bind]
- data Bind
- data Route
- word :: Text -> Key a msg -> Key a msg
- key :: Text -> Key Route msg
- any1 :: Key a msg -> Key Bind msg
- many :: Key a msg -> Key Bind msg
- connect :: MonadIO m => ConnectionOpts -> m Connection
- fromURI :: String -> Either String ConnectionOpts
- data Connection
- publish :: (RequireRoute a, ToJSON msg, MonadIO m) => Connection -> Key a msg -> msg -> m ()
- queue :: MonadIO m => Connection -> QueuePrefix -> Key a msg -> m (Queue msg)
- queueNamed :: MonadIO m => Connection -> QueueName -> Key a msg -> m (Queue msg)
- data Queue msg = Queue (Key Bind msg) QueueName
- queueName :: QueuePrefix -> Key a msg -> QueueName
- type QueueName = Text
- newtype QueuePrefix = QueuePrefix Text
- data ParseError = ParseError String ByteString
- data Message a = Message {
- body :: ByteString
- value :: a
- takeMessage :: (MonadIO m, FromJSON a) => Connection -> Queue a -> m (Message a)
- worker :: (FromJSON a, MonadIO m) => Connection -> Queue a -> (Message a -> m ()) -> m ()
- def :: Default a => a
How to use this library
Define keys to identify how messages will be published and what the message type is
import Network.AMQP.Worker as Worker
data Greeting = Greeting
{ message :: Text }
deriving (Generic, Show, Eq)
instance FromJSON Greeting
instance ToJSON Greeting
newGreetings :: Key Routing Greeting
newGreetings = key "greetings" & word "new"Connect to AMQP and publish a message
conn <- Worker.connect (fromURI "amqp://guest:guest@localhost:5672") Worker.publish conn newGreetings $ Greeting "hello"
Create a queue to receive messages. You can bind direclty to the Routing Key to ensure it is delivered once
q <- Worker.queue conn "new" newMessages :: IO (Queue Greeting) m <- Worker.takeMessage conn q print (value m)
Define dynamic Routing Keys to receive many kinds of messages
let newMessages = key "messages" & any1 & word "new" q <- Worker.queue conn def newMessages :: IO (Queue Greeting) m <- Worker.takeMessage conn q print (value m)
Create a worker to conintually process messages
forkIO $ Worker.worker conn q $ \m -> do
print (value m)Binding and Routing Keys
Messages are published with a specific identifier called a Routing key. Queues can use Binding Keys to control which messages are delivered to them.
Routing keys have no dynamic component and can be used to publish messages
commentsKey :: Key Route Comment commentsKey = key "posts" & word "new"
Binding keys can contain wildcards, only used for matching messages
commentsKey :: Key Bind Comment commentsKey = key "posts" & any1 & word "comments" & many
word :: Text -> Key a msg -> Key a msg Source #
A specific word. Can be used to chain Routing keys or Binding keys
any1 :: Key a msg -> Key Bind msg Source #
Match any one word. Equivalent to *. Converts to a Binding key and can no longer be used to publish messaages
many :: Key a msg -> Key Bind msg Source #
Match zero or more words. Equivalient to #. Converts to a Binding key and can no longer be used to publish messages
Connecting
connect :: MonadIO m => ConnectionOpts -> m Connection Source #
Connect to the AMQP server.
conn <- connect (fromURI "amqp://guest:guest@localhost:5672")
fromURI :: String -> Either String ConnectionOpts #
Parses an AMQP standard URI of the form amqp://user:password@host:port/vhost and returns a ConnectionOpts for use with openConnection''.
To pass multiple servers, separate them by comma, like: amqp://user:password@host:port,host2:port2/vhost
Any of these fields may be empty and will be replaced with defaults from amqp://guest:guest@localhost:5672/
When parsing fails, a Left String will be returned with a human-readable error-message.
data Connection Source #
Sending Messages
publish :: (RequireRoute a, ToJSON msg, MonadIO m) => Connection -> Key a msg -> msg -> m () Source #
send a message to a queue. Enforces that the message type and queue name are correct at the type level
let newUsers = key "users" & word "new":: Key Route User publish conn newUsers (User "username")
Publishing to a Binding Key results in an error
-- Compiler error! This doesn't make sense let users = key "users" & many :: Key Binding User publish conn users (User "username")
Initializing queues
queue :: MonadIO m => Connection -> QueuePrefix -> Key a msg -> m (Queue msg) Source #
queueNamed :: MonadIO m => Connection -> QueueName -> Key a msg -> m (Queue msg) Source #
Create a queue to receive messages matching the binding key. Each queue with a unique name will be delivered a separate copy of the messsage. Workers operating on the same queue, or on queues with the same name will load balance
A queue is an inbox for messages to be delivered
queueName :: QueuePrefix -> Key a msg -> QueueName Source #
Name a queue with a prefix and the binding key name. Useful for seeing at a glance which queues are receiving which messages
-- "main messages.new" queueName "main" (key "messages" & word "new")
newtype QueuePrefix Source #
Constructors
| QueuePrefix Text |
Instances
| Default QueuePrefix Source # | |
Defined in Network.AMQP.Worker.Queue Methods def :: QueuePrefix # | |
| IsString QueuePrefix Source # | |
Defined in Network.AMQP.Worker.Queue Methods fromString :: String -> QueuePrefix # | |
| Show QueuePrefix Source # | |
Defined in Network.AMQP.Worker.Queue Methods showsPrec :: Int -> QueuePrefix -> ShowS # show :: QueuePrefix -> String # showList :: [QueuePrefix] -> ShowS # | |
| Eq QueuePrefix Source # | |
Defined in Network.AMQP.Worker.Queue | |
Messages
data ParseError Source #
Constructors
| ParseError String ByteString |
Instances
| Exception ParseError Source # | |
Defined in Network.AMQP.Worker.Message Methods toException :: ParseError -> SomeException # fromException :: SomeException -> Maybe ParseError # displayException :: ParseError -> String # backtraceDesired :: ParseError -> Bool # | |
| Show ParseError Source # | |
Defined in Network.AMQP.Worker.Message Methods showsPrec :: Int -> ParseError -> ShowS # show :: ParseError -> String # showList :: [ParseError] -> ShowS # | |
a parsed message from the queue
Constructors
| Message | |
Fields
| |
takeMessage :: (MonadIO m, FromJSON a) => Connection -> Queue a -> m (Message a) Source #