| Safe Haskell | None |
|---|---|
| Language | Haskell2010 |
Database.Redis.PubSub
Synopsis
- publish :: RedisCtx m f => ByteString -> ByteString -> m (f Integer)
- pubSub :: PubSub -> (Message -> IO PubSub) -> Redis ()
- data Message
- data PubSub
- subscribe :: [ByteString] -> PubSub
- unsubscribe :: [ByteString] -> PubSub
- psubscribe :: [ByteString] -> PubSub
- punsubscribe :: [ByteString] -> PubSub
- pubSubForever :: Connection -> PubSubController -> IO () -> IO ()
- type RedisChannel = ByteString
- type RedisPChannel = ByteString
- type MessageCallback = ByteString -> IO ()
- type PMessageCallback = RedisChannel -> ByteString -> IO ()
- data PubSubController
- newPubSubController :: MonadIO m => [(RedisChannel, MessageCallback)] -> [(RedisPChannel, PMessageCallback)] -> m PubSubController
- currentChannels :: FunctorMonadIO m => PubSubController -> m [RedisChannel]
- currentPChannels :: FunctorMonadIO m => PubSubController -> m [RedisPChannel]
- addChannels :: MonadIO m => PubSubController -> [(RedisChannel, MessageCallback)] -> [(RedisPChannel, PMessageCallback)] -> m UnregisterCallbacksAction
- addChannelsAndWait :: MonadIO m => PubSubController -> [(RedisChannel, MessageCallback)] -> [(RedisPChannel, PMessageCallback)] -> m UnregisterCallbacksAction
- removeChannels :: MonadIO m => PubSubController -> [RedisChannel] -> [RedisPChannel] -> m ()
- removeChannelsAndWait :: MonadIO m => PubSubController -> [RedisChannel] -> [RedisPChannel] -> m ()
- type UnregisterCallbacksAction = IO ()
- pendingChannels :: MonadIO m => PubSubController -> m (HashSet RedisChannel)
- pendingPatternChannels :: MonadIO m => PubSubController -> m (HashSet RedisPChannel)
- withPubSub :: Connection -> [ByteString] -> [ByteString] -> (STM Message -> IO r) -> IO r
Documentation
Arguments
| :: RedisCtx m f | |
| => ByteString | channel |
| -> ByteString | message |
| -> m (f Integer) |
Post a message to a channel (http://redis.io/commands/publish).
Subscribing to channels
There are two Pub/Sub implementations. First, there is a single-threaded implementation pubSub
which is simpler to use but has the restriction that subscription changes can only be made in
response to a message. Secondly, there is a more complicated Pub/Sub controller pubSubForever
that uses concurrency to support changing subscriptions at any time but requires more setup.
You should only use one or the other. In addition, no types or utility functions (that are part
of the public API) are shared, so functions or types in one of the following sections cannot
be used for the other. In particular, be aware that they use different utility functions to subscribe
and unsubscribe to channels.
Single-thread Pub/Sub
Listens to published messages on subscribed channels and channels matching the subscribed patterns. For documentation on the semantics of Redis Pub/Sub see http://redis.io/topics/pubsub.
The given callback function is called for each received message.
Subscription changes are triggered by the returned PubSub. To keep
subscriptions unchanged, the callback can return mempty.
Example: Subscribe to the "news" channel indefinitely.
pubSub (subscribe ["news"]) $ \msg -> do
putStrLn $ "Message from " ++ show (msgChannel msg)
return mempty
Example: Receive a single message from the "chat" channel.
pubSub (subscribe ["chat"]) $ \msg -> do
putStrLn $ "Message from " ++ show (msgChannel msg)
return $ unsubscribe ["chat"]
It should be noted that Redis Pub/Sub by its nature is asynchronous
so returning unsubscribe does not mean that callback won't be able
to receive any further messages. And to guarantee that you won't
won't process messages after unsubscription and won't unsubscribe
from the same channel more than once you need to use IORef or
something similar
Encapsulates subscription changes. Use subscribe, unsubscribe,
psubscribe, punsubscribe or mempty to construct a value. Combine
values by using the Monoid interface, i.e. mappend and mconcat.
Arguments
| :: [ByteString] | channel |
| -> PubSub |
Listen for messages published to the given channels (http://redis.io/commands/subscribe).
Arguments
| :: [ByteString] | channel |
| -> PubSub |
Stop listening for messages posted to the given channels (http://redis.io/commands/unsubscribe).
Arguments
| :: [ByteString] | pattern |
| -> PubSub |
Listen for messages published to channels matching the given patterns (http://redis.io/commands/psubscribe).
Arguments
| :: [ByteString] | pattern |
| -> PubSub |
Stop listening for messages posted to channels matching the given patterns (http://redis.io/commands/punsubscribe).
Continuous Pub/Sub message controller
Arguments
| :: Connection | The connection pool |
| -> PubSubController | The controller which keeps track of all subscriptions and handlers |
| -> IO () | This action is executed once Redis acknowledges that all the subscriptions in
the controller are now subscribed. You can use this after an exception (such as
|
| -> IO () |
Open a connection to the Redis server, register to all channels in the PubSubController,
and process messages and subscription change requests forever. The only way this will ever
exit is if there is an exception from the network code or an unhandled exception
in a MessageCallback or PMessageCallback. For example, if the network connection to Redis
dies, pubSubForever will throw a ConnectionLost. When such an exception is
thrown, you can recall pubSubForever with the same PubSubController which will open a
new connection and resubscribe to all the channels which are tracked in the PubSubController.
The general pattern is therefore during program startup create a PubSubController and fork
a thread which calls pubSubForever in a loop (using an exponential backoff algorithm
such as the retry package to not hammer the Redis
server if it does die). For example,
myhandler :: ByteString -> IO ()
myhandler msg = putStrLn $ unpack $ decodeUtf8 msg
onInitialComplete :: IO ()
onInitialComplete = putStrLn "Redis acknowledged that mychannel is now subscribed"
main :: IO ()
main = do
conn <- connect defaultConnectInfo
pubSubCtrl <- newPubSubController [("mychannel", myhandler)] []
concurrently ( forever $
pubSubForever conn pubSubCtrl onInitialComplete
`catch` (\(e :: SomeException) -> do
putStrLn $ "Got error: " ++ show e
threadDelay $ 50*1000) -- TODO: use exponential backoff
) $ restOfYourProgram
{- elsewhere in your program, use pubSubCtrl to change subscriptions -}
At most one active pubSubForever can be running against a single PubSubController at any time. If
two active calls to pubSubForever share a single PubSubController there will be deadlocks. If
you do want to process messages using multiple connections to Redis, you can create more than one
PubSubController. For example, create one PubSubController for each getNumCapabilities
and then create a Haskell thread bound to each capability each calling pubSubForever in a loop.
This will create one network connection per controller/capability and allow you to
register separate channels and callbacks for each controller, spreading the load across the capabilities.
type RedisChannel = ByteString Source #
A Redis channel name
type RedisPChannel = ByteString Source #
A Redis pattern channel name
type MessageCallback = ByteString -> IO () Source #
A handler for a message from a subscribed channel. The callback is passed the message content.
Messages are processed synchronously in the receiving thread, so if the callback
takes a long time it will block other callbacks and other messages from being
received. If you need to move long-running work to a different thread, we suggest
you use TBQueue with a reasonable bound, so that if messages are arriving faster
than you can process them, you do eventually block.
If the callback throws an exception, the exception will be thrown from pubSubForever
which will cause the entire Redis connection for all subscriptions to be closed.
As long as you call pubSubForever in a loop you will reconnect to your subscribed
channels, but you should probably add an exception handler to each callback to
prevent this.
type PMessageCallback = RedisChannel -> ByteString -> IO () Source #
A handler for a message from a psubscribed channel. The callback is passed the channel the message was sent on plus the message content.
Similar to MessageCallback, callbacks are executed synchronously and any exceptions
are rethrown from pubSubForever.
data PubSubController Source #
A controller that stores a set of channels, pattern channels, and callbacks.
It allows you to manage Pub/Sub subscriptions and pattern subscriptions and alter them at
any time throughout the life of your program.
You should typically create the controller at the start of your program and then store it
through the life of your program, using addChannels and removeChannels to update the
current subscriptions.
Arguments
| :: MonadIO m | |
| => [(RedisChannel, MessageCallback)] | the initial subscriptions |
| -> [(RedisPChannel, PMessageCallback)] | the initial pattern subscriptions |
| -> m PubSubController |
Create a new PubSubController. Note that this does not subscribe to any channels, it just
creates the controller. The subscriptions will happen once pubSubForever is called.
currentChannels :: FunctorMonadIO m => PubSubController -> m [RedisChannel] Source #
Get the list of current channels in the PubSubController. WARNING! This might not
exactly reflect the subscribed channels in the Redis server, because there is a delay
between adding or removing a channel in the PubSubController and when Redis receives
and processes the subscription change request.
currentPChannels :: FunctorMonadIO m => PubSubController -> m [RedisPChannel] Source #
Get the list of current pattern channels in the PubSubController. WARNING! This might not
exactly reflect the subscribed channels in the Redis server, because there is a delay
between adding or removing a channel in the PubSubController and when Redis receives
and processes the subscription change request.
Arguments
| :: MonadIO m | |
| => PubSubController | |
| -> [(RedisChannel, MessageCallback)] | the channels to subscribe to |
| -> [(RedisPChannel, PMessageCallback)] | the channels to pattern subscribe to |
| -> m UnregisterCallbacksAction |
Add channels into the PubSubController, and if there is an active pubSubForever, send the subscribe
and psubscribe commands to Redis. The addChannels function is thread-safe. This function
does not wait for Redis to acknowledge that the channels have actually been subscribed; use
addChannelsAndWait for that.
You can subscribe to the same channel or pattern channel multiple times; the PubSubController keeps
a list of callbacks and executes each callback in response to a message.
The return value is an action UnregisterCallbacksAction which will unregister the callbacks,
which should typically used with bracket.
Arguments
| :: MonadIO m | |
| => PubSubController | |
| -> [(RedisChannel, MessageCallback)] | the channels to subscribe to |
| -> [(RedisPChannel, PMessageCallback)] | the channels to psubscribe to |
| -> m UnregisterCallbacksAction |
Call addChannels and then wait for Redis to acknowledge that the channels are actually subscribed.
Note that this function waits for requested subscription change requests, so if you for example call
addChannelsAndWait from multiple threads simultaneously, they will all wait their pending
subscription changes to be acknowledged by Redis.
This also correctly waits if the network connection dies during the subscription change. Say that the
network connection dies right after we send a subscription change to Redis. pubSubForever will throw
ConnectionLost and addChannelsAndWait will continue to wait. Once you recall pubSubForever
with the same PubSubController, pubSubForever will open a new connection, send subscription commands
for all channels in the PubSubController (which include the ones we are waiting for),
and wait for the responses from Redis. Only once we receive the response from Redis that it has subscribed
to all channels in PubSubController will addChannelsAndWait unblock and return.
removeChannels :: MonadIO m => PubSubController -> [RedisChannel] -> [RedisPChannel] -> m () Source #
Remove channels from the PubSubController, and if there is an active pubSubForever, send the
unsubscribe commands to Redis. Note that as soon as this function returns, no more callbacks will be
executed even if more messages arrive during the period when we request to unsubscribe from the channel
and Redis actually processes the unsubscribe request. This function is thread-safe.
If you remove all channels, the connection in pubSubForever to redis will stay open and waiting for
any new channels from a call to addChannels. If you really want to close the connection,
use killThread or cancel to kill the thread running
pubSubForever.
removeChannelsAndWait :: MonadIO m => PubSubController -> [RedisChannel] -> [RedisPChannel] -> m () Source #
Call removeChannels and then wait for all pending subscription change requests to be acknowledged
by Redis. This uses the same waiting logic as addChannelsAndWait. Since removeChannels immediately
notifies the PubSubController to start discarding messages, you likely don't need this function and
can just use removeChannels.
type UnregisterCallbacksAction = IO () Source #
An action that when executed will unregister the callbacks. It is returned from addChannels
or addChannelsAndWait and typically you would use it in bracket to guarantee that you
unsubscribe from channels. For example, if you are using websockets to distribute messages to
clients, you could use something such as:
websocketConn <- Network.WebSockets.acceptRequest pending
let mycallback msg = Network.WebSockets.sendTextData websocketConn msg
bracket (addChannelsAndWait ctrl [("hello", mycallback)] []) id $ const $ do
{- loop here calling Network.WebSockets.receiveData -}pendingChannels :: MonadIO m => PubSubController -> m (HashSet RedisChannel) Source #
pendingPatternChannels :: MonadIO m => PubSubController -> m (HashSet RedisPChannel) Source #
Short lived connections
Another approach to PubSub that allows creating a short-lived PubSub connection is to use withPubSub, which takes a callback that receives messages and returns when the callback returns. This is simpler than pubSubForever but does not support changing subscriptions while it is running, so it is only useful for short-lived Pub/Sub connections. For example, you could use withPubSub
to subscribe to a channel, consume a stream of messages, and then return. This approach is worth using when you want a few short-lived
subscriptions. However, each call to withPubSub consumes a connection from the pool, so if you have a lot of short-lived subscriptions, it is more
withPubSub :: Connection -> [ByteString] -> [ByteString] -> (STM Message -> IO r) -> IO r Source #
Creates a subscription and automatically unsubscribes when callback returns, this function keeps flow control in the callback, so it is useful for short-lived subscriptions, when the callback knows when to exit. The function is quite simple and does not make any attempts to handle connection loss.
Note that this function does not support changing subscriptions while it is running, so it is only useful for short-lived Pub/Sub connections.
An example of usage, that is hard to implement with pubSubForever is to subscribe to a channel:
withPubSub conn ["mychannel"] [] $ \waitMsg -> do
d <- registerDelay 1000000 -- 1 second (requires -threaded runtime)
atomically $ asum [ readTVar >>= guard >> return Nothing
, Just $ waitMsg
]
In case if connection is lost, user callback will receive BlockedIndefinitelyOnSTM exception.