{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE ViewPatterns #-}
module Database.Redis.Cluster
  ( Connection(..)
  , NodeRole(..)
  , NodeConnection(..)
  , Node(..)
  , ShardMap(..)
  , HashSlot
  , Shard(..)
  , connect
  , connectWith
  , disconnect
  , requestPipelined
  , nodes
  , hooks
  , requestMasterNodes
  , masterNodes
  , getRandomConnection
) where

import qualified Data.ByteString as B
import qualified Data.ByteString.Char8 as Char8
import qualified Data.IORef as IOR
import Data.List(nub, sortBy, find)
import Data.Maybe(mapMaybe, fromMaybe)
import Data.Map(fromListWith, assocs)
import Data.Function(on)
import Control.Exception(Exception, throwIO, BlockedIndefinitelyOnMVar(..), catches, Handler(..), bracketOnError)
import Control.Concurrent.MVar(MVar, newMVar, readMVar, modifyMVar, modifyMVar_)
import Control.Monad(zipWithM, when, replicateM, forM_)
import Database.Redis.Cluster.HashSlot(HashSlot, keyToSlot)
import qualified Database.Redis.ConnectionContext as CC
import qualified Data.HashMap.Strict as HM
import qualified Data.IntMap.Strict as IntMap
import qualified Scanner
import System.IO.Unsafe(unsafeInterleaveIO)

import Database.Redis.Protocol(Reply(..), renderRequest, reply)
import qualified Database.Redis.Cluster.Command as CMD
import Database.Redis.Hooks (Hooks)
import Network.TLS (ClientParams (..))

-- This module implements a clustered connection whilst maintaining
-- compatibility with the original Hedis codebase. In particular it still
-- performs implicit pipelining using `unsafeInterleaveIO` as the single node
-- codebase does. To achieve this each connection carries around with it a
-- pipeline of commands. Every time `sendRequest` is called the command is
-- added to the pipeline and an IO action is returned which will, upon being
-- evaluated, execute the entire pipeline. If the pipeline is already executed
-- then it just looks up it's response in the executed pipeline.

-- | A connection to a redis cluster, it is composed of a map from Node IDs to
-- | 'NodeConnection's, a 'Pipeline', and a 'ShardMap'
data Connection = Connection
  { Connection -> HashMap NodeID NodeConnection
connectionNodes :: HM.HashMap NodeID NodeConnection
  , Connection -> MVar Pipeline
connectionPipeline :: MVar Pipeline
  , Connection -> MVar ShardMap
connectionShardMap :: MVar ShardMap
  , Connection -> InfoMap
connectionInfoMap :: CMD.InfoMap
  , Connection -> Hooks
connectionHooks :: Hooks
  }

-- | A connection to a single node in the cluster, similar to 'ProtocolPipelining.Connection'
data NodeConnection = NodeConnection
  { NodeConnection -> ConnectionContext
nodeConnectionContext :: CC.ConnectionContext
  , NodeConnection -> IORef (Maybe NodeID)
nodeConnectionLastRecvRef :: IOR.IORef (Maybe B.ByteString)
  , NodeConnection -> NodeID
nodeConnectionNodeId :: NodeID
  }

instance Eq NodeConnection where
    NodeConnection{nodeConnectionNodeId :: NodeConnection -> NodeID
nodeConnectionNodeId=NodeID
id1} == :: NodeConnection -> NodeConnection -> Bool
== NodeConnection{nodeConnectionNodeId :: NodeConnection -> NodeID
nodeConnectionNodeId=NodeID
id2} = NodeID
id1 NodeID -> NodeID -> Bool
forall a. Eq a => a -> a -> Bool
== NodeID
id2

instance Ord NodeConnection where
    compare :: NodeConnection -> NodeConnection -> Ordering
compare NodeConnection{nodeConnectionNodeId :: NodeConnection -> NodeID
nodeConnectionNodeId=NodeID
id1} NodeConnection{nodeConnectionNodeId :: NodeConnection -> NodeID
nodeConnectionNodeId=NodeID
id2} = NodeID -> NodeID -> Ordering
forall a. Ord a => a -> a -> Ordering
compare NodeID
id1 NodeID
id2

data PipelineState =
      -- Nothing in the pipeline has been evaluated yet so nothing has been
      -- sent
      Pending [[B.ByteString]]
      -- This pipeline has been executed, the replies are contained within it
    | Executed [Reply]
      -- We're in a MULTI-EXEC transaction. All commands in the transaction
      -- should go to the same node, but we won't know what node that is until
      -- we see a command with a key. We're storing these transactions and will
      -- send them all together when we see an EXEC.
    | TransactionPending [[B.ByteString]]
-- A pipeline has an MVar for the current state, this state is actually always
-- `Pending` because the first thing the implementation does when executing a
-- pipeline is to take the current pipeline state out of the MVar and replace
-- it with a new `Pending` state. The executed state is held on to by the
-- replies within it.

newtype Pipeline = Pipeline (MVar PipelineState)

data NodeRole = Master | Slave deriving (Port -> NodeRole -> ShowS
[NodeRole] -> ShowS
NodeRole -> Host
(Port -> NodeRole -> ShowS)
-> (NodeRole -> Host) -> ([NodeRole] -> ShowS) -> Show NodeRole
forall a.
(Port -> a -> ShowS) -> (a -> Host) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Port -> NodeRole -> ShowS
showsPrec :: Port -> NodeRole -> ShowS
$cshow :: NodeRole -> Host
show :: NodeRole -> Host
$cshowList :: [NodeRole] -> ShowS
showList :: [NodeRole] -> ShowS
Show, NodeRole -> NodeRole -> Bool
(NodeRole -> NodeRole -> Bool)
-> (NodeRole -> NodeRole -> Bool) -> Eq NodeRole
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: NodeRole -> NodeRole -> Bool
== :: NodeRole -> NodeRole -> Bool
$c/= :: NodeRole -> NodeRole -> Bool
/= :: NodeRole -> NodeRole -> Bool
Eq, Eq NodeRole
Eq NodeRole =>
(NodeRole -> NodeRole -> Ordering)
-> (NodeRole -> NodeRole -> Bool)
-> (NodeRole -> NodeRole -> Bool)
-> (NodeRole -> NodeRole -> Bool)
-> (NodeRole -> NodeRole -> Bool)
-> (NodeRole -> NodeRole -> NodeRole)
-> (NodeRole -> NodeRole -> NodeRole)
-> Ord NodeRole
NodeRole -> NodeRole -> Bool
NodeRole -> NodeRole -> Ordering
NodeRole -> NodeRole -> NodeRole
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: NodeRole -> NodeRole -> Ordering
compare :: NodeRole -> NodeRole -> Ordering
$c< :: NodeRole -> NodeRole -> Bool
< :: NodeRole -> NodeRole -> Bool
$c<= :: NodeRole -> NodeRole -> Bool
<= :: NodeRole -> NodeRole -> Bool
$c> :: NodeRole -> NodeRole -> Bool
> :: NodeRole -> NodeRole -> Bool
$c>= :: NodeRole -> NodeRole -> Bool
>= :: NodeRole -> NodeRole -> Bool
$cmax :: NodeRole -> NodeRole -> NodeRole
max :: NodeRole -> NodeRole -> NodeRole
$cmin :: NodeRole -> NodeRole -> NodeRole
min :: NodeRole -> NodeRole -> NodeRole
Ord)

type Host = String
type Port = Int
type NodeID = B.ByteString

-- | Represents a single node, note that this type does not include the
-- connection to the node because the shard map can be shared amongst multiple
-- connections
data Node = Node
  { Node -> NodeID
nodeId :: NodeID
  , Node -> NodeRole
nodeRole :: NodeRole
  , Node -> Host
nodeHost :: Host
  , Node -> Port
nodePort :: Port
  } deriving (Port -> Node -> ShowS
[Node] -> ShowS
Node -> Host
(Port -> Node -> ShowS)
-> (Node -> Host) -> ([Node] -> ShowS) -> Show Node
forall a.
(Port -> a -> ShowS) -> (a -> Host) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Port -> Node -> ShowS
showsPrec :: Port -> Node -> ShowS
$cshow :: Node -> Host
show :: Node -> Host
$cshowList :: [Node] -> ShowS
showList :: [Node] -> ShowS
Show, Node -> Node -> Bool
(Node -> Node -> Bool) -> (Node -> Node -> Bool) -> Eq Node
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: Node -> Node -> Bool
== :: Node -> Node -> Bool
$c/= :: Node -> Node -> Bool
/= :: Node -> Node -> Bool
Eq, Eq Node
Eq Node =>
(Node -> Node -> Ordering)
-> (Node -> Node -> Bool)
-> (Node -> Node -> Bool)
-> (Node -> Node -> Bool)
-> (Node -> Node -> Bool)
-> (Node -> Node -> Node)
-> (Node -> Node -> Node)
-> Ord Node
Node -> Node -> Bool
Node -> Node -> Ordering
Node -> Node -> Node
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: Node -> Node -> Ordering
compare :: Node -> Node -> Ordering
$c< :: Node -> Node -> Bool
< :: Node -> Node -> Bool
$c<= :: Node -> Node -> Bool
<= :: Node -> Node -> Bool
$c> :: Node -> Node -> Bool
> :: Node -> Node -> Bool
$c>= :: Node -> Node -> Bool
>= :: Node -> Node -> Bool
$cmax :: Node -> Node -> Node
max :: Node -> Node -> Node
$cmin :: Node -> Node -> Node
min :: Node -> Node -> Node
Ord)

type MasterNode = Node
type SlaveNode = Node

-- | A 'shard' is a master node and 0 or more slaves, (the 'master', 'slave'
-- terminology is unfortunate but I felt it better to follow the documentation
-- until it changes).
data Shard = Shard
  { Shard -> Node
shardMaster :: MasterNode
  , Shard -> [Node]
shardSlaves :: [SlaveNode]
  } deriving (Port -> Shard -> ShowS
[Shard] -> ShowS
Shard -> Host
(Port -> Shard -> ShowS)
-> (Shard -> Host) -> ([Shard] -> ShowS) -> Show Shard
forall a.
(Port -> a -> ShowS) -> (a -> Host) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Port -> Shard -> ShowS
showsPrec :: Port -> Shard -> ShowS
$cshow :: Shard -> Host
show :: Shard -> Host
$cshowList :: [Shard] -> ShowS
showList :: [Shard] -> ShowS
Show, Shard -> Shard -> Bool
(Shard -> Shard -> Bool) -> (Shard -> Shard -> Bool) -> Eq Shard
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: Shard -> Shard -> Bool
== :: Shard -> Shard -> Bool
$c/= :: Shard -> Shard -> Bool
/= :: Shard -> Shard -> Bool
Eq, Eq Shard
Eq Shard =>
(Shard -> Shard -> Ordering)
-> (Shard -> Shard -> Bool)
-> (Shard -> Shard -> Bool)
-> (Shard -> Shard -> Bool)
-> (Shard -> Shard -> Bool)
-> (Shard -> Shard -> Shard)
-> (Shard -> Shard -> Shard)
-> Ord Shard
Shard -> Shard -> Bool
Shard -> Shard -> Ordering
Shard -> Shard -> Shard
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: Shard -> Shard -> Ordering
compare :: Shard -> Shard -> Ordering
$c< :: Shard -> Shard -> Bool
< :: Shard -> Shard -> Bool
$c<= :: Shard -> Shard -> Bool
<= :: Shard -> Shard -> Bool
$c> :: Shard -> Shard -> Bool
> :: Shard -> Shard -> Bool
$c>= :: Shard -> Shard -> Bool
>= :: Shard -> Shard -> Bool
$cmax :: Shard -> Shard -> Shard
max :: Shard -> Shard -> Shard
$cmin :: Shard -> Shard -> Shard
min :: Shard -> Shard -> Shard
Ord)

-- | A map from hashslot to shards
newtype ShardMap = ShardMap (IntMap.IntMap Shard) deriving (Port -> ShardMap -> ShowS
[ShardMap] -> ShowS
ShardMap -> Host
(Port -> ShardMap -> ShowS)
-> (ShardMap -> Host) -> ([ShardMap] -> ShowS) -> Show ShardMap
forall a.
(Port -> a -> ShowS) -> (a -> Host) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Port -> ShardMap -> ShowS
showsPrec :: Port -> ShardMap -> ShowS
$cshow :: ShardMap -> Host
show :: ShardMap -> Host
$cshowList :: [ShardMap] -> ShowS
showList :: [ShardMap] -> ShowS
Show)

newtype MissingNodeException = MissingNodeException [B.ByteString] deriving (Port -> MissingNodeException -> ShowS
[MissingNodeException] -> ShowS
MissingNodeException -> Host
(Port -> MissingNodeException -> ShowS)
-> (MissingNodeException -> Host)
-> ([MissingNodeException] -> ShowS)
-> Show MissingNodeException
forall a.
(Port -> a -> ShowS) -> (a -> Host) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Port -> MissingNodeException -> ShowS
showsPrec :: Port -> MissingNodeException -> ShowS
$cshow :: MissingNodeException -> Host
show :: MissingNodeException -> Host
$cshowList :: [MissingNodeException] -> ShowS
showList :: [MissingNodeException] -> ShowS
Show)
instance Exception MissingNodeException

newtype UnsupportedClusterCommandException = UnsupportedClusterCommandException [B.ByteString] deriving (Port -> UnsupportedClusterCommandException -> ShowS
[UnsupportedClusterCommandException] -> ShowS
UnsupportedClusterCommandException -> Host
(Port -> UnsupportedClusterCommandException -> ShowS)
-> (UnsupportedClusterCommandException -> Host)
-> ([UnsupportedClusterCommandException] -> ShowS)
-> Show UnsupportedClusterCommandException
forall a.
(Port -> a -> ShowS) -> (a -> Host) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Port -> UnsupportedClusterCommandException -> ShowS
showsPrec :: Port -> UnsupportedClusterCommandException -> ShowS
$cshow :: UnsupportedClusterCommandException -> Host
show :: UnsupportedClusterCommandException -> Host
$cshowList :: [UnsupportedClusterCommandException] -> ShowS
showList :: [UnsupportedClusterCommandException] -> ShowS
Show)
instance Exception UnsupportedClusterCommandException

newtype CrossSlotException = CrossSlotException [[B.ByteString]] deriving (Port -> CrossSlotException -> ShowS
[CrossSlotException] -> ShowS
CrossSlotException -> Host
(Port -> CrossSlotException -> ShowS)
-> (CrossSlotException -> Host)
-> ([CrossSlotException] -> ShowS)
-> Show CrossSlotException
forall a.
(Port -> a -> ShowS) -> (a -> Host) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Port -> CrossSlotException -> ShowS
showsPrec :: Port -> CrossSlotException -> ShowS
$cshow :: CrossSlotException -> Host
show :: CrossSlotException -> Host
$cshowList :: [CrossSlotException] -> ShowS
showList :: [CrossSlotException] -> ShowS
Show)
instance Exception CrossSlotException

data ClusterAuthError = ClusterAuthError Host Port Reply deriving (Port -> ClusterAuthError -> ShowS
[ClusterAuthError] -> ShowS
ClusterAuthError -> Host
(Port -> ClusterAuthError -> ShowS)
-> (ClusterAuthError -> Host)
-> ([ClusterAuthError] -> ShowS)
-> Show ClusterAuthError
forall a.
(Port -> a -> ShowS) -> (a -> Host) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Port -> ClusterAuthError -> ShowS
showsPrec :: Port -> ClusterAuthError -> ShowS
$cshow :: ClusterAuthError -> Host
show :: ClusterAuthError -> Host
$cshowList :: [ClusterAuthError] -> ShowS
showList :: [ClusterAuthError] -> ShowS
Show)
instance Exception ClusterAuthError

-- | Backwards compatible version of connect that can't provide authentication or TLS parameters.
{-# DEPRECATED connect "Use connectWith instead, passing Nothing for the parameters you don't need." #-}
connect :: [CMD.CommandInfo] -> MVar ShardMap -> Maybe Int -> Hooks -> IO Connection
connect :: [CommandInfo]
-> MVar ShardMap -> Maybe Port -> Hooks -> IO Connection
connect = Maybe NodeID
-> Maybe NodeID
-> Maybe ClientParams
-> [CommandInfo]
-> MVar ShardMap
-> Maybe Port
-> Hooks
-> IO Connection
connectWith Maybe NodeID
forall a. Maybe a
Nothing Maybe NodeID
forall a. Maybe a
Nothing Maybe ClientParams
forall a. Maybe a
Nothing

-- | Connects to cluster.
connectWith :: Maybe B.ByteString -> Maybe B.ByteString -> Maybe ClientParams -> [CMD.CommandInfo] -> MVar ShardMap -> Maybe Int -> Hooks -> IO Connection
connectWith :: Maybe NodeID
-> Maybe NodeID
-> Maybe ClientParams
-> [CommandInfo]
-> MVar ShardMap
-> Maybe Port
-> Hooks
-> IO Connection
connectWith Maybe NodeID
mUsername Maybe NodeID
mPassword Maybe ClientParams
mTlsParams [CommandInfo]
commandInfos MVar ShardMap
shardMapVar Maybe Port
timeoutOpt Hooks
hooks' = do
        shardMap <- MVar ShardMap -> IO ShardMap
forall a. MVar a -> IO a
readMVar MVar ShardMap
shardMapVar
        stateVar <- newMVar $ Pending []
        pipelineVar <- newMVar $ Pipeline stateVar
        nodeConns <- nodeConnections shardMap
        return $ Connection nodeConns pipelineVar shardMapVar (CMD.newInfoMap commandInfos) hooks' where
    nodeConnections :: ShardMap -> IO (HM.HashMap NodeID NodeConnection)
    nodeConnections :: ShardMap -> IO (HashMap NodeID NodeConnection)
nodeConnections ShardMap
shardMap = [(NodeID, NodeConnection)] -> HashMap NodeID NodeConnection
forall k v. Hashable k => [(k, v)] -> HashMap k v
HM.fromList ([(NodeID, NodeConnection)] -> HashMap NodeID NodeConnection)
-> IO [(NodeID, NodeConnection)]
-> IO (HashMap NodeID NodeConnection)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Node] -> IO [(NodeID, NodeConnection)]
connectNodes ([Node] -> [Node]
forall a. Eq a => [a] -> [a]
nub ([Node] -> [Node]) -> [Node] -> [Node]
forall a b. (a -> b) -> a -> b
$ ShardMap -> [Node]
nodes ShardMap
shardMap)
    connectNodes :: [Node] -> IO [(NodeID, NodeConnection)]
    connectNodes :: [Node] -> IO [(NodeID, NodeConnection)]
connectNodes [] = [(NodeID, NodeConnection)] -> IO [(NodeID, NodeConnection)]
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return []
    connectNodes (z :: Node
z@Node{nodeHost :: Node -> Host
nodeHost = Host
host, nodePort :: Node -> Port
nodePort = Port
port}:[Node]
ns) = do
        IO ConnectionContext
-> (ConnectionContext -> IO ())
-> (ConnectionContext -> IO [(NodeID, NodeConnection)])
-> IO [(NodeID, NodeConnection)]
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracketOnError
          (ConnectAddr
-> Maybe Port -> Maybe ClientParams -> IO ConnectionContext
CC.connect (Host -> PortNumber -> ConnectAddr
CC.ConnectAddrHostPort Host
host (PortNumber -> ConnectAddr) -> PortNumber -> ConnectAddr
forall a b. (a -> b) -> a -> b
$ Port -> PortNumber
forall a. Enum a => Port -> a
toEnum Port
port) Maybe Port
timeoutOpt Maybe ClientParams
mTlsParams)
          (ConnectionContext -> IO ()
CC.disconnect) ((ConnectionContext -> IO [(NodeID, NodeConnection)])
 -> IO [(NodeID, NodeConnection)])
-> (ConnectionContext -> IO [(NodeID, NodeConnection)])
-> IO [(NodeID, NodeConnection)]
forall a b. (a -> b) -> a -> b
$ \ConnectionContext
ctx0 -> do
            nodeConn <- Node -> ConnectionContext -> IO (NodeID, NodeConnection)
connectNode Node
z ConnectionContext
ctx0
            rest <- connectNodes ns
            return $ nodeConn : rest
    connectNode :: Node -> CC.ConnectionContext -> IO (NodeID, NodeConnection)
    connectNode :: Node -> ConnectionContext -> IO (NodeID, NodeConnection)
connectNode Node{nodeId :: Node -> NodeID
nodeId = NodeID
n, nodeHost :: Node -> Host
nodeHost = Host
host, nodePort :: Node -> Port
nodePort = Port
port} ConnectionContext
ctx0 = do
        ref <- Maybe NodeID -> IO (IORef (Maybe NodeID))
forall a. a -> IO (IORef a)
IOR.newIORef Maybe NodeID
forall a. Maybe a
Nothing
        let nodeConn = ConnectionContext
-> IORef (Maybe NodeID) -> NodeID -> NodeConnection
NodeConnection ConnectionContext
ctx0 IORef (Maybe NodeID)
ref NodeID
n
        forM_ mPassword $ \NodeID
password -> do
            let reqOpts :: [NodeID]
reqOpts = [NodeID] -> (NodeID -> [NodeID]) -> Maybe NodeID -> [NodeID]
forall b a. b -> (a -> b) -> Maybe a -> b
maybe [NodeID
password] (NodeID -> [NodeID] -> [NodeID]
forall a. a -> [a] -> [a]
:[NodeID
password]) Maybe NodeID
mUsername
            authReply <- NodeConnection -> [NodeID] -> IO Reply
requestNode1 NodeConnection
nodeConn ( [NodeID
"AUTH"] [NodeID] -> [NodeID] -> [NodeID]
forall a. Semigroup a => a -> a -> a
<> [NodeID]
reqOpts )
            case authReply of
              SingleLine NodeID
"OK" -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
              Reply
_ -> ClusterAuthError -> IO ()
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO (ClusterAuthError -> IO ()) -> ClusterAuthError -> IO ()
forall a b. (a -> b) -> a -> b
$ Host -> Port -> Reply -> ClusterAuthError
ClusterAuthError Host
host Port
port Reply
authReply
        return (n, nodeConn)

disconnect :: Connection -> IO ()
disconnect :: Connection -> IO ()
disconnect Connection{connectionNodes :: Connection -> HashMap NodeID NodeConnection
connectionNodes=HashMap NodeID NodeConnection
nodeConnMap} = (NodeConnection -> IO ()) -> [NodeConnection] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ NodeConnection -> IO ()
disconnectNode (HashMap NodeID NodeConnection -> [NodeConnection]
forall k v. HashMap k v -> [v]
HM.elems HashMap NodeID NodeConnection
nodeConnMap) where
    disconnectNode :: NodeConnection -> IO ()
disconnectNode (NodeConnection ConnectionContext
nodeCtx IORef (Maybe NodeID)
_ NodeID
_) = ConnectionContext -> IO ()
CC.disconnect ConnectionContext
nodeCtx

-- Add a request to the current pipeline for this connection. The pipeline will
-- be executed implicitly as soon as any result returned from this function is
-- evaluated.
requestPipelined :: IO ShardMap -> Connection -> [B.ByteString] -> IO Reply
requestPipelined :: IO ShardMap -> Connection -> [NodeID] -> IO Reply
requestPipelined IO ShardMap
refreshAction conn :: Connection
conn@Connection{connectionPipeline :: Connection -> MVar Pipeline
connectionPipeline=MVar Pipeline
pipelineVar, connectionShardMap :: Connection -> MVar ShardMap
connectionShardMap=MVar ShardMap
shardMapVar} [NodeID]
nextRequest = MVar Pipeline -> (Pipeline -> IO (Pipeline, Reply)) -> IO Reply
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar MVar Pipeline
pipelineVar ((Pipeline -> IO (Pipeline, Reply)) -> IO Reply)
-> (Pipeline -> IO (Pipeline, Reply)) -> IO Reply
forall a b. (a -> b) -> a -> b
$ \(Pipeline MVar PipelineState
stateVar) -> do
    (newStateVar, repliesIndex) <- IO (MVar PipelineState, Port) -> IO (MVar PipelineState, Port)
forall a. IO a -> IO a
hasLocked (IO (MVar PipelineState, Port) -> IO (MVar PipelineState, Port))
-> IO (MVar PipelineState, Port) -> IO (MVar PipelineState, Port)
forall a b. (a -> b) -> a -> b
$ MVar PipelineState
-> (PipelineState
    -> IO (PipelineState, (MVar PipelineState, Port)))
-> IO (MVar PipelineState, Port)
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar MVar PipelineState
stateVar ((PipelineState -> IO (PipelineState, (MVar PipelineState, Port)))
 -> IO (MVar PipelineState, Port))
-> (PipelineState
    -> IO (PipelineState, (MVar PipelineState, Port)))
-> IO (MVar PipelineState, Port)
forall a b. (a -> b) -> a -> b
$ \case
        Pending [[NodeID]]
requests | [NodeID] -> Bool
isMulti [NodeID]
nextRequest -> do
            replies <- MVar ShardMap
-> IO ShardMap -> Connection -> [[NodeID]] -> IO [Reply]
evaluatePipeline MVar ShardMap
shardMapVar IO ShardMap
refreshAction Connection
conn [[NodeID]]
requests
            s' <- newMVar $ TransactionPending [nextRequest]
            return (Executed replies, (s', 0))
        Pending [[NodeID]]
requests | [[NodeID]] -> Port
forall a. [a] -> Port
forall (t :: * -> *) a. Foldable t => t a -> Port
length [[NodeID]]
requests Port -> Port -> Bool
forall a. Ord a => a -> a -> Bool
> Port
1000 -> do
            replies <- MVar ShardMap
-> IO ShardMap -> Connection -> [[NodeID]] -> IO [Reply]
evaluatePipeline MVar ShardMap
shardMapVar IO ShardMap
refreshAction Connection
conn ([NodeID]
nextRequest[NodeID] -> [[NodeID]] -> [[NodeID]]
forall a. a -> [a] -> [a]
:[[NodeID]]
requests)
            return (Executed replies, (stateVar, length requests))
        Pending [[NodeID]]
requests ->
            (PipelineState, (MVar PipelineState, Port))
-> IO (PipelineState, (MVar PipelineState, Port))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ([[NodeID]] -> PipelineState
Pending ([NodeID]
nextRequest[NodeID] -> [[NodeID]] -> [[NodeID]]
forall a. a -> [a] -> [a]
:[[NodeID]]
requests), (MVar PipelineState
stateVar, [[NodeID]] -> Port
forall a. [a] -> Port
forall (t :: * -> *) a. Foldable t => t a -> Port
length [[NodeID]]
requests))
        TransactionPending [[NodeID]]
requests ->
            if [NodeID] -> Bool
isExec [NodeID]
nextRequest then do
              replies <- MVar ShardMap
-> IO ShardMap -> Connection -> [[NodeID]] -> IO [Reply]
evaluateTransactionPipeline MVar ShardMap
shardMapVar IO ShardMap
refreshAction Connection
conn ([NodeID]
nextRequest[NodeID] -> [[NodeID]] -> [[NodeID]]
forall a. a -> [a] -> [a]
:[[NodeID]]
requests)
              return (Executed replies, (stateVar, length requests))
            else
              (PipelineState, (MVar PipelineState, Port))
-> IO (PipelineState, (MVar PipelineState, Port))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ([[NodeID]] -> PipelineState
TransactionPending ([NodeID]
nextRequest[NodeID] -> [[NodeID]] -> [[NodeID]]
forall a. a -> [a] -> [a]
:[[NodeID]]
requests), (MVar PipelineState
stateVar, [[NodeID]] -> Port
forall a. [a] -> Port
forall (t :: * -> *) a. Foldable t => t a -> Port
length [[NodeID]]
requests))
        e :: PipelineState
e@(Executed [Reply]
_) -> do
            s' <- PipelineState -> IO (MVar PipelineState)
forall a. a -> IO (MVar a)
newMVar (PipelineState -> IO (MVar PipelineState))
-> PipelineState -> IO (MVar PipelineState)
forall a b. (a -> b) -> a -> b
$
                    if [NodeID] -> Bool
isMulti [NodeID]
nextRequest then
                        [[NodeID]] -> PipelineState
TransactionPending [[NodeID]
nextRequest]
                    else
                        [[NodeID]] -> PipelineState
Pending [[NodeID]
nextRequest]
            return (e, (s', 0))
    evaluateAction <- unsafeInterleaveIO $ do
        replies <- hasLocked $ modifyMVar newStateVar $ \case
            Executed [Reply]
replies ->
                (PipelineState, [Reply]) -> IO (PipelineState, [Reply])
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ([Reply] -> PipelineState
Executed [Reply]
replies, [Reply]
replies)
            Pending [[NodeID]]
requests-> do
                replies <- MVar ShardMap
-> IO ShardMap -> Connection -> [[NodeID]] -> IO [Reply]
evaluatePipeline MVar ShardMap
shardMapVar IO ShardMap
refreshAction Connection
conn [[NodeID]]
requests
                return (Executed replies, replies)
            TransactionPending [[NodeID]]
requests-> do
                replies <- MVar ShardMap
-> IO ShardMap -> Connection -> [[NodeID]] -> IO [Reply]
evaluateTransactionPipeline MVar ShardMap
shardMapVar IO ShardMap
refreshAction Connection
conn [[NodeID]]
requests
                return (Executed replies, replies)
        return $ replies !! repliesIndex
    return (Pipeline newStateVar, evaluateAction)

isMulti :: [B.ByteString] -> Bool
isMulti :: [NodeID] -> Bool
isMulti (NodeID
"MULTI" : [NodeID]
_) = Bool
True
isMulti [NodeID]
_ = Bool
False

isExec :: [B.ByteString] -> Bool
isExec :: [NodeID] -> Bool
isExec (NodeID
"EXEC" : [NodeID]
_) = Bool
True
isExec [NodeID]
_ = Bool
False

data PendingRequest = PendingRequest Int [B.ByteString]
data CompletedRequest = CompletedRequest Int [B.ByteString] Reply

rawRequest :: PendingRequest -> [B.ByteString]
rawRequest :: PendingRequest -> [NodeID]
rawRequest (PendingRequest Port
_ [NodeID]
r) =  [NodeID]
r

responseIndex :: CompletedRequest -> Int
responseIndex :: CompletedRequest -> Port
responseIndex (CompletedRequest Port
i [NodeID]
_ Reply
_) = Port
i

rawResponse :: CompletedRequest -> Reply
rawResponse :: CompletedRequest -> Reply
rawResponse (CompletedRequest Port
_ [NodeID]
_ Reply
r) = Reply
r

-- The approach we take here is similar to that taken by the redis-py-cluster
-- library, which is described at https://redis-py-cluster.readthedocs.io/en/master/pipelines.html
--
-- Essentially we group all the commands by node (based on the current shardmap)
-- and then execute a pipeline for each node (maintaining the order of commands
-- on a per node basis but not between nodes). Once we've done this, if any of
-- the commands have resulted in a MOVED error we refresh the shard map, then
-- we run through all the responses and retry any MOVED or ASK errors. This retry
-- step is not pipelined, there is a request per error. This is probably
-- acceptable in most cases as these errors should only occur in the case of
-- cluster reconfiguration events, which should be rare.
evaluatePipeline :: MVar ShardMap -> IO ShardMap -> Connection -> [[B.ByteString]] -> IO [Reply]
evaluatePipeline :: MVar ShardMap
-> IO ShardMap -> Connection -> [[NodeID]] -> IO [Reply]
evaluatePipeline MVar ShardMap
shardMapVar IO ShardMap
refreshShardmapAction Connection
conn [[NodeID]]
requests = do
        shardMap <- IO ShardMap -> IO ShardMap
forall a. IO a -> IO a
hasLocked (IO ShardMap -> IO ShardMap) -> IO ShardMap -> IO ShardMap
forall a b. (a -> b) -> a -> b
$ MVar ShardMap -> IO ShardMap
forall a. MVar a -> IO a
readMVar MVar ShardMap
shardMapVar
        requestsByNode <- getRequestsByNode shardMap
        resps <- concat <$> mapM (uncurry executeRequests) requestsByNode
        when (any (moved . rawResponse) resps) refreshShardMapVar
        retriedResps <- mapM (retry 0) resps
        return $ map rawResponse $ sortBy (on compare responseIndex) retriedResps
  where
    getRequestsByNode :: ShardMap -> IO [(NodeConnection, [PendingRequest])]
    getRequestsByNode :: ShardMap -> IO [(NodeConnection, [PendingRequest])]
getRequestsByNode ShardMap
shardMap = do
        commandsWithNodes <- (Port -> [NodeID] -> IO [(NodeConnection, [PendingRequest])])
-> [Port]
-> [[NodeID]]
-> IO [[(NodeConnection, [PendingRequest])]]
forall (m :: * -> *) a b c.
Applicative m =>
(a -> b -> m c) -> [a] -> [b] -> m [c]
zipWithM (ShardMap
-> Port -> [NodeID] -> IO [(NodeConnection, [PendingRequest])]
requestWithNodes ShardMap
shardMap) ([Port] -> [Port]
forall a. [a] -> [a]
reverse [Port
0..([[NodeID]] -> Port
forall a. [a] -> Port
forall (t :: * -> *) a. Foldable t => t a -> Port
length [[NodeID]]
requests Port -> Port -> Port
forall a. Num a => a -> a -> a
- Port
1)]) [[NodeID]]
requests
        return $ assocs $ fromListWith (++) (mconcat commandsWithNodes)
    requestWithNodes :: ShardMap -> Int -> [B.ByteString] -> IO [(NodeConnection, [PendingRequest])]
    requestWithNodes :: ShardMap
-> Port -> [NodeID] -> IO [(NodeConnection, [PendingRequest])]
requestWithNodes ShardMap
shardMap Port
index [NodeID]
request = do
        nodeConns <- Connection -> ShardMap -> [NodeID] -> IO [NodeConnection]
nodeConnectionForCommand Connection
conn ShardMap
shardMap [NodeID]
request
        return $ (, [PendingRequest index request]) <$> nodeConns
    executeRequests :: NodeConnection -> [PendingRequest] -> IO [CompletedRequest]
    executeRequests :: NodeConnection -> [PendingRequest] -> IO [CompletedRequest]
executeRequests NodeConnection
nodeConn [PendingRequest]
nodeRequests = do
        replies <- NodeConnection -> [[NodeID]] -> IO [Reply]
requestNode NodeConnection
nodeConn ([[NodeID]] -> IO [Reply]) -> [[NodeID]] -> IO [Reply]
forall a b. (a -> b) -> a -> b
$ (PendingRequest -> [NodeID]) -> [PendingRequest] -> [[NodeID]]
forall a b. (a -> b) -> [a] -> [b]
map PendingRequest -> [NodeID]
rawRequest [PendingRequest]
nodeRequests
        return $ zipWith (curry (\(PendingRequest Port
i [NodeID]
r, Reply
rep) -> Port -> [NodeID] -> Reply -> CompletedRequest
CompletedRequest Port
i [NodeID]
r Reply
rep)) nodeRequests replies
    retry :: Int -> CompletedRequest -> IO CompletedRequest
    retry :: Port -> CompletedRequest -> IO CompletedRequest
retry Port
retryCount (CompletedRequest Port
index [NodeID]
request Reply
thisReply) = do
        retryReply <- [Reply] -> Reply
forall a. HasCallStack => [a] -> a
head ([Reply] -> Reply) -> IO [Reply] -> IO Reply
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MVar ShardMap
-> IO ShardMap
-> Connection
-> Port
-> [[NodeID]]
-> [Reply]
-> IO [Reply]
retryBatch MVar ShardMap
shardMapVar IO ShardMap
refreshShardmapAction Connection
conn Port
retryCount [[NodeID]
request] [Reply
thisReply]
        return (CompletedRequest index request retryReply)
    refreshShardMapVar :: IO ()
    refreshShardMapVar :: IO ()
refreshShardMapVar = IO () -> IO ()
forall a. IO a -> IO a
hasLocked (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar ShardMap -> (ShardMap -> IO ShardMap) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar ShardMap
shardMapVar (IO ShardMap -> ShardMap -> IO ShardMap
forall a b. a -> b -> a
const IO ShardMap
refreshShardmapAction)

-- Retry a batch of requests if any of the responses is a redirect instruction.
-- If multiple requests are passed in they're assumed to be a MULTI..EXEC
-- transaction and will all be retried.
retryBatch :: MVar ShardMap -> IO ShardMap -> Connection -> Int -> [[B.ByteString]] -> [Reply] -> IO [Reply]
retryBatch :: MVar ShardMap
-> IO ShardMap
-> Connection
-> Port
-> [[NodeID]]
-> [Reply]
-> IO [Reply]
retryBatch MVar ShardMap
shardMapVar IO ShardMap
refreshShardmapAction Connection
conn Port
retryCount [[NodeID]]
requests [Reply]
replies =
    -- The last reply will be the `EXEC` reply containing the redirection, if
    -- there is one.
    case [Reply] -> Reply
forall a. HasCallStack => [a] -> a
last [Reply]
replies of
        (Error NodeID
errString) | NodeID -> NodeID -> Bool
B.isPrefixOf NodeID
"MOVED" NodeID
errString -> do
            let Connection{connectionInfoMap :: Connection -> InfoMap
connectionInfoMap=InfoMap
infoMap} = Connection
conn
            keys <- [[NodeID]] -> [NodeID]
forall a. Monoid a => [a] -> a
mconcat ([[NodeID]] -> [NodeID]) -> IO [[NodeID]] -> IO [NodeID]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ([NodeID] -> IO [NodeID]) -> [[NodeID]] -> IO [[NodeID]]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (InfoMap -> [NodeID] -> IO [NodeID]
requestKeys InfoMap
infoMap) [[NodeID]]
requests
            hashSlot <- hashSlotForKeys (CrossSlotException requests) keys
            nodeConn <- nodeConnForHashSlot shardMapVar conn (MissingNodeException (head requests)) hashSlot
            requestNode nodeConn requests
        (Reply -> Maybe (Host, Port)
askingRedirection -> Just (Host
host, Port
port)) -> do
            shardMap <- IO ShardMap -> IO ShardMap
forall a. IO a -> IO a
hasLocked (IO ShardMap -> IO ShardMap) -> IO ShardMap -> IO ShardMap
forall a b. (a -> b) -> a -> b
$ MVar ShardMap -> IO ShardMap
forall a. MVar a -> IO a
readMVar MVar ShardMap
shardMapVar
            let maybeAskNode = ShardMap -> Connection -> Host -> Port -> Maybe NodeConnection
nodeConnWithHostAndPort ShardMap
shardMap Connection
conn Host
host Port
port
            case maybeAskNode of
                Just NodeConnection
askNode -> [Reply] -> [Reply]
forall a. HasCallStack => [a] -> [a]
tail ([Reply] -> [Reply]) -> IO [Reply] -> IO [Reply]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> NodeConnection -> [[NodeID]] -> IO [Reply]
requestNode NodeConnection
askNode ([NodeID
"ASKING"] [NodeID] -> [[NodeID]] -> [[NodeID]]
forall a. a -> [a] -> [a]
: [[NodeID]]
requests)
                Maybe NodeConnection
Nothing -> case Port
retryCount of
                    Port
0 -> do
                        _ <- IO () -> IO ()
forall a. IO a -> IO a
hasLocked (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar ShardMap -> (ShardMap -> IO ShardMap) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar ShardMap
shardMapVar (IO ShardMap -> ShardMap -> IO ShardMap
forall a b. a -> b -> a
const IO ShardMap
refreshShardmapAction)
                        retryBatch shardMapVar refreshShardmapAction conn (retryCount + 1) requests replies
                    Port
_ -> MissingNodeException -> IO [Reply]
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO (MissingNodeException -> IO [Reply])
-> MissingNodeException -> IO [Reply]
forall a b. (a -> b) -> a -> b
$ [NodeID] -> MissingNodeException
MissingNodeException ([[NodeID]] -> [NodeID]
forall a. HasCallStack => [a] -> a
head [[NodeID]]
requests)
        Reply
_ -> [Reply] -> IO [Reply]
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return [Reply]
replies

-- Like `evaluateOnPipeline`, except we expect to be able to run all commands
-- on a single shard. Failing to meet this expectation is an error.
evaluateTransactionPipeline :: MVar ShardMap -> IO ShardMap -> Connection -> [[B.ByteString]] -> IO [Reply]
evaluateTransactionPipeline :: MVar ShardMap
-> IO ShardMap -> Connection -> [[NodeID]] -> IO [Reply]
evaluateTransactionPipeline MVar ShardMap
shardMapVar IO ShardMap
refreshShardmapAction Connection
conn [[NodeID]]
requests' = do
    let requests :: [[NodeID]]
requests = [[NodeID]] -> [[NodeID]]
forall a. [a] -> [a]
reverse [[NodeID]]
requests'
    let Connection{connectionInfoMap :: Connection -> InfoMap
connectionInfoMap=InfoMap
infoMap} = Connection
conn
    keys <- [[NodeID]] -> [NodeID]
forall a. Monoid a => [a] -> a
mconcat ([[NodeID]] -> [NodeID]) -> IO [[NodeID]] -> IO [NodeID]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ([NodeID] -> IO [NodeID]) -> [[NodeID]] -> IO [[NodeID]]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (InfoMap -> [NodeID] -> IO [NodeID]
requestKeys InfoMap
infoMap) [[NodeID]]
requests
    -- In cluster mode Redis expects commands in transactions to all work on the
    -- same hashslot. We find that hashslot here.
    -- We could be more permissive and allow transactions that touch multiple
    -- hashslots, as long as those hashslots are on the same node. This allows
    -- a new failure case though: if some of the transactions hashslots are
    -- moved to a different node we could end up in a situation where some of
    -- the commands in a transaction are applied and some are not. Better to
    -- fail early.
    hashSlot <- hashSlotForKeys (CrossSlotException requests) keys
    nodeConn <- nodeConnForHashSlot shardMapVar conn (MissingNodeException (head requests)) hashSlot
    resps <- requestNode nodeConn requests
    -- The Redis documentation has the following to say on the effect of
    -- resharding on multi-key operations:
    --
    --     Multi-key operations may become unavailable when a resharding of the
    --     hash slot the keys belong to is in progress.
    --
    --     More specifically, even during a resharding the multi-key operations
    --     targeting keys that all exist and all still hash to the same slot
    --     (either the source or destination node) are still available.
    --
    --     Operations on keys that don't exist or are - during the resharding -
    --     split between the source and destination nodes, will generate a
    --     -TRYAGAIN error. The client can try the operation after some time,
    --     or report back the error.
    --
    --     https://redis.io/topics/cluster-spec#multiple-keys-operations
    --
    -- An important take-away here is that MULTI..EXEC transactions can fail
    -- with a redirect in which case we need to repeat the full transaction on
    -- the node we're redirected too.
    --
    -- A second important takeway is that MULTI..EXEC transactions might
    -- temporarily fail during resharding with a -TRYAGAIN error. We can only
    -- make arbitrary decisions about how long to paus before the retry and how
    -- often to retry, so instead we'll propagate the error to the library user
    -- and let them decide how they would like to handle the error.
    when (any moved resps)
      (hasLocked $ modifyMVar_ shardMapVar (const refreshShardmapAction))
    retriedResps <- retryBatch shardMapVar refreshShardmapAction conn 0 requests resps
    return retriedResps

nodeConnForHashSlot :: Exception e => MVar ShardMap -> Connection -> e -> HashSlot -> IO NodeConnection
nodeConnForHashSlot :: forall e.
Exception e =>
MVar ShardMap -> Connection -> e -> HashSlot -> IO NodeConnection
nodeConnForHashSlot MVar ShardMap
shardMapVar Connection
conn e
exception HashSlot
hashSlot = do
    let Connection{connectionNodes :: Connection -> HashMap NodeID NodeConnection
connectionNodes=HashMap NodeID NodeConnection
nodeConns} = Connection
conn
    (ShardMap shardMap) <- IO ShardMap -> IO ShardMap
forall a. IO a -> IO a
hasLocked (IO ShardMap -> IO ShardMap) -> IO ShardMap -> IO ShardMap
forall a b. (a -> b) -> a -> b
$ MVar ShardMap -> IO ShardMap
forall a. MVar a -> IO a
readMVar MVar ShardMap
shardMapVar
    node <-
        case IntMap.lookup (fromEnum hashSlot) shardMap of
            Maybe Shard
Nothing -> e -> IO Node
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO e
exception
            Just Shard{shardMaster :: Shard -> Node
shardMaster = Node
master} -> Node -> IO Node
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Node
master
    case HM.lookup (nodeId node) nodeConns of
        Maybe NodeConnection
Nothing -> e -> IO NodeConnection
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO e
exception
        Just NodeConnection
nodeConn' -> NodeConnection -> IO NodeConnection
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return NodeConnection
nodeConn'

hashSlotForKeys :: Exception e => e -> [B.ByteString] -> IO HashSlot
hashSlotForKeys :: forall e. Exception e => e -> [NodeID] -> IO HashSlot
hashSlotForKeys e
exception [NodeID]
keys =
    case [HashSlot] -> [HashSlot]
forall a. Eq a => [a] -> [a]
nub (NodeID -> HashSlot
keyToSlot (NodeID -> HashSlot) -> [NodeID] -> [HashSlot]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [NodeID]
keys) of
        -- If none of the commands contain a key we can send them to any
        -- node. Let's pick the first one.
        [] -> HashSlot -> IO HashSlot
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return HashSlot
0
        [HashSlot
hashSlot] -> HashSlot -> IO HashSlot
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return HashSlot
hashSlot
        [HashSlot]
_ -> e -> IO HashSlot
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO (e -> IO HashSlot) -> e -> IO HashSlot
forall a b. (a -> b) -> a -> b
$ e
exception

requestKeys :: CMD.InfoMap -> [B.ByteString] -> IO [B.ByteString]
requestKeys :: InfoMap -> [NodeID] -> IO [NodeID]
requestKeys InfoMap
infoMap [NodeID]
request =
    case InfoMap -> [NodeID] -> Maybe [NodeID]
CMD.keysForRequest InfoMap
infoMap [NodeID]
request of
        Maybe [NodeID]
Nothing -> UnsupportedClusterCommandException -> IO [NodeID]
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO (UnsupportedClusterCommandException -> IO [NodeID])
-> UnsupportedClusterCommandException -> IO [NodeID]
forall a b. (a -> b) -> a -> b
$ [NodeID] -> UnsupportedClusterCommandException
UnsupportedClusterCommandException [NodeID]
request
        Just [NodeID]
k -> [NodeID] -> IO [NodeID]
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return [NodeID]
k

askingRedirection :: Reply -> Maybe (Host, Port)
askingRedirection :: Reply -> Maybe (Host, Port)
askingRedirection (Error NodeID
errString) = case NodeID -> [NodeID]
Char8.words NodeID
errString of
    [NodeID
"ASK", NodeID
_, NodeID
hostport] -> case Char -> NodeID -> [NodeID]
Char8.split Char
':' NodeID
hostport of
       [NodeID
host, NodeID
portString] -> case NodeID -> Maybe (Port, NodeID)
Char8.readInt NodeID
portString of
         Just (Port
port,NodeID
"") -> (Host, Port) -> Maybe (Host, Port)
forall a. a -> Maybe a
Just (NodeID -> Host
Char8.unpack NodeID
host, Port
port)
         Maybe (Port, NodeID)
_ -> Maybe (Host, Port)
forall a. Maybe a
Nothing
       [NodeID]
_ -> Maybe (Host, Port)
forall a. Maybe a
Nothing
    [NodeID]
_ -> Maybe (Host, Port)
forall a. Maybe a
Nothing
askingRedirection Reply
_ = Maybe (Host, Port)
forall a. Maybe a
Nothing

moved :: Reply -> Bool
moved :: Reply -> Bool
moved (Error NodeID
errString) = case NodeID -> [NodeID]
Char8.words NodeID
errString of
    NodeID
"MOVED":[NodeID]
_ -> Bool
True
    [NodeID]
_ -> Bool
False
moved Reply
_ = Bool
False


nodeConnWithHostAndPort :: ShardMap -> Connection -> Host -> Port -> Maybe NodeConnection
nodeConnWithHostAndPort :: ShardMap -> Connection -> Host -> Port -> Maybe NodeConnection
nodeConnWithHostAndPort ShardMap
shardMap Connection{connectionNodes :: Connection -> HashMap NodeID NodeConnection
connectionNodes=HashMap NodeID NodeConnection
nodeConns} Host
host Port
port = do
    node <- ShardMap -> Host -> Port -> Maybe Node
nodeWithHostAndPort ShardMap
shardMap Host
host Port
port
    HM.lookup (nodeId node) nodeConns

nodeConnectionForCommand :: Connection -> ShardMap -> [B.ByteString] -> IO [NodeConnection]
nodeConnectionForCommand :: Connection -> ShardMap -> [NodeID] -> IO [NodeConnection]
nodeConnectionForCommand conn :: Connection
conn@Connection{connectionNodes :: Connection -> HashMap NodeID NodeConnection
connectionNodes=HashMap NodeID NodeConnection
nodeConns, connectionInfoMap :: Connection -> InfoMap
connectionInfoMap=InfoMap
infoMap} (ShardMap IntMap Shard
shardMap) [NodeID]
request =
    case [NodeID]
request of
        (NodeID
"FLUSHALL" : [NodeID]
_) -> IO [NodeConnection]
allNodes
        (NodeID
"FLUSHDB" : [NodeID]
_) -> IO [NodeConnection]
allNodes
        (NodeID
"QUIT" : [NodeID]
_) -> IO [NodeConnection]
allNodes
        (NodeID
"UNWATCH" : [NodeID]
_) -> IO [NodeConnection]
allNodes
        [NodeID]
_ -> do
            keys <- InfoMap -> [NodeID] -> IO [NodeID]
requestKeys InfoMap
infoMap [NodeID]
request
            hashSlot <- hashSlotForKeys (CrossSlotException [request]) keys
            node <- case IntMap.lookup (fromEnum hashSlot) shardMap of
                Maybe Shard
Nothing -> MissingNodeException -> IO Node
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO (MissingNodeException -> IO Node)
-> MissingNodeException -> IO Node
forall a b. (a -> b) -> a -> b
$ [NodeID] -> MissingNodeException
MissingNodeException [NodeID]
request
                Just Shard{shardMaster :: Shard -> Node
shardMaster = Node
master} -> Node -> IO Node
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Node
master
            maybe (throwIO $ MissingNodeException request) (return . return) (HM.lookup (nodeId node) nodeConns)
    where
        allNodes :: IO [NodeConnection]
allNodes =
            case Connection -> ShardMap -> Maybe [NodeConnection]
allMasterNodes Connection
conn (IntMap Shard -> ShardMap
ShardMap IntMap Shard
shardMap) of
                Maybe [NodeConnection]
Nothing -> MissingNodeException -> IO [NodeConnection]
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO (MissingNodeException -> IO [NodeConnection])
-> MissingNodeException -> IO [NodeConnection]
forall a b. (a -> b) -> a -> b
$ [NodeID] -> MissingNodeException
MissingNodeException [NodeID]
request
                Just [NodeConnection]
allNodes' -> [NodeConnection] -> IO [NodeConnection]
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return [NodeConnection]
allNodes'

allMasterNodes :: Connection -> ShardMap -> Maybe [NodeConnection]
allMasterNodes :: Connection -> ShardMap -> Maybe [NodeConnection]
allMasterNodes Connection{connectionNodes :: Connection -> HashMap NodeID NodeConnection
connectionNodes=HashMap NodeID NodeConnection
nodeConns} (ShardMap IntMap Shard
shardMap) =
    (Node -> Maybe NodeConnection) -> [Node] -> Maybe [NodeConnection]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM ((NodeID -> HashMap NodeID NodeConnection -> Maybe NodeConnection)
-> HashMap NodeID NodeConnection -> NodeID -> Maybe NodeConnection
forall a b c. (a -> b -> c) -> b -> a -> c
flip NodeID -> HashMap NodeID NodeConnection -> Maybe NodeConnection
forall k v. Hashable k => k -> HashMap k v -> Maybe v
HM.lookup HashMap NodeID NodeConnection
nodeConns (NodeID -> Maybe NodeConnection)
-> (Node -> NodeID) -> Node -> Maybe NodeConnection
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Node -> NodeID
nodeId) [Node]
masters
  where
    masters :: [Node]
masters = Shard -> Node
shardMaster (Shard -> Node) -> [Shard] -> [Node]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Shard] -> [Shard]
forall a. Eq a => [a] -> [a]
nub (IntMap Shard -> [Shard]
forall a. IntMap a -> [a]
IntMap.elems IntMap Shard
shardMap)

requestNode :: NodeConnection -> [[B.ByteString]] -> IO [Reply]
requestNode :: NodeConnection -> [[NodeID]] -> IO [Reply]
requestNode nodeConn :: NodeConnection
nodeConn@(NodeConnection ConnectionContext
ctx IORef (Maybe NodeID)
_ NodeID
_) [[NodeID]]
requests = do
    ([NodeID] -> IO ()) -> [[NodeID]] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (NodeID -> IO ()
sendNode (NodeID -> IO ()) -> ([NodeID] -> NodeID) -> [NodeID] -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [NodeID] -> NodeID
renderRequest) [[NodeID]]
requests
    _ <- ConnectionContext -> IO ()
CC.flush ConnectionContext
ctx
    replicateM (length requests) $ recvNode nodeConn

    where

    sendNode :: B.ByteString -> IO ()
    sendNode :: NodeID -> IO ()
sendNode = ConnectionContext -> NodeID -> IO ()
CC.send ConnectionContext
ctx

requestNode1 :: NodeConnection -> [B.ByteString] -> IO Reply
requestNode1 :: NodeConnection -> [NodeID] -> IO Reply
requestNode1 nodeConn :: NodeConnection
nodeConn@NodeConnection{nodeConnectionContext :: NodeConnection -> ConnectionContext
nodeConnectionContext=ConnectionContext
ctx} [NodeID]
request = do
    ConnectionContext -> NodeID -> IO ()
CC.send ConnectionContext
ctx (NodeID -> IO ()) -> NodeID -> IO ()
forall a b. (a -> b) -> a -> b
$ [NodeID] -> NodeID
renderRequest [NodeID]
request
    _ <- ConnectionContext -> IO ()
CC.flush ConnectionContext
ctx
    recvNode nodeConn

recvNode :: NodeConnection -> IO Reply
recvNode :: NodeConnection -> IO Reply
recvNode NodeConnection{nodeConnectionContext :: NodeConnection -> ConnectionContext
nodeConnectionContext = ConnectionContext
ctx, nodeConnectionLastRecvRef :: NodeConnection -> IORef (Maybe NodeID)
nodeConnectionLastRecvRef = IORef (Maybe NodeID)
lastRecvRef} = do
    maybeLastRecv <- IORef (Maybe NodeID) -> IO (Maybe NodeID)
forall a. IORef a -> IO a
IOR.readIORef IORef (Maybe NodeID)
lastRecvRef
    scanResult <- case maybeLastRecv of
        Just NodeID
lastRecv -> IO NodeID -> Scanner Reply -> NodeID -> IO (Result Reply)
forall (m :: * -> *) a.
Monad m =>
m NodeID -> Scanner a -> NodeID -> m (Result a)
Scanner.scanWith (ConnectionContext -> IO NodeID
CC.recv ConnectionContext
ctx) Scanner Reply
reply NodeID
lastRecv
        Maybe NodeID
Nothing -> IO NodeID -> Scanner Reply -> NodeID -> IO (Result Reply)
forall (m :: * -> *) a.
Monad m =>
m NodeID -> Scanner a -> NodeID -> m (Result a)
Scanner.scanWith (ConnectionContext -> IO NodeID
CC.recv ConnectionContext
ctx) Scanner Reply
reply NodeID
B.empty

    case scanResult of
      Scanner.Fail{}       -> IO Reply
forall a. IO a
CC.errConnClosed
      Scanner.More{}    -> Host -> IO Reply
forall a. HasCallStack => Host -> a
error Host
"Hedis: parseWith returned Partial"
      Scanner.Done NodeID
rest' Reply
r -> do
        IORef (Maybe NodeID) -> Maybe NodeID -> IO ()
forall a. IORef a -> a -> IO ()
IOR.writeIORef IORef (Maybe NodeID)
lastRecvRef (NodeID -> Maybe NodeID
forall a. a -> Maybe a
Just NodeID
rest')
        Reply -> IO Reply
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Reply
r

nodes :: ShardMap -> [Node]
nodes :: ShardMap -> [Node]
nodes (ShardMap IntMap Shard
shardMap) = ((Port, [Node]) -> [Node]) -> [(Port, [Node])] -> [Node]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap (Port, [Node]) -> [Node]
forall a b. (a, b) -> b
snd ([(Port, [Node])] -> [Node]) -> [(Port, [Node])] -> [Node]
forall a b. (a -> b) -> a -> b
$ IntMap [Node] -> [(Port, [Node])]
forall a. IntMap a -> [(Port, a)]
IntMap.toList (IntMap [Node] -> [(Port, [Node])])
-> IntMap [Node] -> [(Port, [Node])]
forall a b. (a -> b) -> a -> b
$ (Shard -> [Node]) -> IntMap Shard -> IntMap [Node]
forall a b. (a -> b) -> IntMap a -> IntMap b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Shard -> [Node]
shardNodes IntMap Shard
shardMap where
    shardNodes :: Shard -> [Node]
    shardNodes :: Shard -> [Node]
shardNodes Shard{[Node]
Node
shardMaster :: Shard -> Node
shardSlaves :: Shard -> [Node]
shardMaster :: Node
shardSlaves :: [Node]
..} = Node
shardMasterNode -> [Node] -> [Node]
forall a. a -> [a] -> [a]
:[Node]
shardSlaves


nodeWithHostAndPort :: ShardMap -> Host -> Port -> Maybe Node
nodeWithHostAndPort :: ShardMap -> Host -> Port -> Maybe Node
nodeWithHostAndPort ShardMap
shardMap Host
host Port
port = (Node -> Bool) -> [Node] -> Maybe Node
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
find (\Node{nodeHost :: Node -> Host
nodeHost = Host
h, nodePort :: Node -> Port
nodePort = Port
p} -> Port
port Port -> Port -> Bool
forall a. Eq a => a -> a -> Bool
== Port
p Bool -> Bool -> Bool
&& Host
host Host -> Host -> Bool
forall a. Eq a => a -> a -> Bool
== Host
h) (ShardMap -> [Node]
nodes ShardMap
shardMap)

hasLocked :: IO a -> IO a
hasLocked :: forall a. IO a -> IO a
hasLocked IO a
action =
  IO a
action IO a -> [Handler a] -> IO a
forall a. IO a -> [Handler a] -> IO a
`catches`
  [ (BlockedIndefinitelyOnMVar -> IO a) -> Handler a
forall a e. Exception e => (e -> IO a) -> Handler a
Handler ((BlockedIndefinitelyOnMVar -> IO a) -> Handler a)
-> (BlockedIndefinitelyOnMVar -> IO a) -> Handler a
forall a b. (a -> b) -> a -> b
$ \exc :: BlockedIndefinitelyOnMVar
exc@BlockedIndefinitelyOnMVar
BlockedIndefinitelyOnMVar -> BlockedIndefinitelyOnMVar -> IO a
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO BlockedIndefinitelyOnMVar
exc
  ]

hooks :: Connection -> Hooks
hooks :: Connection -> Hooks
hooks = Connection -> Hooks
connectionHooks

-- | Send a request to all master nodes in the cluster. This is useful for commands that need to be sent to all master nodes, such as `FLUSHALL` or `CONFIG SET`.
requestMasterNodes :: Connection -> [B.ByteString] -> IO [Reply]
requestMasterNodes :: Connection -> [NodeID] -> IO [Reply]
requestMasterNodes Connection
conn [NodeID]
req = do
    masterNodeConns <- Connection -> IO [NodeConnection]
masterNodes Connection
conn
    concat <$> mapM (`requestNode` [req]) masterNodeConns

-- | Get connection to a master nodes in the cluster.
-- This is useful for commands that need to be sent to all master nodes, such as `FLUSHALL` or `CONFIG SET`.
masterNodes :: Connection -> IO [NodeConnection]
masterNodes :: Connection -> IO [NodeConnection]
masterNodes (Connection HashMap NodeID NodeConnection
nodeConns MVar Pipeline
_ MVar ShardMap
shardMapVar InfoMap
_ Hooks
_) = do
    (ShardMap shardMap) <- MVar ShardMap -> IO ShardMap
forall a. MVar a -> IO a
readMVar MVar ShardMap
shardMapVar
    let masters = (Shard -> Node) -> [Shard] -> [Node]
forall a b. (a -> b) -> [a] -> [b]
map Shard -> Node
shardMaster ([Shard] -> [Node]) -> [Shard] -> [Node]
forall a b. (a -> b) -> a -> b
$ [Shard] -> [Shard]
forall a. Eq a => [a] -> [a]
nub ([Shard] -> [Shard]) -> [Shard] -> [Shard]
forall a b. (a -> b) -> a -> b
$ IntMap Shard -> [Shard]
forall a. IntMap a -> [a]
IntMap.elems IntMap Shard
shardMap
    let masterNodeIds = (Node -> NodeID) -> [Node] -> [NodeID]
forall a b. (a -> b) -> [a] -> [b]
map Node -> NodeID
nodeId [Node]
masters
    return $ mapMaybe (`HM.lookup` nodeConns) masterNodeIds

-- | Get connection to a random node in the cluster that is not the same as the provided connection.
getRandomConnection :: NodeConnection -> Connection -> NodeConnection
getRandomConnection :: NodeConnection -> Connection -> NodeConnection
getRandomConnection NodeConnection
nc Connection{connectionNodes :: Connection -> HashMap NodeID NodeConnection
connectionNodes = HashMap NodeID NodeConnection
hmn} =
  let conns :: [NodeConnection]
conns = HashMap NodeID NodeConnection -> [NodeConnection]
forall k v. HashMap k v -> [v]
HM.elems HashMap NodeID NodeConnection
hmn
      in NodeConnection -> Maybe NodeConnection -> NodeConnection
forall a. a -> Maybe a -> a
fromMaybe ([NodeConnection] -> NodeConnection
forall a. HasCallStack => [a] -> a
head [NodeConnection]
conns) (Maybe NodeConnection -> NodeConnection)
-> Maybe NodeConnection -> NodeConnection
forall a b. (a -> b) -> a -> b
$ (NodeConnection -> Bool)
-> [NodeConnection] -> Maybe NodeConnection
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
find (NodeConnection
nc NodeConnection -> NodeConnection -> Bool
forall a. Eq a => a -> a -> Bool
/= ) [NodeConnection]
conns