{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE RecordWildCards #-}

-- |A module for automatic, optimal protocol pipelining.
--
--  Protocol pipelining is a technique in which multiple requests are written
--  out to a single socket without waiting for the corresponding responses.
--  The pipelining of requests results in a dramatic improvement in protocol
--  performance.
--
--  [Optimal Pipelining] uses the least number of network packets possible
--
--  [Automatic Pipelining] means that requests are implicitly pipelined as much
--      as possible, i.e. as long as a request's response is not used before any
--      subsequent requests.
--
module Database.Redis.ProtocolPipelining (
  Connection,
  connect, connectWithHooks, beginReceiving, disconnect, request, send, recv, flush, fromCtx, fromCtxWithHooks, hooks
) where

import           Prelude
import           Control.Monad
import qualified Scanner
import qualified Data.ByteString as S
import           Data.IORef
import qualified Network.TLS as TLS
import           System.IO.Unsafe

import           Database.Redis.Protocol
import qualified Database.Redis.ConnectionContext as CC
import           Database.Redis.Hooks

data Connection = Conn
  { Connection -> ConnectionContext
connCtx        :: CC.ConnectionContext -- ^ Connection socket-handle.
  , Connection -> IORef [Reply]
connReplies    :: IORef [Reply] -- ^ Reply thunks for unsent requests.
  , Connection -> IORef [Reply]
connPending    :: IORef [Reply]
    -- ^ Reply thunks for requests "in the pipeline". Refers to the same list as
    --   'connReplies', but can have an offset.
  , Connection -> IORef Int
connPendingCnt :: IORef Int
    -- ^ Number of pending replies and thus the difference length between
    --   'connReplies' and 'connPending'.
    --   length connPending  - pendingCount = length connReplies
  , Connection -> Hooks
hooks         :: Hooks
  }


fromCtx :: CC.ConnectionContext -> IO Connection
fromCtx :: ConnectionContext -> IO Connection
fromCtx ConnectionContext
ctx = ConnectionContext
-> IORef [Reply]
-> IORef [Reply]
-> IORef Int
-> Hooks
-> Connection
Conn ConnectionContext
ctx (IORef [Reply]
 -> IORef [Reply] -> IORef Int -> Hooks -> Connection)
-> IO (IORef [Reply])
-> IO (IORef [Reply] -> IORef Int -> Hooks -> Connection)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Reply] -> IO (IORef [Reply])
forall a. a -> IO (IORef a)
newIORef [] IO (IORef [Reply] -> IORef Int -> Hooks -> Connection)
-> IO (IORef [Reply]) -> IO (IORef Int -> Hooks -> Connection)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> [Reply] -> IO (IORef [Reply])
forall a. a -> IO (IORef a)
newIORef [] IO (IORef Int -> Hooks -> Connection)
-> IO (IORef Int) -> IO (Hooks -> Connection)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0 IO (Hooks -> Connection) -> IO Hooks -> IO Connection
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Hooks -> IO Hooks
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Hooks
defaultHooks

fromCtxWithHooks :: CC.ConnectionContext -> Hooks -> IO Connection
fromCtxWithHooks :: ConnectionContext -> Hooks -> IO Connection
fromCtxWithHooks ConnectionContext
ctx Hooks
hooks = ConnectionContext
-> IORef [Reply]
-> IORef [Reply]
-> IORef Int
-> Hooks
-> Connection
Conn ConnectionContext
ctx (IORef [Reply]
 -> IORef [Reply] -> IORef Int -> Hooks -> Connection)
-> IO (IORef [Reply])
-> IO (IORef [Reply] -> IORef Int -> Hooks -> Connection)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Reply] -> IO (IORef [Reply])
forall a. a -> IO (IORef a)
newIORef [] IO (IORef [Reply] -> IORef Int -> Hooks -> Connection)
-> IO (IORef [Reply]) -> IO (IORef Int -> Hooks -> Connection)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> [Reply] -> IO (IORef [Reply])
forall a. a -> IO (IORef a)
newIORef [] IO (IORef Int -> Hooks -> Connection)
-> IO (IORef Int) -> IO (Hooks -> Connection)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0 IO (Hooks -> Connection) -> IO Hooks -> IO Connection
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Hooks -> IO Hooks
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Hooks
hooks

connect :: CC.ConnectAddr -> Maybe Int -> Maybe TLS.ClientParams -> IO Connection
connect :: ConnectAddr -> Maybe Int -> Maybe ClientParams -> IO Connection
connect ConnectAddr
connectAddr Maybe Int
timeoutOpt Maybe ClientParams
mTlsParams = ConnectAddr
-> Maybe Int -> Maybe ClientParams -> Hooks -> IO Connection
connectWithHooks ConnectAddr
connectAddr Maybe Int
timeoutOpt Maybe ClientParams
mTlsParams Hooks
defaultHooks

