hedis
Safe HaskellNone
LanguageHaskell2010

Database.Redis.PubSub

Synopsis

Documentation

publish Source #

Arguments

:: RedisCtx m f 
=> ByteString

channel

-> ByteString

message

-> m (f Integer) 

Post a message to a channel (http://redis.io/commands/publish).

Subscribing to channels

There are two Pub/Sub implementations. First, there is a single-threaded implementation pubSub which is simpler to use but has the restriction that subscription changes can only be made in response to a message. Secondly, there is a more complicated Pub/Sub controller pubSubForever that uses concurrency to support changing subscriptions at any time but requires more setup. You should only use one or the other. In addition, no types or utility functions (that are part of the public API) are shared, so functions or types in one of the following sections cannot be used for the other. In particular, be aware that they use different utility functions to subscribe and unsubscribe to channels.

Single-thread Pub/Sub

pubSub Source #

Arguments

:: PubSub

Initial subscriptions.

-> (Message -> IO PubSub)

Callback function.

-> Redis () 

Listens to published messages on subscribed channels and channels matching the subscribed patterns. For documentation on the semantics of Redis Pub/Sub see http://redis.io/topics/pubsub.

The given callback function is called for each received message. Subscription changes are triggered by the returned PubSub. To keep subscriptions unchanged, the callback can return mempty.

Example: Subscribe to the "news" channel indefinitely.

 pubSub (subscribe ["news"]) $ \msg -> do
     putStrLn $ "Message from " ++ show (msgChannel msg)
     return mempty
 

Example: Receive a single message from the "chat" channel.

 pubSub (subscribe ["chat"]) $ \msg -> do
     putStrLn $ "Message from " ++ show (msgChannel msg)
     return $ unsubscribe ["chat"]
 

It should be noted that Redis Pub/Sub by its nature is asynchronous so returning unsubscribe does not mean that callback won't be able to receive any further messages. And to guarantee that you won't won't process messages after unsubscription and won't unsubscribe from the same channel more than once you need to use IORef or something similar

data Message Source #

Instances

Instances details
Show Message Source # 
Instance details

Defined in Database.Redis.PubSub

data PubSub Source #

Encapsulates subscription changes. Use subscribe, unsubscribe, psubscribe, punsubscribe or mempty to construct a value. Combine values by using the Monoid interface, i.e. mappend and mconcat.

Instances

Instances details
Monoid PubSub Source # 
Instance details

Defined in Database.Redis.PubSub

Semigroup PubSub Source # 
Instance details

Defined in Database.Redis.PubSub

Eq PubSub Source # 
Instance details

Defined in Database.Redis.PubSub

Methods

(==) :: PubSub -> PubSub -> Bool #

(/=) :: PubSub -> PubSub -> Bool #

subscribe Source #

Arguments

:: [ByteString]

channel

-> PubSub 

Listen for messages published to the given channels (http://redis.io/commands/subscribe).

unsubscribe Source #

Arguments

:: [ByteString]

channel

-> PubSub 

Stop listening for messages posted to the given channels (http://redis.io/commands/unsubscribe).

psubscribe Source #

Arguments

:: [ByteString]

pattern

-> PubSub 

Listen for messages published to channels matching the given patterns (http://redis.io/commands/psubscribe).

punsubscribe Source #

Arguments

:: [ByteString]

pattern

-> PubSub 

Stop listening for messages posted to channels matching the given patterns (http://redis.io/commands/punsubscribe).

Continuous Pub/Sub message controller

pubSubForever Source #

Arguments

:: Connection

The connection pool

-> PubSubController

The controller which keeps track of all subscriptions and handlers

-> IO ()

This action is executed once Redis acknowledges that all the subscriptions in the controller are now subscribed. You can use this after an exception (such as ConnectionLost) to signal that all subscriptions are now reactivated.

-> IO () 

Open a connection to the Redis server, register to all channels in the PubSubController, and process messages and subscription change requests forever. The only way this will ever exit is if there is an exception from the network code or an unhandled exception in a MessageCallback or PMessageCallback. For example, if the network connection to Redis dies, pubSubForever will throw a ConnectionLost. When such an exception is thrown, you can recall pubSubForever with the same PubSubController which will open a new connection and resubscribe to all the channels which are tracked in the PubSubController.

The general pattern is therefore during program startup create a PubSubController and fork a thread which calls pubSubForever in a loop (using an exponential backoff algorithm such as the retry package to not hammer the Redis server if it does die). For example,

myhandler :: ByteString -> IO ()
myhandler msg = putStrLn $ unpack $ decodeUtf8 msg

onInitialComplete :: IO ()
onInitialComplete = putStrLn "Redis acknowledged that mychannel is now subscribed"

main :: IO ()
main = do
  conn <- connect defaultConnectInfo
  pubSubCtrl <- newPubSubController [("mychannel", myhandler)] []
  concurrently ( forever $
      pubSubForever conn pubSubCtrl onInitialComplete
        `catch` (\(e :: SomeException) -> do
          putStrLn $ "Got error: " ++ show e
          threadDelay $ 50*1000) -- TODO: use exponential backoff
       ) $ restOfYourProgram


  {- elsewhere in your program, use pubSubCtrl to change subscriptions -}

At most one active pubSubForever can be running against a single PubSubController at any time. If two active calls to pubSubForever share a single PubSubController there will be deadlocks. If you do want to process messages using multiple connections to Redis, you can create more than one PubSubController. For example, create one PubSubController for each getNumCapabilities and then create a Haskell thread bound to each capability each calling pubSubForever in a loop. This will create one network connection per controller/capability and allow you to register separate channels and callbacks for each controller, spreading the load across the capabilities.

type RedisChannel = ByteString Source #

A Redis channel name

type RedisPChannel = ByteString Source #

A Redis pattern channel name

type MessageCallback = ByteString -> IO () Source #

A handler for a message from a subscribed channel. The callback is passed the message content.

Messages are processed synchronously in the receiving thread, so if the callback takes a long time it will block other callbacks and other messages from being received. If you need to move long-running work to a different thread, we suggest you use TBQueue with a reasonable bound, so that if messages are arriving faster than you can process them, you do eventually block.

If the callback throws an exception, the exception will be thrown from pubSubForever which will cause the entire Redis connection for all subscriptions to be closed. As long as you call pubSubForever in a loop you will reconnect to your subscribed channels, but you should probably add an exception handler to each callback to prevent this.

type PMessageCallback = RedisChannel -> ByteString -> IO () Source #

A handler for a message from a psubscribed channel. The callback is passed the channel the message was sent on plus the message content.

Similar to MessageCallback, callbacks are executed synchronously and any exceptions are rethrown from pubSubForever.

data PubSubController Source #

A controller that stores a set of channels, pattern channels, and callbacks. It allows you to manage Pub/Sub subscriptions and pattern subscriptions and alter them at any time throughout the life of your program. You should typically create the controller at the start of your program and then store it through the life of your program, using addChannels and removeChannels to update the current subscriptions.

newPubSubController Source #

Arguments

:: MonadIO m 
=> [(RedisChannel, MessageCallback)]

the initial subscriptions

-> [(RedisPChannel, PMessageCallback)]

the initial pattern subscriptions

-> m PubSubController 

Create a new PubSubController. Note that this does not subscribe to any channels, it just creates the controller. The subscriptions will happen once pubSubForever is called.

currentChannels :: FunctorMonadIO m => PubSubController -> m [RedisChannel] Source #

Get the list of current channels in the PubSubController. WARNING! This might not exactly reflect the subscribed channels in the Redis server, because there is a delay between adding or removing a channel in the PubSubController and when Redis receives and processes the subscription change request.

currentPChannels :: FunctorMonadIO m => PubSubController -> m [RedisPChannel] Source #

Get the list of current pattern channels in the PubSubController. WARNING! This might not exactly reflect the subscribed channels in the Redis server, because there is a delay between adding or removing a channel in the PubSubController and when Redis receives and processes the subscription change request.

addChannels Source #

Arguments

:: MonadIO m 
=> PubSubController 
-> [(RedisChannel, MessageCallback)]

the channels to subscribe to

-> [(RedisPChannel, PMessageCallback)]

the channels to pattern subscribe to

-> m UnregisterCallbacksAction 

Add channels into the PubSubController, and if there is an active pubSubForever, send the subscribe and psubscribe commands to Redis. The addChannels function is thread-safe. This function does not wait for Redis to acknowledge that the channels have actually been subscribed; use addChannelsAndWait for that.

You can subscribe to the same channel or pattern channel multiple times; the PubSubController keeps a list of callbacks and executes each callback in response to a message.

The return value is an action UnregisterCallbacksAction which will unregister the callbacks, which should typically used with bracket.

addChannelsAndWait Source #

Arguments

:: MonadIO m 
=> PubSubController 
-> [(RedisChannel, MessageCallback)]

the channels to subscribe to

-> [(RedisPChannel, PMessageCallback)]

the channels to psubscribe to

-> m UnregisterCallbacksAction 

Call addChannels and then wait for Redis to acknowledge that the channels are actually subscribed.

Note that this function waits for requested subscription change requests, so if you for example call addChannelsAndWait from multiple threads simultaneously, they will all wait their pending subscription changes to be acknowledged by Redis.

This also correctly waits if the network connection dies during the subscription change. Say that the network connection dies right after we send a subscription change to Redis. pubSubForever will throw ConnectionLost and addChannelsAndWait will continue to wait. Once you recall pubSubForever with the same PubSubController, pubSubForever will open a new connection, send subscription commands for all channels in the PubSubController (which include the ones we are waiting for), and wait for the responses from Redis. Only once we receive the response from Redis that it has subscribed to all channels in PubSubController will addChannelsAndWait unblock and return.

removeChannels :: MonadIO m => PubSubController -> [RedisChannel] -> [RedisPChannel] -> m () Source #

Remove channels from the PubSubController, and if there is an active pubSubForever, send the unsubscribe commands to Redis. Note that as soon as this function returns, no more callbacks will be executed even if more messages arrive during the period when we request to unsubscribe from the channel and Redis actually processes the unsubscribe request. This function is thread-safe.

If you remove all channels, the connection in pubSubForever to redis will stay open and waiting for any new channels from a call to addChannels. If you really want to close the connection, use killThread or cancel to kill the thread running pubSubForever.

removeChannelsAndWait :: MonadIO m => PubSubController -> [RedisChannel] -> [RedisPChannel] -> m () Source #

Call removeChannels and then wait for all pending subscription change requests to be acknowledged by Redis. This uses the same waiting logic as addChannelsAndWait. Since removeChannels immediately notifies the PubSubController to start discarding messages, you likely don't need this function and can just use removeChannels.

type UnregisterCallbacksAction = IO () Source #

An action that when executed will unregister the callbacks. It is returned from addChannels or addChannelsAndWait and typically you would use it in bracket to guarantee that you unsubscribe from channels. For example, if you are using websockets to distribute messages to clients, you could use something such as:

websocketConn <- Network.WebSockets.acceptRequest pending
let mycallback msg = Network.WebSockets.sendTextData websocketConn msg
bracket (addChannelsAndWait ctrl [("hello", mycallback)] []) id $ const $ do
  {- loop here calling Network.WebSockets.receiveData -}

Short lived connections

Another approach to PubSub that allows creating a short-lived PubSub connection is to use withPubSub, which takes a callback that receives messages and returns when the callback returns. This is simpler than pubSubForever but does not support changing subscriptions while it is running, so it is only useful for short-lived Pub/Sub connections. For example, you could use withPubSub to subscribe to a channel, consume a stream of messages, and then return. This approach is worth using when you want a few short-lived subscriptions. However, each call to withPubSub consumes a connection from the pool, so if you have a lot of short-lived subscriptions, it is more

withPubSub :: Connection -> [ByteString] -> [ByteString] -> (STM Message -> IO r) -> IO r Source #

Creates a subscription and automatically unsubscribes when callback returns, this function keeps flow control in the callback, so it is useful for short-lived subscriptions, when the callback knows when to exit. The function is quite simple and does not make any attempts to handle connection loss.

Note that this function does not support changing subscriptions while it is running, so it is only useful for short-lived Pub/Sub connections.

An example of usage, that is hard to implement with pubSubForever is to subscribe to a channel:

withPubSub conn ["mychannel"] [] $ \waitMsg -> do
   d <- registerDelay 1000000 -- 1 second (requires -threaded runtime)
   atomically $ asum [ readTVar >>= guard >> return Nothing
                     , Just $ waitMsg
                     ]

In case if connection is lost, user callback will receive BlockedIndefinitelyOnSTM exception.