{-# LANGUAGE CPP, OverloadedStrings, RecordWildCards, EmptyDataDecls,
    FlexibleInstances, FlexibleContexts, GeneralizedNewtypeDeriving #-}
{-# LANGUAGE ScopedTypeVariables, TupleSections, ConstraintKinds #-}
{-# LANGUAGE BlockArguments #-}

module Database.Redis.PubSub (
    publish,

    -- ** Subscribing to channels
    -- $pubsubexpl

    -- *** Single-thread Pub/Sub
    pubSub,
    Message(..),
    PubSub(),
    subscribe, unsubscribe, psubscribe, punsubscribe,
    -- *** Continuous Pub/Sub message controller
    pubSubForever,
    RedisChannel, RedisPChannel, MessageCallback, PMessageCallback,
    PubSubController, newPubSubController, currentChannels, currentPChannels,
    addChannels, addChannelsAndWait, removeChannels, removeChannelsAndWait,
    UnregisterCallbacksAction,
    pendingChannels, pendingPatternChannels,
    -- ** Short lived connections
    -- $shortlivedexpl
    withPubSub
) where

#if __GLASGOW_HASKELL__ < 710
import Control.Applicative
import Data.Monoid hiding (<>)
#endif
import Control.Arrow (second)
import Control.Concurrent.Async (withAsync, waitEitherCatch, waitEitherCatchSTM, concurrently)
import Control.Concurrent.STM
import Control.Exception (throwIO, finally)
import qualified Database.Redis.ProtocolPipelining as PP
import Control.Monad
import Control.Monad.Reader (asks)
import Control.Monad.State
import Data.ByteString.Char8 (ByteString)
import Data.Function (fix)
import qualified Data.List as L
import qualified Data.List.NonEmpty as NE
import Data.Maybe (isJust)
import Data.Pool
#if __GLASGOW_HASKELL__ < 808
import Data.Semigroup (Semigroup(..))
#endif
import Data.Hashable (Hashable)
import qualified Data.HashMap.Strict as HM
import qualified Data.HashSet as HS
import qualified Database.Redis.Cluster as Cluster
import qualified Database.Redis.Core as Core
import qualified Database.Redis.Connection as Connection
import Database.Redis.Protocol (Reply(..), renderRequest)
import Database.Redis.Types
import Control.Monad.IO.Unlift (MonadUnliftIO(withRunInIO))
import Data.Functor (($>))

-- |While in PubSub mode, we keep track of the number of current subscriptions
--  (as reported by Redis replies) and the number of messages we expect to
--  receive after a SUBSCRIBE or PSUBSCRIBE command. We can safely leave the
--  PubSub mode when both these numbers are zero.
data PubSubState = PubSubState { PubSubState -> Int
subCnt, PubSubState -> Int
pending :: Int }

modifyPending :: (MonadState PubSubState m) => (Int -> Int) -> m ()
modifyPending :: forall (m :: * -> *).
MonadState PubSubState m =>
(Int -> Int) -> m ()
modifyPending Int -> Int
f = (PubSubState -> PubSubState) -> m ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify ((PubSubState -> PubSubState) -> m ())
-> (PubSubState -> PubSubState) -> m ()
forall a b. (a -> b) -> a -> b
$ \PubSubState
s -> PubSubState
s{ pending = f (pending s) }

putSubCnt :: (MonadState PubSubState m) => Int -> m ()
putSubCnt :: forall (m :: * -> *). MonadState PubSubState m => Int -> m ()
putSubCnt Int
n = (PubSubState -> PubSubState) -> m ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify ((PubSubState -> PubSubState) -> m ())
-> (PubSubState -> PubSubState) -> m ()
forall a b. (a -> b) -> a -> b
$ \PubSubState
s -> PubSubState
s{ subCnt = n }

data Subscribe
data Unsubscribe
data Channel
data Pattern

-- |Encapsulates subscription changes. Use 'subscribe', 'unsubscribe',
--  'psubscribe', 'punsubscribe' or 'mempty' to construct a value. Combine
--  values by using the 'Monoid' interface, i.e. 'mappend' and 'mconcat'.
data PubSub = PubSub
    { PubSub -> Cmd Subscribe Channel
subs    :: Cmd Subscribe Channel
    , PubSub -> Cmd Unsubscribe Channel
unsubs  :: Cmd Unsubscribe Channel
    , PubSub -> Cmd Subscribe Pattern
psubs   :: Cmd Subscribe Pattern
    , PubSub -> Cmd Unsubscribe Pattern
punsubs :: Cmd Unsubscribe Pattern
    } deriving (PubSub -> PubSub -> Bool
(PubSub -> PubSub -> Bool)
-> (PubSub -> PubSub -> Bool) -> Eq PubSub
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: PubSub -> PubSub -> Bool
== :: PubSub -> PubSub -> Bool
$c/= :: PubSub -> PubSub -> Bool
/= :: PubSub -> PubSub -> Bool
Eq)

instance Semigroup PubSub where
    <> :: PubSub -> PubSub -> PubSub
(<>) PubSub
p1 PubSub
p2 = PubSub { subs :: Cmd Subscribe Channel
subs    = PubSub -> Cmd Subscribe Channel
subs PubSub
p1 Cmd Subscribe Channel
-> Cmd Subscribe Channel -> Cmd Subscribe Channel
forall a. Monoid a => a -> a -> a
`mappend` PubSub -> Cmd Subscribe Channel
subs PubSub
p2
                           , unsubs :: Cmd Unsubscribe Channel
unsubs  = PubSub -> Cmd Unsubscribe Channel
unsubs PubSub
p1 Cmd Unsubscribe Channel
-> Cmd Unsubscribe Channel -> Cmd Unsubscribe Channel
forall a. Monoid a => a -> a -> a
`mappend` PubSub -> Cmd Unsubscribe Channel
unsubs PubSub
p2
                           , psubs :: Cmd Subscribe Pattern
psubs   = PubSub -> Cmd Subscribe Pattern
psubs PubSub
p1 Cmd Subscribe Pattern
-> Cmd Subscribe Pattern -> Cmd Subscribe Pattern
forall a. Monoid a => a -> a -> a
`mappend` PubSub -> Cmd Subscribe Pattern
psubs PubSub
p2
                           , punsubs :: Cmd Unsubscribe Pattern
punsubs = PubSub -> Cmd Unsubscribe Pattern
punsubs PubSub
p1 Cmd Unsubscribe Pattern
-> Cmd Unsubscribe Pattern -> Cmd Unsubscribe Pattern
forall a. Monoid a => a -> a -> a
`mappend` PubSub -> Cmd Unsubscribe Pattern
punsubs PubSub
p2
                           }

instance Monoid PubSub where
    mempty :: PubSub
mempty        = Cmd Subscribe Channel
-> Cmd Unsubscribe Channel
-> Cmd Subscribe Pattern
-> Cmd Unsubscribe Pattern
-> PubSub
PubSub Cmd Subscribe Channel
forall a. Monoid a => a
mempty Cmd Unsubscribe Channel
forall a. Monoid a => a
mempty Cmd Subscribe Pattern
forall a. Monoid a => a
mempty Cmd Unsubscribe Pattern
forall a. Monoid a => a
mempty
    mappend :: PubSub -> PubSub -> PubSub
mappend = PubSub -> PubSub -> PubSub
forall a. Semigroup a => a -> a -> a
(<>)

data Cmd a b = DoNothing | Cmd { forall a b. Cmd a b -> [ByteString]
changes :: [ByteString] } deriving (Cmd a b -> Cmd a b -> Bool
(Cmd a b -> Cmd a b -> Bool)
-> (Cmd a b -> Cmd a b -> Bool) -> Eq (Cmd a b)
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
forall a b. Cmd a b -> Cmd a b -> Bool
$c== :: forall a b. Cmd a b -> Cmd a b -> Bool
== :: Cmd a b -> Cmd a b -> Bool
$c/= :: forall a b. Cmd a b -> Cmd a b -> Bool
/= :: Cmd a b -> Cmd a b -> Bool
Eq)

instance Semigroup (Cmd Subscribe a) where
  <> :: Cmd Subscribe a -> Cmd Subscribe a -> Cmd Subscribe a
(<>) Cmd Subscribe a
DoNothing Cmd Subscribe a
x = Cmd Subscribe a
x
  (<>) Cmd Subscribe a
x Cmd Subscribe a
DoNothing = Cmd Subscribe a
x
  (<>) (Cmd [ByteString]
xs) (Cmd [ByteString]
ys) = [ByteString] -> Cmd Subscribe a
forall a b. [ByteString] -> Cmd a b
Cmd ([ByteString]
xs [ByteString] -> [ByteString] -> [ByteString]
forall a. [a] -> [a] -> [a]
++ [ByteString]
ys)

instance Monoid (Cmd Subscribe a) where
  mempty :: Cmd Subscribe a
mempty = Cmd Subscribe a
forall a b. Cmd a b
DoNothing
  mappend :: Cmd Subscribe a -> Cmd Subscribe a -> Cmd Subscribe a
mappend = Cmd Subscribe a -> Cmd Subscribe a -> Cmd Subscribe a
forall a. Semigroup a => a -> a -> a
(<>)

instance Semigroup (Cmd Unsubscribe a) where
  <> :: Cmd Unsubscribe a -> Cmd Unsubscribe a -> Cmd Unsubscribe a
(<>) Cmd Unsubscribe a
DoNothing Cmd Unsubscribe a
x = Cmd Unsubscribe a
x
  (<>) Cmd Unsubscribe a
x Cmd Unsubscribe a
DoNothing = Cmd Unsubscribe a
x
  -- empty subscription list => unsubscribe all channels and patterns
  (<>) (Cmd []) Cmd Unsubscribe a
_ = [ByteString] -> Cmd Unsubscribe a
forall a b. [ByteString] -> Cmd a b
Cmd []
  (<>) Cmd Unsubscribe a
_ (Cmd []) = [ByteString] -> Cmd Unsubscribe a
forall a b. [ByteString] -> Cmd a b
Cmd []
  (<>) (Cmd [ByteString]
xs) (Cmd [ByteString]
ys) = [ByteString] -> Cmd Unsubscribe a
forall a b. [ByteString] -> Cmd a b
Cmd ([ByteString]
xs [ByteString] -> [ByteString] -> [ByteString]
forall a. [a] -> [a] -> [a]
++ [ByteString]
ys)

instance Monoid (Cmd Unsubscribe a) where
  mempty :: Cmd Unsubscribe a
mempty = Cmd Unsubscribe a
forall a b. Cmd a b
DoNothing
  mappend :: Cmd Unsubscribe a -> Cmd Unsubscribe a -> Cmd Unsubscribe a
mappend = Cmd Unsubscribe a -> Cmd Unsubscribe a -> Cmd Unsubscribe a
forall a. Semigroup a => a -> a -> a
(<>)

class Command a where
    redisCmd      :: a -> ByteString
    updatePending :: a -> Int -> Int

sendCmd :: (Command (Cmd a b)) => Cmd a b -> StateT PubSubState Core.Redis ()
sendCmd :: forall a b.
Command (Cmd a b) =>
Cmd a b -> StateT PubSubState Redis ()
sendCmd Cmd a b
DoNothing = () -> StateT PubSubState Redis ()
forall a. a -> StateT PubSubState Redis a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
sendCmd Cmd a b
cmd       = do
  conn <- Redis Connection -> StateT PubSubState Redis Connection
forall (m :: * -> *) a. Monad m => m a -> StateT PubSubState m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (Redis Connection -> StateT PubSubState Redis Connection)
-> Redis Connection -> StateT PubSubState Redis Connection
forall a b. (a -> b) -> a -> b
$ ReaderT RedisEnv IO Connection -> Redis Connection
forall a. ReaderT RedisEnv IO a -> Redis a
Core.reRedis (ReaderT RedisEnv IO Connection -> Redis Connection)
-> ReaderT RedisEnv IO Connection -> Redis Connection
forall a b. (a -> b) -> a -> b
$ (RedisEnv -> Connection) -> ReaderT RedisEnv IO Connection
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks RedisEnv -> Connection
Core.envConn
  let hook = Hooks -> SendPubSubHook
Core.sendPubSubHook (Hooks -> SendPubSubHook) -> Hooks -> SendPubSubHook
forall a b. (a -> b) -> a -> b
$ Connection -> Hooks
PP.hooks Connection
conn
  lift $ withRunInIO $ \forall a. Redis a -> IO a
runInIO -> SendPubSubHook
hook (Redis () -> IO ()
forall a. Redis a -> IO a
runInIO (Redis () -> IO ())
-> ([ByteString] -> Redis ()) -> [ByteString] -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [ByteString] -> Redis ()
forall (m :: * -> *). MonadRedis m => [ByteString] -> m ()
Core.send) (Cmd a b -> ByteString
forall a. Command a => a -> ByteString
redisCmd Cmd a b
cmd ByteString -> [ByteString] -> [ByteString]
forall a. a -> [a] -> [a]
: Cmd a b -> [ByteString]
forall a b. Cmd a b -> [ByteString]
changes Cmd a b
cmd)
  modifyPending (updatePending cmd)

rawSendCmd :: (Command (Cmd a b)) => PP.Connection -> Cmd a b -> IO ()
rawSendCmd :: forall a b. Command (Cmd a b) => Connection -> Cmd a b -> IO ()
rawSendCmd Connection
_ Cmd a b
DoNothing = () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
rawSendCmd Connection
conn Cmd a b
cmd    =
  let hook :: SendPubSubHook
hook = Hooks -> SendPubSubHook
Core.sendPubSubHook (Hooks -> SendPubSubHook) -> Hooks -> SendPubSubHook
forall a b. (a -> b) -> a -> b
$ Connection -> Hooks
PP.hooks Connection
conn
      msg :: [ByteString]
msg = Cmd a b -> ByteString
forall a. Command a => a -> ByteString
redisCmd Cmd a b
cmd ByteString -> [ByteString] -> [ByteString]
forall a. a -> [a] -> [a]
: Cmd a b -> [ByteString]
forall a b. Cmd a b -> [ByteString]
changes Cmd a b
cmd
  in SendPubSubHook
hook (Connection -> ByteString -> IO ()
PP.send Connection
conn (ByteString -> IO ())
-> ([ByteString] -> ByteString) -> [ByteString] -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [ByteString] -> ByteString
renderRequest) [ByteString]
msg

plusChangeCnt :: Cmd a b -> Int -> Int
plusChangeCnt :: forall a b. Cmd a b -> Int -> Int
plusChangeCnt Cmd a b
DoNothing = Int -> Int
forall a. a -> a
id
plusChangeCnt (Cmd [ByteString]
cs)  = (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ [ByteString] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [ByteString]
cs)

instance Command (Cmd Subscribe Channel) where
    redisCmd :: Cmd Subscribe Channel -> ByteString
redisCmd      = ByteString -> Cmd Subscribe Channel -> ByteString
forall a b. a -> b -> a
const ByteString
"SUBSCRIBE"
    updatePending :: Cmd Subscribe Channel -> Int -> Int
updatePending = Cmd Subscribe Channel -> Int -> Int
forall a b. Cmd a b -> Int -> Int
plusChangeCnt

instance Command (Cmd Subscribe Pattern) where
    redisCmd :: Cmd Subscribe Pattern -> ByteString
redisCmd      = ByteString -> Cmd Subscribe Pattern -> ByteString
forall a b. a -> b -> a
const ByteString
"PSUBSCRIBE"
    updatePending :: Cmd Subscribe Pattern -> Int -> Int
updatePending = Cmd Subscribe Pattern -> Int -> Int
forall a b. Cmd a b -> Int -> Int
plusChangeCnt

instance Command (Cmd Unsubscribe Channel) where
    redisCmd :: Cmd Unsubscribe Channel -> ByteString
redisCmd      = ByteString -> Cmd Unsubscribe Channel -> ByteString
forall a b. a -> b -> a
const ByteString
"UNSUBSCRIBE"
    updatePending :: Cmd Unsubscribe Channel -> Int -> Int
updatePending = (Int -> Int) -> Cmd Unsubscribe Channel -> Int -> Int
forall a b. a -> b -> a
const Int -> Int
forall a. a -> a
id

instance Command (Cmd Unsubscribe Pattern) where
    redisCmd :: Cmd Unsubscribe Pattern -> ByteString
redisCmd      = ByteString -> Cmd Unsubscribe Pattern -> ByteString
forall a b. a -> b -> a
const ByteString
"PUNSUBSCRIBE"
    updatePending :: Cmd Unsubscribe Pattern -> Int -> Int
updatePending = (Int -> Int) -> Cmd Unsubscribe Pattern -> Int -> Int
forall a b. a -> b -> a
const Int -> Int
forall a. a -> a
id


data Message = Message  { Message -> ByteString
msgChannel, Message -> ByteString
msgMessage :: ByteString}
             | PMessage { Message -> ByteString
msgPattern, msgChannel, msgMessage :: ByteString}
    deriving (Int -> Message -> ShowS
[Message] -> ShowS
Message -> String
(Int -> Message -> ShowS)
-> (Message -> String) -> ([Message] -> ShowS) -> Show Message
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> Message -> ShowS
showsPrec :: Int -> Message -> ShowS
$cshow :: Message -> String
show :: Message -> String
$cshowList :: [Message] -> ShowS
showList :: [Message] -> ShowS
Show)

data PubSubReply
    = Subscribed RedisChannel
    | PSubscribed RedisPChannel
    | Unsubscribed RedisChannel Int
    | PUnsubscribed RedisPChannel Int
    | Msg Message


------------------------------------------------------------------------------
-- Public Interface
--

-- |Post a message to a channel (<http://redis.io/commands/publish>).
publish
    :: (Core.RedisCtx m f)
    => ByteString -- ^ channel
    -> ByteString -- ^ message
    -> m (f Integer)
publish :: forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
ByteString -> ByteString -> m (f Integer)
publish ByteString
channel ByteString
message =
    [ByteString] -> m (f Integer)
forall (m :: * -> *) (f :: * -> *) a.
(RedisCtx m f, RedisResult a) =>
[ByteString] -> m (f a)
Core.sendRequest [ByteString
"PUBLISH", ByteString
channel, ByteString
message]

-- |Listen for messages published to the given channels
--  (<http://redis.io/commands/subscribe>).
subscribe
    :: [ByteString] -- ^ channel
    -> PubSub
subscribe :: [ByteString] -> PubSub
subscribe [] = PubSub
forall a. Monoid a => a
mempty
subscribe [ByteString]
cs = PubSub
forall a. Monoid a => a
mempty{ subs = Cmd cs }

-- |Stop listening for messages posted to the given channels
--  (<http://redis.io/commands/unsubscribe>).
unsubscribe
    :: [ByteString] -- ^ channel
    -> PubSub
unsubscribe :: [ByteString] -> PubSub
unsubscribe [ByteString]
cs = PubSub
forall a. Monoid a => a
mempty{ unsubs = Cmd cs }

-- |Stop listening for messages posted to the given channels.
-- It works like 'unsubscribe', except it does not unsubscribe from
-- all channels when no channel is passed
unsubscribe1
    :: [ByteString] -- ^ channel
    -> PubSub
unsubscribe1 :: [ByteString] -> PubSub
unsubscribe1 [] = PubSub
forall a. Monoid a => a
mempty
unsubscribe1 [ByteString]
cs = PubSub
forall a. Monoid a => a
mempty{ unsubs = Cmd cs }

-- |Listen for messages published to channels matching the given patterns
--  (<http://redis.io/commands/psubscribe>).
psubscribe
    :: [ByteString] -- ^ pattern
    -> PubSub
psubscribe :: [ByteString] -> PubSub
psubscribe [] = PubSub
forall a. Monoid a => a
mempty
psubscribe [ByteString]
ps = PubSub
forall a. Monoid a => a
mempty{ psubs = Cmd ps }

-- |Stop listening for messages posted to channels matching the given patterns
--  (<http://redis.io/commands/punsubscribe>).
punsubscribe
    :: [ByteString] -- ^ pattern
    -> PubSub
punsubscribe :: [ByteString] -> PubSub
punsubscribe [ByteString]
ps = PubSub
forall a. Monoid a => a
mempty{ punsubs = Cmd ps }

-- |Stop listening for messages posted to channels matching the given patterns.
-- It works like 'punsubscribe', except it does not unsubscribe from all channels
-- in case when the list is empty.
punsubscribe1
    :: [ByteString] -- ^ pattern
    -> PubSub
punsubscribe1 :: [ByteString] -> PubSub
punsubscribe1 [] = PubSub
forall a. Monoid a => a
mempty
punsubscribe1 [ByteString]
ps = PubSub
forall a. Monoid a => a
mempty{ punsubs = Cmd ps }

-- |Listens to published messages on subscribed channels and channels matching
--  the subscribed patterns. For documentation on the semantics of Redis
--  Pub\/Sub see <http://redis.io/topics/pubsub>.
--
--  The given callback function is called for each received message.
--  Subscription changes are triggered by the returned 'PubSub'. To keep
--  subscriptions unchanged, the callback can return 'mempty'.
--
--  Example: Subscribe to the \"news\" channel indefinitely.
--
--  @
--  pubSub (subscribe [\"news\"]) $ \\msg -> do
--      putStrLn $ \"Message from \" ++ show (msgChannel msg)
--      return mempty
--  @
--
--  Example: Receive a single message from the \"chat\" channel.
--
--  @
--  pubSub (subscribe [\"chat\"]) $ \\msg -> do
--      putStrLn $ \"Message from \" ++ show (msgChannel msg)
--      return $ unsubscribe [\"chat\"]
--  @
--
-- It should be noted that Redis Pub\/Sub by its nature is asynchronous
-- so returning `unsubscribe` does not mean that callback won't be able
-- to receive any further messages. And to guarantee that you won't
-- won't process messages after unsubscription and won't unsubscribe
-- from the same channel more than once you need to use `IORef` or
-- something similar
--
pubSub
    :: PubSub                 -- ^ Initial subscriptions.
    -> (Message -> IO PubSub) -- ^ Callback function.
    -> Core.Redis ()
pubSub :: PubSub -> (Message -> IO PubSub) -> Redis ()
pubSub PubSub
initial Message -> IO PubSub
callback
    | PubSub
initial PubSub -> PubSub -> Bool
forall a. Eq a => a -> a -> Bool
== PubSub
forall a. Monoid a => a
mempty = () -> Redis ()
forall a. a -> Redis a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    | Bool
otherwise         = StateT PubSubState Redis () -> PubSubState -> Redis ()
forall (m :: * -> *) s a. Monad m => StateT s m a -> s -> m a
evalStateT (PubSub -> StateT PubSubState Redis ()
send PubSub
initial) (Int -> Int -> PubSubState
PubSubState Int
0 Int
0)
  where
    send :: PubSub -> StateT PubSubState Core.Redis ()
    send :: PubSub -> StateT PubSubState Redis ()
send PubSub{Cmd Unsubscribe Pattern
Cmd Unsubscribe Channel
Cmd Subscribe Pattern
Cmd Subscribe Channel
subs :: PubSub -> Cmd Subscribe Channel
unsubs :: PubSub -> Cmd Unsubscribe Channel
psubs :: PubSub -> Cmd Subscribe Pattern
punsubs :: PubSub -> Cmd Unsubscribe Pattern
subs :: Cmd Subscribe Channel
unsubs :: Cmd Unsubscribe Channel
psubs :: Cmd Subscribe Pattern
punsubs :: Cmd Unsubscribe Pattern
..} = do
        Cmd Subscribe Channel -> StateT PubSubState Redis ()
forall a b.
Command (Cmd a b) =>
Cmd a b -> StateT PubSubState Redis ()
sendCmd Cmd Subscribe Channel
subs
        Cmd Unsubscribe Channel -> StateT PubSubState Redis ()
forall a b.
Command (Cmd a b) =>
Cmd a b -> StateT PubSubState Redis ()
sendCmd Cmd Unsubscribe Channel
unsubs
        Cmd Subscribe Pattern -> StateT PubSubState Redis ()
forall a b.
Command (Cmd a b) =>
Cmd a b -> StateT PubSubState Redis ()
sendCmd Cmd Subscribe Pattern
psubs
        Cmd Unsubscribe Pattern -> StateT PubSubState Redis ()
forall a b.
Command (Cmd a b) =>
Cmd a b -> StateT PubSubState Redis ()
sendCmd Cmd Unsubscribe Pattern
punsubs
        StateT PubSubState Redis ()
recv

    recv :: StateT PubSubState Core.Redis ()
    recv :: StateT PubSubState Redis ()
recv = do
        hook <- Redis CallbackHook -> StateT PubSubState Redis CallbackHook
forall (m :: * -> *) a. Monad m => m a -> StateT PubSubState m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (Redis CallbackHook -> StateT PubSubState Redis CallbackHook)
-> Redis CallbackHook -> StateT PubSubState Redis CallbackHook
forall a b. (a -> b) -> a -> b
$ ReaderT RedisEnv IO CallbackHook -> Redis CallbackHook
forall a. ReaderT RedisEnv IO a -> Redis a
Core.reRedis (ReaderT RedisEnv IO CallbackHook -> Redis CallbackHook)
-> ReaderT RedisEnv IO CallbackHook -> Redis CallbackHook
forall a b. (a -> b) -> a -> b
$ (RedisEnv -> CallbackHook) -> ReaderT RedisEnv IO CallbackHook
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks ((RedisEnv -> CallbackHook) -> ReaderT RedisEnv IO CallbackHook)
-> (RedisEnv -> CallbackHook) -> ReaderT RedisEnv IO CallbackHook
forall a b. (a -> b) -> a -> b
$ Hooks -> CallbackHook
Core.callbackHook (Hooks -> CallbackHook)
-> (RedisEnv -> Hooks) -> RedisEnv -> CallbackHook
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connection -> Hooks
PP.hooks (Connection -> Hooks)
-> (RedisEnv -> Connection) -> RedisEnv -> Hooks
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RedisEnv -> Connection
Core.envConn
        reply <- lift Core.recv
        case decodeMsg reply of
            Msg Message
msg           -> IO PubSub -> StateT PubSubState Redis PubSub
forall a. IO a -> StateT PubSubState Redis a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (CallbackHook
hook Message -> IO PubSub
callback Message
msg) StateT PubSubState Redis PubSub
-> (PubSub -> StateT PubSubState Redis ())
-> StateT PubSubState Redis ()
forall a b.
StateT PubSubState Redis a
-> (a -> StateT PubSubState Redis b) -> StateT PubSubState Redis b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= PubSub -> StateT PubSubState Redis ()
send
            Subscribed ByteString
_      -> (Int -> Int) -> StateT PubSubState Redis ()
forall (m :: * -> *).
MonadState PubSubState m =>
(Int -> Int) -> m ()
modifyPending (Int -> Int -> Int
forall a. Num a => a -> a -> a
subtract Int
1) StateT PubSubState Redis ()
-> StateT PubSubState Redis () -> StateT PubSubState Redis ()
forall a b.
StateT PubSubState Redis a
-> StateT PubSubState Redis b -> StateT PubSubState Redis b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> StateT PubSubState Redis ()
recv
            PSubscribed ByteString
_     -> (Int -> Int) -> StateT PubSubState Redis ()
forall (m :: * -> *).
MonadState PubSubState m =>
(Int -> Int) -> m ()
modifyPending (Int -> Int -> Int
forall a. Num a => a -> a -> a
subtract Int
1) StateT PubSubState Redis ()
-> StateT PubSubState Redis () -> StateT PubSubState Redis ()
forall a b.
StateT PubSubState Redis a
-> StateT PubSubState Redis b -> StateT PubSubState Redis b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> StateT PubSubState Redis ()
recv
            PUnsubscribed ByteString
_ Int
n -> Int -> StateT PubSubState Redis ()
onUnsubscribe Int
n
            Unsubscribed ByteString
_ Int
n  -> Int -> StateT PubSubState Redis ()
onUnsubscribe Int
n

    onUnsubscribe :: Int -> StateT PubSubState Core.Redis ()
    onUnsubscribe :: Int -> StateT PubSubState Redis ()
onUnsubscribe Int
n = do
        Int -> StateT PubSubState Redis ()
forall (m :: * -> *). MonadState PubSubState m => Int -> m ()
putSubCnt Int
n
        PubSubState{..} <- StateT PubSubState Redis PubSubState
forall s (m :: * -> *). MonadState s m => m s
get
        unless (subCnt == 0 && pending == 0) recv

-- | A Redis channel name
type RedisChannel = ByteString

-- | A Redis pattern channel name
type RedisPChannel = ByteString

-- | A handler for a message from a subscribed channel.
-- The callback is passed the message content.
--
-- Messages are processed synchronously in the receiving thread, so if the callback
-- takes a long time it will block other callbacks and other messages from being
-- received.  If you need to move long-running work to a different thread, we suggest
-- you use 'TBQueue' with a reasonable bound, so that if messages are arriving faster
-- than you can process them, you do eventually block.
--
-- If the callback throws an exception, the exception will be thrown from 'pubSubForever'
-- which will cause the entire Redis connection for all subscriptions to be closed.
-- As long as you call 'pubSubForever' in a loop you will reconnect to your subscribed
-- channels, but you should probably add an exception handler to each callback to
-- prevent this.
type MessageCallback = ByteString -> IO ()

-- | A handler for a message from a psubscribed channel.
-- The callback is passed the channel the message was sent on plus the message content.
--
-- Similar to 'MessageCallback', callbacks are executed synchronously and any exceptions
-- are rethrown from 'pubSubForever'.
type PMessageCallback = RedisChannel -> ByteString -> IO ()

-- | An action that when executed will unregister the callbacks.  It is returned from 'addChannels'
-- or 'addChannelsAndWait' and typically you would use it in 'bracket' to guarantee that you
-- unsubscribe from channels.  For example, if you are using websockets to distribute messages to
-- clients, you could use something such as:
--
-- > websocketConn <- Network.WebSockets.acceptRequest pending
-- > let mycallback msg = Network.WebSockets.sendTextData websocketConn msg
-- > bracket (addChannelsAndWait ctrl [("hello", mycallback)] []) id $ const $ do
-- >   {- loop here calling Network.WebSockets.receiveData -}
type UnregisterCallbacksAction = IO ()

newtype UnregisterHandle = UnregisterHandle Integer
  deriving (UnregisterHandle -> UnregisterHandle -> Bool
(UnregisterHandle -> UnregisterHandle -> Bool)
-> (UnregisterHandle -> UnregisterHandle -> Bool)
-> Eq UnregisterHandle
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: UnregisterHandle -> UnregisterHandle -> Bool
== :: UnregisterHandle -> UnregisterHandle -> Bool
$c/= :: UnregisterHandle -> UnregisterHandle -> Bool
/= :: UnregisterHandle -> UnregisterHandle -> Bool
Eq, Int -> UnregisterHandle -> ShowS
[UnregisterHandle] -> ShowS
UnregisterHandle -> String
(Int -> UnregisterHandle -> ShowS)
-> (UnregisterHandle -> String)
-> ([UnregisterHandle] -> ShowS)
-> Show UnregisterHandle
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> UnregisterHandle -> ShowS
showsPrec :: Int -> UnregisterHandle -> ShowS
$cshow :: UnregisterHandle -> String
show :: UnregisterHandle -> String
$cshowList :: [UnregisterHandle] -> ShowS
showList :: [UnregisterHandle] -> ShowS
Show, Integer -> UnregisterHandle
UnregisterHandle -> UnregisterHandle
UnregisterHandle -> UnregisterHandle -> UnregisterHandle
(UnregisterHandle -> UnregisterHandle -> UnregisterHandle)
-> (UnregisterHandle -> UnregisterHandle -> UnregisterHandle)
-> (UnregisterHandle -> UnregisterHandle -> UnregisterHandle)
-> (UnregisterHandle -> UnregisterHandle)
-> (UnregisterHandle -> UnregisterHandle)
-> (UnregisterHandle -> UnregisterHandle)
-> (Integer -> UnregisterHandle)
-> Num UnregisterHandle
forall a.
(a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a)
-> (a -> a)
-> (a -> a)
-> (Integer -> a)
-> Num a
$c+ :: UnregisterHandle -> UnregisterHandle -> UnregisterHandle
+ :: UnregisterHandle -> UnregisterHandle -> UnregisterHandle
$c- :: UnregisterHandle -> UnregisterHandle -> UnregisterHandle
- :: UnregisterHandle -> UnregisterHandle -> UnregisterHandle
$c* :: UnregisterHandle -> UnregisterHandle -> UnregisterHandle
* :: UnregisterHandle -> UnregisterHandle -> UnregisterHandle
$cnegate :: UnregisterHandle -> UnregisterHandle
negate :: UnregisterHandle -> UnregisterHandle
$cabs :: UnregisterHandle -> UnregisterHandle
abs :: UnregisterHandle -> UnregisterHandle
$csignum :: UnregisterHandle -> UnregisterHandle
signum :: UnregisterHandle -> UnregisterHandle
$cfromInteger :: Integer -> UnregisterHandle
fromInteger :: Integer -> UnregisterHandle
Num)

-- | Stores channels subscribed, pending subscription, and pending removal
-- by type, where type can be a normal channel, or a pattern channel.
data ChannelData channel callback
    = ChannelData
    { forall channel callback.
ChannelData channel callback
-> TVar (HashMap channel [(UnregisterHandle, callback)])
cdSubscribedChannels :: !(TVar (HM.HashMap channel [(UnregisterHandle, callback)]))
    , forall channel callback.
ChannelData channel callback -> TVar (HashSet channel)
cdChannelsPendingSubscription :: !(TVar (HS.HashSet channel))
    , forall channel callback.
ChannelData channel callback -> TVar (HashSet channel)
cdChannelsPendingRemoval :: !(TVar (HS.HashSet channel))
    }

-- | A controller that stores a set of channels, pattern channels, and callbacks.
-- It allows you to manage Pub/Sub subscriptions and pattern subscriptions and alter them at
-- any time throughout the life of your program.
-- You should typically create the controller at the start of your program and then store it
-- through the life of your program, using 'addChannels' and 'removeChannels' to update the
-- current subscriptions.
data PubSubController = PubSubController
  { PubSubController -> TBQueue PubSub
sendChanges :: TBQueue PubSub
  , PubSubController -> ChannelData ByteString (ByteString -> IO ())
pscChannelData :: ChannelData RedisChannel MessageCallback
  , PubSubController -> ChannelData ByteString PMessageCallback
pscPChannelData :: ChannelData RedisPChannel PMessageCallback
  , PubSubController -> TVar UnregisterHandle
lastUsedCallbackId :: TVar UnregisterHandle
  }

newChannelData :: Hashable channel => [(channel, callback)] -> STM (ChannelData channel callback)
newChannelData :: forall channel callback.
Hashable channel =>
[(channel, callback)] -> STM (ChannelData channel callback)
newChannelData [(channel, callback)]
initialSubs
    = TVar (HashMap channel [(UnregisterHandle, callback)])
-> TVar (HashSet channel)
-> TVar (HashSet channel)
-> ChannelData channel callback
forall channel callback.
TVar (HashMap channel [(UnregisterHandle, callback)])
-> TVar (HashSet channel)
-> TVar (HashSet channel)
-> ChannelData channel callback
ChannelData
    (TVar (HashMap channel [(UnregisterHandle, callback)])
 -> TVar (HashSet channel)
 -> TVar (HashSet channel)
 -> ChannelData channel callback)
-> STM (TVar (HashMap channel [(UnregisterHandle, callback)]))
-> STM
     (TVar (HashSet channel)
      -> TVar (HashSet channel) -> ChannelData channel callback)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> HashMap channel [(UnregisterHandle, callback)]
-> STM (TVar (HashMap channel [(UnregisterHandle, callback)]))
forall a. a -> STM (TVar a)
newTVar (([(UnregisterHandle, callback)]
 -> [(UnregisterHandle, callback)]
 -> [(UnregisterHandle, callback)])
-> [(channel, [(UnregisterHandle, callback)])]
-> HashMap channel [(UnregisterHandle, callback)]
forall k v. Hashable k => (v -> v -> v) -> [(k, v)] -> HashMap k v
HM.fromListWith [(UnregisterHandle, callback)]
-> [(UnregisterHandle, callback)] -> [(UnregisterHandle, callback)]
forall a. [a] -> [a] -> [a]
(++) ([(channel, [(UnregisterHandle, callback)])]
 -> HashMap channel [(UnregisterHandle, callback)])
-> [(channel, [(UnregisterHandle, callback)])]
-> HashMap channel [(UnregisterHandle, callback)]
forall a b. (a -> b) -> a -> b
$ ((channel, callback) -> (channel, [(UnregisterHandle, callback)]))
-> [(channel, callback)]
-> [(channel, [(UnregisterHandle, callback)])]
forall a b. (a -> b) -> [a] -> [b]
map ((callback -> [(UnregisterHandle, callback)])
-> (channel, callback) -> (channel, [(UnregisterHandle, callback)])
forall b c d. (b -> c) -> (d, b) -> (d, c)
forall (a :: * -> * -> *) b c d.
Arrow a =>
a b c -> a (d, b) (d, c)
second ((callback -> [(UnregisterHandle, callback)])
 -> (channel, callback)
 -> (channel, [(UnregisterHandle, callback)]))
-> (callback -> [(UnregisterHandle, callback)])
-> (channel, callback)
-> (channel, [(UnregisterHandle, callback)])
forall a b. (a -> b) -> a -> b
$ (UnregisterHandle, callback) -> [(UnregisterHandle, callback)]
forall a. a -> [a]
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((UnregisterHandle, callback) -> [(UnregisterHandle, callback)])
-> (callback -> (UnregisterHandle, callback))
-> callback
-> [(UnregisterHandle, callback)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (UnregisterHandle
0,)) [(channel, callback)]
initialSubs)
    STM
  (TVar (HashSet channel)
   -> TVar (HashSet channel) -> ChannelData channel callback)
-> STM (TVar (HashSet channel))
-> STM (TVar (HashSet channel) -> ChannelData channel callback)
forall a b. STM (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> HashSet channel -> STM (TVar (HashSet channel))
forall a. a -> STM (TVar a)
newTVar HashSet channel
forall a. Monoid a => a
mempty
    STM (TVar (HashSet channel) -> ChannelData channel callback)
-> STM (TVar (HashSet channel))
-> STM (ChannelData channel callback)
forall a b. STM (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> HashSet channel -> STM (TVar (HashSet channel))
forall a. a -> STM (TVar a)
newTVar HashSet channel
forall a. Monoid a => a
mempty

-- | Create a new 'PubSubController'.  Note that this does not subscribe to any channels, it just
-- creates the controller.  The subscriptions will happen once 'pubSubForever' is called.
newPubSubController :: MonadIO m => [(RedisChannel, MessageCallback)] -- ^ the initial subscriptions
                                 -> [(RedisPChannel, PMessageCallback)] -- ^ the initial pattern subscriptions
                                 -> m PubSubController
newPubSubController :: forall (m :: * -> *).
MonadIO m =>
[(ByteString, ByteString -> IO ())]
-> [(ByteString, PMessageCallback)] -> m PubSubController
newPubSubController [(ByteString, ByteString -> IO ())]
initialSubs [(ByteString, PMessageCallback)]
initialPSubs = IO PubSubController -> m PubSubController
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO PubSubController -> m PubSubController)
-> IO PubSubController -> m PubSubController
forall a b. (a -> b) -> a -> b
$ STM PubSubController -> IO PubSubController
forall a. STM a -> IO a
atomically (STM PubSubController -> IO PubSubController)
-> STM PubSubController -> IO PubSubController
forall a b. (a -> b) -> a -> b
$ do
    c <- Natural -> STM (TBQueue PubSub)
forall a. Natural -> STM (TBQueue a)
newTBQueue Natural
10
    lastId <- newTVar 0
    channelData' <- newChannelData initialSubs
    pchannelData' <- newChannelData initialPSubs
    return $ PubSubController c channelData' pchannelData' lastId

#if __GLASGOW_HASKELL__ < 710
type FunctorMonadIO m = (MonadIO m, Functor m)
#else
type FunctorMonadIO m = MonadIO m
#endif

-- | Get the list of current channels in the 'PubSubController'.  WARNING! This might not
-- exactly reflect the subscribed channels in the Redis server, because there is a delay
-- between adding or removing a channel in the 'PubSubController' and when Redis receives
-- and processes the subscription change request.
currentChannels :: FunctorMonadIO m => PubSubController -> m [RedisChannel]
currentChannels :: forall (m :: * -> *).
FunctorMonadIO m =>
PubSubController -> m [ByteString]
currentChannels PubSubController
ctrl = HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> [ByteString]
forall k v. HashMap k v -> [k]
HM.keys (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
 -> [ByteString])
-> m (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> m [ByteString]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (IO (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> m (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
 -> m (HashMap
         ByteString [(UnregisterHandle, ByteString -> IO ())]))
-> IO
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> m (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a b. (a -> b) -> a -> b
$ STM (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> IO
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a. STM a -> IO a
atomically (STM (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
 -> IO
      (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]))
-> STM
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> IO
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a b. (a -> b) -> a -> b
$ TVar (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a. TVar a -> STM a
readTVar (TVar
   (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
 -> STM
      (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]))
-> TVar
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a b. (a -> b) -> a -> b
$ ChannelData ByteString (ByteString -> IO ())
-> TVar
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall channel callback.
ChannelData channel callback
-> TVar (HashMap channel [(UnregisterHandle, callback)])
cdSubscribedChannels (ChannelData ByteString (ByteString -> IO ())
 -> TVar
      (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]))
-> ChannelData ByteString (ByteString -> IO ())
-> TVar
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a b. (a -> b) -> a -> b
$ PubSubController -> ChannelData ByteString (ByteString -> IO ())
pscChannelData PubSubController
ctrl)

-- | Get the list of current pattern channels in the 'PubSubController'.  WARNING! This might not
-- exactly reflect the subscribed channels in the Redis server, because there is a delay
-- between adding or removing a channel in the 'PubSubController' and when Redis receives
-- and processes the subscription change request.
currentPChannels :: FunctorMonadIO m => PubSubController -> m [RedisPChannel]
currentPChannels :: forall (m :: * -> *).
FunctorMonadIO m =>
PubSubController -> m [ByteString]
currentPChannels PubSubController
ctrl = HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> [ByteString]
forall k v. HashMap k v -> [k]
HM.keys (HashMap ByteString [(UnregisterHandle, PMessageCallback)]
 -> [ByteString])
-> m (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> m [ByteString]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (IO (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> m (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
 -> m (HashMap ByteString [(UnregisterHandle, PMessageCallback)]))
-> IO (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> m (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a b. (a -> b) -> a -> b
$ STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> IO (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a. STM a -> IO a
atomically (STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
 -> IO (HashMap ByteString [(UnregisterHandle, PMessageCallback)]))
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> IO (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a b. (a -> b) -> a -> b
$ TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a. TVar a -> STM a
readTVar (TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
 -> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)]))
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a b. (a -> b) -> a -> b
$ ChannelData ByteString PMessageCallback
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall channel callback.
ChannelData channel callback
-> TVar (HashMap channel [(UnregisterHandle, callback)])
cdSubscribedChannels (ChannelData ByteString PMessageCallback
 -> TVar
      (HashMap ByteString [(UnregisterHandle, PMessageCallback)]))
-> ChannelData ByteString PMessageCallback
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a b. (a -> b) -> a -> b
$ PubSubController -> ChannelData ByteString PMessageCallback
pscPChannelData PubSubController
ctrl)

pendingChannels :: MonadIO m => PubSubController -> m (HS.HashSet RedisChannel)
pendingChannels :: forall (m :: * -> *).
MonadIO m =>
PubSubController -> m (HashSet ByteString)
pendingChannels PubSubController
ctrl = IO (HashSet ByteString) -> m (HashSet ByteString)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (HashSet ByteString) -> m (HashSet ByteString))
-> IO (HashSet ByteString) -> m (HashSet ByteString)
forall a b. (a -> b) -> a -> b
$ TVar (HashSet ByteString) -> IO (HashSet ByteString)
forall a. TVar a -> IO a
readTVarIO (TVar (HashSet ByteString) -> IO (HashSet ByteString))
-> TVar (HashSet ByteString) -> IO (HashSet ByteString)
forall a b. (a -> b) -> a -> b
$ ChannelData ByteString (ByteString -> IO ())
-> TVar (HashSet ByteString)
forall channel callback.
ChannelData channel callback -> TVar (HashSet channel)
cdChannelsPendingSubscription (ChannelData ByteString (ByteString -> IO ())
 -> TVar (HashSet ByteString))
-> ChannelData ByteString (ByteString -> IO ())
-> TVar (HashSet ByteString)
forall a b. (a -> b) -> a -> b
$ PubSubController -> ChannelData ByteString (ByteString -> IO ())
pscChannelData PubSubController
ctrl

pendingPatternChannels :: MonadIO m => PubSubController -> m (HS.HashSet RedisPChannel)
pendingPatternChannels :: forall (m :: * -> *).
MonadIO m =>
PubSubController -> m (HashSet ByteString)
pendingPatternChannels PubSubController
ctrl = IO (HashSet ByteString) -> m (HashSet ByteString)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (HashSet ByteString) -> m (HashSet ByteString))
-> IO (HashSet ByteString) -> m (HashSet ByteString)
forall a b. (a -> b) -> a -> b
$ TVar (HashSet ByteString) -> IO (HashSet ByteString)
forall a. TVar a -> IO a
readTVarIO (TVar (HashSet ByteString) -> IO (HashSet ByteString))
-> TVar (HashSet ByteString) -> IO (HashSet ByteString)
forall a b. (a -> b) -> a -> b
$ ChannelData ByteString PMessageCallback
-> TVar (HashSet ByteString)
forall channel callback.
ChannelData channel callback -> TVar (HashSet channel)
cdChannelsPendingSubscription (ChannelData ByteString PMessageCallback
 -> TVar (HashSet ByteString))
-> ChannelData ByteString PMessageCallback
-> TVar (HashSet ByteString)
forall a b. (a -> b) -> a -> b
$ PubSubController -> ChannelData ByteString PMessageCallback
pscPChannelData PubSubController
ctrl

-- type CallbackMap a = HM.HashMap ByteString [(UnregisterHandle, a)]

-- | Helper for `addChannels`. Can take either normal or pattern channels.
addChannelsOfType
    :: Hashable channel
    => UnregisterHandle
    -> [(channel, callback)]
    -> ChannelData channel callback
    -> STM [channel]
addChannelsOfType :: forall channel callback.
Hashable channel =>
UnregisterHandle
-> [(channel, callback)]
-> ChannelData channel callback
-> STM [channel]
addChannelsOfType UnregisterHandle
ident [(channel, callback)]
newChans ChannelData channel callback
channelData = do
    callbacks <- TVar (HashMap channel [(UnregisterHandle, callback)])
-> STM (HashMap channel [(UnregisterHandle, callback)])
forall a. TVar a -> STM a
readTVar (TVar (HashMap channel [(UnregisterHandle, callback)])
 -> STM (HashMap channel [(UnregisterHandle, callback)]))
-> TVar (HashMap channel [(UnregisterHandle, callback)])
-> STM (HashMap channel [(UnregisterHandle, callback)])
forall a b. (a -> b) -> a -> b
$ ChannelData channel callback
-> TVar (HashMap channel [(UnregisterHandle, callback)])
forall channel callback.
ChannelData channel callback
-> TVar (HashMap channel [(UnregisterHandle, callback)])
cdSubscribedChannels ChannelData channel callback
channelData
    pendingCallbacks <- readTVar $ cdChannelsPendingSubscription channelData
    let newChans' = (channel -> Bool) -> [channel] -> [channel]
forall a. (a -> Bool) -> [a] -> [a]
filter (Bool -> Bool
not (Bool -> Bool) -> (channel -> Bool) -> channel -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. HashMap channel [(UnregisterHandle, callback)]
-> HashSet channel -> channel -> Bool
forall k v1. Hashable k => HashMap k v1 -> HashSet k -> k -> Bool
memberMapOrSet HashMap channel [(UnregisterHandle, callback)]
callbacks HashSet channel
pendingCallbacks) ([channel] -> [channel]) -> [channel] -> [channel]
forall a b. (a -> b) -> a -> b
$ (channel, callback) -> channel
forall a b. (a, b) -> a
fst ((channel, callback) -> channel)
-> [(channel, callback)] -> [channel]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [(channel, callback)]
newChans
    writeTVar (cdSubscribedChannels channelData) (HM.unionWith (++) callbacks $ (\callback
z -> [(UnregisterHandle
ident,callback
z)]) <$> HM.fromList newChans)
    writeTVar (cdChannelsPendingSubscription channelData) $ HS.union pendingCallbacks $ HS.fromList newChans'
    pure newChans'

-- | Add channels into the 'PubSubController', and if there is an active 'pubSubForever', send the subscribe
-- and psubscribe commands to Redis.  The 'addChannels' function is thread-safe.  This function
-- does not wait for Redis to acknowledge that the channels have actually been subscribed; use
-- 'addChannelsAndWait' for that.
--
-- You can subscribe to the same channel or pattern channel multiple times; the 'PubSubController' keeps
-- a list of callbacks and executes each callback in response to a message.
--
-- The return value is an action 'UnregisterCallbacksAction' which will unregister the callbacks,
-- which should typically used with 'bracket'.
addChannels :: MonadIO m => PubSubController
                         -> [(RedisChannel, MessageCallback)] -- ^ the channels to subscribe to
                         -> [(RedisPChannel, PMessageCallback)] -- ^ the channels to pattern subscribe to
                         -> m UnregisterCallbacksAction
addChannels :: forall (m :: * -> *).
MonadIO m =>
PubSubController
-> [(ByteString, ByteString -> IO ())]
-> [(ByteString, PMessageCallback)]
-> m (IO ())
addChannels PubSubController
_ [] [] = IO () -> m (IO ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (IO () -> m (IO ())) -> IO () -> m (IO ())
forall a b. (a -> b) -> a -> b
$ () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
addChannels PubSubController
ctrl [(ByteString, ByteString -> IO ())]
newChans [(ByteString, PMessageCallback)]
newPChans = IO (IO ()) -> m (IO ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IO ()) -> m (IO ())) -> IO (IO ()) -> m (IO ())
forall a b. (a -> b) -> a -> b
$ do
    ident <- STM UnregisterHandle -> IO UnregisterHandle
forall a. STM a -> IO a
atomically (STM UnregisterHandle -> IO UnregisterHandle)
-> STM UnregisterHandle -> IO UnregisterHandle
forall a b. (a -> b) -> a -> b
$ do
      TVar UnregisterHandle
-> (UnregisterHandle -> UnregisterHandle) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar (PubSubController -> TVar UnregisterHandle
lastUsedCallbackId PubSubController
ctrl) (UnregisterHandle -> UnregisterHandle -> UnregisterHandle
forall a. Num a => a -> a -> a
+UnregisterHandle
1)
      ident <- TVar UnregisterHandle -> STM UnregisterHandle
forall a. TVar a -> STM a
readTVar (TVar UnregisterHandle -> STM UnregisterHandle)
-> TVar UnregisterHandle -> STM UnregisterHandle
forall a b. (a -> b) -> a -> b
$ PubSubController -> TVar UnregisterHandle
lastUsedCallbackId PubSubController
ctrl
      newChannels <- addChannelsOfType ident newChans $ pscChannelData ctrl
      newPChannels <- addChannelsOfType ident newPChans $ pscPChannelData ctrl
      let ps = [ByteString] -> PubSub
subscribe [ByteString]
newChannels PubSub -> PubSub -> PubSub
forall a. Monoid a => a -> a -> a
`mappend` [ByteString] -> PubSub
psubscribe [ByteString]
newPChannels
      writeTBQueue (sendChanges ctrl) ps
      return ident
    return $ unsubChannels ctrl (map fst newChans) (map fst newPChans) ident


-- | Call 'addChannels' and then wait for Redis to acknowledge that the channels are actually subscribed.
--
-- Note that this function waits for requested subscription change requests, so if you for example call
-- 'addChannelsAndWait' from multiple threads simultaneously, they will all wait their pending
-- subscription changes to be acknowledged by Redis.
--
-- This also correctly waits if the network connection dies during the subscription change.  Say that the
-- network connection dies right after we send a subscription change to Redis.  'pubSubForever' will throw
-- 'ConnectionLost' and 'addChannelsAndWait' will continue to wait.  Once you recall 'pubSubForever'
-- with the same 'PubSubController', 'pubSubForever' will open a new connection, send subscription commands
-- for all channels in the 'PubSubController' (which include the ones we are waiting for),
-- and wait for the responses from Redis.  Only once we receive the response from Redis that it has subscribed
-- to all channels in 'PubSubController' will 'addChannelsAndWait' unblock and return.
addChannelsAndWait :: MonadIO m => PubSubController
                                -> [(RedisChannel, MessageCallback)] -- ^ the channels to subscribe to
                                -> [(RedisPChannel, PMessageCallback)] -- ^ the channels to psubscribe to
                                -> m UnregisterCallbacksAction
addChannelsAndWait :: forall (m :: * -> *).
MonadIO m =>
PubSubController
-> [(ByteString, ByteString -> IO ())]
-> [(ByteString, PMessageCallback)]
-> m (IO ())
addChannelsAndWait PubSubController
_ [] [] = IO () -> m (IO ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (IO () -> m (IO ())) -> IO () -> m (IO ())
forall a b. (a -> b) -> a -> b
$ () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
addChannelsAndWait PubSubController
ctrl [(ByteString, ByteString -> IO ())]
newChans [(ByteString, PMessageCallback)]
newPChans = do
  unreg <- PubSubController
-> [(ByteString, ByteString -> IO ())]
-> [(ByteString, PMessageCallback)]
-> m (IO ())
forall (m :: * -> *).
MonadIO m =>
PubSubController
-> [(ByteString, ByteString -> IO ())]
-> [(ByteString, PMessageCallback)]
-> m (IO ())
addChannels PubSubController
ctrl [(ByteString, ByteString -> IO ())]
newChans [(ByteString, PMessageCallback)]
newPChans
  liftIO $
    waitUntilAbsent
      [ (cdChannelsPendingSubscription $ pscChannelData ctrl, fst <$> newChans)
      , (cdChannelsPendingSubscription $ pscPChannelData ctrl, fst <$> newPChans)
      ]
  return unreg

-- | Wait until all interesting channels are instantiated.
waitUntilAbsent :: Hashable channel => [(TVar (HS.HashSet channel), [channel])] -> IO ()
waitUntilAbsent :: forall channel.
Hashable channel =>
[(TVar (HashSet channel), [channel])] -> IO ()
waitUntilAbsent [(TVar (HashSet channel), [channel])]
pending  = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
  [(TVar (HashSet channel), [channel])]
-> ((TVar (HashSet channel), [channel]) -> STM ()) -> STM ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [(TVar (HashSet channel), [channel])]
pending (((TVar (HashSet channel), [channel]) -> STM ()) -> STM ())
-> ((TVar (HashSet channel), [channel]) -> STM ()) -> STM ()
forall a b. (a -> b) -> a -> b
$ \(TVar (HashSet channel)
tPendingChannels, [channel]
channels) -> do
    Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([channel] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [channel]
channels) (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ do
      pendingChannels' <- TVar (HashSet channel) -> STM (HashSet channel)
forall a. TVar a -> STM a
readTVar TVar (HashSet channel)
tPendingChannels
      when (any (\channel
ch -> channel -> HashSet channel -> Bool
forall a. Hashable a => a -> HashSet a -> Bool
HS.member channel
ch HashSet channel
pendingChannels') channels) retry

-- | Remove channels from the 'PubSubController', and if there is an active 'pubSubForever', send the
-- unsubscribe commands to Redis.  Note that as soon as this function returns, no more callbacks will be
-- executed even if more messages arrive during the period when we request to unsubscribe from the channel
-- and Redis actually processes the unsubscribe request.  This function is thread-safe.
--
-- If you remove all channels, the connection in 'pubSubForever' to redis will stay open and waiting for
-- any new channels from a call to 'addChannels'.  If you really want to close the connection,
-- use 'Control.Concurrent.killThread' or 'Control.Concurrent.Async.cancel' to kill the thread running
-- 'pubSubForever'.
removeChannels :: MonadIO m => PubSubController
                            -> [RedisChannel]
                            -> [RedisPChannel]
                            -> m ()
removeChannels :: forall (m :: * -> *).
MonadIO m =>
PubSubController -> [ByteString] -> [ByteString] -> m ()
removeChannels PubSubController
_ [] [] = () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
removeChannels PubSubController
ctrl [ByteString]
remChans [ByteString]
remPChans = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    remChans' <- ChannelData ByteString (ByteString -> IO ())
-> [ByteString] -> STM [ByteString]
forall channel callback.
Hashable channel =>
ChannelData channel callback -> [channel] -> STM [channel]
removeChannels' (PubSubController -> ChannelData ByteString (ByteString -> IO ())
pscChannelData PubSubController
ctrl) [ByteString]
remChans
    remPChans' <- removeChannels' (pscPChannelData ctrl) remPChans
    writeTBQueue (sendChanges ctrl) $ unsubscribe1 remChans' `mappend` punsubscribe1 remPChans'

#if !(MIN_VERSION_stm(2,3,0))
-- | Strict version of 'modifyTVar'.
--
-- @since 2.3
modifyTVar' :: TVar a -> (a -> a) -> STM ()
modifyTVar' var f = do
    x <- readTVar var
    writeTVar var $! f x
{-# INLINE modifyTVar' #-}
#endif

-- Helper for `removeChannels` that works on normal or pattern channels
removeChannels' :: (Hashable channel) => ChannelData channel callback -> [channel] -> STM [channel]
removeChannels' :: forall channel callback.
Hashable channel =>
ChannelData channel callback -> [channel] -> STM [channel]
removeChannels' ChannelData channel callback
channelData [channel]
remChannels = do
    subbedChannels <- TVar (HashMap channel [(UnregisterHandle, callback)])
-> STM (HashMap channel [(UnregisterHandle, callback)])
forall a. TVar a -> STM a
readTVar (TVar (HashMap channel [(UnregisterHandle, callback)])
 -> STM (HashMap channel [(UnregisterHandle, callback)]))
-> TVar (HashMap channel [(UnregisterHandle, callback)])
-> STM (HashMap channel [(UnregisterHandle, callback)])
forall a b. (a -> b) -> a -> b
$ ChannelData channel callback
-> TVar (HashMap channel [(UnregisterHandle, callback)])
forall channel callback.
ChannelData channel callback
-> TVar (HashMap channel [(UnregisterHandle, callback)])
cdSubscribedChannels ChannelData channel callback
channelData
    pendingChannelSubs <- readTVar $ cdChannelsPendingSubscription channelData
    let remChannels' = (channel -> Bool) -> [channel] -> [channel]
forall a. (a -> Bool) -> [a] -> [a]
filter (HashMap channel [(UnregisterHandle, callback)]
-> HashSet channel -> channel -> Bool
forall k v1. Hashable k => HashMap k v1 -> HashSet k -> k -> Bool
memberMapOrSet HashMap channel [(UnregisterHandle, callback)]
subbedChannels HashSet channel
pendingChannelSubs) [channel]
remChannels
    writeTVar (cdSubscribedChannels channelData) (L.foldl' (flip HM.delete) subbedChannels remChannels')
    writeTVar (cdChannelsPendingSubscription channelData) (L.foldl' (flip HS.delete) pendingChannelSubs remChannels')
    modifyTVar' (cdChannelsPendingRemoval channelData) $ flip (L.foldl' $ flip HS.insert) remChannels'
    pure remChannels'

memberMapOrSet :: Hashable k => HM.HashMap k v1 -> HS.HashSet k -> k -> Bool
memberMapOrSet :: forall k v1. Hashable k => HashMap k v1 -> HashSet k -> k -> Bool
memberMapOrSet HashMap k v1
m HashSet k
s k
k = k -> HashMap k v1 -> Bool
forall k a. Hashable k => k -> HashMap k a -> Bool
HM.member k
k HashMap k v1
m Bool -> Bool -> Bool
|| k -> HashSet k -> Bool
forall a. Hashable a => a -> HashSet a -> Bool
HS.member k
k HashSet k
s

unregisterHandles
    :: forall channel callback. Hashable channel => ChannelData channel callback
    -> [channel]
    -> UnregisterHandle
    -> STM [channel]
unregisterHandles :: forall channel callback.
Hashable channel =>
ChannelData channel callback
-> [channel] -> UnregisterHandle -> STM [channel]
unregisterHandles ChannelData channel callback
channelData [channel]
remChansParam UnregisterHandle
h = do
    callbacks <- TVar (HashMap channel [(UnregisterHandle, callback)])
-> STM (HashMap channel [(UnregisterHandle, callback)])
forall a. TVar a -> STM a
readTVar (TVar (HashMap channel [(UnregisterHandle, callback)])
 -> STM (HashMap channel [(UnregisterHandle, callback)]))
-> TVar (HashMap channel [(UnregisterHandle, callback)])
-> STM (HashMap channel [(UnregisterHandle, callback)])
forall a b. (a -> b) -> a -> b
$ ChannelData channel callback
-> TVar (HashMap channel [(UnregisterHandle, callback)])
forall channel callback.
ChannelData channel callback
-> TVar (HashMap channel [(UnregisterHandle, callback)])
cdSubscribedChannels ChannelData channel callback
channelData
    let remChans = (channel -> Bool) -> [channel] -> [channel]
forall a. (a -> Bool) -> [a] -> [a]
filter (channel -> HashMap channel [(UnregisterHandle, callback)] -> Bool
forall k a. Hashable k => k -> HashMap k a -> Bool
`HM.member` HashMap channel [(UnregisterHandle, callback)]
callbacks) [channel]
remChansParam

    -- helper functions to filter out handlers that match
    -- returns number of removals, and remaining subscriptions
    -- maps after taking out channels matching the handle
    let callbacks' = (HashMap channel [(UnregisterHandle, callback)]
 -> channel -> HashMap channel [(UnregisterHandle, callback)])
-> HashMap channel [(UnregisterHandle, callback)]
-> [channel]
-> HashMap channel [(UnregisterHandle, callback)]
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
L.foldl' HashMap channel [(UnregisterHandle, callback)]
-> channel -> HashMap channel [(UnregisterHandle, callback)]
forall a.
HashMap channel [(UnregisterHandle, a)]
-> channel -> HashMap channel [(UnregisterHandle, a)]
removeHandles HashMap channel [(UnregisterHandle, callback)]
callbacks [channel]
remChans
        remChans' = (channel -> Bool) -> [channel] -> [channel]
forall a. (a -> Bool) -> [a] -> [a]
filter (\channel
chan -> channel -> HashMap channel [(UnregisterHandle, callback)] -> Bool
forall k a. Hashable k => k -> HashMap k a -> Bool
HM.member channel
chan HashMap channel [(UnregisterHandle, callback)]
callbacks Bool -> Bool -> Bool
&& Bool -> Bool
not (channel -> HashMap channel [(UnregisterHandle, callback)] -> Bool
forall k a. Hashable k => k -> HashMap k a -> Bool
HM.member channel
chan HashMap channel [(UnregisterHandle, callback)]
callbacks')) [channel]
remChans

    writeTVar (cdSubscribedChannels channelData) callbacks'
    unless (null remChans') $ modifyTVar (cdChannelsPendingSubscription channelData) (`HS.difference` HS.fromList remChans')
    pure remChans'

    where
        filterHandle :: Maybe [(UnregisterHandle,a)] -> Maybe [(UnregisterHandle,a)]
        filterHandle :: forall a.
Maybe [(UnregisterHandle, a)] -> Maybe [(UnregisterHandle, a)]
filterHandle Maybe [(UnregisterHandle, a)]
Nothing = Maybe [(UnregisterHandle, a)]
forall a. Maybe a
Nothing
        filterHandle (Just [(UnregisterHandle, a)]
lst) = case ((UnregisterHandle, a) -> Bool)
-> [(UnregisterHandle, a)] -> [(UnregisterHandle, a)]
forall a. (a -> Bool) -> [a] -> [a]
filter (\(UnregisterHandle, a)
x -> (UnregisterHandle, a) -> UnregisterHandle
forall a b. (a, b) -> a
fst (UnregisterHandle, a)
x UnregisterHandle -> UnregisterHandle -> Bool
forall a. Eq a => a -> a -> Bool
/= UnregisterHandle
h) [(UnregisterHandle, a)]
lst of
                                    [] -> Maybe [(UnregisterHandle, a)]
forall a. Maybe a
Nothing
                                    [(UnregisterHandle, a)]
xs -> [(UnregisterHandle, a)] -> Maybe [(UnregisterHandle, a)]
forall a. a -> Maybe a
Just [(UnregisterHandle, a)]
xs

        removeHandles :: HM.HashMap channel [(UnregisterHandle,a)]
                      -> channel
                      -> HM.HashMap channel [(UnregisterHandle,a)]
        removeHandles :: forall a.
HashMap channel [(UnregisterHandle, a)]
-> channel -> HashMap channel [(UnregisterHandle, a)]
removeHandles HashMap channel [(UnregisterHandle, a)]
m channel
k = case Maybe [(UnregisterHandle, a)] -> Maybe [(UnregisterHandle, a)]
forall a.
Maybe [(UnregisterHandle, a)] -> Maybe [(UnregisterHandle, a)]
filterHandle (channel
-> HashMap channel [(UnregisterHandle, a)]
-> Maybe [(UnregisterHandle, a)]
forall k v. Hashable k => k -> HashMap k v -> Maybe v
HM.lookup channel
k HashMap channel [(UnregisterHandle, a)]
m) of -- recent versions of unordered-containers have alter
            Maybe [(UnregisterHandle, a)]
Nothing -> channel
-> HashMap channel [(UnregisterHandle, a)]
-> HashMap channel [(UnregisterHandle, a)]
forall k v. Hashable k => k -> HashMap k v -> HashMap k v
HM.delete channel
k HashMap channel [(UnregisterHandle, a)]
m
            Just [(UnregisterHandle, a)]
v  -> channel
-> [(UnregisterHandle, a)]
-> HashMap channel [(UnregisterHandle, a)]
-> HashMap channel [(UnregisterHandle, a)]
forall k v. Hashable k => k -> v -> HashMap k v -> HashMap k v
HM.insert channel
k [(UnregisterHandle, a)]
v HashMap channel [(UnregisterHandle, a)]
m

-- | Internal function to unsubscribe only from those channels matching the given handle.
unsubChannels :: PubSubController -> [RedisChannel] -> [RedisPChannel] -> UnregisterHandle -> IO ()
unsubChannels :: PubSubController
-> [ByteString] -> [ByteString] -> UnregisterHandle -> IO ()
unsubChannels PubSubController
ctrl [ByteString]
chans [ByteString]
pchans UnregisterHandle
h = IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    channelsToDrop <- ChannelData ByteString (ByteString -> IO ())
-> [ByteString] -> UnregisterHandle -> STM [ByteString]
forall channel callback.
Hashable channel =>
ChannelData channel callback
-> [channel] -> UnregisterHandle -> STM [channel]
unregisterHandles (PubSubController -> ChannelData ByteString (ByteString -> IO ())
pscChannelData PubSubController
ctrl) [ByteString]
chans UnregisterHandle
h
    pChannelsToDrop <- unregisterHandles (pscPChannelData ctrl) pchans h

    let commands = [ByteString] -> PubSub
unsubscribe1 [ByteString]
channelsToDrop PubSub -> PubSub -> PubSub
forall a. Monoid a => a -> a -> a
`mappend` [ByteString] -> PubSub
punsubscribe1 [ByteString]
pChannelsToDrop

    -- do the unsubscribe
    writeTBQueue (sendChanges ctrl) commands
    return ()

-- | Call 'removeChannels' and then wait for all pending subscription change requests to be acknowledged
-- by Redis.  This uses the same waiting logic as 'addChannelsAndWait'.  Since 'removeChannels' immediately
-- notifies the 'PubSubController' to start discarding messages, you likely don't need this function and
-- can just use 'removeChannels'.
removeChannelsAndWait :: MonadIO m => PubSubController
                                   -> [RedisChannel]
                                   -> [RedisPChannel]
                                   -> m ()
removeChannelsAndWait :: forall (m :: * -> *).
MonadIO m =>
PubSubController -> [ByteString] -> [ByteString] -> m ()
removeChannelsAndWait PubSubController
ctrl [ByteString]
remChannels [ByteString]
remPChannels = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
    (remChans', remPChans') <- STM ([ByteString], [ByteString]) -> IO ([ByteString], [ByteString])
forall a. STM a -> IO a
atomically (STM ([ByteString], [ByteString])
 -> IO ([ByteString], [ByteString]))
-> STM ([ByteString], [ByteString])
-> IO ([ByteString], [ByteString])
forall a b. (a -> b) -> a -> b
$ do
        remChans' <- ChannelData ByteString (ByteString -> IO ())
-> [ByteString] -> STM [ByteString]
forall channel callback.
Hashable channel =>
ChannelData channel callback -> [channel] -> STM [channel]
removeChannels' (PubSubController -> ChannelData ByteString (ByteString -> IO ())
pscChannelData PubSubController
ctrl) [ByteString]
remChannels
        remPChans' <- removeChannels' (pscPChannelData ctrl) remPChannels
        writeTBQueue (sendChanges ctrl) $ unsubscribe1 remChans' `mappend` punsubscribe1 remPChans'
        pure (remChans', remPChans')
    waitUntilAbsent
      [ (cdChannelsPendingRemoval $ pscChannelData ctrl, remChans')
      , (cdChannelsPendingRemoval $ pscPChannelData ctrl, remPChans')
      ]

-- | Internal thread which listens for messages and executes callbacks.
-- This is the only thread which ever receives data from the underlying
-- connection.
listenThread :: PubSubController -> PP.Connection -> IO ()
listenThread :: PubSubController -> Connection -> IO ()
listenThread PubSubController
ctrl Connection
rawConn = IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    msg <- Connection -> IO Reply
PP.recv Connection
rawConn
    case decodeMsg msg of
        Msg message :: Message
message@(Message ByteString
channel ByteString
_) -> do
          cm <- STM (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> IO
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a. STM a -> IO a
atomically (STM (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
 -> IO
      (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]))
-> STM
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> IO
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a b. (a -> b) -> a -> b
$ TVar (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a. TVar a -> STM a
readTVar (TVar
   (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
 -> STM
      (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]))
-> TVar
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a b. (a -> b) -> a -> b
$ ChannelData ByteString (ByteString -> IO ())
-> TVar
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall channel callback.
ChannelData channel callback
-> TVar (HashMap channel [(UnregisterHandle, callback)])
cdSubscribedChannels (ChannelData ByteString (ByteString -> IO ())
 -> TVar
      (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]))
-> ChannelData ByteString (ByteString -> IO ())
-> TVar
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a b. (a -> b) -> a -> b
$ PubSubController -> ChannelData ByteString (ByteString -> IO ())
pscChannelData PubSubController
ctrl
          forM_ (HM.lookup channel cm) $ \[(UnregisterHandle, ByteString -> IO ())]
c -> do
            IO PubSub -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO PubSub -> IO ()) -> IO PubSub -> IO ()
forall a b. (a -> b) -> a -> b
$ Hooks -> CallbackHook
Core.callbackHook (Connection -> Hooks
PP.hooks Connection
rawConn) (\Message
m -> ((UnregisterHandle, ByteString -> IO ()) -> IO ())
-> [(UnregisterHandle, ByteString -> IO ())] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\(UnregisterHandle
_,ByteString -> IO ()
x) -> ByteString -> IO ()
x (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ Message -> ByteString
msgMessage Message
m) [(UnregisterHandle, ByteString -> IO ())]
c IO () -> PubSub -> IO PubSub
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> PubSub
forall a. Monoid a => a
mempty) Message
message
        Msg message :: Message
message@(PMessage ByteString
pattern ByteString
_ ByteString
_) -> do
          pm <- STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> IO (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a. STM a -> IO a
atomically (STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
 -> IO (HashMap ByteString [(UnregisterHandle, PMessageCallback)]))
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> IO (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a b. (a -> b) -> a -> b
$ TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a. TVar a -> STM a
readTVar (TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
 -> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)]))
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a b. (a -> b) -> a -> b
$ ChannelData ByteString PMessageCallback
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall channel callback.
ChannelData channel callback
-> TVar (HashMap channel [(UnregisterHandle, callback)])
cdSubscribedChannels (ChannelData ByteString PMessageCallback
 -> TVar
      (HashMap ByteString [(UnregisterHandle, PMessageCallback)]))
-> ChannelData ByteString PMessageCallback
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a b. (a -> b) -> a -> b
$ PubSubController -> ChannelData ByteString PMessageCallback
pscPChannelData PubSubController
ctrl
          forM_ (HM.lookup pattern pm) $ \[(UnregisterHandle, PMessageCallback)]
c -> do
            IO PubSub -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO PubSub -> IO ()) -> IO PubSub -> IO ()
forall a b. (a -> b) -> a -> b
$ Hooks -> CallbackHook
Core.callbackHook (Connection -> Hooks
PP.hooks Connection
rawConn) (\Message
m -> ((UnregisterHandle, PMessageCallback) -> IO ())
-> [(UnregisterHandle, PMessageCallback)] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\(UnregisterHandle
_,PMessageCallback
x) -> PMessageCallback
x (Message -> ByteString
msgChannel Message
m) (Message -> ByteString
msgMessage Message
m)) [(UnregisterHandle, PMessageCallback)]
c IO () -> PubSub -> IO PubSub
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> PubSub
forall a. Monoid a => a
mempty) Message
message
        Subscribed ByteString
chan -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (HashSet ByteString)
-> (HashSet ByteString -> HashSet ByteString) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar (ChannelData ByteString (ByteString -> IO ())
-> TVar (HashSet ByteString)
forall channel callback.
ChannelData channel callback -> TVar (HashSet channel)
cdChannelsPendingSubscription (ChannelData ByteString (ByteString -> IO ())
 -> TVar (HashSet ByteString))
-> ChannelData ByteString (ByteString -> IO ())
-> TVar (HashSet ByteString)
forall a b. (a -> b) -> a -> b
$ PubSubController -> ChannelData ByteString (ByteString -> IO ())
pscChannelData PubSubController
ctrl) ((HashSet ByteString -> HashSet ByteString) -> STM ())
-> (HashSet ByteString -> HashSet ByteString) -> STM ()
forall a b. (a -> b) -> a -> b
$ ByteString -> HashSet ByteString -> HashSet ByteString
forall a. Hashable a => a -> HashSet a -> HashSet a
HS.delete ByteString
chan
        PSubscribed ByteString
chan -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (HashSet ByteString)
-> (HashSet ByteString -> HashSet ByteString) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar (ChannelData ByteString PMessageCallback
-> TVar (HashSet ByteString)
forall channel callback.
ChannelData channel callback -> TVar (HashSet channel)
cdChannelsPendingSubscription (ChannelData ByteString PMessageCallback
 -> TVar (HashSet ByteString))
-> ChannelData ByteString PMessageCallback
-> TVar (HashSet ByteString)
forall a b. (a -> b) -> a -> b
$ PubSubController -> ChannelData ByteString PMessageCallback
pscPChannelData PubSubController
ctrl) ((HashSet ByteString -> HashSet ByteString) -> STM ())
-> (HashSet ByteString -> HashSet ByteString) -> STM ()
forall a b. (a -> b) -> a -> b
$ ByteString -> HashSet ByteString -> HashSet ByteString
forall a. Hashable a => a -> HashSet a -> HashSet a
HS.delete ByteString
chan
        Unsubscribed ByteString
chan Int
_ -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (HashSet ByteString)
-> (HashSet ByteString -> HashSet ByteString) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar (ChannelData ByteString (ByteString -> IO ())
-> TVar (HashSet ByteString)
forall channel callback.
ChannelData channel callback -> TVar (HashSet channel)
cdChannelsPendingRemoval (ChannelData ByteString (ByteString -> IO ())
 -> TVar (HashSet ByteString))
-> ChannelData ByteString (ByteString -> IO ())
-> TVar (HashSet ByteString)
forall a b. (a -> b) -> a -> b
$ PubSubController -> ChannelData ByteString (ByteString -> IO ())
pscChannelData PubSubController
ctrl) ((HashSet ByteString -> HashSet ByteString) -> STM ())
-> (HashSet ByteString -> HashSet ByteString) -> STM ()
forall a b. (a -> b) -> a -> b
$ ByteString -> HashSet ByteString -> HashSet ByteString
forall a. Hashable a => a -> HashSet a -> HashSet a
HS.delete ByteString
chan
        PUnsubscribed ByteString
chan Int
_ -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (HashSet ByteString)
-> (HashSet ByteString -> HashSet ByteString) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar (ChannelData ByteString PMessageCallback
-> TVar (HashSet ByteString)
forall channel callback.
ChannelData channel callback -> TVar (HashSet channel)
cdChannelsPendingRemoval (ChannelData ByteString PMessageCallback
 -> TVar (HashSet ByteString))
-> ChannelData ByteString PMessageCallback
-> TVar (HashSet ByteString)
forall a b. (a -> b) -> a -> b
$ PubSubController -> ChannelData ByteString PMessageCallback
pscPChannelData PubSubController
ctrl) ((HashSet ByteString -> HashSet ByteString) -> STM ())
-> (HashSet ByteString -> HashSet ByteString) -> STM ()
forall a b. (a -> b) -> a -> b
$ ByteString -> HashSet ByteString -> HashSet ByteString
forall a. Hashable a => a -> HashSet a -> HashSet a
HS.delete ByteString
chan

-- | Internal thread which sends subscription change requests.
-- This is the only thread which ever sends data on the underlying
-- connection.
sendThread :: PubSubController -> PP.Connection -> IO ()
sendThread :: PubSubController -> Connection -> IO ()
sendThread PubSubController
ctrl Connection
rawConn = IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    PubSub{..} <- STM PubSub -> IO PubSub
forall a. STM a -> IO a
atomically (STM PubSub -> IO PubSub) -> STM PubSub -> IO PubSub
forall a b. (a -> b) -> a -> b
$ TBQueue PubSub -> STM PubSub
forall a. TBQueue a -> STM a
readTBQueue (PubSubController -> TBQueue PubSub
sendChanges PubSubController
ctrl)
    rawSendCmd rawConn subs
    rawSendCmd rawConn unsubs
    rawSendCmd rawConn psubs
    rawSendCmd rawConn punsubs
    -- normally, the socket is flushed during 'recv', but
    -- 'recv' could currently be blocking on a message.
    PP.flush rawConn

-- | Open a connection to the Redis server, register to all channels in the 'PubSubController',
-- and process messages and subscription change requests forever.  The only way this will ever
-- exit is if there is an exception from the network code or an unhandled exception
-- in a 'MessageCallback' or 'PMessageCallback'. For example, if the network connection to Redis
-- dies, 'pubSubForever' will throw a 'ConnectionLost'.  When such an exception is
-- thrown, you can recall 'pubSubForever' with the same 'PubSubController' which will open a
-- new connection and resubscribe to all the channels which are tracked in the 'PubSubController'.
--
-- The general pattern is therefore during program startup create a 'PubSubController' and fork
-- a thread which calls 'pubSubForever' in a loop (using an exponential backoff algorithm
-- such as the <https://hackage.haskell.org/package/retry retry> package to not hammer the Redis
-- server if it does die).  For example,
--
-- @
-- myhandler :: ByteString -> IO ()
-- myhandler msg = putStrLn $ unpack $ decodeUtf8 msg
--
-- onInitialComplete :: IO ()
-- onInitialComplete = putStrLn "Redis acknowledged that mychannel is now subscribed"
--
-- main :: IO ()
-- main = do
--   conn <- connect defaultConnectInfo
--   pubSubCtrl <- newPubSubController [("mychannel", myhandler)] []
--   concurrently ( forever $
--       pubSubForever conn pubSubCtrl onInitialComplete
--         \`catch\` (\\(e :: SomeException) -> do
--           putStrLn $ "Got error: " ++ show e
--           threadDelay $ 50*1000) -- TODO: use exponential backoff
--        ) $ restOfYourProgram
--
--
--   {- elsewhere in your program, use pubSubCtrl to change subscriptions -}
-- @
--
-- At most one active 'pubSubForever' can be running against a single 'PubSubController' at any time.  If
-- two active calls to 'pubSubForever' share a single 'PubSubController' there will be deadlocks.  If
-- you do want to process messages using multiple connections to Redis, you can create more than one
-- 'PubSubController'.  For example, create one PubSubController for each 'Control.Concurrent.getNumCapabilities'
-- and then create a Haskell thread bound to each capability each calling 'pubSubForever' in a loop.
-- This will create one network connection per controller/capability and allow you to
-- register separate channels and callbacks for each controller, spreading the load across the capabilities.
pubSubForever :: Connection.Connection -- ^ The connection pool
              -> PubSubController -- ^ The controller which keeps track of all subscriptions and handlers
              -> IO () -- ^ This action is executed once Redis acknowledges that all the subscriptions in
                       -- the controller are now subscribed.  You can use this after an exception (such as
                       -- 'ConnectionLost') to signal that all subscriptions are now reactivated.
              -> IO ()
pubSubForever :: Connection -> PubSubController -> IO () -> IO ()
pubSubForever (Connection.NonClusteredConnection Pool Connection
pool) PubSubController
ctrl IO ()
onInitialLoad =
    Pool Connection -> (Connection -> IO ()) -> IO ()
forall a r. Pool a -> (a -> IO r) -> IO r
withResource Pool Connection
pool ((Connection -> IO ()) -> IO ()) -> (Connection -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Connection
rawConn -> Connection -> PubSubController -> IO () -> IO ()
pubSubForeverOnConn Connection
rawConn PubSubController
ctrl IO ()
onInitialLoad
pubSubForever (Connection.ClusteredConnection MVar ShardMap
_ Pool Connection
pool) PubSubController
ctrl IO ()
onInitialLoad = Pool Connection -> (Connection -> IO ()) -> IO ()
forall a r. Pool a -> (a -> IO r) -> IO r
withResource Pool Connection
pool ((Connection -> IO ()) -> IO ()) -> (Connection -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Connection
clusterConn -> do
    masterNodeConns <- Connection -> IO [NodeConnection]
Cluster.masterNodes Connection
clusterConn
    nodeConn <- case masterNodeConns of
      [] -> IOError -> IO NodeConnection
forall a. IOError -> IO a
ioError (IOError -> IO NodeConnection) -> IOError -> IO NodeConnection
forall a b. (a -> b) -> a -> b
$ String -> IOError
userError String
"Hedis: clustered pubSubForever requires at least one master node"
      NodeConnection
x:[NodeConnection]
_ -> NodeConnection -> IO NodeConnection
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure NodeConnection
x
    rawConn <- PP.fromCtxWithHooks (Cluster.nodeConnectionContext nodeConn) (Cluster.hooks clusterConn)
    PP.beginReceiving rawConn
    pubSubForeverOnConn rawConn ctrl onInitialLoad

pubSubForeverOnConn :: PP.Connection -> PubSubController -> IO () -> IO ()
pubSubForeverOnConn :: Connection -> PubSubController -> IO () -> IO ()
pubSubForeverOnConn Connection
rawConn PubSubController
ctrl IO ()
onInitialLoad = do
    -- get initial subscriptions and write them into the queue.
    STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
      let loop :: STM ()
loop = TBQueue PubSub -> STM (Maybe PubSub)
forall a. TBQueue a -> STM (Maybe a)
tryReadTBQueue (PubSubController -> TBQueue PubSub
sendChanges PubSubController
ctrl) STM (Maybe PubSub) -> (Maybe PubSub -> STM ()) -> STM ()
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>=
                   \Maybe PubSub
x -> if Maybe PubSub -> Bool
forall a. Maybe a -> Bool
isJust Maybe PubSub
x then STM ()
loop else () -> STM ()
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
      STM ()
loop
      channels <- (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
 -> [ByteString])
-> STM
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM [ByteString]
forall a b. (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> [ByteString]
forall k v. HashMap k v -> [k]
HM.keys (STM (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
 -> STM [ByteString])
-> STM
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM [ByteString]
forall a b. (a -> b) -> a -> b
$ TVar (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a. TVar a -> STM a
readTVar (TVar
   (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
 -> STM
      (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]))
-> TVar
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a b. (a -> b) -> a -> b
$ ChannelData ByteString (ByteString -> IO ())
-> TVar
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall channel callback.
ChannelData channel callback
-> TVar (HashMap channel [(UnregisterHandle, callback)])
cdSubscribedChannels (ChannelData ByteString (ByteString -> IO ())
 -> TVar
      (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]))
-> ChannelData ByteString (ByteString -> IO ())
-> TVar
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a b. (a -> b) -> a -> b
$ PubSubController -> ChannelData ByteString (ByteString -> IO ())
pscChannelData PubSubController
ctrl
      patternChannels <- fmap HM.keys $ readTVar $ cdSubscribedChannels $ pscPChannelData ctrl
      let ps = [ByteString] -> PubSub
subscribe [ByteString]
channels PubSub -> PubSub -> PubSub
forall a. Monoid a => a -> a -> a
`mappend` [ByteString] -> PubSub
psubscribe [ByteString]
patternChannels
      writeTBQueue (sendChanges ctrl) ps
      writeTVar (cdChannelsPendingSubscription $ pscChannelData ctrl) $ HS.fromList channels
      writeTVar (cdChannelsPendingSubscription $ pscPChannelData ctrl) $ HS.fromList patternChannels

    IO () -> (Async () -> IO ()) -> IO ()
forall a b. IO a -> (Async a -> IO b) -> IO b
withAsync (PubSubController -> Connection -> IO ()
listenThread PubSubController
ctrl Connection
rawConn) ((Async () -> IO ()) -> IO ()) -> (Async () -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Async ()
listenT ->
      IO () -> (Async () -> IO ()) -> IO ()
forall a b. IO a -> (Async a -> IO b) -> IO b
withAsync (PubSubController -> Connection -> IO ()
sendThread PubSubController
ctrl Connection
rawConn) ((Async () -> IO ()) -> IO ()) -> (Async () -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Async ()
sendT -> do

        -- wait for initial subscription count to go to zero or for threads to fail
        mret <- STM
  (Either
     (Either (Either SomeException ()) (Either SomeException ())) ())
-> IO
     (Either
        (Either (Either SomeException ()) (Either SomeException ())) ())
forall a. STM a -> IO a
atomically (STM
   (Either
      (Either (Either SomeException ()) (Either SomeException ())) ())
 -> IO
      (Either
         (Either (Either SomeException ()) (Either SomeException ())) ()))
-> STM
     (Either
        (Either (Either SomeException ()) (Either SomeException ())) ())
-> IO
     (Either
        (Either (Either SomeException ()) (Either SomeException ())) ())
forall a b. (a -> b) -> a -> b
$
            (Either (Either SomeException ()) (Either SomeException ())
-> Either
     (Either (Either SomeException ()) (Either SomeException ())) ()
forall a b. a -> Either a b
Left (Either (Either SomeException ()) (Either SomeException ())
 -> Either
      (Either (Either SomeException ()) (Either SomeException ())) ())
-> STM (Either (Either SomeException ()) (Either SomeException ()))
-> STM
     (Either
        (Either (Either SomeException ()) (Either SomeException ())) ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Async ()
-> Async ()
-> STM (Either (Either SomeException ()) (Either SomeException ()))
forall a b.
Async a
-> Async b
-> STM (Either (Either SomeException a) (Either SomeException b))
waitEitherCatchSTM Async ()
listenT Async ()
sendT))
          STM
  (Either
     (Either (Either SomeException ()) (Either SomeException ())) ())
-> STM
     (Either
        (Either (Either SomeException ()) (Either SomeException ())) ())
-> STM
     (Either
        (Either (Either SomeException ()) (Either SomeException ())) ())
forall a. STM a -> STM a -> STM a
`orElse`
            (()
-> Either
     (Either (Either SomeException ()) (Either SomeException ())) ()
forall a b. b -> Either a b
Right (()
 -> Either
      (Either (Either SomeException ()) (Either SomeException ())) ())
-> STM ()
-> STM
     (Either
        (Either (Either SomeException ()) (Either SomeException ())) ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> do
              a <- TVar (HashSet ByteString) -> STM (HashSet ByteString)
forall a. TVar a -> STM a
readTVar (TVar (HashSet ByteString) -> STM (HashSet ByteString))
-> TVar (HashSet ByteString) -> STM (HashSet ByteString)
forall a b. (a -> b) -> a -> b
$ ChannelData ByteString (ByteString -> IO ())
-> TVar (HashSet ByteString)
forall channel callback.
ChannelData channel callback -> TVar (HashSet channel)
cdChannelsPendingSubscription (ChannelData ByteString (ByteString -> IO ())
 -> TVar (HashSet ByteString))
-> ChannelData ByteString (ByteString -> IO ())
-> TVar (HashSet ByteString)
forall a b. (a -> b) -> a -> b
$ PubSubController -> ChannelData ByteString (ByteString -> IO ())
pscChannelData PubSubController
ctrl
              unless (HS.null a) retry
              b <- readTVar $ cdChannelsPendingSubscription $ pscPChannelData ctrl
              unless (HS.null b) retry)
        case mret of
          Right () -> IO ()
onInitialLoad
          Either
  (Either (Either SomeException ()) (Either SomeException ())) ()
_ -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return () -- if there is an error, waitEitherCatch below will also see it

        -- wait for threads to end with error
        merr <- waitEitherCatch listenT sendT
        case merr of
          (Right (Left SomeException
err)) -> SomeException -> IO ()
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO SomeException
err
          (Left (Left SomeException
err)) -> SomeException -> IO ()
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO SomeException
err
          Either (Either SomeException ()) (Either SomeException ())
_ -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()  -- should never happen, since threads exit only with an error


------------------------------------------------------------------------------
-- Helpers
--
decodeMsg :: Reply -> PubSubReply
decodeMsg :: Reply -> PubSubReply
decodeMsg r :: Reply
r@(MultiBulk (Just (Reply
r0:Reply
r1:Reply
r2:[Reply]
rs))) = (Reply -> PubSubReply)
-> (PubSubReply -> PubSubReply)
-> Either Reply PubSubReply
-> PubSubReply
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (Reply -> Reply -> PubSubReply
forall a. Reply -> a
errMsg Reply
r) PubSubReply -> PubSubReply
forall a. a -> a
id (Either Reply PubSubReply -> PubSubReply)
-> Either Reply PubSubReply -> PubSubReply
forall a b. (a -> b) -> a -> b
$ do
    kind <- Reply -> Either Reply ByteString
forall a. RedisResult a => Reply -> Either Reply a
decode Reply
r0
    case kind :: ByteString of
        ByteString
"message"      -> Message -> PubSubReply
Msg (Message -> PubSubReply)
-> Either Reply Message -> Either Reply PubSubReply
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Either Reply Message
decodeMessage
        ByteString
"pmessage"     -> Message -> PubSubReply
Msg (Message -> PubSubReply)
-> Either Reply Message -> Either Reply PubSubReply
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Either Reply Message
decodePMessage
        ByteString
"subscribe"    -> ByteString -> PubSubReply
Subscribed (ByteString -> PubSubReply)
-> Either Reply ByteString -> Either Reply PubSubReply
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Either Reply ByteString
decodeChan
        ByteString
"psubscribe"   -> ByteString -> PubSubReply
PSubscribed (ByteString -> PubSubReply)
-> Either Reply ByteString -> Either Reply PubSubReply
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Either Reply ByteString
decodeChan
        ByteString
"unsubscribe"  -> ByteString -> Int -> PubSubReply
Unsubscribed (ByteString -> Int -> PubSubReply)
-> Either Reply ByteString -> Either Reply (Int -> PubSubReply)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Either Reply ByteString
decodeChan Either Reply (Int -> PubSubReply)
-> Either Reply Int -> Either Reply PubSubReply
forall a b.
Either Reply (a -> b) -> Either Reply a -> Either Reply b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Either Reply Int
decodeCnt
        ByteString
"punsubscribe" -> ByteString -> Int -> PubSubReply
PUnsubscribed (ByteString -> Int -> PubSubReply)
-> Either Reply ByteString -> Either Reply (Int -> PubSubReply)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Either Reply ByteString
decodeChan Either Reply (Int -> PubSubReply)
-> Either Reply Int -> Either Reply PubSubReply
forall a b.
Either Reply (a -> b) -> Either Reply a -> Either Reply b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Either Reply Int
decodeCnt
        ByteString
_              -> Reply -> Either Reply PubSubReply
forall a. Reply -> a
errMsg Reply
r
  where
    decodeMessage :: Either Reply Message
decodeMessage  = ByteString -> ByteString -> Message
Message  (ByteString -> ByteString -> Message)
-> Either Reply ByteString -> Either Reply (ByteString -> Message)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Reply -> Either Reply ByteString
forall a. RedisResult a => Reply -> Either Reply a
decode Reply
r1 Either Reply (ByteString -> Message)
-> Either Reply ByteString -> Either Reply Message
forall a b.
Either Reply (a -> b) -> Either Reply a -> Either Reply b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Reply -> Either Reply ByteString
forall a. RedisResult a => Reply -> Either Reply a
decode Reply
r2
    decodePMessage :: Either Reply Message
decodePMessage = ByteString -> ByteString -> ByteString -> Message
PMessage (ByteString -> ByteString -> ByteString -> Message)
-> Either Reply ByteString
-> Either Reply (ByteString -> ByteString -> Message)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Reply -> Either Reply ByteString
forall a. RedisResult a => Reply -> Either Reply a
decode Reply
r1 Either Reply (ByteString -> ByteString -> Message)
-> Either Reply ByteString -> Either Reply (ByteString -> Message)
forall a b.
Either Reply (a -> b) -> Either Reply a -> Either Reply b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Reply -> Either Reply ByteString
forall a. RedisResult a => Reply -> Either Reply a
decode Reply
r2 Either Reply (ByteString -> Message)
-> Either Reply ByteString -> Either Reply Message
forall a b.
Either Reply (a -> b) -> Either Reply a -> Either Reply b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Reply -> Either Reply ByteString
forall a. RedisResult a => Reply -> Either Reply a
decode ([Reply] -> Reply
forall a. HasCallStack => [a] -> a
head [Reply]
rs)
    decodeCnt :: Either Reply Int
decodeCnt      = Integer -> Int
forall a. Num a => Integer -> a
fromInteger (Integer -> Int) -> Either Reply Integer -> Either Reply Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Reply -> Either Reply Integer
forall a. RedisResult a => Reply -> Either Reply a
decode Reply
r2
    decodeChan :: Either Reply ByteString
decodeChan     = Reply -> Either Reply ByteString
forall a. RedisResult a => Reply -> Either Reply a
decode Reply
r1

decodeMsg Reply
r = Reply -> PubSubReply
forall a. Reply -> a
errMsg Reply
r

errMsg :: Reply -> a
errMsg :: forall a. Reply -> a
errMsg Reply
r = String -> a
forall a. HasCallStack => String -> a
error (String -> a) -> String -> a
forall a b. (a -> b) -> a -> b
$ String
"Hedis: expected pub/sub-message but got: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ Reply -> String
forall a. Show a => a -> String
show Reply
r


-- $pubsubexpl
-- There are two Pub/Sub implementations.  First, there is a single-threaded implementation 'pubSub'
-- which is simpler to use but has the restriction that subscription changes can only be made in
-- response to a message.  Secondly, there is a more complicated Pub/Sub controller 'pubSubForever'
-- that uses concurrency to support changing subscriptions at any time but requires more setup.
-- You should only use one or the other.  In addition, no types or utility functions (that are part
-- of the public API) are shared, so functions or types in one of the following sections cannot
-- be used for the other.  In particular, be aware that they use different utility functions to subscribe
-- and unsubscribe to channels.


-- $shortlivedexpl
-- Another approach to Pub/Sub that allows creating a short-lived Pub/Sub connection is to use 'withPubSub', which takes a callback that receives messages and returns when the callback returns. This is simpler than 'pubSubForever' but does not support changing subscriptions while it is running, so it is only useful for short-lived Pub/Sub connections. For example, you could use 'withPubSub'
-- to subscribe to a channel, consume a stream of messages, and then return. This approach is worth using when you want a few short-lived
-- subscriptions. However, each call to 'withPubSub' consumes a connection from the pool, so if you have a lot of short-lived subscriptions, it is more

-- |
-- Creates a subscription and automatically unsubscribes when callback returns, this function keeps
-- flow control in the callback, so it is useful for short-lived subscriptions, when the callback knows
-- when to exit. The function is quite simple and does not make any attempts to handle connection loss.
--
-- Note that this function does not support changing subscriptions while it is running, so it is only useful for short-lived Pub/Sub connections.
--
-- An example of usage, that is hard to implement with 'pubSubForever' is to subscribe to a channel:
--
-- @
-- withPubSub conn [\"mychannel\"] [] $ \\waitMsg -> do
--    d <- registerDelay 1000000 -- 1 second (requires -threaded runtime)
--    atomically $ asum [ readTVar >>= guard >> return Nothing
--                      , Just <$> waitMsg
--                      ]
-- @
--
-- In case if connection is lost, user callback will receive 'BlockedIndefinitelyOnSTM' exception.
withPubSub :: Connection.Connection -> [ByteString] -> [ByteString] -> (STM Message -> IO r) -> IO r
withPubSub :: forall r.
Connection
-> [ByteString] -> [ByteString] -> (STM Message -> IO r) -> IO r
withPubSub (Connection.NonClusteredConnection Pool Connection
pool) [ByteString]
chans [ByteString]
pchans STM Message -> IO r
f = Pool Connection -> (Connection -> IO r) -> IO r
forall a r. Pool a -> (a -> IO r) -> IO r
withResource Pool Connection
pool ((Connection -> IO r) -> IO r) -> (Connection -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \Connection
rawConn -> do
    IO (TChan Message)
forall a. IO (TChan a)
newTChanIO IO (TChan Message) -> (TChan Message -> IO r) -> IO r
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \TChan Message
messageChan -> TChan Message
-> [ByteString]
-> [ByteString]
-> Connection
-> (STM Message -> IO r)
-> IO r
forall r.
TChan Message
-> [ByteString]
-> [ByteString]
-> Connection
-> (STM Message -> IO r)
-> IO r
withPubSubOnConn TChan Message
messageChan [ByteString]
chans [ByteString]
pchans Connection
rawConn STM Message -> IO r
f
withPubSub (Connection.ClusteredConnection MVar ShardMap
_ Pool Connection
pool) [ByteString]
chans [ByteString]
pchans STM Message -> IO r
f = Pool Connection -> (Connection -> IO r) -> IO r
forall a r. Pool a -> (a -> IO r) -> IO r
withResource Pool Connection
pool ((Connection -> IO r) -> IO r) -> (Connection -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \Connection
clusterConn -> do
    masterNodeConns <- Connection -> IO [NodeConnection]
Cluster.masterNodes Connection
clusterConn
    nodeConn <- case masterNodeConns of
      [] -> IOError -> IO NodeConnection
forall a. IOError -> IO a
ioError (IOError -> IO NodeConnection) -> IOError -> IO NodeConnection
forall a b. (a -> b) -> a -> b
$ String -> IOError
userError String
"Hedis: clustered withPubSub requires at least one master node"
      NodeConnection
x:[NodeConnection]
_ -> NodeConnection -> IO NodeConnection
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure NodeConnection
x
    rawConn <- PP.fromCtxWithHooks (Cluster.nodeConnectionContext nodeConn) (Cluster.hooks clusterConn)
    PP.beginReceiving rawConn
    newTChanIO >>= \TChan Message
messageChan -> TChan Message
-> [ByteString]
-> [ByteString]
-> Connection
-> (STM Message -> IO r)
-> IO r
forall r.
TChan Message
-> [ByteString]
-> [ByteString]
-> Connection
-> (STM Message -> IO r)
-> IO r
withPubSubOnConn TChan Message
messageChan [ByteString]
chans [ByteString]
pchans Connection
rawConn STM Message -> IO r
f

withPubSubOnConn :: TChan Message -> [ByteString] -> [ByteString] -> PP.Connection -> (STM Message -> IO r) -> IO r
withPubSubOnConn :: forall r.
TChan Message
-> [ByteString]
-> [ByteString]
-> Connection
-> (STM Message -> IO r)
-> IO r
withPubSubOnConn TChan Message
messageChan [ByteString]
chans [ByteString]
pchans Connection
rawConn STM Message -> IO r
f = do
    IO ()
subscribeAll
    (_, r) <- IO () -> IO r -> IO ((), r)
forall a b. IO a -> IO b -> IO (a, b)
concurrently IO ()
lThread (STM Message -> IO r
f (TChan Message -> STM Message
forall a. TChan a -> STM a
readTChan TChan Message
messageChan) IO r -> IO () -> IO r
forall a b. IO a -> IO b -> IO a
`finally` IO ()
unsubscribeAll)
    pure r
  where
    subscribeAll :: IO ()
subscribeAll = do
        Maybe (NonEmpty ByteString)
-> (NonEmpty ByteString -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ ([ByteString] -> Maybe (NonEmpty ByteString)
forall a. [a] -> Maybe (NonEmpty a)
NE.nonEmpty [ByteString]
chans) \NonEmpty ByteString
ne_chans ->
            Connection -> ByteString -> IO ()
PP.send Connection
rawConn (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ [ByteString] -> ByteString
renderRequest (ByteString
"SUBSCRIBE" ByteString -> [ByteString] -> [ByteString]
forall a. a -> [a] -> [a]
: NonEmpty ByteString -> [ByteString]
forall a. NonEmpty a -> [a]
NE.toList NonEmpty ByteString
ne_chans)
        Maybe (NonEmpty ByteString)
-> (NonEmpty ByteString -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ ([ByteString] -> Maybe (NonEmpty ByteString)
forall a. [a] -> Maybe (NonEmpty a)
NE.nonEmpty [ByteString]
pchans) \NonEmpty ByteString
ne_pchans ->
            Connection -> ByteString -> IO ()
PP.send Connection
rawConn (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ [ByteString] -> ByteString
renderRequest (ByteString
"PSUBSCRIBE" ByteString -> [ByteString] -> [ByteString]
forall a. a -> [a] -> [a]
: NonEmpty ByteString -> [ByteString]
forall a. NonEmpty a -> [a]
NE.toList NonEmpty ByteString
ne_pchans)
        Connection -> IO ()
PP.flush Connection
rawConn
    unsubscribeAll :: IO ()
unsubscribeAll = do
        Maybe (NonEmpty ByteString)
-> (NonEmpty ByteString -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ ([ByteString] -> Maybe (NonEmpty ByteString)
forall a. [a] -> Maybe (NonEmpty a)
NE.nonEmpty [ByteString]
chans) \NonEmpty ByteString
ne_chans ->
            Connection -> ByteString -> IO ()
PP.send Connection
rawConn (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ [ByteString] -> ByteString
renderRequest (ByteString
"UNSUBSCRIBE" ByteString -> [ByteString] -> [ByteString]
forall a. a -> [a] -> [a]
: NonEmpty ByteString -> [ByteString]
forall a. NonEmpty a -> [a]
NE.toList NonEmpty ByteString
ne_chans)
        Maybe (NonEmpty ByteString)
-> (NonEmpty ByteString -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ ([ByteString] -> Maybe (NonEmpty ByteString)
forall a. [a] -> Maybe (NonEmpty a)
NE.nonEmpty [ByteString]
pchans) \NonEmpty ByteString
ne_pchans ->
            Connection -> ByteString -> IO ()
PP.send Connection
rawConn (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ [ByteString] -> ByteString
renderRequest (ByteString
"PUNSUBSCRIBE" ByteString -> [ByteString] -> [ByteString]
forall a. a -> [a] -> [a]
: NonEmpty ByteString -> [ByteString]
forall a. NonEmpty a -> [a]
NE.toList NonEmpty ByteString
ne_pchans)
        Connection -> IO ()
PP.flush Connection
rawConn
    lThread :: IO ()
lThread = (IO () -> IO ()) -> IO ()
forall a. (a -> a) -> a
fix \IO ()
next -> do
        msg <- Connection -> IO Reply
PP.recv Connection
rawConn
        case decodeMsg msg of
            Msg Message
m -> do
                STM () -> IO ()
forall a. STM a -> IO a
atomically (TChan Message -> Message -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan Message
messageChan Message
m)
                IO ()
next
            Unsubscribed ByteString
_ Int
0 -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
            PUnsubscribed ByteString
_ Int
0 -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
            PubSubReply
_ -> IO ()
next