connectWithHooks :: CC.ConnectAddr -> Maybe Int -> Maybe TLS.ClientParams -> Hooks -> IO Connection
connectWithHooks :: ConnectAddr
-> Maybe Int -> Maybe ClientParams -> Hooks -> IO Connection
connectWithHooks ConnectAddr
connectAddr Maybe Int
timeoutOpt Maybe ClientParams
mTlsParams Hooks
hooks = do
    connCtx <- ConnectAddr
-> Maybe Int -> Maybe ClientParams -> IO ConnectionContext
CC.connect ConnectAddr
connectAddr Maybe Int
timeoutOpt Maybe ClientParams
mTlsParams
    connReplies <- newIORef []
    connPending <- newIORef []
    connPendingCnt <- newIORef 0
    return Conn{..}

beginReceiving :: Connection -> IO ()
beginReceiving :: Connection -> IO ()
beginReceiving Connection
conn = do
  rs <- Connection -> IO [Reply]
connGetReplies Connection
conn
  writeIORef (connReplies conn) rs
  writeIORef (connPending conn) rs

disconnect :: Connection -> IO ()
disconnect :: Connection -> IO ()
disconnect Conn{IORef Int
IORef [Reply]
ConnectionContext
Hooks
hooks :: Connection -> Hooks
connCtx :: Connection -> ConnectionContext
connReplies :: Connection -> IORef [Reply]
connPending :: Connection -> IORef [Reply]
connPendingCnt :: Connection -> IORef Int
connCtx :: ConnectionContext
connReplies :: IORef [Reply]
connPending :: IORef [Reply]
connPendingCnt :: IORef Int
hooks :: Hooks
..} = ConnectionContext -> IO ()
CC.disconnect ConnectionContext
connCtx

-- |Write the request to the socket output buffer, without actually sending.
--  The 'Handle' is 'hFlush'ed when reading replies from the 'connCtx'.
send :: Connection -> S.ByteString -> IO ()
send :: Connection -> ByteString -> IO ()
send Conn{IORef Int
IORef [Reply]
ConnectionContext
Hooks
hooks :: Connection -> Hooks
connCtx :: Connection -> ConnectionContext
connReplies :: Connection -> IORef [Reply]
connPending :: Connection -> IORef [Reply]
connPendingCnt :: Connection -> IORef Int
connCtx :: ConnectionContext
connReplies :: IORef [Reply]
connPending :: IORef [Reply]
connPendingCnt :: IORef Int
hooks :: Hooks
..} ByteString
s = do
  Hooks -> SendHook
