{-# LANGUAGE CPP, OverloadedStrings, RecordWildCards, EmptyDataDecls,
FlexibleInstances, FlexibleContexts, GeneralizedNewtypeDeriving #-}
{-# LANGUAGE ScopedTypeVariables, TupleSections, ConstraintKinds #-}
{-# LANGUAGE BlockArguments #-}
module Database.Redis.PubSub (
publish,
pubSub,
Message(..),
PubSub(),
subscribe, unsubscribe, psubscribe, punsubscribe,
pubSubForever,
RedisChannel, RedisPChannel, MessageCallback, PMessageCallback,
PubSubController, newPubSubController, currentChannels, currentPChannels,
addChannels, addChannelsAndWait, removeChannels, removeChannelsAndWait,
UnregisterCallbacksAction,
pendingChannels, pendingPatternChannels,
withPubSub
) where
#if __GLASGOW_HASKELL__ < 710
import Control.Applicative
import Data.Monoid hiding (<>)
#endif
import Control.Arrow (second)
import Control.Concurrent.Async (withAsync, waitEitherCatch, waitEitherCatchSTM, concurrently)
import Control.Concurrent.STM
import Control.Exception (throwIO, finally)
import qualified Database.Redis.ProtocolPipelining as PP
import Control.Monad
import Control.Monad.Reader (asks)
import Control.Monad.State
import Data.ByteString.Char8 (ByteString)
import Data.Function (fix)
import qualified Data.List as L
import qualified Data.List.NonEmpty as NE
import Data.Maybe (isJust)
import Data.Pool
#if __GLASGOW_HASKELL__ < 808
import Data.Semigroup (Semigroup(..))
#endif
import Data.Hashable (Hashable)
import qualified Data.HashMap.Strict as HM
import qualified Data.HashSet as HS
import qualified Database.Redis.Cluster as Cluster
import qualified Database.Redis.Core as Core
import qualified Database.Redis.Connection as Connection
import Database.Redis.Protocol (Reply(..), renderRequest)
import Database.Redis.Types
import Control.Monad.IO.Unlift (MonadUnliftIO(withRunInIO))
import Data.Functor (($>))
data PubSubState = PubSubState { PubSubState -> Int
subCnt, PubSubState -> Int
pending :: Int }
modifyPending :: (MonadState PubSubState m) => (Int -> Int) -> m ()
modifyPending :: forall (m :: * -> *).
MonadState PubSubState m =>
(Int -> Int) -> m ()
modifyPending Int -> Int
f = (PubSubState -> PubSubState) -> m ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify ((PubSubState -> PubSubState) -> m ())
-> (PubSubState -> PubSubState) -> m ()
forall a b. (a -> b) -> a -> b
$ \PubSubState
s -> PubSubState
s{ pending = f (pending s) }
putSubCnt :: (MonadState PubSubState m) => Int -> m ()
putSubCnt :: forall (m :: * -> *). MonadState PubSubState m => Int -> m ()
putSubCnt Int
n = (PubSubState -> PubSubState) -> m ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify ((PubSubState -> PubSubState) -> m ())
-> (PubSubState -> PubSubState) -> m ()
forall a b. (a -> b) -> a -> b
$ \PubSubState
s -> PubSubState
s{ subCnt = n }
data Subscribe
data Unsubscribe
data Channel
data Pattern
data PubSub = PubSub
{ PubSub -> Cmd Subscribe Channel
subs :: Cmd Subscribe Channel
, PubSub -> Cmd Unsubscribe Channel
unsubs :: Cmd Unsubscribe Channel
, PubSub -> Cmd Subscribe Pattern
psubs :: Cmd Subscribe Pattern
, PubSub -> Cmd Unsubscribe Pattern
punsubs :: Cmd Unsubscribe Pattern
} deriving (PubSub -> PubSub -> Bool
(PubSub -> PubSub -> Bool)
-> (PubSub -> PubSub -> Bool) -> Eq PubSub
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: PubSub -> PubSub -> Bool
== :: PubSub -> PubSub -> Bool
$c/= :: PubSub -> PubSub -> Bool
/= :: PubSub -> PubSub -> Bool
Eq)
instance Semigroup PubSub where
<> :: PubSub -> PubSub -> PubSub
(<>) PubSub
p1 PubSub
p2 = PubSub { subs :: Cmd Subscribe Channel
subs = PubSub -> Cmd Subscribe Channel
subs PubSub
p1 Cmd Subscribe Channel
-> Cmd Subscribe Channel -> Cmd Subscribe Channel
forall a. Monoid a => a -> a -> a
`mappend` PubSub -> Cmd Subscribe Channel
subs PubSub
p2
, unsubs :: Cmd Unsubscribe Channel
unsubs = PubSub -> Cmd Unsubscribe Channel
unsubs PubSub
p1 Cmd Unsubscribe Channel
-> Cmd Unsubscribe Channel -> Cmd Unsubscribe Channel
forall a. Monoid a => a -> a -> a
`mappend` PubSub -> Cmd Unsubscribe Channel
unsubs PubSub
p2
, psubs :: Cmd Subscribe Pattern
psubs = PubSub -> Cmd Subscribe Pattern
psubs PubSub
p1 Cmd Subscribe Pattern
-> Cmd Subscribe Pattern -> Cmd Subscribe Pattern
forall a. Monoid a => a -> a -> a
`mappend` PubSub -> Cmd Subscribe Pattern
psubs PubSub
p2
, punsubs :: Cmd Unsubscribe Pattern
punsubs = PubSub -> Cmd Unsubscribe Pattern
punsubs PubSub
p1 Cmd Unsubscribe Pattern
-> Cmd Unsubscribe Pattern -> Cmd Unsubscribe Pattern
forall a. Monoid a => a -> a -> a
`mappend` PubSub -> Cmd Unsubscribe Pattern
punsubs PubSub
p2
}
instance Monoid PubSub where
mempty :: PubSub
mempty = Cmd Subscribe Channel
-> Cmd Unsubscribe Channel
-> Cmd Subscribe Pattern
-> Cmd Unsubscribe Pattern
-> PubSub
PubSub Cmd Subscribe Channel
forall a. Monoid a => a
mempty Cmd Unsubscribe Channel
forall a. Monoid a => a
mempty Cmd Subscribe Pattern
forall a. Monoid a => a
mempty Cmd Unsubscribe Pattern
forall a. Monoid a => a
mempty
mappend :: PubSub -> PubSub -> PubSub
mappend = PubSub -> PubSub -> PubSub
forall a. Semigroup a => a -> a -> a
(<>)
data Cmd a b = DoNothing | Cmd { forall a b. Cmd a b -> [ByteString]
changes :: [ByteString] } deriving (Cmd a b -> Cmd a b -> Bool
(Cmd a b -> Cmd a b -> Bool)
-> (Cmd a b -> Cmd a b -> Bool) -> Eq (Cmd a b)
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
forall a b. Cmd a b -> Cmd a b -> Bool
$c== :: forall a b. Cmd a b -> Cmd a b -> Bool
== :: Cmd a b -> Cmd a b -> Bool
$c/= :: forall a b. Cmd a b -> Cmd a b -> Bool
/= :: Cmd a b -> Cmd a b -> Bool
Eq)
instance Semigroup (Cmd Subscribe a) where
<> :: Cmd Subscribe a -> Cmd Subscribe a -> Cmd Subscribe a
(<>) Cmd Subscribe a
DoNothing Cmd Subscribe a
x = Cmd Subscribe a
x
(<>) Cmd Subscribe a
x Cmd Subscribe a
DoNothing = Cmd Subscribe a
x
(<>) (Cmd [ByteString]
xs) (Cmd [ByteString]
ys) = [ByteString] -> Cmd Subscribe a
forall a b. [ByteString] -> Cmd a b
Cmd ([ByteString]
xs [ByteString] -> [ByteString] -> [ByteString]
forall a. [a] -> [a] -> [a]
++ [ByteString]
ys)
instance Monoid (Cmd Subscribe a) where
mempty :: Cmd Subscribe a
mempty = Cmd Subscribe a
forall a b. Cmd a b
DoNothing
mappend :: Cmd Subscribe a -> Cmd Subscribe a -> Cmd Subscribe a
mappend = Cmd Subscribe a -> Cmd Subscribe a -> Cmd Subscribe a
forall a. Semigroup a => a -> a -> a
(<>)
instance Semigroup (Cmd Unsubscribe a) where
<> :: Cmd Unsubscribe a -> Cmd Unsubscribe a -> Cmd Unsubscribe a
(<>) Cmd Unsubscribe a
DoNothing Cmd Unsubscribe a
x = Cmd Unsubscribe a
x
(<>) Cmd Unsubscribe a
x Cmd Unsubscribe a
DoNothing = Cmd Unsubscribe a
x
(<>) (Cmd []) Cmd Unsubscribe a
_ = [ByteString] -> Cmd Unsubscribe a
forall a b. [ByteString] -> Cmd a b
Cmd []
(<>) Cmd Unsubscribe a
_ (Cmd []) = [ByteString] -> Cmd Unsubscribe a
forall a b. [ByteString] -> Cmd a b
Cmd []
(<>) (Cmd [ByteString]
xs) (Cmd [ByteString]
ys) = [ByteString] -> Cmd Unsubscribe a
forall a b. [ByteString] -> Cmd a b
Cmd ([ByteString]
xs [ByteString] -> [ByteString] -> [ByteString]
forall a. [a] -> [a] -> [a]
++ [ByteString]
ys)
instance Monoid (Cmd Unsubscribe a) where
mempty :: Cmd Unsubscribe a
mempty = Cmd Unsubscribe a
forall a b. Cmd a b
DoNothing
mappend :: Cmd Unsubscribe a -> Cmd Unsubscribe a -> Cmd Unsubscribe a
mappend = Cmd Unsubscribe a -> Cmd Unsubscribe a -> Cmd Unsubscribe a
forall a. Semigroup a => a -> a -> a
(<>)
class Command a where
redisCmd :: a -> ByteString
updatePending :: a -> Int -> Int
sendCmd :: (Command (Cmd a b)) => Cmd a b -> StateT PubSubState Core.Redis ()
sendCmd :: forall a b.
Command (Cmd a b) =>
Cmd a b -> StateT PubSubState Redis ()
sendCmd Cmd a b
DoNothing = () -> StateT PubSubState Redis ()
forall a. a -> StateT PubSubState Redis a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
sendCmd Cmd a b
cmd = do
conn <- Redis Connection -> StateT PubSubState Redis Connection
forall (m :: * -> *) a. Monad m => m a -> StateT PubSubState m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (Redis Connection -> StateT PubSubState Redis Connection)
-> Redis Connection -> StateT PubSubState Redis Connection
forall a b. (a -> b) -> a -> b
$ ReaderT RedisEnv IO Connection -> Redis Connection
forall a. ReaderT RedisEnv IO a -> Redis a
Core.reRedis (ReaderT RedisEnv IO Connection -> Redis Connection)
-> ReaderT RedisEnv IO Connection -> Redis Connection
forall a b. (a -> b) -> a -> b
$ (RedisEnv -> Connection) -> ReaderT RedisEnv IO Connection
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks RedisEnv -> Connection
Core.envConn
let hook = Hooks -> SendPubSubHook
Core.sendPubSubHook (Hooks -> SendPubSubHook) -> Hooks -> SendPubSubHook
forall a b. (a -> b) -> a -> b
$ Connection -> Hooks
PP.hooks Connection
conn
lift $ withRunInIO $ \forall a. Redis a -> IO a
runInIO -> SendPubSubHook
hook (Redis () -> IO ()
forall a. Redis a -> IO a
runInIO (Redis () -> IO ())
-> ([ByteString] -> Redis ()) -> [ByteString] -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [ByteString] -> Redis ()
forall (m :: * -> *). MonadRedis m => [ByteString] -> m ()
Core.send) (Cmd a b -> ByteString
forall a. Command a => a -> ByteString
redisCmd Cmd a b
cmd ByteString -> [ByteString] -> [ByteString]
forall a. a -> [a] -> [a]
: Cmd a b -> [ByteString]
forall a b. Cmd a b -> [ByteString]
changes Cmd a b
cmd)
modifyPending (updatePending cmd)
rawSendCmd :: (Command (Cmd a b)) => PP.Connection -> Cmd a b -> IO ()
rawSendCmd :: forall a b. Command (Cmd a b) => Connection -> Cmd a b -> IO ()
rawSendCmd Connection
_ Cmd a b
DoNothing = () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
rawSendCmd Connection
conn Cmd a b
cmd =
let hook :: SendPubSubHook
hook = Hooks -> SendPubSubHook
Core.sendPubSubHook (Hooks -> SendPubSubHook) -> Hooks -> SendPubSubHook
forall a b. (a -> b) -> a -> b
$ Connection -> Hooks
PP.hooks Connection
conn
msg :: [ByteString]
msg = Cmd a b -> ByteString
forall a. Command a => a -> ByteString
redisCmd Cmd a b
cmd ByteString -> [ByteString] -> [ByteString]
forall a. a -> [a] -> [a]
: Cmd a b -> [ByteString]
forall a b. Cmd a b -> [ByteString]
changes Cmd a b
cmd
in SendPubSubHook
hook (Connection -> ByteString -> IO ()
PP.send Connection
conn (ByteString -> IO ())
-> ([ByteString] -> ByteString) -> [ByteString] -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [ByteString] -> ByteString
renderRequest) [ByteString]
msg
plusChangeCnt :: Cmd a b -> Int -> Int
plusChangeCnt :: forall a b. Cmd a b -> Int -> Int
plusChangeCnt Cmd a b
DoNothing = Int -> Int
forall a. a -> a
id
plusChangeCnt (Cmd [ByteString]
cs) = (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ [ByteString] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [ByteString]
cs)
instance Command (Cmd Subscribe Channel) where
redisCmd :: Cmd Subscribe Channel -> ByteString
redisCmd = ByteString -> Cmd Subscribe Channel -> ByteString
forall a b. a -> b -> a
const ByteString
"SUBSCRIBE"
updatePending :: Cmd Subscribe Channel -> Int -> Int
updatePending = Cmd Subscribe Channel -> Int -> Int
forall a b. Cmd a b -> Int -> Int
plusChangeCnt
instance Command (Cmd Subscribe Pattern) where
redisCmd :: Cmd Subscribe Pattern -> ByteString
redisCmd = ByteString -> Cmd Subscribe Pattern -> ByteString
forall a b. a -> b -> a
const ByteString
"PSUBSCRIBE"
updatePending :: Cmd Subscribe Pattern -> Int -> Int
updatePending = Cmd Subscribe Pattern -> Int -> Int
forall a b. Cmd a b -> Int -> Int
plusChangeCnt
instance Command (Cmd Unsubscribe Channel) where
redisCmd :: Cmd Unsubscribe Channel -> ByteString
redisCmd = ByteString -> Cmd Unsubscribe Channel -> ByteString
forall a b. a -> b -> a
const ByteString
"UNSUBSCRIBE"
updatePending :: Cmd Unsubscribe Channel -> Int -> Int
updatePending = (Int -> Int) -> Cmd Unsubscribe Channel -> Int -> Int
forall a b. a -> b -> a
const Int -> Int
forall a. a -> a
id
instance Command (Cmd Unsubscribe Pattern) where
redisCmd :: Cmd Unsubscribe Pattern -> ByteString
redisCmd = ByteString -> Cmd Unsubscribe Pattern -> ByteString
forall a b. a -> b -> a
const ByteString
"PUNSUBSCRIBE"
updatePending :: Cmd Unsubscribe Pattern -> Int -> Int
updatePending = (Int -> Int) -> Cmd Unsubscribe Pattern -> Int -> Int
forall a b. a -> b -> a
const Int -> Int
forall a. a -> a
id
data Message = Message { Message -> ByteString
msgChannel, Message -> ByteString
msgMessage :: ByteString}
| PMessage { Message -> ByteString
msgPattern, msgChannel, msgMessage :: ByteString}
deriving (Int -> Message -> ShowS
[Message] -> ShowS
Message -> String
(Int -> Message -> ShowS)
-> (Message -> String) -> ([Message] -> ShowS) -> Show Message
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> Message -> ShowS
showsPrec :: Int -> Message -> ShowS
$cshow :: Message -> String
show :: Message -> String
$cshowList :: [Message] -> ShowS
showList :: [Message] -> ShowS
Show)
data PubSubReply
= Subscribed RedisChannel
| PSubscribed RedisPChannel
| Unsubscribed RedisChannel Int
| PUnsubscribed RedisPChannel Int
| Msg Message
publish
:: (Core.RedisCtx m f)
=> ByteString
-> ByteString
-> m (f Integer)
publish :: forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
ByteString -> ByteString -> m (f Integer)
publish ByteString
channel ByteString
message =
[ByteString] -> m (f Integer)
forall (m :: * -> *) (f :: * -> *) a.
(RedisCtx m f, RedisResult a) =>
[ByteString] -> m (f a)
Core.sendRequest [ByteString
"PUBLISH", ByteString
channel, ByteString
message]
subscribe
:: [ByteString]
-> PubSub
subscribe :: [ByteString] -> PubSub
subscribe [] = PubSub
forall a. Monoid a => a
mempty
subscribe [ByteString]
cs = PubSub
forall a. Monoid a => a
mempty{ subs = Cmd cs }
unsubscribe
:: [ByteString]
-> PubSub
unsubscribe :: [ByteString] -> PubSub
unsubscribe [ByteString]
cs = PubSub
forall a. Monoid a => a
mempty{ unsubs = Cmd cs }
unsubscribe1
:: [ByteString]
-> PubSub
unsubscribe1 :: [ByteString] -> PubSub
unsubscribe1 [] = PubSub
forall a. Monoid a => a
mempty
unsubscribe1 [ByteString]
cs = PubSub
forall a. Monoid a => a
mempty{ unsubs = Cmd cs }
psubscribe
:: [ByteString]
-> PubSub
psubscribe :: [ByteString] -> PubSub
psubscribe [] = PubSub
forall a. Monoid a => a
mempty
psubscribe [ByteString]
ps = PubSub
forall a. Monoid a => a
mempty{ psubs = Cmd ps }
punsubscribe
:: [ByteString]
-> PubSub
punsubscribe :: [ByteString] -> PubSub
punsubscribe [ByteString]
ps = PubSub
forall a. Monoid a => a
mempty{ punsubs = Cmd ps }
punsubscribe1
:: [ByteString]
-> PubSub
punsubscribe1 :: [ByteString] -> PubSub
punsubscribe1 [] = PubSub
forall a. Monoid a => a
mempty
punsubscribe1 [ByteString]
ps = PubSub
forall a. Monoid a => a
mempty{ punsubs = Cmd ps }
pubSub
:: PubSub
-> (Message -> IO PubSub)
-> Core.Redis ()
pubSub :: PubSub -> (Message -> IO PubSub) -> Redis ()
pubSub PubSub
initial Message -> IO PubSub
callback
| PubSub
initial PubSub -> PubSub -> Bool
forall a. Eq a => a -> a -> Bool
== PubSub
forall a. Monoid a => a
mempty = () -> Redis ()
forall a. a -> Redis a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
| Bool
otherwise = StateT PubSubState Redis () -> PubSubState -> Redis ()
forall (m :: * -> *) s a. Monad m => StateT s m a -> s -> m a
evalStateT (PubSub -> StateT PubSubState Redis ()
send PubSub
initial) (Int -> Int -> PubSubState
PubSubState Int
0 Int
0)
where
send :: PubSub -> StateT PubSubState Core.Redis ()
send :: PubSub -> StateT PubSubState Redis ()
send PubSub{Cmd Unsubscribe Pattern
Cmd Unsubscribe Channel
Cmd Subscribe Pattern
Cmd Subscribe Channel
subs :: PubSub -> Cmd Subscribe Channel
unsubs :: PubSub -> Cmd Unsubscribe Channel
psubs :: PubSub -> Cmd Subscribe Pattern
punsubs :: PubSub -> Cmd Unsubscribe Pattern
subs :: Cmd Subscribe Channel
unsubs :: Cmd Unsubscribe Channel
psubs :: Cmd Subscribe Pattern
punsubs :: Cmd Unsubscribe Pattern
..} = do
Cmd Subscribe Channel -> StateT PubSubState Redis ()
forall a b.
Command (Cmd a b) =>
Cmd a b -> StateT PubSubState Redis ()
sendCmd Cmd Subscribe Channel
subs
Cmd Unsubscribe Channel -> StateT PubSubState Redis ()
forall a b.
Command (Cmd a b) =>
Cmd a b -> StateT PubSubState Redis ()
sendCmd Cmd Unsubscribe Channel
unsubs
Cmd Subscribe Pattern -> StateT PubSubState Redis ()
forall a b.
Command (Cmd a b) =>
Cmd a b -> StateT PubSubState Redis ()
sendCmd Cmd Subscribe Pattern
psubs
Cmd Unsubscribe Pattern -> StateT PubSubState Redis ()
forall a b.
Command (Cmd a b) =>
Cmd a b -> StateT PubSubState Redis ()
sendCmd Cmd Unsubscribe Pattern
punsubs
StateT PubSubState Redis ()
recv
recv :: StateT PubSubState Core.Redis ()
recv :: StateT PubSubState Redis ()
recv = do
hook <- Redis CallbackHook -> StateT PubSubState Redis CallbackHook
forall (m :: * -> *) a. Monad m => m a -> StateT PubSubState m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (Redis CallbackHook -> StateT PubSubState Redis CallbackHook)
-> Redis CallbackHook -> StateT PubSubState Redis CallbackHook
forall a b. (a -> b) -> a -> b
$ ReaderT RedisEnv IO CallbackHook -> Redis CallbackHook
forall a. ReaderT RedisEnv IO a -> Redis a
Core.reRedis (ReaderT RedisEnv IO CallbackHook -> Redis CallbackHook)
-> ReaderT RedisEnv IO CallbackHook -> Redis CallbackHook
forall a b. (a -> b) -> a -> b
$ (RedisEnv -> CallbackHook) -> ReaderT RedisEnv IO CallbackHook
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks ((RedisEnv -> CallbackHook) -> ReaderT RedisEnv IO CallbackHook)
-> (RedisEnv -> CallbackHook) -> ReaderT RedisEnv IO CallbackHook
forall a b. (a -> b) -> a -> b
$ Hooks -> CallbackHook
Core.callbackHook (Hooks -> CallbackHook)
-> (RedisEnv -> Hooks) -> RedisEnv -> CallbackHook
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connection -> Hooks
PP.hooks (Connection -> Hooks)
-> (RedisEnv -> Connection) -> RedisEnv -> Hooks
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RedisEnv -> Connection
Core.envConn
reply <- lift Core.recv
case decodeMsg reply of
Msg Message
msg -> IO PubSub -> StateT PubSubState Redis PubSub
forall a. IO a -> StateT PubSubState Redis a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (CallbackHook
hook Message -> IO PubSub
callback Message
msg) StateT PubSubState Redis PubSub
-> (PubSub -> StateT PubSubState Redis ())
-> StateT PubSubState Redis ()
forall a b.
StateT PubSubState Redis a
-> (a -> StateT PubSubState Redis b) -> StateT PubSubState Redis b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= PubSub -> StateT PubSubState Redis ()
send
Subscribed ByteString
_ -> (Int -> Int) -> StateT PubSubState Redis ()
forall (m :: * -> *).
MonadState PubSubState m =>
(Int -> Int) -> m ()
modifyPending (Int -> Int -> Int
forall a. Num a => a -> a -> a
subtract Int
1) StateT PubSubState Redis ()
-> StateT PubSubState Redis () -> StateT PubSubState Redis ()
forall a b.
StateT PubSubState Redis a
-> StateT PubSubState Redis b -> StateT PubSubState Redis b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> StateT PubSubState Redis ()
recv
PSubscribed ByteString
_ -> (Int -> Int) -> StateT PubSubState Redis ()
forall (m :: * -> *).
MonadState PubSubState m =>
(Int -> Int) -> m ()
modifyPending (Int -> Int -> Int
forall a. Num a => a -> a -> a
subtract Int
1) StateT PubSubState Redis ()
-> StateT PubSubState Redis () -> StateT PubSubState Redis ()
forall a b.
StateT PubSubState Redis a
-> StateT PubSubState Redis b -> StateT PubSubState Redis b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> StateT PubSubState Redis ()
recv
PUnsubscribed ByteString
_ Int
n -> Int -> StateT PubSubState Redis ()
onUnsubscribe Int
n
Unsubscribed ByteString
_ Int
n -> Int -> StateT PubSubState Redis ()
onUnsubscribe Int
n
onUnsubscribe :: Int -> StateT PubSubState Core.Redis ()
onUnsubscribe :: Int -> StateT PubSubState Redis ()
onUnsubscribe Int
n = do
Int -> StateT PubSubState Redis ()
forall (m :: * -> *). MonadState PubSubState m => Int -> m ()
putSubCnt Int
n
PubSubState{..} <- StateT PubSubState Redis PubSubState
forall s (m :: * -> *). MonadState s m => m s
get
unless (subCnt == 0 && pending == 0) recv
type RedisChannel = ByteString
type RedisPChannel = ByteString
type MessageCallback = ByteString -> IO ()
type PMessageCallback = RedisChannel -> ByteString -> IO ()
type UnregisterCallbacksAction = IO ()
newtype UnregisterHandle = UnregisterHandle Integer
deriving (UnregisterHandle -> UnregisterHandle -> Bool
(UnregisterHandle -> UnregisterHandle -> Bool)
-> (UnregisterHandle -> UnregisterHandle -> Bool)
-> Eq UnregisterHandle
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: UnregisterHandle -> UnregisterHandle -> Bool
== :: UnregisterHandle -> UnregisterHandle -> Bool
$c/= :: UnregisterHandle -> UnregisterHandle -> Bool
/= :: UnregisterHandle -> UnregisterHandle -> Bool
Eq, Int -> UnregisterHandle -> ShowS
[UnregisterHandle] -> ShowS
UnregisterHandle -> String
(Int -> UnregisterHandle -> ShowS)
-> (UnregisterHandle -> String)
-> ([UnregisterHandle] -> ShowS)
-> Show UnregisterHandle
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> UnregisterHandle -> ShowS
showsPrec :: Int -> UnregisterHandle -> ShowS
$cshow :: UnregisterHandle -> String
show :: UnregisterHandle -> String
$cshowList :: [UnregisterHandle] -> ShowS
showList :: [UnregisterHandle] -> ShowS
Show, Integer -> UnregisterHandle
UnregisterHandle -> UnregisterHandle
UnregisterHandle -> UnregisterHandle -> UnregisterHandle
(UnregisterHandle -> UnregisterHandle -> UnregisterHandle)
-> (UnregisterHandle -> UnregisterHandle -> UnregisterHandle)
-> (UnregisterHandle -> UnregisterHandle -> UnregisterHandle)
-> (UnregisterHandle -> UnregisterHandle)
-> (UnregisterHandle -> UnregisterHandle)
-> (UnregisterHandle -> UnregisterHandle)
-> (Integer -> UnregisterHandle)
-> Num UnregisterHandle
forall a.
(a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a)
-> (a -> a)
-> (a -> a)
-> (Integer -> a)
-> Num a
$c+ :: UnregisterHandle -> UnregisterHandle -> UnregisterHandle
+ :: UnregisterHandle -> UnregisterHandle -> UnregisterHandle
$c- :: UnregisterHandle -> UnregisterHandle -> UnregisterHandle
- :: UnregisterHandle -> UnregisterHandle -> UnregisterHandle
$c* :: UnregisterHandle -> UnregisterHandle -> UnregisterHandle
* :: UnregisterHandle -> UnregisterHandle -> UnregisterHandle
$cnegate :: UnregisterHandle -> UnregisterHandle
negate :: UnregisterHandle -> UnregisterHandle
$cabs :: UnregisterHandle -> UnregisterHandle
abs :: UnregisterHandle -> UnregisterHandle
$csignum :: UnregisterHandle -> UnregisterHandle
signum :: UnregisterHandle -> UnregisterHandle
$cfromInteger :: Integer -> UnregisterHandle
fromInteger :: Integer -> UnregisterHandle
Num)
data ChannelData channel callback
= ChannelData
{ forall channel callback.
ChannelData channel callback
-> TVar (HashMap channel [(UnregisterHandle, callback)])
cdSubscribedChannels :: !(TVar (HM.HashMap channel [(UnregisterHandle, callback)]))
, forall channel callback.
ChannelData channel callback -> TVar (HashSet channel)
cdChannelsPendingSubscription :: !(TVar (HS.HashSet channel))
, forall channel callback.
ChannelData channel callback -> TVar (HashSet channel)
cdChannelsPendingRemoval :: !(TVar (HS.HashSet channel))
}
data PubSubController = PubSubController
{ PubSubController -> TBQueue PubSub
sendChanges :: TBQueue PubSub
, PubSubController -> ChannelData ByteString (ByteString -> IO ())
pscChannelData :: ChannelData RedisChannel MessageCallback
, PubSubController -> ChannelData ByteString PMessageCallback
pscPChannelData :: ChannelData RedisPChannel PMessageCallback
, PubSubController -> TVar UnregisterHandle
lastUsedCallbackId :: TVar UnregisterHandle
}
newChannelData :: Hashable channel => [(channel, callback)] -> STM (ChannelData channel callback)
newChannelData :: forall channel callback.
Hashable channel =>
[(channel, callback)] -> STM (ChannelData channel callback)
newChannelData [(channel, callback)]
initialSubs
= TVar (HashMap channel [(UnregisterHandle, callback)])
-> TVar (HashSet channel)
-> TVar (HashSet channel)
-> ChannelData channel callback
forall channel callback.
TVar (HashMap channel [(UnregisterHandle, callback)])
-> TVar (HashSet channel)
-> TVar (HashSet channel)
-> ChannelData channel callback
ChannelData
(TVar (HashMap channel [(UnregisterHandle, callback)])
-> TVar (HashSet channel)
-> TVar (HashSet channel)
-> ChannelData channel callback)
-> STM (TVar (HashMap channel [(UnregisterHandle, callback)]))
-> STM
(TVar (HashSet channel)
-> TVar (HashSet channel) -> ChannelData channel callback)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> HashMap channel [(UnregisterHandle, callback)]
-> STM (TVar (HashMap channel [(UnregisterHandle, callback)]))
forall a. a -> STM (TVar a)
newTVar (([(UnregisterHandle, callback)]
-> [(UnregisterHandle, callback)]
-> [(UnregisterHandle, callback)])
-> [(channel, [(UnregisterHandle, callback)])]
-> HashMap channel [(UnregisterHandle, callback)]
forall k v. Hashable k => (v -> v -> v) -> [(k, v)] -> HashMap k v
HM.fromListWith [(UnregisterHandle, callback)]
-> [(UnregisterHandle, callback)] -> [(UnregisterHandle, callback)]
forall a. [a] -> [a] -> [a]
(++) ([(channel, [(UnregisterHandle, callback)])]
-> HashMap channel [(UnregisterHandle, callback)])
-> [(channel, [(UnregisterHandle, callback)])]
-> HashMap channel [(UnregisterHandle, callback)]
forall a b. (a -> b) -> a -> b
$ ((channel, callback) -> (channel, [(UnregisterHandle, callback)]))
-> [(channel, callback)]
-> [(channel, [(UnregisterHandle, callback)])]
forall a b. (a -> b) -> [a] -> [b]
map ((callback -> [(UnregisterHandle, callback)])
-> (channel, callback) -> (channel, [(UnregisterHandle, callback)])
forall b c d. (b -> c) -> (d, b) -> (d, c)
forall (a :: * -> * -> *) b c d.
Arrow a =>
a b c -> a (d, b) (d, c)
second ((callback -> [(UnregisterHandle, callback)])
-> (channel, callback)
-> (channel, [(UnregisterHandle, callback)]))
-> (callback -> [(UnregisterHandle, callback)])
-> (channel, callback)
-> (channel, [(UnregisterHandle, callback)])
forall a b. (a -> b) -> a -> b
$ (UnregisterHandle, callback) -> [(UnregisterHandle, callback)]
forall a. a -> [a]
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((UnregisterHandle, callback) -> [(UnregisterHandle, callback)])
-> (callback -> (UnregisterHandle, callback))
-> callback
-> [(UnregisterHandle, callback)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (UnregisterHandle
0,)) [(channel, callback)]
initialSubs)
STM
(TVar (HashSet channel)
-> TVar (HashSet channel) -> ChannelData channel callback)
-> STM (TVar (HashSet channel))
-> STM (TVar (HashSet channel) -> ChannelData channel callback)
forall a b. STM (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> HashSet channel -> STM (TVar (HashSet channel))
forall a. a -> STM (TVar a)
newTVar HashSet channel
forall a. Monoid a => a
mempty
STM (TVar (HashSet channel) -> ChannelData channel callback)
-> STM (TVar (HashSet channel))
-> STM (ChannelData channel callback)
forall a b. STM (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> HashSet channel -> STM (TVar (HashSet channel))
forall a. a -> STM (TVar a)
newTVar HashSet channel
forall a. Monoid a => a
mempty
newPubSubController :: MonadIO m => [(RedisChannel, MessageCallback)]
-> [(RedisPChannel, PMessageCallback)]
-> m PubSubController
newPubSubController :: forall (m :: * -> *).
MonadIO m =>
[(ByteString, ByteString -> IO ())]
-> [(ByteString, PMessageCallback)] -> m PubSubController
newPubSubController [(ByteString, ByteString -> IO ())]
initialSubs [(ByteString, PMessageCallback)]
initialPSubs = IO PubSubController -> m PubSubController
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO PubSubController -> m PubSubController)
-> IO PubSubController -> m PubSubController
forall a b. (a -> b) -> a -> b
$ STM PubSubController -> IO PubSubController
forall a. STM a -> IO a
atomically (STM PubSubController -> IO PubSubController)
-> STM PubSubController -> IO PubSubController
forall a b. (a -> b) -> a -> b
$ do
c <- Natural -> STM (TBQueue PubSub)
forall a. Natural -> STM (TBQueue a)
newTBQueue Natural
10
lastId <- newTVar 0
channelData' <- newChannelData initialSubs
pchannelData' <- newChannelData initialPSubs
return $ PubSubController c channelData' pchannelData' lastId
#if __GLASGOW_HASKELL__ < 710
type FunctorMonadIO m = (MonadIO m, Functor m)
#else
type FunctorMonadIO m = MonadIO m
#endif
currentChannels :: FunctorMonadIO m => PubSubController -> m [RedisChannel]
currentChannels :: forall (m :: * -> *).
FunctorMonadIO m =>
PubSubController -> m [ByteString]
currentChannels PubSubController
ctrl = HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> [ByteString]
forall k v. HashMap k v -> [k]
HM.keys (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> [ByteString])
-> m (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> m [ByteString]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (IO (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> m (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> m (HashMap
ByteString [(UnregisterHandle, ByteString -> IO ())]))
-> IO
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> m (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a b. (a -> b) -> a -> b
$ STM (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> IO
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a. STM a -> IO a
atomically (STM (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> IO
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]))
-> STM
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> IO
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a b. (a -> b) -> a -> b
$ TVar (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a. TVar a -> STM a
readTVar (TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]))
-> TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a b. (a -> b) -> a -> b
$ ChannelData ByteString (ByteString -> IO ())
-> TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall channel callback.
ChannelData channel callback
-> TVar (HashMap channel [(UnregisterHandle, callback)])
cdSubscribedChannels (ChannelData ByteString (ByteString -> IO ())
-> TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]))
-> ChannelData ByteString (ByteString -> IO ())
-> TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a b. (a -> b) -> a -> b
$ PubSubController -> ChannelData ByteString (ByteString -> IO ())
pscChannelData PubSubController
ctrl)
currentPChannels :: FunctorMonadIO m => PubSubController -> m [RedisPChannel]
currentPChannels :: forall (m :: * -> *).
FunctorMonadIO m =>
PubSubController -> m [ByteString]
currentPChannels PubSubController
ctrl = HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> [ByteString]
forall k v. HashMap k v -> [k]
HM.keys (HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> [ByteString])
-> m (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> m [ByteString]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (IO (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> m (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> m (HashMap ByteString [(UnregisterHandle, PMessageCallback)]))
-> IO (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> m (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a b. (a -> b) -> a -> b
$ STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> IO (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a. STM a -> IO a
atomically (STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> IO (HashMap ByteString [(UnregisterHandle, PMessageCallback)]))
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> IO (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a b. (a -> b) -> a -> b
$ TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a. TVar a -> STM a
readTVar (TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)]))
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a b. (a -> b) -> a -> b
$ ChannelData ByteString PMessageCallback
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall channel callback.
ChannelData channel callback
-> TVar (HashMap channel [(UnregisterHandle, callback)])
cdSubscribedChannels (ChannelData ByteString PMessageCallback
-> TVar
(HashMap ByteString [(UnregisterHandle, PMessageCallback)]))
-> ChannelData ByteString PMessageCallback
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a b. (a -> b) -> a -> b
$ PubSubController -> ChannelData ByteString PMessageCallback
pscPChannelData PubSubController
ctrl)
pendingChannels :: MonadIO m => PubSubController -> m (HS.HashSet RedisChannel)
pendingChannels :: forall (m :: * -> *).
MonadIO m =>
PubSubController -> m (HashSet ByteString)
pendingChannels PubSubController
ctrl = IO (HashSet ByteString) -> m (HashSet ByteString)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (HashSet ByteString) -> m (HashSet ByteString))
-> IO (HashSet ByteString) -> m (HashSet ByteString)
forall a b. (a -> b) -> a -> b
$ TVar (HashSet ByteString) -> IO (HashSet ByteString)
forall a. TVar a -> IO a
readTVarIO (TVar (HashSet ByteString) -> IO (HashSet ByteString))
-> TVar (HashSet ByteString) -> IO (HashSet ByteString)
forall a b. (a -> b) -> a -> b
$ ChannelData ByteString (ByteString -> IO ())
-> TVar (HashSet ByteString)
forall channel callback.
ChannelData channel callback -> TVar (HashSet channel)
cdChannelsPendingSubscription (ChannelData ByteString (ByteString -> IO ())
-> TVar (HashSet ByteString))
-> ChannelData ByteString (ByteString -> IO ())
-> TVar (HashSet ByteString)
forall a b. (a -> b) -> a -> b
$ PubSubController -> ChannelData ByteString (ByteString -> IO ())
pscChannelData PubSubController
ctrl
pendingPatternChannels :: MonadIO m => PubSubController -> m (HS.HashSet RedisPChannel)
pendingPatternChannels :: forall (m :: * -> *).
MonadIO m =>
PubSubController -> m (HashSet ByteString)
pendingPatternChannels PubSubController
ctrl = IO (HashSet ByteString) -> m (HashSet ByteString)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (HashSet ByteString) -> m (HashSet ByteString))
-> IO (HashSet ByteString) -> m (HashSet ByteString)
forall a b. (a -> b) -> a -> b
$ TVar (HashSet ByteString) -> IO (HashSet ByteString)
forall a. TVar a -> IO a
readTVarIO (TVar (HashSet ByteString) -> IO (HashSet ByteString))
-> TVar (HashSet ByteString) -> IO (HashSet ByteString)
forall a b. (a -> b) -> a -> b
$ ChannelData ByteString PMessageCallback
-> TVar (HashSet ByteString)
forall channel callback.
ChannelData channel callback -> TVar (HashSet channel)
cdChannelsPendingSubscription (ChannelData ByteString PMessageCallback
-> TVar (HashSet ByteString))
-> ChannelData ByteString PMessageCallback
-> TVar (HashSet ByteString)
forall a b. (a -> b) -> a -> b
$ PubSubController -> ChannelData ByteString PMessageCallback
pscPChannelData PubSubController
ctrl
addChannelsOfType
:: Hashable channel
=> UnregisterHandle
-> [(channel, callback)]
-> ChannelData channel callback
-> STM [channel]
addChannelsOfType :: forall channel callback.
Hashable channel =>
UnregisterHandle
-> [(channel, callback)]
-> ChannelData channel callback
-> STM [channel]
addChannelsOfType UnregisterHandle
ident [(channel, callback)]
newChans ChannelData channel callback
channelData = do
callbacks <- TVar (HashMap channel [(UnregisterHandle, callback)])
-> STM (HashMap channel [(UnregisterHandle, callback)])
forall a. TVar a -> STM a
readTVar (TVar (HashMap channel [(UnregisterHandle, callback)])
-> STM (HashMap channel [(UnregisterHandle, callback)]))
-> TVar (HashMap channel [(UnregisterHandle, callback)])
-> STM (HashMap channel [(UnregisterHandle, callback)])
forall a b. (a -> b) -> a -> b
$ ChannelData channel callback
-> TVar (HashMap channel [(UnregisterHandle, callback)])
forall channel callback.
ChannelData channel callback
-> TVar (HashMap channel [(UnregisterHandle, callback)])
cdSubscribedChannels ChannelData channel callback
channelData
pendingCallbacks <- readTVar $ cdChannelsPendingSubscription channelData
let newChans' = (channel -> Bool) -> [channel] -> [channel]
forall a. (a -> Bool) -> [a] -> [a]
filter (Bool -> Bool
not (Bool -> Bool) -> (channel -> Bool) -> channel -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. HashMap channel [(UnregisterHandle, callback)]
-> HashSet channel -> channel -> Bool
forall k v1. Hashable k => HashMap k v1 -> HashSet k -> k -> Bool
memberMapOrSet HashMap channel [(UnregisterHandle, callback)]
callbacks HashSet channel
pendingCallbacks) ([channel] -> [channel]) -> [channel] -> [channel]
forall a b. (a -> b) -> a -> b
$ (channel, callback) -> channel
forall a b. (a, b) -> a
fst ((channel, callback) -> channel)
-> [(channel, callback)] -> [channel]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [(channel, callback)]
newChans
writeTVar (cdSubscribedChannels channelData) (HM.unionWith (++) callbacks $ (\callback
z -> [(UnregisterHandle
ident,callback
z)]) <$> HM.fromList newChans)
writeTVar (cdChannelsPendingSubscription channelData) $ HS.union pendingCallbacks $ HS.fromList newChans'
pure newChans'
addChannels :: MonadIO m => PubSubController
-> [(RedisChannel, MessageCallback)]
-> [(RedisPChannel, PMessageCallback)]
-> m UnregisterCallbacksAction
addChannels :: forall (m :: * -> *).
MonadIO m =>
PubSubController
-> [(ByteString, ByteString -> IO ())]
-> [(ByteString, PMessageCallback)]
-> m (IO ())
addChannels PubSubController
_ [] [] = IO () -> m (IO ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (IO () -> m (IO ())) -> IO () -> m (IO ())
forall a b. (a -> b) -> a -> b
$ () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
addChannels PubSubController
ctrl [(ByteString, ByteString -> IO ())]
newChans [(ByteString, PMessageCallback)]
newPChans = IO (IO ()) -> m (IO ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IO ()) -> m (IO ())) -> IO (IO ()) -> m (IO ())
forall a b. (a -> b) -> a -> b
$ do
ident <- STM UnregisterHandle -> IO UnregisterHandle
forall a. STM a -> IO a
atomically (STM UnregisterHandle -> IO UnregisterHandle)
-> STM UnregisterHandle -> IO UnregisterHandle
forall a b. (a -> b) -> a -> b
$ do
TVar UnregisterHandle
-> (UnregisterHandle -> UnregisterHandle) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar (PubSubController -> TVar UnregisterHandle
lastUsedCallbackId PubSubController
ctrl) (UnregisterHandle -> UnregisterHandle -> UnregisterHandle
forall a. Num a => a -> a -> a
+UnregisterHandle
1)
ident <- TVar UnregisterHandle -> STM UnregisterHandle
forall a. TVar a -> STM a
readTVar (TVar UnregisterHandle -> STM UnregisterHandle)
-> TVar UnregisterHandle -> STM UnregisterHandle
forall a b. (a -> b) -> a -> b
$ PubSubController -> TVar UnregisterHandle
lastUsedCallbackId PubSubController
ctrl
newChannels <- addChannelsOfType ident newChans $ pscChannelData ctrl
newPChannels <- addChannelsOfType ident newPChans $ pscPChannelData ctrl
let ps = [ByteString] -> PubSub
subscribe [ByteString]
newChannels PubSub -> PubSub -> PubSub
forall a. Monoid a => a -> a -> a
`mappend` [ByteString] -> PubSub
psubscribe [ByteString]
newPChannels
writeTBQueue (sendChanges ctrl) ps
return ident
return $ unsubChannels ctrl (map fst newChans) (map fst newPChans) ident
addChannelsAndWait :: MonadIO m => PubSubController
-> [(RedisChannel, MessageCallback)]
-> [(RedisPChannel, PMessageCallback)]
-> m UnregisterCallbacksAction
addChannelsAndWait :: forall (m :: * -> *).
MonadIO m =>
PubSubController
-> [(ByteString, ByteString -> IO ())]
-> [(ByteString, PMessageCallback)]
-> m (IO ())
addChannelsAndWait PubSubController
_ [] [] = IO () -> m (IO ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (IO () -> m (IO ())) -> IO () -> m (IO ())
forall a b. (a -> b) -> a -> b
$ () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
addChannelsAndWait PubSubController
ctrl [(ByteString, ByteString -> IO ())]
newChans [(ByteString, PMessageCallback)]
newPChans = do
unreg <- PubSubController
-> [(ByteString, ByteString -> IO ())]
-> [(ByteString, PMessageCallback)]
-> m (IO ())
forall (m :: * -> *).
MonadIO m =>
PubSubController
-> [(ByteString, ByteString -> IO ())]
-> [(ByteString, PMessageCallback)]
-> m (IO ())
addChannels PubSubController
ctrl [(ByteString, ByteString -> IO ())]
newChans [(ByteString, PMessageCallback)]
newPChans
liftIO $
waitUntilAbsent
[ (cdChannelsPendingSubscription $ pscChannelData ctrl, fst <$> newChans)
, (cdChannelsPendingSubscription $ pscPChannelData ctrl, fst <$> newPChans)
]
return unreg
waitUntilAbsent :: Hashable channel => [(TVar (HS.HashSet channel), [channel])] -> IO ()
waitUntilAbsent :: forall channel.
Hashable channel =>
[(TVar (HashSet channel), [channel])] -> IO ()
waitUntilAbsent [(TVar (HashSet channel), [channel])]
pending = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
[(TVar (HashSet channel), [channel])]
-> ((TVar (HashSet channel), [channel]) -> STM ()) -> STM ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [(TVar (HashSet channel), [channel])]
pending (((TVar (HashSet channel), [channel]) -> STM ()) -> STM ())
-> ((TVar (HashSet channel), [channel]) -> STM ()) -> STM ()
forall a b. (a -> b) -> a -> b
$ \(TVar (HashSet channel)
tPendingChannels, [channel]
channels) -> do
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([channel] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [channel]
channels) (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ do
pendingChannels' <- TVar (HashSet channel) -> STM (HashSet channel)
forall a. TVar a -> STM a
readTVar TVar (HashSet channel)
tPendingChannels
when (any (\channel
ch -> channel -> HashSet channel -> Bool
forall a. Hashable a => a -> HashSet a -> Bool
HS.member channel
ch HashSet channel
pendingChannels') channels) retry
removeChannels :: MonadIO m => PubSubController
-> [RedisChannel]
-> [RedisPChannel]
-> m ()
removeChannels :: forall (m :: * -> *).
MonadIO m =>
PubSubController -> [ByteString] -> [ByteString] -> m ()
removeChannels PubSubController
_ [] [] = () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
removeChannels PubSubController
ctrl [ByteString]
remChans [ByteString]
remPChans = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
remChans' <- ChannelData ByteString (ByteString -> IO ())
-> [ByteString] -> STM [ByteString]
forall channel callback.
Hashable channel =>
ChannelData channel callback -> [channel] -> STM [channel]
removeChannels' (PubSubController -> ChannelData ByteString (ByteString -> IO ())
pscChannelData PubSubController
ctrl) [ByteString]
remChans
remPChans' <- removeChannels' (pscPChannelData ctrl) remPChans
writeTBQueue (sendChanges ctrl) $ unsubscribe1 remChans' `mappend` punsubscribe1 remPChans'
#if !(MIN_VERSION_stm(2,3,0))
modifyTVar' :: TVar a -> (a -> a) -> STM ()
modifyTVar' var f = do
x <- readTVar var
writeTVar var $! f x
{-# INLINE modifyTVar' #-}
#endif
removeChannels' :: (Hashable channel) => ChannelData channel callback -> [channel] -> STM [channel]
removeChannels' :: forall channel callback.
Hashable channel =>
ChannelData channel callback -> [channel] -> STM [channel]
removeChannels' ChannelData channel callback
channelData [channel]
remChannels = do
subbedChannels <- TVar (HashMap channel [(UnregisterHandle, callback)])
-> STM (HashMap channel [(UnregisterHandle, callback)])
forall a. TVar a -> STM a
readTVar (TVar (HashMap channel [(UnregisterHandle, callback)])
-> STM (HashMap channel [(UnregisterHandle, callback)]))
-> TVar (HashMap channel [(UnregisterHandle, callback)])
-> STM (HashMap channel [(UnregisterHandle, callback)])
forall a b. (a -> b) -> a -> b
$ ChannelData channel callback
-> TVar (HashMap channel [(UnregisterHandle, callback)])
forall channel callback.
ChannelData channel callback
-> TVar (HashMap channel [(UnregisterHandle, callback)])
cdSubscribedChannels ChannelData channel callback
channelData
pendingChannelSubs <- readTVar $ cdChannelsPendingSubscription channelData
let remChannels' = (channel -> Bool) -> [channel] -> [channel]
forall a. (a -> Bool) -> [a] -> [a]
filter (HashMap channel [(UnregisterHandle, callback)]
-> HashSet channel -> channel -> Bool
forall k v1. Hashable k => HashMap k v1 -> HashSet k -> k -> Bool
memberMapOrSet HashMap channel [(UnregisterHandle, callback)]
subbedChannels HashSet channel
pendingChannelSubs) [channel]
remChannels
writeTVar (cdSubscribedChannels channelData) (L.foldl' (flip HM.delete) subbedChannels remChannels')
writeTVar (cdChannelsPendingSubscription channelData) (L.foldl' (flip HS.delete) pendingChannelSubs remChannels')
modifyTVar' (cdChannelsPendingRemoval channelData) $ flip (L.foldl' $ flip HS.insert) remChannels'
pure remChannels'
memberMapOrSet :: Hashable k => HM.HashMap k v1 -> HS.HashSet k -> k -> Bool
memberMapOrSet :: forall k v1. Hashable k => HashMap k v1 -> HashSet k -> k -> Bool
memberMapOrSet HashMap k v1
m HashSet k
s k
k = k -> HashMap k v1 -> Bool
forall k a. Hashable k => k -> HashMap k a -> Bool
HM.member k
k HashMap k v1
m Bool -> Bool -> Bool
|| k -> HashSet k -> Bool
forall a. Hashable a => a -> HashSet a -> Bool
HS.member k
k HashSet k
s
unregisterHandles
:: forall channel callback. Hashable channel => ChannelData channel callback
-> [channel]
-> UnregisterHandle
-> STM [channel]
unregisterHandles :: forall channel callback.
Hashable channel =>
ChannelData channel callback
-> [channel] -> UnregisterHandle -> STM [channel]
unregisterHandles ChannelData channel callback
channelData [channel]
remChansParam UnregisterHandle
h = do
callbacks <- TVar (HashMap channel [(UnregisterHandle, callback)])
-> STM (HashMap channel [(UnregisterHandle, callback)])
forall a. TVar a -> STM a
readTVar (TVar (HashMap channel [(UnregisterHandle, callback)])
-> STM (HashMap channel [(UnregisterHandle, callback)]))
-> TVar (HashMap channel [(UnregisterHandle, callback)])
-> STM (HashMap channel [(UnregisterHandle, callback)])
forall a b. (a -> b) -> a -> b
$ ChannelData channel callback
-> TVar (HashMap channel [(UnregisterHandle, callback)])
forall channel callback.
ChannelData channel callback
-> TVar (HashMap channel [(UnregisterHandle, callback)])
cdSubscribedChannels ChannelData channel callback
channelData
let remChans = (channel -> Bool) -> [channel] -> [channel]
forall a. (a -> Bool) -> [a] -> [a]
filter (channel -> HashMap channel [(UnregisterHandle, callback)] -> Bool
forall k a. Hashable k => k -> HashMap k a -> Bool
`HM.member` HashMap channel [(UnregisterHandle, callback)]
callbacks) [channel]
remChansParam
let callbacks' = (HashMap channel [(UnregisterHandle, callback)]
-> channel -> HashMap channel [(UnregisterHandle, callback)])
-> HashMap channel [(UnregisterHandle, callback)]
-> [channel]
-> HashMap channel [(UnregisterHandle, callback)]
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
L.foldl' HashMap channel [(UnregisterHandle, callback)]
-> channel -> HashMap channel [(UnregisterHandle, callback)]
forall a.
HashMap channel [(UnregisterHandle, a)]
-> channel -> HashMap channel [(UnregisterHandle, a)]
removeHandles HashMap channel [(UnregisterHandle, callback)]
callbacks [channel]
remChans
remChans' = (channel -> Bool) -> [channel] -> [channel]
forall a. (a -> Bool) -> [a] -> [a]
filter (\channel
chan -> channel -> HashMap channel [(UnregisterHandle, callback)] -> Bool
forall k a. Hashable k => k -> HashMap k a -> Bool
HM.member channel
chan HashMap channel [(UnregisterHandle, callback)]
callbacks Bool -> Bool -> Bool
&& Bool -> Bool
not (channel -> HashMap channel [(UnregisterHandle, callback)] -> Bool
forall k a. Hashable k => k -> HashMap k a -> Bool
HM.member channel
chan HashMap channel [(UnregisterHandle, callback)]
callbacks')) [channel]
remChans
writeTVar (cdSubscribedChannels channelData) callbacks'
unless (null remChans') $ modifyTVar (cdChannelsPendingSubscription channelData) (`HS.difference` HS.fromList remChans')
pure remChans'
where
filterHandle :: Maybe [(UnregisterHandle,a)] -> Maybe [(UnregisterHandle,a)]
filterHandle :: forall a.
Maybe [(UnregisterHandle, a)] -> Maybe [(UnregisterHandle, a)]
filterHandle Maybe [(UnregisterHandle, a)]
Nothing = Maybe [(UnregisterHandle, a)]
forall a. Maybe a
Nothing
filterHandle (Just [(UnregisterHandle, a)]
lst) = case ((UnregisterHandle, a) -> Bool)
-> [(UnregisterHandle, a)] -> [(UnregisterHandle, a)]
forall a. (a -> Bool) -> [a] -> [a]
filter (\(UnregisterHandle, a)
x -> (UnregisterHandle, a) -> UnregisterHandle
forall a b. (a, b) -> a
fst (UnregisterHandle, a)
x UnregisterHandle -> UnregisterHandle -> Bool
forall a. Eq a => a -> a -> Bool
/= UnregisterHandle
h) [(UnregisterHandle, a)]
lst of
[] -> Maybe [(UnregisterHandle, a)]
forall a. Maybe a
Nothing
[(UnregisterHandle, a)]
xs -> [(UnregisterHandle, a)] -> Maybe [(UnregisterHandle, a)]
forall a. a -> Maybe a
Just [(UnregisterHandle, a)]
xs
removeHandles :: HM.HashMap channel [(UnregisterHandle,a)]
-> channel
-> HM.HashMap channel [(UnregisterHandle,a)]
removeHandles :: forall a.
HashMap channel [(UnregisterHandle, a)]
-> channel -> HashMap channel [(UnregisterHandle, a)]
removeHandles HashMap channel [(UnregisterHandle, a)]
m channel
k = case Maybe [(UnregisterHandle, a)] -> Maybe [(UnregisterHandle, a)]
forall a.
Maybe [(UnregisterHandle, a)] -> Maybe [(UnregisterHandle, a)]
filterHandle (channel
-> HashMap channel [(UnregisterHandle, a)]
-> Maybe [(UnregisterHandle, a)]
forall k v. Hashable k => k -> HashMap k v -> Maybe v
HM.lookup channel
k HashMap channel [(UnregisterHandle, a)]
m) of
Maybe [(UnregisterHandle, a)]
Nothing -> channel
-> HashMap channel [(UnregisterHandle, a)]
-> HashMap channel [(UnregisterHandle, a)]
forall k v. Hashable k => k -> HashMap k v -> HashMap k v
HM.delete channel
k HashMap channel [(UnregisterHandle, a)]
m
Just [(UnregisterHandle, a)]
v -> channel
-> [(UnregisterHandle, a)]
-> HashMap channel [(UnregisterHandle, a)]
-> HashMap channel [(UnregisterHandle, a)]
forall k v. Hashable k => k -> v -> HashMap k v -> HashMap k v
HM.insert channel
k [(UnregisterHandle, a)]
v HashMap channel [(UnregisterHandle, a)]
m
unsubChannels :: PubSubController -> [RedisChannel] -> [RedisPChannel] -> UnregisterHandle -> IO ()
unsubChannels :: PubSubController
-> [ByteString] -> [ByteString] -> UnregisterHandle -> IO ()
unsubChannels PubSubController
ctrl [ByteString]
chans [ByteString]
pchans UnregisterHandle
h = IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
channelsToDrop <- ChannelData ByteString (ByteString -> IO ())
-> [ByteString] -> UnregisterHandle -> STM [ByteString]
forall channel callback.
Hashable channel =>
ChannelData channel callback
-> [channel] -> UnregisterHandle -> STM [channel]
unregisterHandles (PubSubController -> ChannelData ByteString (ByteString -> IO ())
pscChannelData PubSubController
ctrl) [ByteString]
chans UnregisterHandle
h
pChannelsToDrop <- unregisterHandles (pscPChannelData ctrl) pchans h
let commands = [ByteString] -> PubSub
unsubscribe1 [ByteString]
channelsToDrop PubSub -> PubSub -> PubSub
forall a. Monoid a => a -> a -> a
`mappend` [ByteString] -> PubSub
punsubscribe1 [ByteString]
pChannelsToDrop
writeTBQueue (sendChanges ctrl) commands
return ()
removeChannelsAndWait :: MonadIO m => PubSubController
-> [RedisChannel]
-> [RedisPChannel]
-> m ()
removeChannelsAndWait :: forall (m :: * -> *).
MonadIO m =>
PubSubController -> [ByteString] -> [ByteString] -> m ()
removeChannelsAndWait PubSubController
ctrl [ByteString]
remChannels [ByteString]
remPChannels = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
(remChans', remPChans') <- STM ([ByteString], [ByteString]) -> IO ([ByteString], [ByteString])
forall a. STM a -> IO a
atomically (STM ([ByteString], [ByteString])
-> IO ([ByteString], [ByteString]))
-> STM ([ByteString], [ByteString])
-> IO ([ByteString], [ByteString])
forall a b. (a -> b) -> a -> b
$ do
remChans' <- ChannelData ByteString (ByteString -> IO ())
-> [ByteString] -> STM [ByteString]
forall channel callback.
Hashable channel =>
ChannelData channel callback -> [channel] -> STM [channel]
removeChannels' (PubSubController -> ChannelData ByteString (ByteString -> IO ())
pscChannelData PubSubController
ctrl) [ByteString]
remChannels
remPChans' <- removeChannels' (pscPChannelData ctrl) remPChannels
writeTBQueue (sendChanges ctrl) $ unsubscribe1 remChans' `mappend` punsubscribe1 remPChans'
pure (remChans', remPChans')
waitUntilAbsent
[ (cdChannelsPendingRemoval $ pscChannelData ctrl, remChans')
, (cdChannelsPendingRemoval $ pscPChannelData ctrl, remPChans')
]
listenThread :: PubSubController -> PP.Connection -> IO ()
listenThread :: PubSubController -> Connection -> IO ()
listenThread PubSubController
ctrl Connection
rawConn = IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
msg <- Connection -> IO Reply
PP.recv Connection
rawConn
case decodeMsg msg of
Msg message :: Message
message@(Message ByteString
channel ByteString
_) -> do
cm <- STM (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> IO
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a. STM a -> IO a
atomically (STM (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> IO
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]))
-> STM
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> IO
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a b. (a -> b) -> a -> b
$ TVar (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a. TVar a -> STM a
readTVar (TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]))
-> TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a b. (a -> b) -> a -> b
$ ChannelData ByteString (ByteString -> IO ())
-> TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall channel callback.
ChannelData channel callback
-> TVar (HashMap channel [(UnregisterHandle, callback)])
cdSubscribedChannels (ChannelData ByteString (ByteString -> IO ())
-> TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]))
-> ChannelData ByteString (ByteString -> IO ())
-> TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a b. (a -> b) -> a -> b
$ PubSubController -> ChannelData ByteString (ByteString -> IO ())
pscChannelData PubSubController
ctrl
forM_ (HM.lookup channel cm) $ \[(UnregisterHandle, ByteString -> IO ())]
c -> do
IO PubSub -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO PubSub -> IO ()) -> IO PubSub -> IO ()
forall a b. (a -> b) -> a -> b
$ Hooks -> CallbackHook
Core.callbackHook (Connection -> Hooks
PP.hooks Connection
rawConn) (\Message
m -> ((UnregisterHandle, ByteString -> IO ()) -> IO ())
-> [(UnregisterHandle, ByteString -> IO ())] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\(UnregisterHandle
_,ByteString -> IO ()
x) -> ByteString -> IO ()
x (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ Message -> ByteString
msgMessage Message
m) [(UnregisterHandle, ByteString -> IO ())]
c IO () -> PubSub -> IO PubSub
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> PubSub
forall a. Monoid a => a
mempty) Message
message
Msg message :: Message
message@(PMessage ByteString
pattern ByteString
_ ByteString
_) -> do
pm <- STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> IO (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a. STM a -> IO a
atomically (STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> IO (HashMap ByteString [(UnregisterHandle, PMessageCallback)]))
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> IO (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a b. (a -> b) -> a -> b
$ TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a. TVar a -> STM a
readTVar (TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)]))
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a b. (a -> b) -> a -> b
$ ChannelData ByteString PMessageCallback
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall channel callback.
ChannelData channel callback
-> TVar (HashMap channel [(UnregisterHandle, callback)])
cdSubscribedChannels (ChannelData ByteString PMessageCallback
-> TVar
(HashMap ByteString [(UnregisterHandle, PMessageCallback)]))
-> ChannelData ByteString PMessageCallback
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a b. (a -> b) -> a -> b
$ PubSubController -> ChannelData ByteString PMessageCallback
pscPChannelData PubSubController
ctrl
forM_ (HM.lookup pattern pm) $ \[(UnregisterHandle, PMessageCallback)]
c -> do
IO PubSub -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO PubSub -> IO ()) -> IO PubSub -> IO ()
forall a b. (a -> b) -> a -> b
$ Hooks -> CallbackHook
Core.callbackHook (Connection -> Hooks
PP.hooks Connection
rawConn) (\Message
m -> ((UnregisterHandle, PMessageCallback) -> IO ())
-> [(UnregisterHandle, PMessageCallback)] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\(UnregisterHandle
_,PMessageCallback
x) -> PMessageCallback
x (Message -> ByteString
msgChannel Message
m) (Message -> ByteString
msgMessage Message
m)) [(UnregisterHandle, PMessageCallback)]
c IO () -> PubSub -> IO PubSub
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> PubSub
forall a. Monoid a => a
mempty) Message
message
Subscribed ByteString
chan -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (HashSet ByteString)
-> (HashSet ByteString -> HashSet ByteString) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar (ChannelData ByteString (ByteString -> IO ())
-> TVar (HashSet ByteString)
forall channel callback.
ChannelData channel callback -> TVar (HashSet channel)
cdChannelsPendingSubscription (ChannelData ByteString (ByteString -> IO ())
-> TVar (HashSet ByteString))
-> ChannelData ByteString (ByteString -> IO ())
-> TVar (HashSet ByteString)
forall a b. (a -> b) -> a -> b
$ PubSubController -> ChannelData ByteString (ByteString -> IO ())
pscChannelData PubSubController
ctrl) ((HashSet ByteString -> HashSet ByteString) -> STM ())
-> (HashSet ByteString -> HashSet ByteString) -> STM ()
forall a b. (a -> b) -> a -> b
$ ByteString -> HashSet ByteString -> HashSet ByteString
forall a. Hashable a => a -> HashSet a -> HashSet a
HS.delete ByteString
chan
PSubscribed ByteString
chan -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (HashSet ByteString)
-> (HashSet ByteString -> HashSet ByteString) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar (ChannelData ByteString PMessageCallback
-> TVar (HashSet ByteString)
forall channel callback.
ChannelData channel callback -> TVar (HashSet channel)
cdChannelsPendingSubscription (ChannelData ByteString PMessageCallback
-> TVar (HashSet ByteString))
-> ChannelData ByteString PMessageCallback
-> TVar (HashSet ByteString)
forall a b. (a -> b) -> a -> b
$ PubSubController -> ChannelData ByteString PMessageCallback
pscPChannelData PubSubController
ctrl) ((HashSet ByteString -> HashSet ByteString) -> STM ())
-> (HashSet ByteString -> HashSet ByteString) -> STM ()
forall a b. (a -> b) -> a -> b
$ ByteString -> HashSet ByteString -> HashSet ByteString
forall a. Hashable a => a -> HashSet a -> HashSet a
HS.delete ByteString
chan
Unsubscribed ByteString
chan Int
_ -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (HashSet ByteString)
-> (HashSet ByteString -> HashSet ByteString) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar (ChannelData ByteString (ByteString -> IO ())
-> TVar (HashSet ByteString)
forall channel callback.
ChannelData channel callback -> TVar (HashSet channel)
cdChannelsPendingRemoval (ChannelData ByteString (ByteString -> IO ())
-> TVar (HashSet ByteString))
-> ChannelData ByteString (ByteString -> IO ())
-> TVar (HashSet ByteString)
forall a b. (a -> b) -> a -> b
$ PubSubController -> ChannelData ByteString (ByteString -> IO ())
pscChannelData PubSubController
ctrl) ((HashSet ByteString -> HashSet ByteString) -> STM ())
-> (HashSet ByteString -> HashSet ByteString) -> STM ()
forall a b. (a -> b) -> a -> b
$ ByteString -> HashSet ByteString -> HashSet ByteString
forall a. Hashable a => a -> HashSet a -> HashSet a
HS.delete ByteString
chan
PUnsubscribed ByteString
chan Int
_ -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (HashSet ByteString)
-> (HashSet ByteString -> HashSet ByteString) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar (ChannelData ByteString PMessageCallback
-> TVar (HashSet ByteString)
forall channel callback.
ChannelData channel callback -> TVar (HashSet channel)
cdChannelsPendingRemoval (ChannelData ByteString PMessageCallback
-> TVar (HashSet ByteString))
-> ChannelData ByteString PMessageCallback
-> TVar (HashSet ByteString)
forall a b. (a -> b) -> a -> b
$ PubSubController -> ChannelData ByteString PMessageCallback
pscPChannelData PubSubController
ctrl) ((HashSet ByteString -> HashSet ByteString) -> STM ())
-> (HashSet ByteString -> HashSet ByteString) -> STM ()
forall a b. (a -> b) -> a -> b
$ ByteString -> HashSet ByteString -> HashSet ByteString
forall a. Hashable a => a -> HashSet a -> HashSet a
HS.delete ByteString
chan
sendThread :: PubSubController -> PP.Connection -> IO ()
sendThread :: PubSubController -> Connection -> IO ()
sendThread PubSubController
ctrl Connection
rawConn = IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
PubSub{..} <- STM PubSub -> IO PubSub
forall a. STM a -> IO a
atomically (STM PubSub -> IO PubSub) -> STM PubSub -> IO PubSub
forall a b. (a -> b) -> a -> b
$ TBQueue PubSub -> STM PubSub
forall a. TBQueue a -> STM a
readTBQueue (PubSubController -> TBQueue PubSub
sendChanges PubSubController
ctrl)
rawSendCmd rawConn subs
rawSendCmd rawConn unsubs
rawSendCmd rawConn psubs
rawSendCmd rawConn punsubs
PP.flush rawConn
pubSubForever :: Connection.Connection
-> PubSubController
-> IO ()
-> IO ()
pubSubForever :: Connection -> PubSubController -> IO () -> IO ()
pubSubForever (Connection.NonClusteredConnection Pool Connection
pool) PubSubController
ctrl IO ()
onInitialLoad =
Pool Connection -> (Connection -> IO ()) -> IO ()
forall a r. Pool a -> (a -> IO r) -> IO r
withResource Pool Connection
pool ((Connection -> IO ()) -> IO ()) -> (Connection -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Connection
rawConn -> Connection -> PubSubController -> IO () -> IO ()
pubSubForeverOnConn Connection
rawConn PubSubController
ctrl IO ()
onInitialLoad
pubSubForever (Connection.ClusteredConnection MVar ShardMap
_ Pool Connection
pool) PubSubController
ctrl IO ()
onInitialLoad = Pool Connection -> (Connection -> IO ()) -> IO ()
forall a r. Pool a -> (a -> IO r) -> IO r
withResource Pool Connection
pool ((Connection -> IO ()) -> IO ()) -> (Connection -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Connection
clusterConn -> do
masterNodeConns <- Connection -> IO [NodeConnection]
Cluster.masterNodes Connection
clusterConn
nodeConn <- case masterNodeConns of
[] -> IOError -> IO NodeConnection
forall a. IOError -> IO a
ioError (IOError -> IO NodeConnection) -> IOError -> IO NodeConnection
forall a b. (a -> b) -> a -> b
$ String -> IOError
userError String
"Hedis: clustered pubSubForever requires at least one master node"
NodeConnection
x:[NodeConnection]
_ -> NodeConnection -> IO NodeConnection
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure NodeConnection
x
rawConn <- PP.fromCtxWithHooks (Cluster.nodeConnectionContext nodeConn) (Cluster.hooks clusterConn)
PP.beginReceiving rawConn
pubSubForeverOnConn rawConn ctrl onInitialLoad
pubSubForeverOnConn :: PP.Connection -> PubSubController -> IO () -> IO ()
pubSubForeverOnConn :: Connection -> PubSubController -> IO () -> IO ()
pubSubForeverOnConn Connection
rawConn PubSubController
ctrl IO ()
onInitialLoad = do
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
let loop :: STM ()
loop = TBQueue PubSub -> STM (Maybe PubSub)
forall a. TBQueue a -> STM (Maybe a)
tryReadTBQueue (PubSubController -> TBQueue PubSub
sendChanges PubSubController
ctrl) STM (Maybe PubSub) -> (Maybe PubSub -> STM ()) -> STM ()
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>=
\Maybe PubSub
x -> if Maybe PubSub -> Bool
forall a. Maybe a -> Bool
isJust Maybe PubSub
x then STM ()
loop else () -> STM ()
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
STM ()
loop
channels <- (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> [ByteString])
-> STM
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM [ByteString]
forall a b. (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> [ByteString]
forall k v. HashMap k v -> [k]
HM.keys (STM (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM [ByteString])
-> STM
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM [ByteString]
forall a b. (a -> b) -> a -> b
$ TVar (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a. TVar a -> STM a
readTVar (TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]))
-> TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a b. (a -> b) -> a -> b
$ ChannelData ByteString (ByteString -> IO ())
-> TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall channel callback.
ChannelData channel callback
-> TVar (HashMap channel [(UnregisterHandle, callback)])
cdSubscribedChannels (ChannelData ByteString (ByteString -> IO ())
-> TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]))
-> ChannelData ByteString (ByteString -> IO ())
-> TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a b. (a -> b) -> a -> b
$ PubSubController -> ChannelData ByteString (ByteString -> IO ())
pscChannelData PubSubController
ctrl
patternChannels <- fmap HM.keys $ readTVar $ cdSubscribedChannels $ pscPChannelData ctrl
let ps = [ByteString] -> PubSub
subscribe [ByteString]
channels PubSub -> PubSub -> PubSub
forall a. Monoid a => a -> a -> a
`mappend` [ByteString] -> PubSub
psubscribe [ByteString]
patternChannels
writeTBQueue (sendChanges ctrl) ps
writeTVar (cdChannelsPendingSubscription $ pscChannelData ctrl) $ HS.fromList channels
writeTVar (cdChannelsPendingSubscription $ pscPChannelData ctrl) $ HS.fromList patternChannels
IO () -> (Async () -> IO ()) -> IO ()
forall a b. IO a -> (Async a -> IO b) -> IO b
withAsync (PubSubController -> Connection -> IO ()
listenThread PubSubController
ctrl Connection
rawConn) ((Async () -> IO ()) -> IO ()) -> (Async () -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Async ()
listenT ->
IO () -> (Async () -> IO ()) -> IO ()
forall a b. IO a -> (Async a -> IO b) -> IO b
withAsync (PubSubController -> Connection -> IO ()
sendThread PubSubController
ctrl Connection
rawConn) ((Async () -> IO ()) -> IO ()) -> (Async () -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Async ()
sendT -> do
mret <- STM
(Either
(Either (Either SomeException ()) (Either SomeException ())) ())
-> IO
(Either
(Either (Either SomeException ()) (Either SomeException ())) ())
forall a. STM a -> IO a
atomically (STM
(Either
(Either (Either SomeException ()) (Either SomeException ())) ())
-> IO
(Either
(Either (Either SomeException ()) (Either SomeException ())) ()))
-> STM
(Either
(Either (Either SomeException ()) (Either SomeException ())) ())
-> IO
(Either
(Either (Either SomeException ()) (Either SomeException ())) ())
forall a b. (a -> b) -> a -> b
$
(Either (Either SomeException ()) (Either SomeException ())
-> Either
(Either (Either SomeException ()) (Either SomeException ())) ()
forall a b. a -> Either a b
Left (Either (Either SomeException ()) (Either SomeException ())
-> Either
(Either (Either SomeException ()) (Either SomeException ())) ())
-> STM (Either (Either SomeException ()) (Either SomeException ()))
-> STM
(Either
(Either (Either SomeException ()) (Either SomeException ())) ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Async ()
-> Async ()
-> STM (Either (Either SomeException ()) (Either SomeException ()))
forall a b.
Async a
-> Async b
-> STM (Either (Either SomeException a) (Either SomeException b))
waitEitherCatchSTM Async ()
listenT Async ()
sendT))
STM
(Either
(Either (Either SomeException ()) (Either SomeException ())) ())
-> STM
(Either
(Either (Either SomeException ()) (Either SomeException ())) ())
-> STM
(Either
(Either (Either SomeException ()) (Either SomeException ())) ())
forall a. STM a -> STM a -> STM a
`orElse`
(()
-> Either
(Either (Either SomeException ()) (Either SomeException ())) ()
forall a b. b -> Either a b
Right (()
-> Either
(Either (Either SomeException ()) (Either SomeException ())) ())
-> STM ()
-> STM
(Either
(Either (Either SomeException ()) (Either SomeException ())) ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> do
a <- TVar (HashSet ByteString) -> STM (HashSet ByteString)
forall a. TVar a -> STM a
readTVar (TVar (HashSet ByteString) -> STM (HashSet ByteString))
-> TVar (HashSet ByteString) -> STM (HashSet ByteString)
forall a b. (a -> b) -> a -> b
$ ChannelData ByteString (ByteString -> IO ())
-> TVar (HashSet ByteString)
forall channel callback.
ChannelData channel callback -> TVar (HashSet channel)
cdChannelsPendingSubscription (ChannelData ByteString (ByteString -> IO ())
-> TVar (HashSet ByteString))
-> ChannelData ByteString (ByteString -> IO ())
-> TVar (HashSet ByteString)
forall a b. (a -> b) -> a -> b
$ PubSubController -> ChannelData ByteString (ByteString -> IO ())
pscChannelData PubSubController
ctrl
unless (HS.null a) retry
b <- readTVar $ cdChannelsPendingSubscription $ pscPChannelData ctrl
unless (HS.null b) retry)
case mret of
Right () -> IO ()
onInitialLoad
Either
(Either (Either SomeException ()) (Either SomeException ())) ()
_ -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
merr <- waitEitherCatch listenT sendT
case merr of
(Right (Left SomeException
err)) -> SomeException -> IO ()
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO SomeException
err
(Left (Left SomeException
err)) -> SomeException -> IO ()
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO SomeException
err
Either (Either SomeException ()) (Either SomeException ())
_ -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
decodeMsg :: Reply -> PubSubReply
decodeMsg :: Reply -> PubSubReply
decodeMsg r :: Reply
r@(MultiBulk (Just (Reply
r0:Reply
r1:Reply
r2:[Reply]
rs))) = (Reply -> PubSubReply)
-> (PubSubReply -> PubSubReply)
-> Either Reply PubSubReply
-> PubSubReply
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (Reply -> Reply -> PubSubReply
forall a. Reply -> a
errMsg Reply
r) PubSubReply -> PubSubReply
forall a. a -> a
id (Either Reply PubSubReply -> PubSubReply)
-> Either Reply PubSubReply -> PubSubReply
forall a b. (a -> b) -> a -> b
$ do
kind <- Reply -> Either Reply ByteString
forall a. RedisResult a => Reply -> Either Reply a
decode Reply
r0
case kind :: ByteString of
ByteString
"message" -> Message -> PubSubReply
Msg (Message -> PubSubReply)
-> Either Reply Message -> Either Reply PubSubReply
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Either Reply Message
decodeMessage
ByteString
"pmessage" -> Message -> PubSubReply
Msg (Message -> PubSubReply)
-> Either Reply Message -> Either Reply PubSubReply
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Either Reply Message
decodePMessage
ByteString
"subscribe" -> ByteString -> PubSubReply
Subscribed (ByteString -> PubSubReply)
-> Either Reply ByteString -> Either Reply PubSubReply
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Either Reply ByteString
decodeChan
ByteString
"psubscribe" -> ByteString -> PubSubReply
PSubscribed (ByteString -> PubSubReply)
-> Either Reply ByteString -> Either Reply PubSubReply
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Either Reply ByteString
decodeChan
ByteString
"unsubscribe" -> ByteString -> Int -> PubSubReply
Unsubscribed (ByteString -> Int -> PubSubReply)
-> Either Reply ByteString -> Either Reply (Int -> PubSubReply)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Either Reply ByteString
decodeChan Either Reply (Int -> PubSubReply)
-> Either Reply Int -> Either Reply PubSubReply
forall a b.
Either Reply (a -> b) -> Either Reply a -> Either Reply b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Either Reply Int
decodeCnt
ByteString
"punsubscribe" -> ByteString -> Int -> PubSubReply
PUnsubscribed (ByteString -> Int -> PubSubReply)
-> Either Reply ByteString -> Either Reply (Int -> PubSubReply)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Either Reply ByteString
decodeChan Either Reply (Int -> PubSubReply)
-> Either Reply Int -> Either Reply PubSubReply
forall a b.
Either Reply (a -> b) -> Either Reply a -> Either Reply b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Either Reply Int
decodeCnt
ByteString
_ -> Reply -> Either Reply PubSubReply
forall a. Reply -> a
errMsg Reply
r
where
decodeMessage :: Either Reply Message
decodeMessage = ByteString -> ByteString -> Message
Message (ByteString -> ByteString -> Message)
-> Either Reply ByteString -> Either Reply (ByteString -> Message)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Reply -> Either Reply ByteString
forall a. RedisResult a => Reply -> Either Reply a
decode Reply
r1 Either Reply (ByteString -> Message)
-> Either Reply ByteString -> Either Reply Message
forall a b.
Either Reply (a -> b) -> Either Reply a -> Either Reply b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Reply -> Either Reply ByteString
forall a. RedisResult a => Reply -> Either Reply a
decode Reply
r2
decodePMessage :: Either Reply Message
decodePMessage = ByteString -> ByteString -> ByteString -> Message
PMessage (ByteString -> ByteString -> ByteString -> Message)
-> Either Reply ByteString
-> Either Reply (ByteString -> ByteString -> Message)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Reply -> Either Reply ByteString
forall a. RedisResult a => Reply -> Either Reply a
decode Reply
r1 Either Reply (ByteString -> ByteString -> Message)
-> Either Reply ByteString -> Either Reply (ByteString -> Message)
forall a b.
Either Reply (a -> b) -> Either Reply a -> Either Reply b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Reply -> Either Reply ByteString
forall a. RedisResult a => Reply -> Either Reply a
decode Reply
r2 Either Reply (ByteString -> Message)
-> Either Reply ByteString -> Either Reply Message
forall a b.
Either Reply (a -> b) -> Either Reply a -> Either Reply b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Reply -> Either Reply ByteString
forall a. RedisResult a => Reply -> Either Reply a
decode ([Reply] -> Reply
forall a. HasCallStack => [a] -> a
head [Reply]
rs)
decodeCnt :: Either Reply Int
decodeCnt = Integer -> Int
forall a. Num a => Integer -> a
fromInteger (Integer -> Int) -> Either Reply Integer -> Either Reply Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Reply -> Either Reply Integer
forall a. RedisResult a => Reply -> Either Reply a
decode Reply
r2
decodeChan :: Either Reply ByteString
decodeChan = Reply -> Either Reply ByteString
forall a. RedisResult a => Reply -> Either Reply a
decode Reply
r1
decodeMsg Reply
r = Reply -> PubSubReply
forall a. Reply -> a
errMsg Reply
r
errMsg :: Reply -> a
errMsg :: forall a. Reply -> a
errMsg Reply
r = String -> a
forall a. HasCallStack => String -> a
error (String -> a) -> String -> a
forall a b. (a -> b) -> a -> b
$ String
"Hedis: expected pub/sub-message but got: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ Reply -> String
forall a. Show a => a -> String
show Reply
r
withPubSub :: Connection.Connection -> [ByteString] -> [ByteString] -> (STM Message -> IO r) -> IO r
withPubSub :: forall r.
Connection
-> [ByteString] -> [ByteString] -> (STM Message -> IO r) -> IO r
withPubSub (Connection.NonClusteredConnection Pool Connection
pool) [ByteString]
chans [ByteString]
pchans STM Message -> IO r
f = Pool Connection -> (Connection -> IO r) -> IO r
forall a r. Pool a -> (a -> IO r) -> IO r
withResource Pool Connection
pool ((Connection -> IO r) -> IO r) -> (Connection -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \Connection
rawConn -> do
IO (TChan Message)
forall a. IO (TChan a)
newTChanIO IO (TChan Message) -> (TChan Message -> IO r) -> IO r
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \TChan Message
messageChan -> TChan Message
-> [ByteString]
-> [ByteString]
-> Connection
-> (STM Message -> IO r)
-> IO r
forall r.
TChan Message
-> [ByteString]
-> [ByteString]
-> Connection
-> (STM Message -> IO r)
-> IO r
withPubSubOnConn TChan Message
messageChan [ByteString]
chans [ByteString]
pchans Connection
rawConn STM Message -> IO r
f
withPubSub (Connection.ClusteredConnection MVar ShardMap
_ Pool Connection
pool) [ByteString]
chans [ByteString]
pchans STM Message -> IO r
f = Pool Connection -> (Connection -> IO r) -> IO r
forall a r. Pool a -> (a -> IO r) -> IO r
withResource Pool Connection
pool ((Connection -> IO r) -> IO r) -> (Connection -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \Connection
clusterConn -> do
masterNodeConns <- Connection -> IO [NodeConnection]
Cluster.masterNodes Connection
clusterConn
nodeConn <- case masterNodeConns of
[] -> IOError -> IO NodeConnection
forall a. IOError -> IO a
ioError (IOError -> IO NodeConnection) -> IOError -> IO NodeConnection
forall a b. (a -> b) -> a -> b
$ String -> IOError
userError String
"Hedis: clustered withPubSub requires at least one master node"
NodeConnection
x:[NodeConnection]
_ -> NodeConnection -> IO NodeConnection
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure NodeConnection
x
rawConn <- PP.fromCtxWithHooks (Cluster.nodeConnectionContext nodeConn) (Cluster.hooks clusterConn)
PP.beginReceiving rawConn
newTChanIO >>= \TChan Message
messageChan -> TChan Message
-> [ByteString]
-> [ByteString]
-> Connection
-> (STM Message -> IO r)
-> IO r
forall r.
TChan Message
-> [ByteString]
-> [ByteString]
-> Connection
-> (STM Message -> IO r)
-> IO r
withPubSubOnConn TChan Message
messageChan [ByteString]
chans [ByteString]
pchans Connection
rawConn STM Message -> IO r
f
withPubSubOnConn :: TChan Message -> [ByteString] -> [ByteString] -> PP.Connection -> (STM Message -> IO r) -> IO r
withPubSubOnConn :: forall r.
TChan Message
-> [ByteString]
-> [ByteString]
-> Connection
-> (STM Message -> IO r)
-> IO r
withPubSubOnConn TChan Message
messageChan [ByteString]
chans [ByteString]
pchans Connection
rawConn STM Message -> IO r
f = do
IO ()
subscribeAll
(_, r) <- IO () -> IO r -> IO ((), r)
forall a b. IO a -> IO b -> IO (a, b)
concurrently IO ()
lThread (STM Message -> IO r
f (TChan Message -> STM Message
forall a. TChan a -> STM a
readTChan TChan Message
messageChan) IO r -> IO () -> IO r
forall a b. IO a -> IO b -> IO a
`finally` IO ()
unsubscribeAll)
pure r
where
subscribeAll :: IO ()
subscribeAll = do
Maybe (NonEmpty ByteString)
-> (NonEmpty ByteString -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ ([ByteString] -> Maybe (NonEmpty ByteString)
forall a. [a] -> Maybe (NonEmpty a)
NE.nonEmpty [ByteString]
chans) \NonEmpty ByteString
ne_chans ->
Connection -> ByteString -> IO ()
PP.send Connection
rawConn (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ [ByteString] -> ByteString
renderRequest (ByteString
"SUBSCRIBE" ByteString -> [ByteString] -> [ByteString]
forall a. a -> [a] -> [a]
: NonEmpty ByteString -> [ByteString]
forall a. NonEmpty a -> [a]
NE.toList NonEmpty ByteString
ne_chans)
Maybe (NonEmpty ByteString)
-> (NonEmpty ByteString -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ ([ByteString] -> Maybe (NonEmpty ByteString)
forall a. [a] -> Maybe (NonEmpty a)
NE.nonEmpty [ByteString]
pchans) \NonEmpty ByteString
ne_pchans ->
Connection -> ByteString -> IO ()
PP.send Connection
rawConn (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ [ByteString] -> ByteString
renderRequest (ByteString
"PSUBSCRIBE" ByteString -> [ByteString] -> [ByteString]
forall a. a -> [a] -> [a]
: NonEmpty ByteString -> [ByteString]
forall a. NonEmpty a -> [a]
NE.toList NonEmpty ByteString
ne_pchans)
Connection -> IO ()
PP.flush Connection
rawConn
unsubscribeAll :: IO ()
unsubscribeAll = do
Maybe (NonEmpty ByteString)
-> (NonEmpty ByteString -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ ([ByteString] -> Maybe (NonEmpty ByteString)
forall a. [a] -> Maybe (NonEmpty a)
NE.nonEmpty [ByteString]
chans) \NonEmpty ByteString
ne_chans ->
Connection -> ByteString -> IO ()
PP.send Connection
rawConn (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ [ByteString] -> ByteString
renderRequest (ByteString
"UNSUBSCRIBE" ByteString -> [ByteString] -> [ByteString]
forall a. a -> [a] -> [a]
: NonEmpty ByteString -> [ByteString]
forall a. NonEmpty a -> [a]
NE.toList NonEmpty ByteString
ne_chans)
Maybe (NonEmpty ByteString)
-> (NonEmpty ByteString -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ ([ByteString] -> Maybe (NonEmpty ByteString)
forall a. [a] -> Maybe (NonEmpty a)
NE.nonEmpty [ByteString]
pchans) \NonEmpty ByteString
ne_pchans ->
Connection -> ByteString -> IO ()
PP.send Connection
rawConn (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ [ByteString] -> ByteString
renderRequest (ByteString
"PUNSUBSCRIBE" ByteString -> [ByteString] -> [ByteString]
forall a. a -> [a] -> [a]
: NonEmpty ByteString -> [ByteString]
forall a. NonEmpty a -> [a]
NE.toList NonEmpty ByteString
ne_pchans)
Connection -> IO ()
PP.flush Connection
rawConn
lThread :: IO ()
lThread = (IO () -> IO ()) -> IO ()
forall a. (a -> a) -> a
fix \IO ()
next -> do
msg <- Connection -> IO Reply
PP.recv Connection
rawConn
case decodeMsg msg of
Msg Message
m -> do
STM () -> IO ()
forall a. STM a -> IO a
atomically (TChan Message -> Message -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan Message
messageChan Message
m)
IO ()
next
Unsubscribed ByteString
_ Int
0 -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
PUnsubscribed ByteString
_ Int
0 -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
PubSubReply
_ -> IO ()
next