{-# LANGUAGE TupleSections #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE OverloadedStrings #-}
module Database.Redis.Connection where
import Control.Exception
import qualified Control.Monad.Catch as Catch
import Control.Monad.IO.Class(liftIO, MonadIO)
import Control.Monad(when)
import Control.Concurrent.MVar(MVar, newMVar)
import qualified Data.ByteString as B
import qualified Data.ByteString.Char8 as Char8
import Data.Functor(void)
import qualified Data.IntMap.Strict as IntMap
import Data.Pool(Pool, withResource, createPool, destroyAllResources)
import Data.Typeable
import qualified Data.Time as Time
import Network.TLS (ClientParams)
import qualified Network.Socket as NS
import qualified Data.HashMap.Strict as HM
import qualified Database.Redis.ProtocolPipelining as PP
import Database.Redis.Core(Redis, runRedisInternal, runRedisClusteredInternal)
import Database.Redis.Protocol(Reply(..))
import Database.Redis.Cluster(ShardMap(..), Node, Shard(..))
import qualified Database.Redis.Cluster as Cluster
import qualified Database.Redis.ConnectionContext as CC
import Database.Redis.Commands
( ping
, select
, auth
, clusterSlots
, command
, ClusterSlotsResponse(..)
, ClusterSlotsResponseEntry(..)
, ClusterSlotsNode(..))
data Connection
= NonClusteredConnection (Pool PP.Connection)
| ClusteredConnection (MVar ShardMap) (Pool Cluster.Connection)
data ConnectInfo = ConnInfo
{ ConnectInfo -> HostName
connectHost :: NS.HostName
, ConnectInfo -> PortID
connectPort :: CC.PortID
, ConnectInfo -> Maybe ByteString
connectAuth :: Maybe B.ByteString
, ConnectInfo -> Integer
connectDatabase :: Integer
, ConnectInfo -> Port
connectMaxConnections :: Int
, ConnectInfo -> NominalDiffTime
connectMaxIdleTime :: Time.NominalDiffTime
, ConnectInfo -> Maybe NominalDiffTime
connectTimeout :: Maybe Time.NominalDiffTime
, ConnectInfo -> Maybe ClientParams
connectTLSParams :: Maybe ClientParams
} deriving Port -> ConnectInfo -> ShowS
[ConnectInfo] -> ShowS
ConnectInfo -> HostName
forall a.
(Port -> a -> ShowS) -> (a -> HostName) -> ([a] -> ShowS) -> Show a
showList :: [ConnectInfo] -> ShowS
$cshowList :: [ConnectInfo] -> ShowS
show :: ConnectInfo -> HostName
$cshow :: ConnectInfo -> HostName
showsPrec :: Port -> ConnectInfo -> ShowS
$cshowsPrec :: Port -> ConnectInfo -> ShowS
Show
data ConnectError = ConnectAuthError Reply
| ConnectSelectError Reply
deriving (ConnectError -> ConnectError -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ConnectError -> ConnectError -> Bool
$c/= :: ConnectError -> ConnectError -> Bool
== :: ConnectError -> ConnectError -> Bool
$c== :: ConnectError -> ConnectError -> Bool
Eq, Port -> ConnectError -> ShowS
[ConnectError] -> ShowS
ConnectError -> HostName
forall a.
(Port -> a -> ShowS) -> (a -> HostName) -> ([a] -> ShowS) -> Show a
showList :: [ConnectError] -> ShowS
$cshowList :: [ConnectError] -> ShowS
show :: ConnectError -> HostName
$cshow :: ConnectError -> HostName
showsPrec :: Port -> ConnectError -> ShowS
$cshowsPrec :: Port -> ConnectError -> ShowS
Show, Typeable)
instance Exception ConnectError
defaultConnectInfo :: ConnectInfo
defaultConnectInfo :: ConnectInfo
defaultConnectInfo = ConnInfo
{ connectHost :: HostName
connectHost = HostName
"localhost"
, connectPort :: PortID
connectPort = PortNumber -> PortID
CC.PortNumber PortNumber
6379
, connectAuth :: Maybe ByteString
connectAuth = forall a. Maybe a
Nothing
, connectDatabase :: Integer
connectDatabase = Integer
0
, connectMaxConnections :: Port
connectMaxConnections = Port
50
, connectMaxIdleTime :: NominalDiffTime
connectMaxIdleTime = NominalDiffTime
30
, connectTimeout :: Maybe NominalDiffTime
connectTimeout = forall a. Maybe a
Nothing
, connectTLSParams :: Maybe ClientParams
connectTLSParams = forall a. Maybe a
Nothing
}
createConnection :: ConnectInfo -> IO PP.Connection
createConnection :: ConnectInfo -> IO Connection
createConnection ConnInfo{Port
Integer
HostName
Maybe ByteString
Maybe NominalDiffTime
Maybe ClientParams
NominalDiffTime
PortID
connectTLSParams :: Maybe ClientParams
connectTimeout :: Maybe NominalDiffTime
connectMaxIdleTime :: NominalDiffTime
connectMaxConnections :: Port
connectDatabase :: Integer
connectAuth :: Maybe ByteString
connectPort :: PortID
connectHost :: HostName
connectTLSParams :: ConnectInfo -> Maybe ClientParams
connectTimeout :: ConnectInfo -> Maybe NominalDiffTime
connectMaxIdleTime :: ConnectInfo -> NominalDiffTime
connectMaxConnections :: ConnectInfo -> Port
connectDatabase :: ConnectInfo -> Integer
connectAuth :: ConnectInfo -> Maybe ByteString
connectPort :: ConnectInfo -> PortID
connectHost :: ConnectInfo -> HostName
..} = do
let timeoutOptUs :: Maybe Port
timeoutOptUs =
forall a b. (RealFrac a, Integral b) => a -> b
round forall b c a. (b -> c) -> (a -> b) -> a -> c
. (NominalDiffTime
1000000 forall a. Num a => a -> a -> a
*) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe NominalDiffTime
connectTimeout
Connection
conn <- HostName -> PortID -> Maybe Port -> IO Connection
PP.connect HostName
connectHost PortID
connectPort Maybe Port
timeoutOptUs
Connection
conn' <- case Maybe ClientParams
connectTLSParams of
Maybe ClientParams
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return Connection
conn
Just ClientParams
tlsParams -> ClientParams -> Connection -> IO Connection
PP.enableTLS ClientParams
tlsParams Connection
conn
Connection -> IO ()
PP.beginReceiving Connection
conn'
forall a. Connection -> Redis a -> IO a
runRedisInternal Connection
conn' forall a b. (a -> b) -> a -> b
$ do
case Maybe ByteString
connectAuth of
Maybe ByteString
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just ByteString
pass -> do
Either Reply Status
resp <- forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
ByteString -> m (f Status)
auth ByteString
pass
case Either Reply Status
resp of
Left Reply
r -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall e a. Exception e => e -> IO a
throwIO forall a b. (a -> b) -> a -> b
$ Reply -> ConnectError
ConnectAuthError Reply
r
Either Reply Status
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Integer
connectDatabase forall a. Eq a => a -> a -> Bool
/= Integer
0) forall a b. (a -> b) -> a -> b
$ do
Either Reply Status
resp <- forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
Integer -> m (f Status)
select Integer
connectDatabase
case Either Reply Status
resp of
Left Reply
r -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall e a. Exception e => e -> IO a
throwIO forall a b. (a -> b) -> a -> b
$ Reply -> ConnectError
ConnectSelectError Reply
r
Either Reply Status
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
forall (m :: * -> *) a. Monad m => a -> m a
return Connection
conn'
connect :: ConnectInfo -> IO Connection
connect :: ConnectInfo -> IO Connection
connect cInfo :: ConnectInfo
cInfo@ConnInfo{Port
Integer
HostName
Maybe ByteString
Maybe NominalDiffTime
Maybe ClientParams
NominalDiffTime
PortID
connectTLSParams :: Maybe ClientParams
connectTimeout :: Maybe NominalDiffTime
connectMaxIdleTime :: NominalDiffTime
connectMaxConnections :: Port
connectDatabase :: Integer
connectAuth :: Maybe ByteString
connectPort :: PortID
connectHost :: HostName
connectTLSParams :: ConnectInfo -> Maybe ClientParams
connectTimeout :: ConnectInfo -> Maybe NominalDiffTime
connectMaxIdleTime :: ConnectInfo -> NominalDiffTime
connectMaxConnections :: ConnectInfo -> Port
connectDatabase :: ConnectInfo -> Integer
connectAuth :: ConnectInfo -> Maybe ByteString
connectPort :: ConnectInfo -> PortID
connectHost :: ConnectInfo -> HostName
..} = Pool Connection -> Connection
NonClusteredConnection forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
forall a.
IO a
-> (a -> IO ()) -> Port -> NominalDiffTime -> Port -> IO (Pool a)
createPool (ConnectInfo -> IO Connection
createConnection ConnectInfo
cInfo) Connection -> IO ()
PP.disconnect Port
1 NominalDiffTime
connectMaxIdleTime Port
connectMaxConnections
checkedConnect :: ConnectInfo -> IO Connection
checkedConnect :: ConnectInfo -> IO Connection
checkedConnect ConnectInfo
connInfo = do
Connection
conn <- ConnectInfo -> IO Connection
connect ConnectInfo
connInfo
forall a. Connection -> Redis a -> IO a
runRedis Connection
conn forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a. Functor f => f a -> f ()
void forall (m :: * -> *) (f :: * -> *). RedisCtx m f => m (f Status)
ping
forall (m :: * -> *) a. Monad m => a -> m a
return Connection
conn
disconnect :: Connection -> IO ()
disconnect :: Connection -> IO ()
disconnect (NonClusteredConnection Pool Connection
pool) = forall a. Pool a -> IO ()
destroyAllResources Pool Connection
pool
disconnect (ClusteredConnection MVar ShardMap
_ Pool Connection
pool) = forall a. Pool a -> IO ()
destroyAllResources Pool Connection
pool
withConnect :: (Catch.MonadMask m, MonadIO m) => ConnectInfo -> (Connection -> m c) -> m c
withConnect :: forall (m :: * -> *) c.
(MonadMask m, MonadIO m) =>
ConnectInfo -> (Connection -> m c) -> m c
withConnect ConnectInfo
connInfo = forall (m :: * -> *) a c b.
MonadMask m =>
m a -> (a -> m c) -> (a -> m b) -> m b
Catch.bracket (forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ ConnectInfo -> IO Connection
connect ConnectInfo
connInfo) (forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connection -> IO ()
disconnect)
withCheckedConnect :: ConnectInfo -> (Connection -> IO c) -> IO c
withCheckedConnect :: forall c. ConnectInfo -> (Connection -> IO c) -> IO c
withCheckedConnect ConnectInfo
connInfo = forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket (ConnectInfo -> IO Connection
checkedConnect ConnectInfo
connInfo) Connection -> IO ()
disconnect
runRedis :: Connection -> Redis a -> IO a
runRedis :: forall a. Connection -> Redis a -> IO a
runRedis (NonClusteredConnection Pool Connection
pool) Redis a
redis =
forall a r. Pool a -> (a -> IO r) -> IO r
withResource Pool Connection
pool forall a b. (a -> b) -> a -> b
$ \Connection
conn -> forall a. Connection -> Redis a -> IO a
runRedisInternal Connection
conn Redis a
redis
runRedis (ClusteredConnection MVar ShardMap
_ Pool Connection
pool) Redis a
redis =
forall a r. Pool a -> (a -> IO r) -> IO r
withResource Pool Connection
pool forall a b. (a -> b) -> a -> b
$ \Connection
conn -> forall a. Connection -> IO ShardMap -> Redis a -> IO a
runRedisClusteredInternal Connection
conn (Connection -> IO ShardMap
refreshShardMap Connection
conn) Redis a
redis
newtype ClusterConnectError = ClusterConnectError Reply
deriving (ClusterConnectError -> ClusterConnectError -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ClusterConnectError -> ClusterConnectError -> Bool
$c/= :: ClusterConnectError -> ClusterConnectError -> Bool
== :: ClusterConnectError -> ClusterConnectError -> Bool
$c== :: ClusterConnectError -> ClusterConnectError -> Bool
Eq, Port -> ClusterConnectError -> ShowS
[ClusterConnectError] -> ShowS
ClusterConnectError -> HostName
forall a.
(Port -> a -> ShowS) -> (a -> HostName) -> ([a] -> ShowS) -> Show a
showList :: [ClusterConnectError] -> ShowS
$cshowList :: [ClusterConnectError] -> ShowS
show :: ClusterConnectError -> HostName
$cshow :: ClusterConnectError -> HostName
showsPrec :: Port -> ClusterConnectError -> ShowS
$cshowsPrec :: Port -> ClusterConnectError -> ShowS
Show, Typeable)
instance Exception ClusterConnectError
connectCluster :: ConnectInfo -> IO Connection
connectCluster :: ConnectInfo -> IO Connection
connectCluster ConnectInfo
bootstrapConnInfo = do
Connection
conn <- ConnectInfo -> IO Connection
createConnection ConnectInfo
bootstrapConnInfo
Either Reply ClusterSlotsResponse
slotsResponse <- forall a. Connection -> Redis a -> IO a
runRedisInternal Connection
conn forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
m (f ClusterSlotsResponse)
clusterSlots
MVar ShardMap
shardMapVar <- case Either Reply ClusterSlotsResponse
slotsResponse of
Left Reply
e -> forall e a. Exception e => e -> IO a
throwIO forall a b. (a -> b) -> a -> b
$ Reply -> ClusterConnectError
ClusterConnectError Reply
e
Right ClusterSlotsResponse
slots -> do
ShardMap
shardMap <- ClusterSlotsResponse -> IO ShardMap
shardMapFromClusterSlotsResponse ClusterSlotsResponse
slots
forall a. a -> IO (MVar a)
newMVar ShardMap
shardMap
Either Reply [CommandInfo]
commandInfos <- forall a. Connection -> Redis a -> IO a
runRedisInternal Connection
conn forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
m (f [CommandInfo])
command
case Either Reply [CommandInfo]
commandInfos of
Left Reply
e -> forall e a. Exception e => e -> IO a
throwIO forall a b. (a -> b) -> a -> b
$ Reply -> ClusterConnectError
ClusterConnectError Reply
e
Right [CommandInfo]
infos -> do
Pool Connection
pool <- forall a.
IO a
-> (a -> IO ()) -> Port -> NominalDiffTime -> Port -> IO (Pool a)
createPool ([CommandInfo] -> MVar ShardMap -> Maybe Port -> IO Connection
Cluster.connect [CommandInfo]
infos MVar ShardMap
shardMapVar forall a. Maybe a
Nothing) Connection -> IO ()
Cluster.disconnect Port
1 (ConnectInfo -> NominalDiffTime
connectMaxIdleTime ConnectInfo
bootstrapConnInfo) (ConnectInfo -> Port
connectMaxConnections ConnectInfo
bootstrapConnInfo)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ MVar ShardMap -> Pool Connection -> Connection
ClusteredConnection MVar ShardMap
shardMapVar Pool Connection
pool
shardMapFromClusterSlotsResponse :: ClusterSlotsResponse -> IO ShardMap
shardMapFromClusterSlotsResponse :: ClusterSlotsResponse -> IO ShardMap
shardMapFromClusterSlotsResponse ClusterSlotsResponse{[ClusterSlotsResponseEntry]
clusterSlotsResponseEntries :: ClusterSlotsResponse -> [ClusterSlotsResponseEntry]
clusterSlotsResponseEntries :: [ClusterSlotsResponseEntry]
..} = IntMap Shard -> ShardMap
ShardMap forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr ClusterSlotsResponseEntry -> IO (IntMap Shard) -> IO (IntMap Shard)
mkShardMap (forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. IntMap a
IntMap.empty) [ClusterSlotsResponseEntry]
clusterSlotsResponseEntries where
mkShardMap :: ClusterSlotsResponseEntry -> IO (IntMap.IntMap Shard) -> IO (IntMap.IntMap Shard)
mkShardMap :: ClusterSlotsResponseEntry -> IO (IntMap Shard) -> IO (IntMap Shard)
mkShardMap ClusterSlotsResponseEntry{Port
[ClusterSlotsNode]
ClusterSlotsNode
clusterSlotsResponseEntryReplicas :: ClusterSlotsResponseEntry -> [ClusterSlotsNode]
clusterSlotsResponseEntryMaster :: ClusterSlotsResponseEntry -> ClusterSlotsNode
clusterSlotsResponseEntryEndSlot :: ClusterSlotsResponseEntry -> Port
clusterSlotsResponseEntryStartSlot :: ClusterSlotsResponseEntry -> Port
clusterSlotsResponseEntryReplicas :: [ClusterSlotsNode]
clusterSlotsResponseEntryMaster :: ClusterSlotsNode
clusterSlotsResponseEntryEndSlot :: Port
clusterSlotsResponseEntryStartSlot :: Port
..} IO (IntMap Shard)
accumulator = do
IntMap Shard
accumulated <- IO (IntMap Shard)
accumulator
let master :: Node
master = Bool -> ClusterSlotsNode -> Node
nodeFromClusterSlotNode Bool
True ClusterSlotsNode
clusterSlotsResponseEntryMaster
let replicas :: [Node]
replicas = forall a b. (a -> b) -> [a] -> [b]
map (Bool -> ClusterSlotsNode -> Node
nodeFromClusterSlotNode Bool
False) [ClusterSlotsNode]
clusterSlotsResponseEntryReplicas
let shard :: Shard
shard = Node -> [Node] -> Shard
Shard Node
master [Node]
replicas
let slotMap :: IntMap Shard
slotMap = forall a. [(Port, a)] -> IntMap a
IntMap.fromList forall a b. (a -> b) -> a -> b
$ forall a b. (a -> b) -> [a] -> [b]
map (, Shard
shard) [Port
clusterSlotsResponseEntryStartSlot..Port
clusterSlotsResponseEntryEndSlot]
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. IntMap a -> IntMap a -> IntMap a
IntMap.union IntMap Shard
slotMap IntMap Shard
accumulated
nodeFromClusterSlotNode :: Bool -> ClusterSlotsNode -> Node
nodeFromClusterSlotNode :: Bool -> ClusterSlotsNode -> Node
nodeFromClusterSlotNode Bool
isMaster ClusterSlotsNode{Port
ByteString
clusterSlotsNodeID :: ClusterSlotsNode -> ByteString
clusterSlotsNodePort :: ClusterSlotsNode -> Port
clusterSlotsNodeIP :: ClusterSlotsNode -> ByteString
clusterSlotsNodeID :: ByteString
clusterSlotsNodePort :: Port
clusterSlotsNodeIP :: ByteString
..} =
let hostname :: HostName
hostname = ByteString -> HostName
Char8.unpack ByteString
clusterSlotsNodeIP
role :: NodeRole
role = if Bool
isMaster then NodeRole
Cluster.Master else NodeRole
Cluster.Slave
in
ByteString -> NodeRole -> HostName -> Port -> Node
Cluster.Node ByteString
clusterSlotsNodeID NodeRole
role HostName
hostname (forall a. Enum a => Port -> a
toEnum Port
clusterSlotsNodePort)
refreshShardMap :: Cluster.Connection -> IO ShardMap
refreshShardMap :: Connection -> IO ShardMap
refreshShardMap (Cluster.Connection HashMap ByteString NodeConnection
nodeConns MVar Pipeline
_ MVar ShardMap
_ InfoMap
_) = do
let (Cluster.NodeConnection ConnectionContext
ctx IORef (Maybe ByteString)
_ ByteString
_) = forall a. [a] -> a
head forall a b. (a -> b) -> a -> b
$ forall k v. HashMap k v -> [v]
HM.elems HashMap ByteString NodeConnection
nodeConns
Connection
pipelineConn <- ConnectionContext -> IO Connection
PP.fromCtx ConnectionContext
ctx
()
_ <- Connection -> IO ()
PP.beginReceiving Connection
pipelineConn
Either Reply ClusterSlotsResponse
slotsResponse <- forall a. Connection -> Redis a -> IO a
runRedisInternal Connection
pipelineConn forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
m (f ClusterSlotsResponse)
clusterSlots
case Either Reply ClusterSlotsResponse
slotsResponse of
Left Reply
e -> forall e a. Exception e => e -> IO a
throwIO forall a b. (a -> b) -> a -> b
$ Reply -> ClusterConnectError
ClusterConnectError Reply
e
Right ClusterSlotsResponse
slots -> ClusterSlotsResponse -> IO ShardMap
shardMapFromClusterSlotsResponse ClusterSlotsResponse
slots