{-# LANGUAGE TupleSections #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE OverloadedStrings #-}
module Database.Redis.Connection where

import Control.Exception
import qualified Control.Monad.Catch as Catch
import Control.Monad.IO.Class(liftIO, MonadIO)
import Control.Monad(when)
import Control.Concurrent.MVar(MVar, newMVar)
import qualified Data.ByteString as B
import qualified Data.ByteString.Char8 as Char8
import Data.Functor(void)
import qualified Data.IntMap.Strict as IntMap
import Data.Pool(Pool, withResource, createPool, destroyAllResources)
import Data.Typeable
import qualified Data.Time as Time
import Network.TLS (ClientParams)
import qualified Network.Socket as NS
import qualified Data.HashMap.Strict as HM

import qualified Database.Redis.ProtocolPipelining as PP
import Database.Redis.Core(Redis, runRedisInternal, runRedisClusteredInternal)
import Database.Redis.Protocol(Reply(..))
import Database.Redis.Cluster(ShardMap(..), Node, Shard(..))
import qualified Database.Redis.Cluster as Cluster
import qualified Database.Redis.ConnectionContext as CC
--import qualified Database.Redis.Cluster.Pipeline as ClusterPipeline
import Database.Redis.Commands
    ( ping
    , select
    , auth
    , clusterSlots
    , command
    , ClusterSlotsResponse(..)
    , ClusterSlotsResponseEntry(..)
    , ClusterSlotsNode(..))

--------------------------------------------------------------------------------
-- Connection
--

-- |A threadsafe pool of network connections to a Redis server. Use the
--  'connect' function to create one.
data Connection
    = NonClusteredConnection (Pool PP.Connection)
    | ClusteredConnection (MVar ShardMap) (Pool Cluster.Connection)

-- |Information for connnecting to a Redis server.
--
-- It is recommended to not use the 'ConnInfo' data constructor directly.
-- Instead use 'defaultConnectInfo' and update it with record syntax. For
-- example to connect to a password protected Redis server running on localhost
-- and listening to the default port:
--
-- @
-- myConnectInfo :: ConnectInfo
-- myConnectInfo = defaultConnectInfo {connectAuth = Just \"secret\"}
-- @
--
data ConnectInfo = ConnInfo
    { ConnectInfo -> HostName
connectHost           :: NS.HostName
    -- ^ Ignored when 'connectPort' is a 'UnixSocket'
    , ConnectInfo -> PortID
connectPort           :: CC.PortID
    , ConnectInfo -> Maybe ByteString
connectAuth           :: Maybe B.ByteString
    -- ^ When the server is protected by a password, set 'connectAuth' to 'Just'
    --   the password. Each connection will then authenticate by the 'auth'
    --   command.
    , ConnectInfo -> Integer
connectDatabase       :: Integer
    -- ^ Each connection will 'select' the database with the given index.
    , ConnectInfo -> Port
connectMaxConnections :: Int
    -- ^ Maximum number of connections to keep open. The smallest acceptable
    --   value is 1.
    , ConnectInfo -> NominalDiffTime
connectMaxIdleTime    :: Time.NominalDiffTime
    -- ^ Amount of time for which an unused connection is kept open. The
    --   smallest acceptable value is 0.5 seconds. If the @timeout@ value in
    --   your redis.conf file is non-zero, it should be larger than
    --   'connectMaxIdleTime'.
    , ConnectInfo -> Maybe NominalDiffTime
connectTimeout        :: Maybe Time.NominalDiffTime
    -- ^ Optional timeout until connection to Redis gets
    --   established. 'ConnectTimeoutException' gets thrown if no socket
    --   get connected in this interval of time.
    , ConnectInfo -> Maybe ClientParams
connectTLSParams      :: Maybe ClientParams
    -- ^ Optional TLS parameters. TLS will be enabled if this is provided.
    } deriving Port -> ConnectInfo -> ShowS
[ConnectInfo] -> ShowS
ConnectInfo -> HostName
forall a.
(Port -> a -> ShowS) -> (a -> HostName) -> ([a] -> ShowS) -> Show a
showList :: [ConnectInfo] -> ShowS
$cshowList :: [ConnectInfo] -> ShowS
show :: ConnectInfo -> HostName
$cshow :: ConnectInfo -> HostName
showsPrec :: Port -> ConnectInfo -> ShowS
$cshowsPrec :: Port -> ConnectInfo -> ShowS
Show

data ConnectError = ConnectAuthError Reply
                  | ConnectSelectError Reply
    deriving (ConnectError -> ConnectError -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ConnectError -> ConnectError -> Bool
$c/= :: ConnectError -> ConnectError -> Bool
== :: ConnectError -> ConnectError -> Bool
$c== :: ConnectError -> ConnectError -> Bool
Eq, Port -> ConnectError -> ShowS
[ConnectError] -> ShowS
ConnectError -> HostName
forall a.
(Port -> a -> ShowS) -> (a -> HostName) -> ([a] -> ShowS) -> Show a
showList :: [ConnectError] -> ShowS
$cshowList :: [ConnectError] -> ShowS
show :: ConnectError -> HostName
$cshow :: ConnectError -> HostName
showsPrec :: Port -> ConnectError -> ShowS
$cshowsPrec :: Port -> ConnectError -> ShowS
Show, Typeable)

instance Exception ConnectError

-- |Default information for connecting:
--
-- @
--  connectHost           = \"localhost\"
--  connectPort           = PortNumber 6379 -- Redis default port
--  connectAuth           = Nothing         -- No password
--  connectDatabase       = 0               -- SELECT database 0
--  connectMaxConnections = 50              -- Up to 50 connections
--  connectMaxIdleTime    = 30              -- Keep open for 30 seconds
--  connectTimeout        = Nothing         -- Don't add timeout logic
--  connectTLSParams      = Nothing         -- Do not use TLS
-- @
--
defaultConnectInfo :: ConnectInfo
defaultConnectInfo :: ConnectInfo
defaultConnectInfo = ConnInfo
    { connectHost :: HostName
connectHost           = HostName
"localhost"
    , connectPort :: PortID
connectPort           = PortNumber -> PortID
CC.PortNumber PortNumber
6379
    , connectAuth :: Maybe ByteString
connectAuth           = forall a. Maybe a
Nothing
    , connectDatabase :: Integer
connectDatabase       = Integer
0
    , connectMaxConnections :: Port
connectMaxConnections = Port
50
    , connectMaxIdleTime :: NominalDiffTime
connectMaxIdleTime    = NominalDiffTime
30
    , connectTimeout :: Maybe NominalDiffTime
connectTimeout        = forall a. Maybe a
Nothing
    , connectTLSParams :: Maybe ClientParams
connectTLSParams      = forall a. Maybe a
Nothing
    }

createConnection :: ConnectInfo -> IO PP.Connection
createConnection :: ConnectInfo -> IO Connection
createConnection ConnInfo{Port
Integer
HostName
Maybe ByteString
Maybe NominalDiffTime
Maybe ClientParams
NominalDiffTime
PortID
connectTLSParams :: Maybe ClientParams
connectTimeout :: Maybe NominalDiffTime
connectMaxIdleTime :: NominalDiffTime
connectMaxConnections :: Port
connectDatabase :: Integer
connectAuth :: Maybe ByteString
connectPort :: PortID
connectHost :: HostName
connectTLSParams :: ConnectInfo -> Maybe ClientParams
connectTimeout :: ConnectInfo -> Maybe NominalDiffTime
connectMaxIdleTime :: ConnectInfo -> NominalDiffTime
connectMaxConnections :: ConnectInfo -> Port
connectDatabase :: ConnectInfo -> Integer
connectAuth :: ConnectInfo -> Maybe ByteString
connectPort :: ConnectInfo -> PortID
connectHost :: ConnectInfo -> HostName
..} = do
    let timeoutOptUs :: Maybe Port
timeoutOptUs =
          forall a b. (RealFrac a, Integral b) => a -> b
round forall b c a. (b -> c) -> (a -> b) -> a -> c
. (NominalDiffTime
1000000 forall a. Num a => a -> a -> a
*) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe NominalDiffTime
connectTimeout
    Connection
conn <- HostName -> PortID -> Maybe Port -> IO Connection
PP.connect HostName
connectHost PortID
connectPort Maybe Port
timeoutOptUs
    Connection
conn' <- case Maybe ClientParams
connectTLSParams of
               Maybe ClientParams
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return Connection
conn
               Just ClientParams
tlsParams -> ClientParams -> Connection -> IO Connection
PP.enableTLS ClientParams
tlsParams Connection
conn
    Connection -> IO ()
PP.beginReceiving Connection
conn'

    forall a. Connection -> Redis a -> IO a
runRedisInternal Connection
conn' forall a b. (a -> b) -> a -> b
$ do
        -- AUTH
        case Maybe ByteString
connectAuth of
            Maybe ByteString
Nothing   -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
            Just ByteString
pass -> do
              Either Reply Status
resp <- forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
ByteString -> m (f Status)
auth ByteString
pass
              case Either Reply Status
resp of
                Left Reply
r -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall e a. Exception e => e -> IO a
throwIO forall a b. (a -> b) -> a -> b
$ Reply -> ConnectError
ConnectAuthError Reply
r
                Either Reply Status
_      -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
        -- SELECT
        forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Integer
connectDatabase forall a. Eq a => a -> a -> Bool
/= Integer
0) forall a b. (a -> b) -> a -> b
$ do
          Either Reply Status
resp <- forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
Integer -> m (f Status)
select Integer
connectDatabase
          case Either Reply Status
resp of
              Left Reply
r -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall e a. Exception e => e -> IO a
throwIO forall a b. (a -> b) -> a -> b
$ Reply -> ConnectError
ConnectSelectError Reply
r
              Either Reply Status
_      -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
    forall (m :: * -> *) a. Monad m => a -> m a
return Connection
conn'

-- |Constructs a 'Connection' pool to a Redis server designated by the
--  given 'ConnectInfo'. The first connection is not actually established
--  until the first call to the server.
connect :: ConnectInfo -> IO Connection
connect :: ConnectInfo -> IO Connection
connect cInfo :: ConnectInfo
cInfo@ConnInfo{Port
Integer
HostName
Maybe ByteString
Maybe NominalDiffTime
Maybe ClientParams
NominalDiffTime
PortID
connectTLSParams :: Maybe ClientParams
connectTimeout :: Maybe NominalDiffTime
connectMaxIdleTime :: NominalDiffTime
connectMaxConnections :: Port
connectDatabase :: Integer
connectAuth :: Maybe ByteString
connectPort :: PortID
connectHost :: HostName
connectTLSParams :: ConnectInfo -> Maybe ClientParams
connectTimeout :: ConnectInfo -> Maybe NominalDiffTime
connectMaxIdleTime :: ConnectInfo -> NominalDiffTime
connectMaxConnections :: ConnectInfo -> Port
connectDatabase :: ConnectInfo -> Integer
connectAuth :: ConnectInfo -> Maybe ByteString
connectPort :: ConnectInfo -> PortID
connectHost :: ConnectInfo -> HostName
..} = Pool Connection -> Connection
NonClusteredConnection forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
    forall a.
IO a
-> (a -> IO ()) -> Port -> NominalDiffTime -> Port -> IO (Pool a)
createPool (ConnectInfo -> IO Connection
createConnection ConnectInfo
cInfo) Connection -> IO ()
PP.disconnect Port
1 NominalDiffTime
connectMaxIdleTime Port
connectMaxConnections

-- |Constructs a 'Connection' pool to a Redis server designated by the
--  given 'ConnectInfo', then tests if the server is actually there.
--  Throws an exception if the connection to the Redis server can't be
--  established.
checkedConnect :: ConnectInfo -> IO Connection
checkedConnect :: ConnectInfo -> IO Connection
checkedConnect ConnectInfo
connInfo = do
    Connection
conn <- ConnectInfo -> IO Connection
connect ConnectInfo
connInfo
    forall a. Connection -> Redis a -> IO a
runRedis Connection
conn forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a. Functor f => f a -> f ()
void forall (m :: * -> *) (f :: * -> *). RedisCtx m f => m (f Status)
ping
    forall (m :: * -> *) a. Monad m => a -> m a
return Connection
conn

-- |Destroy all idle resources in the pool.
disconnect :: Connection -> IO ()
disconnect :: Connection -> IO ()
disconnect (NonClusteredConnection Pool Connection
pool) = forall a. Pool a -> IO ()
destroyAllResources Pool Connection
pool
disconnect (ClusteredConnection MVar ShardMap
_ Pool Connection
pool) = forall a. Pool a -> IO ()
destroyAllResources Pool Connection
pool

-- | Memory bracket around 'connect' and 'disconnect'.
withConnect :: (Catch.MonadMask m, MonadIO m) => ConnectInfo -> (Connection -> m c) -> m c
withConnect :: forall (m :: * -> *) c.
(MonadMask m, MonadIO m) =>
ConnectInfo -> (Connection -> m c) -> m c
withConnect ConnectInfo
connInfo = forall (m :: * -> *) a c b.
MonadMask m =>
m a -> (a -> m c) -> (a -> m b) -> m b
Catch.bracket (forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ ConnectInfo -> IO Connection
connect ConnectInfo
connInfo) (forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connection -> IO ()
disconnect)

-- | Memory bracket around 'checkedConnect' and 'disconnect'
withCheckedConnect :: ConnectInfo -> (Connection -> IO c) -> IO c
withCheckedConnect :: forall c. ConnectInfo -> (Connection -> IO c) -> IO c
withCheckedConnect ConnectInfo
connInfo = forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket (ConnectInfo -> IO Connection
checkedConnect ConnectInfo
connInfo) Connection -> IO ()
disconnect

-- |Interact with a Redis datastore specified by the given 'Connection'.
--
--  Each call of 'runRedis' takes a network connection from the 'Connection'
--  pool and runs the given 'Redis' action. Calls to 'runRedis' may thus block
--  while all connections from the pool are in use.
runRedis :: Connection -> Redis a -> IO a
runRedis :: forall a. Connection -> Redis a -> IO a
runRedis (NonClusteredConnection Pool Connection
pool) Redis a
redis =
  forall a r. Pool a -> (a -> IO r) -> IO r
withResource Pool Connection
pool forall a b. (a -> b) -> a -> b
$ \Connection
conn -> forall a. Connection -> Redis a -> IO a
runRedisInternal Connection
conn Redis a
redis
runRedis (ClusteredConnection MVar ShardMap
_ Pool Connection
pool) Redis a
redis =
    forall a r. Pool a -> (a -> IO r) -> IO r
withResource Pool Connection
pool forall a b. (a -> b) -> a -> b
$ \Connection
conn -> forall a. Connection -> IO ShardMap -> Redis a -> IO a
runRedisClusteredInternal Connection
conn (Connection -> IO ShardMap
refreshShardMap Connection
conn) Redis a
redis

newtype ClusterConnectError = ClusterConnectError Reply
    deriving (ClusterConnectError -> ClusterConnectError -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ClusterConnectError -> ClusterConnectError -> Bool
$c/= :: ClusterConnectError -> ClusterConnectError -> Bool
== :: ClusterConnectError -> ClusterConnectError -> Bool
$c== :: ClusterConnectError -> ClusterConnectError -> Bool
Eq, Port -> ClusterConnectError -> ShowS
[ClusterConnectError] -> ShowS
ClusterConnectError -> HostName
forall a.
(Port -> a -> ShowS) -> (a -> HostName) -> ([a] -> ShowS) -> Show a
showList :: [ClusterConnectError] -> ShowS
$cshowList :: [ClusterConnectError] -> ShowS
show :: ClusterConnectError -> HostName
$cshow :: ClusterConnectError -> HostName
showsPrec :: Port -> ClusterConnectError -> ShowS
$cshowsPrec :: Port -> ClusterConnectError -> ShowS
Show, Typeable)

instance Exception ClusterConnectError

-- |Constructs a 'ShardMap' of connections to clustered nodes. The argument is
-- a 'ConnectInfo' for any node in the cluster
--
-- Some Redis commands are currently not supported in cluster mode
-- - CONFIG, AUTH
-- - SCAN
-- - MOVE, SELECT
-- - PUBLISH, SUBSCRIBE, PSUBSCRIBE, UNSUBSCRIBE, PUNSUBSCRIBE, RESET
connectCluster :: ConnectInfo -> IO Connection
connectCluster :: ConnectInfo -> IO Connection
connectCluster ConnectInfo
bootstrapConnInfo = do
    Connection
conn <- ConnectInfo -> IO Connection
createConnection ConnectInfo
bootstrapConnInfo
    Either Reply ClusterSlotsResponse
slotsResponse <- forall a. Connection -> Redis a -> IO a
runRedisInternal Connection
conn forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
m (f ClusterSlotsResponse)
clusterSlots
    MVar ShardMap
shardMapVar <- case Either Reply ClusterSlotsResponse
slotsResponse of
        Left Reply
e -> forall e a. Exception e => e -> IO a
throwIO forall a b. (a -> b) -> a -> b
$ Reply -> ClusterConnectError
ClusterConnectError Reply
e
        Right ClusterSlotsResponse
slots -> do
            ShardMap
shardMap <- ClusterSlotsResponse -> IO ShardMap
shardMapFromClusterSlotsResponse ClusterSlotsResponse
slots
            forall a. a -> IO (MVar a)
newMVar ShardMap
shardMap
    Either Reply [CommandInfo]
commandInfos <- forall a. Connection -> Redis a -> IO a
runRedisInternal Connection
conn forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
m (f [CommandInfo])
command
    case Either Reply [CommandInfo]
commandInfos of
        Left Reply
e -> forall e a. Exception e => e -> IO a
throwIO forall a b. (a -> b) -> a -> b
$ Reply -> ClusterConnectError
ClusterConnectError Reply
e
        Right [CommandInfo]
infos -> do
            Pool Connection
pool <- forall a.
IO a
-> (a -> IO ()) -> Port -> NominalDiffTime -> Port -> IO (Pool a)
createPool ([CommandInfo] -> MVar ShardMap -> Maybe Port -> IO Connection
Cluster.connect [CommandInfo]
infos MVar ShardMap
shardMapVar forall a. Maybe a
Nothing) Connection -> IO ()
Cluster.disconnect Port
1 (ConnectInfo -> NominalDiffTime
connectMaxIdleTime ConnectInfo
bootstrapConnInfo) (ConnectInfo -> Port
connectMaxConnections ConnectInfo
bootstrapConnInfo)
            forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ MVar ShardMap -> Pool Connection -> Connection
ClusteredConnection MVar ShardMap
shardMapVar Pool Connection
pool

shardMapFromClusterSlotsResponse :: ClusterSlotsResponse -> IO ShardMap
shardMapFromClusterSlotsResponse :: ClusterSlotsResponse -> IO ShardMap
shardMapFromClusterSlotsResponse ClusterSlotsResponse{[ClusterSlotsResponseEntry]
clusterSlotsResponseEntries :: ClusterSlotsResponse -> [ClusterSlotsResponseEntry]
clusterSlotsResponseEntries :: [ClusterSlotsResponseEntry]
..} = IntMap Shard -> ShardMap
ShardMap forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr ClusterSlotsResponseEntry -> IO (IntMap Shard) -> IO (IntMap Shard)
mkShardMap (forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. IntMap a
IntMap.empty)  [ClusterSlotsResponseEntry]
clusterSlotsResponseEntries where
    mkShardMap :: ClusterSlotsResponseEntry -> IO (IntMap.IntMap Shard) -> IO (IntMap.IntMap Shard)
    mkShardMap :: ClusterSlotsResponseEntry -> IO (IntMap Shard) -> IO (IntMap Shard)
mkShardMap ClusterSlotsResponseEntry{Port
[ClusterSlotsNode]
ClusterSlotsNode
clusterSlotsResponseEntryReplicas :: ClusterSlotsResponseEntry -> [ClusterSlotsNode]
clusterSlotsResponseEntryMaster :: ClusterSlotsResponseEntry -> ClusterSlotsNode
clusterSlotsResponseEntryEndSlot :: ClusterSlotsResponseEntry -> Port
clusterSlotsResponseEntryStartSlot :: ClusterSlotsResponseEntry -> Port
clusterSlotsResponseEntryReplicas :: [ClusterSlotsNode]
clusterSlotsResponseEntryMaster :: ClusterSlotsNode
clusterSlotsResponseEntryEndSlot :: Port
clusterSlotsResponseEntryStartSlot :: Port
..} IO (IntMap Shard)
accumulator = do
        IntMap Shard
accumulated <- IO (IntMap Shard)
accumulator
        let master :: Node
master = Bool -> ClusterSlotsNode -> Node
nodeFromClusterSlotNode Bool
True ClusterSlotsNode
clusterSlotsResponseEntryMaster
        let replicas :: [Node]
replicas = forall a b. (a -> b) -> [a] -> [b]
map (Bool -> ClusterSlotsNode -> Node
nodeFromClusterSlotNode Bool
False) [ClusterSlotsNode]
clusterSlotsResponseEntryReplicas
        let shard :: Shard
shard = Node -> [Node] -> Shard
Shard Node
master [Node]
replicas
        let slotMap :: IntMap Shard
slotMap = forall a. [(Port, a)] -> IntMap a
IntMap.fromList forall a b. (a -> b) -> a -> b
$ forall a b. (a -> b) -> [a] -> [b]
map (, Shard
shard) [Port
clusterSlotsResponseEntryStartSlot..Port
clusterSlotsResponseEntryEndSlot]
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. IntMap a -> IntMap a -> IntMap a
IntMap.union IntMap Shard
slotMap IntMap Shard
accumulated
    nodeFromClusterSlotNode :: Bool -> ClusterSlotsNode -> Node
    nodeFromClusterSlotNode :: Bool -> ClusterSlotsNode -> Node
nodeFromClusterSlotNode Bool
isMaster ClusterSlotsNode{Port
ByteString
clusterSlotsNodeID :: ClusterSlotsNode -> ByteString
clusterSlotsNodePort :: ClusterSlotsNode -> Port
clusterSlotsNodeIP :: ClusterSlotsNode -> ByteString
clusterSlotsNodeID :: ByteString
clusterSlotsNodePort :: Port
clusterSlotsNodeIP :: ByteString
..} =
        let hostname :: HostName
hostname = ByteString -> HostName
Char8.unpack ByteString
clusterSlotsNodeIP
            role :: NodeRole
role = if Bool
isMaster then NodeRole
Cluster.Master else NodeRole
Cluster.Slave
        in
            ByteString -> NodeRole -> HostName -> Port -> Node
Cluster.Node ByteString
clusterSlotsNodeID NodeRole
role HostName
hostname (forall a. Enum a => Port -> a
toEnum Port
clusterSlotsNodePort)

refreshShardMap :: Cluster.Connection -> IO ShardMap
refreshShardMap :: Connection -> IO ShardMap
refreshShardMap (Cluster.Connection HashMap ByteString NodeConnection
nodeConns MVar Pipeline
_ MVar ShardMap
_ InfoMap
_) = do
    let (Cluster.NodeConnection ConnectionContext
ctx IORef (Maybe ByteString)
_ ByteString
_) = forall a. [a] -> a
head forall a b. (a -> b) -> a -> b
$ forall k v. HashMap k v -> [v]
HM.elems HashMap ByteString NodeConnection
nodeConns
    Connection
pipelineConn <- ConnectionContext -> IO Connection
PP.fromCtx ConnectionContext
ctx
    ()
_ <- Connection -> IO ()
PP.beginReceiving Connection
pipelineConn
    Either Reply ClusterSlotsResponse
slotsResponse <- forall a. Connection -> Redis a -> IO a
runRedisInternal Connection
pipelineConn forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
m (f ClusterSlotsResponse)
clusterSlots
    case Either Reply ClusterSlotsResponse
slotsResponse of
        Left Reply
e -> forall e a. Exception e => e -> IO a
throwIO forall a b. (a -> b) -> a -> b
$ Reply -> ClusterConnectError
ClusterConnectError Reply
e
        Right ClusterSlotsResponse
slots -> ClusterSlotsResponse -> IO ShardMap
shardMapFromClusterSlotsResponse ClusterSlotsResponse
slots