sendHook Hooks
hooks (ConnectionContext -> ByteString -> IO ()
CC.send ConnectionContext
connCtx) ByteString
s

  -- Signal that we expect one more reply from Redis.
  n <- IORef Int -> (Int -> (Int, Int)) -> IO Int
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef Int
connPendingCnt ((Int -> (Int, Int)) -> IO Int) -> (Int -> (Int, Int)) -> IO Int
forall a b. (a -> b) -> a -> b
$ \Int
n -> let n' :: Int
n' = Int
nInt -> Int -> Int
forall a. Num a => a -> a -> a
+Int
1 in (Int
n', Int
n')
  -- Limit the "pipeline length". This is necessary in long pipelines, to avoid
  -- thunk build-up, and thus space-leaks.
  -- TODO find smallest max pending with good-enough performance.
  when (n >= 1000) $ do
    -- Force oldest pending reply.
    r:_ <- readIORef connPending
    r `seq` return ()

-- |Take a reply-thunk from the list of future replies.
recv :: Connection -> IO Reply
recv :: Connection -> IO Reply
recv Conn{IORef Int
IORef [Reply]
ConnectionContext
Hooks
hooks :: Connection -> Hooks
connCtx :: Connection -> ConnectionContext
connReplies :: Connection -> IORef [Reply]
connPending :: Connection -> IORef [Reply]
connPendingCnt :: Connection -> IORef Int
connCtx :: ConnectionContext
connReplies :: IORef [Reply]
connPending :: IORef [Reply]
connPendingCnt :: IORef Int
hooks :: Hooks
..} =
  Hooks -> ReceiveHook
receiveHook Hooks
hooks ReceiveHook -> ReceiveHook
forall a b. (a -> b) -> a -> b
$ do
    (r:rs) <- IORef [Reply] -> IO [Reply]
forall a. IORef a -> IO a
readIORef IORef [Reply]
connReplies
    writeIORef connReplies rs
    return r

-- | Flush the socket.  Normally, the socket is flushed in 'recv' (actually 'conGetReplies'), but
-- for the multithreaded pub/sub code, the sending thread needs to explicitly flush the subscription
-- change requests.
flush :: Connection -> IO ()
flush :: Connection -> IO ()
flush Conn{IORef Int
IORef [Reply]
ConnectionContext
Hooks
hooks :: Connection -> Hooks
connCtx :: Connection -> ConnectionContext
connReplies :: Connection -> IORef [Reply]
connPending :: Connection -> IORef [Reply]
connPendingCnt :: Connection -> IORef Int
connCtx :: ConnectionContext
connReplies :: IORef [Reply]
connPending :: IORef [Reply]
connPendingCnt :: IORef Int
hooks :: Hooks
..} = ConnectionContext -> IO ()
CC.flush ConnectionContext
connCtx

-- |Send a request and receive the corresponding reply
request :: Connection -> S.ByteString -> IO Reply
request :: Connection -> ByteString -> IO Reply
request Connection
conn ByteString
req = Connection -> ByteString -> IO ()
send Connection
conn ByteString
req IO () -> ReceiveHook
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Connection -> IO Reply
recv Connection
conn

-- |A list of all future 'Reply's of the 'Connection'.
--
--  The spine of the list can be evaluated without forcing the replies.
--
--  Evaluating/forcing a 'Reply' from the list will 'unsafeInterleaveIO' the
--  reading and parsing from the 'connCtx'. To ensure correct ordering, each
--  Reply first evaluates (and thus reads from the network) the previous one.
--
--  'unsafeInterleaveIO' only evaluates it's result once, making this function
--  thread-safe. 'Handle' as implemented by GHC is also threadsafe, it is safe
--  to call 'hFlush' here. The list constructor '(:)' must be called from
--  /within/ unsafeInterleaveIO, to keep the replies in correct order.
connGetReplies :: Connection -> IO [Reply]
connGetReplies :: Connection -> IO [Reply]
connGetReplies conn :: Connection
conn@Conn{IORef Int
IORef [Reply]
ConnectionContext
Hooks
hooks :: Connection -> Hooks
connCtx :: Connection -> ConnectionContext
connReplies :: Connection -> IORef [Reply]
connPending :: Connection -> IORef [Reply]
connPendingCnt :: Connection -> IORef Int
connCtx :: ConnectionContext
connReplies :: IORef [Reply]
connPending :: IORef [Reply]
connPendingCnt :: IORef Int
hooks :: Hooks
..} = ByteString -> Reply -> IO [Reply]
go ByteString
S.empty (ByteString -> Reply
SingleLine ByteString
"previous of first")
  where
    go :: ByteString -> Reply -> IO [Reply]
go ByteString
rest Reply
previous = do
      -- lazy pattern match to actually delay the receiving
      ~(r, rest') <- IO (Reply, ByteString) -> IO (Reply, ByteString)
forall a. IO a -> IO a
unsafeInterleaveIO (IO (Reply, ByteString) -> IO (Reply, ByteString))
-> IO (Reply, ByteString) -> IO (Reply, ByteString)
forall a b. (a -> b) -> a -> b
$ do
        -- Force previous reply for correct order.
        Reply
previous Reply -> IO () -> IO ()
forall a b. a -> b -> b
`seq` () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        scanResult <- IO ByteString -> Scanner Reply -> ByteString -> IO (Result Reply)
forall (m :: * -> *) a.
Monad m =>
m ByteString -> Scanner a -> ByteString -> m (Result a)
Scanner.scanWith IO ByteString
readMore Scanner Reply
reply ByteString
rest
        case scanResult of
          Scanner.Fail{}       -> IO (Reply, ByteString)
forall a. IO a
CC.errConnClosed
          Scanner.More{}    -> String -> IO (Reply, ByteString)
forall a. HasCallStack => String -> a
error String
"Hedis: parseWith returned Partial"
          Scanner.Done ByteString
rest' Reply
r -> do
            -- r is the same as 'head' of 'connPending'. Since we just
            -- received r, we remove it from the pending list.
            IORef [Reply] -> ([Reply] -> ([Reply], ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef [Reply]
connPending (([Reply] -> ([Reply], ())) -> IO ())
-> ([Reply] -> ([Reply], ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ \case
               (Reply
_:[Reply]
rs) -> ([Reply]
rs, ())
               [] -> String -> ([Reply], ())
forall a. HasCallStack => String -> a
error String
"Hedis: impossible happened parseWith missing value that it just received"
            -- We now expect one less reply from Redis. We don't count to
            -- negative, which would otherwise occur during pubsub.
            IORef Int -> (Int -> (Int, ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef Int
connPendingCnt ((Int -> (Int, ())) -> IO ()) -> (Int -> (Int, ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Int
n -> (Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
0 (Int
nInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
1), ())
            (Reply, ByteString) -> IO (Reply, ByteString)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Reply
r, ByteString
rest')
      rs <- unsafeInterleaveIO (go rest' r)
      return (r:rs)

    readMore :: IO ByteString
readMore = IO ByteString -> IO ByteString
forall a. IO a -> IO a
CC.ioErrorToConnLost (IO ByteString -> IO ByteString) -> IO ByteString -> IO ByteString
forall a b. (a -> b) -> a -> b
$ do
      Connection -> IO ()
flush Connection
conn
      ConnectionContext -> IO ByteString
CC.recv ConnectionContext
connCtx