{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE RecordWildCards #-}
module Database.Redis.ProtocolPipelining (
Connection,
connect, connectWithHooks, beginReceiving, disconnect, request, send, recv, flush, fromCtx, fromCtxWithHooks, hooks
) where
import Prelude
import Control.Monad
import qualified Scanner
import qualified Data.ByteString as S
import Data.IORef
import qualified Network.TLS as TLS
import System.IO.Unsafe
import Database.Redis.Protocol
import qualified Database.Redis.ConnectionContext as CC
import Database.Redis.Hooks
data Connection = Conn
{ Connection -> ConnectionContext
connCtx :: CC.ConnectionContext
, Connection -> IORef [Reply]
connReplies :: IORef [Reply]
, Connection -> IORef [Reply]
connPending :: IORef [Reply]
, Connection -> IORef Int
connPendingCnt :: IORef Int
, Connection -> Hooks
hooks :: Hooks
}
fromCtx :: CC.ConnectionContext -> IO Connection
fromCtx :: ConnectionContext -> IO Connection
fromCtx ConnectionContext
ctx = ConnectionContext
-> IORef [Reply]
-> IORef [Reply]
-> IORef Int
-> Hooks
-> Connection
Conn ConnectionContext
ctx (IORef [Reply]
-> IORef [Reply] -> IORef Int -> Hooks -> Connection)
-> IO (IORef [Reply])
-> IO (IORef [Reply] -> IORef Int -> Hooks -> Connection)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Reply] -> IO (IORef [Reply])
forall a. a -> IO (IORef a)
newIORef [] IO (IORef [Reply] -> IORef Int -> Hooks -> Connection)
-> IO (IORef [Reply]) -> IO (IORef Int -> Hooks -> Connection)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> [Reply] -> IO (IORef [Reply])
forall a. a -> IO (IORef a)
newIORef [] IO (IORef Int -> Hooks -> Connection)
-> IO (IORef Int) -> IO (Hooks -> Connection)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0 IO (Hooks -> Connection) -> IO Hooks -> IO Connection
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Hooks -> IO Hooks
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Hooks
defaultHooks
fromCtxWithHooks :: CC.ConnectionContext -> Hooks -> IO Connection
fromCtxWithHooks :: ConnectionContext -> Hooks -> IO Connection
fromCtxWithHooks ConnectionContext
ctx Hooks
hooks = ConnectionContext
-> IORef [Reply]
-> IORef [Reply]
-> IORef Int
-> Hooks
-> Connection
Conn ConnectionContext
ctx (IORef [Reply]
-> IORef [Reply] -> IORef Int -> Hooks -> Connection)
-> IO (IORef [Reply])
-> IO (IORef [Reply] -> IORef Int -> Hooks -> Connection)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Reply] -> IO (IORef [Reply])
forall a. a -> IO (IORef a)
newIORef [] IO (IORef [Reply] -> IORef Int -> Hooks -> Connection)
-> IO (IORef [Reply]) -> IO (IORef Int -> Hooks -> Connection)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> [Reply] -> IO (IORef [Reply])
forall a. a -> IO (IORef a)
newIORef [] IO (IORef Int -> Hooks -> Connection)
-> IO (IORef Int) -> IO (Hooks -> Connection)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0 IO (Hooks -> Connection) -> IO Hooks -> IO Connection
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Hooks -> IO Hooks
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Hooks
hooks
connect :: CC.ConnectAddr -> Maybe Int -> Maybe TLS.ClientParams -> IO Connection
connect :: ConnectAddr -> Maybe Int -> Maybe ClientParams -> IO Connection
connect ConnectAddr
connectAddr Maybe Int
timeoutOpt Maybe ClientParams
mTlsParams = ConnectAddr
-> Maybe Int -> Maybe ClientParams -> Hooks -> IO Connection
connectWithHooks ConnectAddr
connectAddr Maybe Int
timeoutOpt Maybe ClientParams
mTlsParams Hooks
defaultHooks
connectWithHooks :: CC.ConnectAddr -> Maybe Int -> Maybe TLS.ClientParams -> Hooks -> IO Connection
connectWithHooks :: ConnectAddr
-> Maybe Int -> Maybe ClientParams -> Hooks -> IO Connection
connectWithHooks ConnectAddr
connectAddr Maybe Int
timeoutOpt Maybe ClientParams
mTlsParams Hooks
hooks = do
connCtx <- ConnectAddr
-> Maybe Int -> Maybe ClientParams -> IO ConnectionContext
CC.connect ConnectAddr
connectAddr Maybe Int
timeoutOpt Maybe ClientParams
mTlsParams
connReplies <- newIORef []
connPending <- newIORef []
connPendingCnt <- newIORef 0
return Conn{..}
beginReceiving :: Connection -> IO ()
beginReceiving :: Connection -> IO ()
beginReceiving Connection
conn = do
rs <- Connection -> IO [Reply]
connGetReplies Connection
conn
writeIORef (connReplies conn) rs
writeIORef (connPending conn) rs
disconnect :: Connection -> IO ()
disconnect :: Connection -> IO ()
disconnect Conn{IORef Int
IORef [Reply]
ConnectionContext
Hooks
hooks :: Connection -> Hooks
connCtx :: Connection -> ConnectionContext
connReplies :: Connection -> IORef [Reply]
connPending :: Connection -> IORef [Reply]
connPendingCnt :: Connection -> IORef Int
connCtx :: ConnectionContext
connReplies :: IORef [Reply]
connPending :: IORef [Reply]
connPendingCnt :: IORef Int
hooks :: Hooks
..} = ConnectionContext -> IO ()
CC.disconnect ConnectionContext
connCtx
send :: Connection -> S.ByteString -> IO ()
send :: Connection -> ByteString -> IO ()
send Conn{IORef Int
IORef [Reply]
ConnectionContext
Hooks
hooks :: Connection -> Hooks
connCtx :: Connection -> ConnectionContext
connReplies :: Connection -> IORef [Reply]
connPending :: Connection -> IORef [Reply]
connPendingCnt :: Connection -> IORef Int
connCtx :: ConnectionContext
connReplies :: IORef [Reply]
connPending :: IORef [Reply]
connPendingCnt :: IORef Int
hooks :: Hooks
..} ByteString
s = do
Hooks -> SendHook
sendHook Hooks
hooks (ConnectionContext -> ByteString -> IO ()
CC.send ConnectionContext
connCtx) ByteString
s
n <- IORef Int -> (Int -> (Int, Int)) -> IO Int
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef Int
connPendingCnt ((Int -> (Int, Int)) -> IO Int) -> (Int -> (Int, Int)) -> IO Int
forall a b. (a -> b) -> a -> b
$ \Int
n -> let n' :: Int
n' = Int
nInt -> Int -> Int
forall a. Num a => a -> a -> a
+Int
1 in (Int
n', Int
n')
when (n >= 1000) $ do
r:_ <- readIORef connPending
r `seq` return ()
recv :: Connection -> IO Reply
recv :: Connection -> IO Reply
recv Conn{IORef Int
IORef [Reply]
ConnectionContext
Hooks
hooks :: Connection -> Hooks
connCtx :: Connection -> ConnectionContext
connReplies :: Connection -> IORef [Reply]
connPending :: Connection -> IORef [Reply]
connPendingCnt :: Connection -> IORef Int
connCtx :: ConnectionContext
connReplies :: IORef [Reply]
connPending :: IORef [Reply]
connPendingCnt :: IORef Int
hooks :: Hooks
..} =
Hooks -> ReceiveHook
receiveHook Hooks
hooks ReceiveHook -> ReceiveHook
forall a b. (a -> b) -> a -> b
$ do
(r:rs) <- IORef [Reply] -> IO [Reply]
forall a. IORef a -> IO a
readIORef IORef [Reply]
connReplies
writeIORef connReplies rs
return r
flush :: Connection -> IO ()
flush :: Connection -> IO ()
flush Conn{IORef Int
IORef [Reply]
ConnectionContext
Hooks
hooks :: Connection -> Hooks
connCtx :: Connection -> ConnectionContext
connReplies :: Connection -> IORef [Reply]
connPending :: Connection -> IORef [Reply]
connPendingCnt :: Connection -> IORef Int
connCtx :: ConnectionContext
connReplies :: IORef [Reply]
connPending :: IORef [Reply]
connPendingCnt :: IORef Int
hooks :: Hooks
..} = ConnectionContext -> IO ()
CC.flush ConnectionContext
connCtx
request :: Connection -> S.ByteString -> IO Reply
request :: Connection -> ByteString -> IO Reply
request Connection
conn ByteString
req = Connection -> ByteString -> IO ()
send Connection
conn ByteString
req IO () -> ReceiveHook
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Connection -> IO Reply
recv Connection
conn
connGetReplies :: Connection -> IO [Reply]
connGetReplies :: Connection -> IO [Reply]
connGetReplies conn :: Connection
conn@Conn{IORef Int
IORef [Reply]
ConnectionContext
Hooks
hooks :: Connection -> Hooks
connCtx :: Connection -> ConnectionContext
connReplies :: Connection -> IORef [Reply]
connPending :: Connection -> IORef [Reply]
connPendingCnt :: Connection -> IORef Int
connCtx :: ConnectionContext
connReplies :: IORef [Reply]
connPending :: IORef [Reply]
connPendingCnt :: IORef Int
hooks :: Hooks
..} = ByteString -> Reply -> IO [Reply]
go ByteString
S.empty (ByteString -> Reply
SingleLine ByteString
"previous of first")
where
go :: ByteString -> Reply -> IO [Reply]
go ByteString
rest Reply
previous = do
~(r, rest') <- IO (Reply, ByteString) -> IO (Reply, ByteString)
forall a. IO a -> IO a
unsafeInterleaveIO (IO (Reply, ByteString) -> IO (Reply, ByteString))
-> IO (Reply, ByteString) -> IO (Reply, ByteString)
forall a b. (a -> b) -> a -> b
$ do
Reply
previous Reply -> IO () -> IO ()
forall a b. a -> b -> b
`seq` () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
scanResult <- IO ByteString -> Scanner Reply -> ByteString -> IO (Result Reply)
forall (m :: * -> *) a.
Monad m =>
m ByteString -> Scanner a -> ByteString -> m (Result a)
Scanner.scanWith IO ByteString
readMore Scanner Reply
reply ByteString
rest
case scanResult of
Scanner.Fail{} -> IO (Reply, ByteString)
forall a. IO a
CC.errConnClosed
Scanner.More{} -> String -> IO (Reply, ByteString)
forall a. HasCallStack => String -> a
error String
"Hedis: parseWith returned Partial"
Scanner.Done ByteString
rest' Reply
r -> do
IORef [Reply] -> ([Reply] -> ([Reply], ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef [Reply]
connPending (([Reply] -> ([Reply], ())) -> IO ())
-> ([Reply] -> ([Reply], ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ \case
(Reply
_:[Reply]
rs) -> ([Reply]
rs, ())
[] -> String -> ([Reply], ())
forall a. HasCallStack => String -> a
error String
"Hedis: impossible happened parseWith missing value that it just received"
IORef Int -> (Int -> (Int, ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef Int
connPendingCnt ((Int -> (Int, ())) -> IO ()) -> (Int -> (Int, ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Int
n -> (Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
0 (Int
nInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
1), ())
(Reply, ByteString) -> IO (Reply, ByteString)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Reply
r, ByteString
rest')
rs <- unsafeInterleaveIO (go rest' r)
return (r:rs)
readMore :: IO ByteString
readMore = IO ByteString -> IO ByteString
forall a. IO a -> IO a
CC.ioErrorToConnLost (IO ByteString -> IO ByteString) -> IO ByteString -> IO ByteString
forall a b. (a -> b) -> a -> b
$ do
Connection -> IO ()
flush Connection
conn
ConnectionContext -> IO ByteString
CC.recv ConnectionContext
connCtx