-- | Channel
--
-- You should not have to import this module directly; instead import
-- "Network.GRPC.Util.Session".
module Network.GRPC.Util.Session.Channel (
    -- * Main definition
    Channel(..)
  , initChannel
    -- ** Flow state
  , FlowState(..)
  , RegularFlowState(..)
  , initFlowStateRegular
    -- * Working with an open channel
  , getInboundHeaders
  , send
  , recvBoth
  , recvEither
  , RecvFinal(..)
  , RecvAfterFinal(..)
  , SendAfterFinal(..)
    -- * Closing
  , waitForOutbound
  , close
  , ChannelDiscarded(..)
  , ChannelAborted(..)
    -- * Support for half-closing
  , InboundResult
  , AllowHalfClosed(..)
  , linkOutboundToInbound
    -- * Constructing channels
  , sendMessageLoop
  , recvMessageLoop
  , outboundTrailersMaker
  ) where

import Control.Concurrent.STM
import Control.DeepSeq (NFData, force)
import Control.Exception
import Control.Monad
import Control.Monad.Catch (ExitCase(..))
import Data.Bifunctor
import Data.ByteString.Builder (Builder)
import Data.ByteString.Lazy qualified as BS.Lazy
import GHC.Stack

-- Doesn't really matter if we import from .Client or .Server
import Network.HTTP2.Client qualified as HTTP2 (
    TrailersMaker
  , NextTrailersMaker(..)
  )

import Network.GRPC.Common.StreamElem (StreamElem(..))
import Network.GRPC.Common.StreamElem qualified as StreamElem
import Network.GRPC.Spec.Util.Parser (Parser)
import Network.GRPC.Spec.Util.Parser qualified as Parser
import Network.GRPC.Util.HTTP2.Stream
import Network.GRPC.Util.RedundantConstraint
import Network.GRPC.Util.Session.API
import Network.GRPC.Util.Thread

{-------------------------------------------------------------------------------
  Definitions

  The fields of 'Channel' are its /implementation/, not its interface. It is
  kept opaque in the top-level @.Peer@ module.

  Implementation note: it is tempting to try and define 'Channel' purely in
  terms of bytestrings, and deal with serialization and deserialization to and
  from messages in a higher layer. However, this does not work:

  - For deserialization, if we make chunks of messages available in the 'TMVar',
    then if multiple threads are reading from that one 'TMVar', one thread might
    get the first chunk of a message and another thread the second.
  - Similarly, for serialization, if multiple threads are trying to write to the
    'TMVar', we might get interleaving of fragments of messages.

  Thus, to ensure thread safety, we work at the level of messages, not bytes.
-------------------------------------------------------------------------------}

-- | Bidirectional open channel on a node to a peer node
--
-- The node might be a client (and its peer a server), or the node might be
-- a server (and its peer a client); the main purpose of this abstraction
-- is precisely to abstract over that difference.
--
-- Each channel is constructed for a /single/ session (request/response).
data Channel sess = Channel {
      -- | Thread state of the thread receiving messages from the peer
      forall sess.
Channel sess -> TVar (ThreadState (FlowState (Inbound sess)))
channelInbound :: TVar (ThreadState (FlowState (Inbound sess)))

      -- | Thread state of the thread sending messages to the peer
    , forall sess.
Channel sess -> TVar (ThreadState (FlowState (Outbound sess)))
channelOutbound :: TVar (ThreadState (FlowState (Outbound sess)))

      -- | Have we sent the final message?
      --
      -- The sole purpose of this 'TVar' is catching user mistakes: if there is
      -- another 'send' after the final message, we can throw an exception,
      -- rather than the message simply being lost or blockng indefinitely.
    , forall sess. Channel sess -> TVar (Maybe CallStack)
channelSentFinal :: TVar (Maybe CallStack)

      -- | Have we received the final message?
      --
      -- This is used to improve the user experience; see 'channelSentFinal'.
      -- It is also used when checking if a call should be considered
      -- \"cancelled\"; see 'withRPC'.
    , forall sess. Channel sess -> TVar (RecvFinal (Inbound sess))
channelRecvFinal :: TVar (RecvFinal (Inbound sess))
    }

data RecvFinal flow =
    -- | We have not yet delivered the final message to the client
    RecvNotFinal

    -- | We delivered the final message, but not yet the trailers
  | RecvWithoutTrailers (Trailers flow)

    -- | We delivered the final message and the trailers
  | RecvFinal CallStack

-- | Data flow state
data FlowState flow =
    FlowStateRegular (RegularFlowState flow)
  | FlowStateNoMessages (NoMessages flow)

-- | Regular (streaming) flow state
data RegularFlowState flow = RegularFlowState {
      -- | Headers
      --
      -- On the client side, the outbound headers are specified when the request
      -- is made ('callRequestMetadata'), and the inbound headers are recorded
      -- once the responds starts to come in; clients can block-and-wait for
      -- these headers ('getInboundHeaders').
      --
      -- On the server side, the inbound headers are recorded when the request
      -- comes in, and the outbound headers are specified
      -- ('setResponseMetadata') before the response is initiated
      -- ('initiateResponse'/'sendTrailersOnly').
      forall {k} (flow :: k). RegularFlowState flow -> Headers flow
flowHeaders :: Headers flow

      -- | Messages
      --
      -- This TMVar is written to for incoming messages ('recvMessageLoop') and
      -- read from for outgoing messages ('sendMessageLoop'). It acts as a
      -- one-place buffer, providing backpressure in both directions.
    , forall {k} (flow :: k).
RegularFlowState flow
-> TMVar (StreamElem (Trailers flow) (Message flow))
flowMsg :: TMVar (StreamElem (Trailers flow) (Message flow))

      -- | Trailers
      --
      -- Unlike 'flowMsg', which is /written/ to in 'recvMessageLoop' and /read/
      -- from in 'sendMessageLoop', both loops /set/ 'flowTerminated', once,
      -- just before they terminate.
      --
      -- * For 'sendMessageLoop', this means that the last message has been
      --   written (that is, the last call to 'writeChunk' has happened).
      --   This has two consequences:
      --
      --   1. @http2@ can now construct the trailers ('outboundTrailersMaker')
      --   2. Higher layers can wait on 'flowTerminated' to be /sure/ that the
      --      last message has been written.
      --
      -- * For 'recvMessageLoop', this means that the trailers have been
      --   received from the peer. Higher layers can use this to check for, or
      --   block-and-wait, to receive those trailers.
      --
      -- == Relation to 'channelSentFinal'/'channelRecvFinal'
      --
      -- 'flowTerminated' is set at different times than 'channelSentFinal' and
      -- 'channelRecvFinal' are:
      --
      -- * 'channelSentFinal' is set on the last call to 'send', but /before/
      --   the message is processed by 'sendMessageLoop'.
      -- * 'channelRecvFinal', dually, is set on the last call to 'recv, which
      --   must (necessarily) happen /before/ that message is actually made
      --   available by 'recvMessageLoop'.
      --
      -- /Their/ sole purpose is to catch user errors, not capture data flow.
    , forall {k} (flow :: k).
RegularFlowState flow -> TMVar (Trailers flow)
flowTerminated :: TMVar (Trailers flow)
    }

-- | 'Show' instance is useful in combination with @stm-debug@ only
deriving instance (
    Show (Headers flow)
  , Show (TMVar (StreamElem (Trailers flow) (Message flow)))
  , Show (TMVar (Trailers flow))
  ) => Show (RegularFlowState flow)

{-------------------------------------------------------------------------------
  Initialization
-------------------------------------------------------------------------------}

initChannel :: HasCallStack => IO (Channel sess)
initChannel :: forall sess. HasCallStack => IO (Channel sess)
initChannel = do
    channelInbound   <- IO (TVar (ThreadState (FlowState (Inbound sess))))
forall a. HasCallStack => IO (TVar (ThreadState a))
newThreadState
    channelOutbound  <- newThreadState
    channelSentFinal <- newTVarIO Nothing
    channelRecvFinal <- newTVarIO RecvNotFinal
    return Channel{
        channelInbound
      , channelOutbound
      , channelSentFinal
      , channelRecvFinal
      }

initFlowStateRegular :: Headers flow -> IO (RegularFlowState flow)
initFlowStateRegular :: forall {k} (flow :: k). Headers flow -> IO (RegularFlowState flow)
initFlowStateRegular Headers flow
flowHeaders = do
   flowMsg        <- IO (TMVar (StreamElem (Trailers flow) (Message flow)))
forall a. IO (TMVar a)
newEmptyTMVarIO
   flowTerminated <- newEmptyTMVarIO
   return RegularFlowState {
       flowHeaders
     , flowMsg
     , flowTerminated
     }

{-------------------------------------------------------------------------------
  Working with an open channel
-------------------------------------------------------------------------------}

-- | The inbound headers
--
-- Will block if the inbound headers have not yet been received.
getInboundHeaders ::
     Channel sess
  -> IO (Either (NoMessages (Inbound sess)) (Headers (Inbound sess)))
getInboundHeaders :: forall sess.
Channel sess
-> IO (Either (NoMessages (Inbound sess)) (Headers (Inbound sess)))
getInboundHeaders Channel{TVar (ThreadState (FlowState (Inbound sess)))
channelInbound :: forall sess.
Channel sess -> TVar (ThreadState (FlowState (Inbound sess)))
channelInbound :: TVar (ThreadState (FlowState (Inbound sess)))
channelInbound} =
    TVar (ThreadState (FlowState (Inbound sess)))
-> (FlowState (Inbound sess)
    -> STM
         (Either (NoMessages (Inbound sess)) (Headers (Inbound sess))))
-> IO (Either (NoMessages (Inbound sess)) (Headers (Inbound sess)))
forall a b. TVar (ThreadState a) -> (a -> STM b) -> IO b
withThreadInterface TVar (ThreadState (FlowState (Inbound sess)))
channelInbound (Either (NoMessages (Inbound sess)) (Headers (Inbound sess))
-> STM
     (Either (NoMessages (Inbound sess)) (Headers (Inbound sess)))
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either (NoMessages (Inbound sess)) (Headers (Inbound sess))
 -> STM
      (Either (NoMessages (Inbound sess)) (Headers (Inbound sess))))
-> (FlowState (Inbound sess)
    -> Either (NoMessages (Inbound sess)) (Headers (Inbound sess)))
-> FlowState (Inbound sess)
-> STM
     (Either (NoMessages (Inbound sess)) (Headers (Inbound sess)))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FlowState (Inbound sess)
-> Either (NoMessages (Inbound sess)) (Headers (Inbound sess))
forall {k} (flow :: k).
FlowState flow -> Either (NoMessages flow) (Headers flow)
aux)
  where
    aux :: forall flow.
         FlowState flow
      -> Either (NoMessages flow) (Headers flow)
    aux :: forall {k} (flow :: k).
FlowState flow -> Either (NoMessages flow) (Headers flow)
aux (FlowStateRegular    RegularFlowState flow
regular)  = Headers flow -> Either (NoMessages flow) (Headers flow)
forall a b. b -> Either a b
Right (Headers flow -> Either (NoMessages flow) (Headers flow))
-> Headers flow -> Either (NoMessages flow) (Headers flow)
forall a b. (a -> b) -> a -> b
$ RegularFlowState flow -> Headers flow
forall {k} (flow :: k). RegularFlowState flow -> Headers flow
flowHeaders RegularFlowState flow
regular
    aux (FlowStateNoMessages NoMessages flow
trailers) = NoMessages flow -> Either (NoMessages flow) (Headers flow)
forall a b. a -> Either a b
Left NoMessages flow
trailers

-- | Send a message to the node's peer
--
-- It is a bug to call 'send' again after the final message (that is, a message
-- which 'StreamElem.whenDefinitelyFinal' considers to be final). Doing so will
-- result in a 'SendAfterFinal' exception.
send :: forall sess.
     (HasCallStack, NFData (Message (Outbound sess)))
  => Channel sess
  -> StreamElem (Trailers (Outbound sess)) (Message (Outbound sess))
  -> IO ()
send :: forall sess.
(HasCallStack, NFData (Message (Outbound sess))) =>
Channel sess
-> StreamElem (Trailers (Outbound sess)) (Message (Outbound sess))
-> IO ()
send Channel{TVar (ThreadState (FlowState (Outbound sess)))
channelOutbound :: forall sess.
Channel sess -> TVar (ThreadState (FlowState (Outbound sess)))
channelOutbound :: TVar (ThreadState (FlowState (Outbound sess)))
channelOutbound, TVar (Maybe CallStack)
channelSentFinal :: forall sess. Channel sess -> TVar (Maybe CallStack)
channelSentFinal :: TVar (Maybe CallStack)
channelSentFinal} = \StreamElem (Trailers (Outbound sess)) (Message (Outbound sess))
msg -> do
    msg' <- StreamElem (Trailers (Outbound sess)) (Message (Outbound sess))
-> IO
     (StreamElem (Trailers (Outbound sess)) (Message (Outbound sess)))
forall a. a -> IO a
evaluate (StreamElem (Trailers (Outbound sess)) (Message (Outbound sess))
 -> IO
      (StreamElem (Trailers (Outbound sess)) (Message (Outbound sess))))
-> StreamElem (Trailers (Outbound sess)) (Message (Outbound sess))
-> IO
     (StreamElem (Trailers (Outbound sess)) (Message (Outbound sess)))
forall a b. (a -> b) -> a -> b
$ Message (Outbound sess) -> Message (Outbound sess)
forall a. NFData a => a -> a
force (Message (Outbound sess) -> Message (Outbound sess))
-> StreamElem (Trailers (Outbound sess)) (Message (Outbound sess))
-> StreamElem (Trailers (Outbound sess)) (Message (Outbound sess))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StreamElem (Trailers (Outbound sess)) (Message (Outbound sess))
msg
    withThreadInterface channelOutbound $ aux msg'
  where
    aux ::
         StreamElem (Trailers (Outbound sess)) (Message (Outbound sess))
      -> FlowState (Outbound sess)
      -> STM ()
    aux :: StreamElem (Trailers (Outbound sess)) (Message (Outbound sess))
-> FlowState (Outbound sess) -> STM ()
aux StreamElem (Trailers (Outbound sess)) (Message (Outbound sess))
msg FlowState (Outbound sess)
st = do
        -- By checking that we haven't sent the final message yet, we know that
        -- this call to 'putMVar' will not block indefinitely: the thread that
        -- sends messages to the peer will get to it eventually (unless it dies,
        -- in which case the thread status will change and the call to
        -- 'getThreadInterface' will be retried).
        sentFinal <- TVar (Maybe CallStack) -> STM (Maybe CallStack)
forall a. TVar a -> STM a
readTVar TVar (Maybe CallStack)
channelSentFinal
        case sentFinal of
          Just CallStack
cs -> SendAfterFinal -> STM ()
forall e a. Exception e => e -> STM a
throwSTM (SendAfterFinal -> STM ()) -> SendAfterFinal -> STM ()
forall a b. (a -> b) -> a -> b
$ CallStack -> SendAfterFinal
SendAfterFinal CallStack
cs
          Maybe CallStack
Nothing -> () -> STM ()
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        case st of
          FlowStateRegular RegularFlowState (Outbound sess)
regular -> do
            StreamElem (Trailers (Outbound sess)) (Message (Outbound sess))
-> (Trailers (Outbound sess) -> STM ()) -> STM ()
forall (m :: * -> *) b a.
Applicative m =>
StreamElem b a -> (b -> m ()) -> m ()
StreamElem.whenDefinitelyFinal StreamElem (Trailers (Outbound sess)) (Message (Outbound sess))
msg ((Trailers (Outbound sess) -> STM ()) -> STM ())
-> (Trailers (Outbound sess) -> STM ()) -> STM ()
forall a b. (a -> b) -> a -> b
$ \Trailers (Outbound sess)
_trailers ->
              TVar (Maybe CallStack) -> Maybe CallStack -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe CallStack)
channelSentFinal (Maybe CallStack -> STM ()) -> Maybe CallStack -> STM ()
forall a b. (a -> b) -> a -> b
$ CallStack -> Maybe CallStack
forall a. a -> Maybe a
Just CallStack
HasCallStack => CallStack
callStack

            TMVar
  (StreamElem (Trailers (Outbound sess)) (Message (Outbound sess)))
-> StreamElem (Trailers (Outbound sess)) (Message (Outbound sess))
-> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar (RegularFlowState (Outbound sess)
-> TMVar
     (StreamElem (Trailers (Outbound sess)) (Message (Outbound sess)))
forall {k} (flow :: k).
RegularFlowState flow
-> TMVar (StreamElem (Trailers flow) (Message flow))
flowMsg RegularFlowState (Outbound sess)
regular) StreamElem (Trailers (Outbound sess)) (Message (Outbound sess))
msg
          FlowStateNoMessages NoMessages (Outbound sess)
_ ->
            -- For outgoing messages, the caller decides to use Trailers-Only,
            -- so if they then subsequently call 'send', we throw an exception.
            -- This is different for /inbound/ messages; see 'recv', below.
            SendAfterFinal -> STM ()
forall e a. Exception e => e -> STM a
throwSTM (SendAfterFinal -> STM ()) -> SendAfterFinal -> STM ()
forall a b. (a -> b) -> a -> b
$ SendAfterFinal
SendButTrailersOnly

-- | Receive a message from the node's peer
--
-- If the sender indicates that the message is final /when/ they send it, by
-- sending the HTTP trailers in the same frame, then we will return the message
-- and the trailers together. It is a bug to call 'recvBoth' again after this;
-- doing so will result in a 'RecvAfterFinal' exception.
recvBoth :: forall sess.
     HasCallStack
  => Channel sess
  -> IO ( Either
            (NoMessages (Inbound sess))
            (StreamElem (Trailers (Inbound sess)) (Message (Inbound sess)))
        )
recvBoth :: forall sess.
HasCallStack =>
Channel sess
-> IO
     (Either
        (NoMessages (Inbound sess))
        (StreamElem (Trailers (Inbound sess)) (Message (Inbound sess))))
recvBoth =
    (Message (Inbound sess)
 -> StreamElem (Trailers (Inbound sess)) (Message (Inbound sess)))
-> (Trailers (Inbound sess)
    -> StreamElem (Trailers (Inbound sess)) (Message (Inbound sess)))
-> ((Message (Inbound sess), Trailers (Inbound sess))
    -> (StreamElem (Trailers (Inbound sess)) (Message (Inbound sess)),
        Maybe (Trailers (Inbound sess))))
-> Channel sess
-> IO
     (Either
        (NoMessages (Inbound sess))
        (StreamElem (Trailers (Inbound sess)) (Message (Inbound sess))))
forall sess b.
HasCallStack =>
(Message (Inbound sess) -> b)
-> (Trailers (Inbound sess) -> b)
-> ((Message (Inbound sess), Trailers (Inbound sess))
    -> (b, Maybe (Trailers (Inbound sess))))
-> Channel sess
-> IO (Either (NoMessages (Inbound sess)) b)
recv'
      Message (Inbound sess)
-> StreamElem (Trailers (Inbound sess)) (Message (Inbound sess))
forall b a. a -> StreamElem b a
StreamElem
      Trailers (Inbound sess)
-> StreamElem (Trailers (Inbound sess)) (Message (Inbound sess))
forall b a. b -> StreamElem b a
NoMoreElems
      ((,Maybe (Trailers (Inbound sess))
forall a. Maybe a
Nothing) (StreamElem (Trailers (Inbound sess)) (Message (Inbound sess))
 -> (StreamElem (Trailers (Inbound sess)) (Message (Inbound sess)),
     Maybe (Trailers (Inbound sess))))
-> ((Message (Inbound sess), Trailers (Inbound sess))
    -> StreamElem (Trailers (Inbound sess)) (Message (Inbound sess)))
-> (Message (Inbound sess), Trailers (Inbound sess))
-> (StreamElem (Trailers (Inbound sess)) (Message (Inbound sess)),
    Maybe (Trailers (Inbound sess)))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Message (Inbound sess)
 -> Trailers (Inbound sess)
 -> StreamElem (Trailers (Inbound sess)) (Message (Inbound sess)))
-> (Message (Inbound sess), Trailers (Inbound sess))
-> StreamElem (Trailers (Inbound sess)) (Message (Inbound sess))
forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry Message (Inbound sess)
-> Trailers (Inbound sess)
-> StreamElem (Trailers (Inbound sess)) (Message (Inbound sess))
forall b a. a -> b -> StreamElem b a
FinalElem)

-- | Variant on 'recvBoth' where trailers are always returned separately
--
-- Unlike in 'recvBoth', even if the sender indicates that the final message is
-- final when they send it, we will store these trailers internally and return
-- only that final message. The trailers are then returned on the /next/ call to
-- 'recvEither'. Call 'recvEither' again /after/ receiving the trailers is a
-- bug; doing so will result in a 'RecvAfterFinal' exception.
recvEither ::
     HasCallStack
  => Channel sess
  -> IO ( Either
            (NoMessages (Inbound sess))
            (Either (Trailers (Inbound sess)) (Message (Inbound sess)))
        )
recvEither :: forall sess.
HasCallStack =>
Channel sess
-> IO
     (Either
        (NoMessages (Inbound sess))
        (Either (Trailers (Inbound sess)) (Message (Inbound sess))))
recvEither =
    (Message (Inbound sess)
 -> Either (Trailers (Inbound sess)) (Message (Inbound sess)))
-> (Trailers (Inbound sess)
    -> Either (Trailers (Inbound sess)) (Message (Inbound sess)))
-> ((Message (Inbound sess), Trailers (Inbound sess))
    -> (Either (Trailers (Inbound sess)) (Message (Inbound sess)),
        Maybe (Trailers (Inbound sess))))
-> Channel sess
-> IO
     (Either
        (NoMessages (Inbound sess))
        (Either (Trailers (Inbound sess)) (Message (Inbound sess))))
forall sess b.
HasCallStack =>
(Message (Inbound sess) -> b)
-> (Trailers (Inbound sess) -> b)
-> ((Message (Inbound sess), Trailers (Inbound sess))
    -> (b, Maybe (Trailers (Inbound sess))))
-> Channel sess
-> IO (Either (NoMessages (Inbound sess)) b)
recv'
      Message (Inbound sess)
-> Either (Trailers (Inbound sess)) (Message (Inbound sess))
forall a b. b -> Either a b
Right
      Trailers (Inbound sess)
-> Either (Trailers (Inbound sess)) (Message (Inbound sess))
forall a b. a -> Either a b
Left
      ((Message (Inbound sess)
 -> Either (Trailers (Inbound sess)) (Message (Inbound sess)))
-> (Trailers (Inbound sess) -> Maybe (Trailers (Inbound sess)))
-> (Message (Inbound sess), Trailers (Inbound sess))
-> (Either (Trailers (Inbound sess)) (Message (Inbound sess)),
    Maybe (Trailers (Inbound sess)))
forall a b c d. (a -> b) -> (c -> d) -> (a, c) -> (b, d)
forall (p :: * -> * -> *) a b c d.
Bifunctor p =>
(a -> b) -> (c -> d) -> p a c -> p b d
bimap Message (Inbound sess)
-> Either (Trailers (Inbound sess)) (Message (Inbound sess))
forall a b. b -> Either a b
Right Trailers (Inbound sess) -> Maybe (Trailers (Inbound sess))
forall a. a -> Maybe a
Just)

-- | Internal generalization of 'recvBoth' and 'recvEither'
recv' :: forall sess b.
     HasCallStack
  => (Message    (Inbound sess) -> b)  -- ^ Message without trailers
  -> (Trailers   (Inbound sess) -> b)  -- ^ Trailers without (final) message
  -> (    (Message (Inbound sess), Trailers (Inbound sess))
       -> (b, Maybe (Trailers (Inbound sess)))
     )
     -- ^ Message with trailers
     --
     -- In addition to the result, should also return the trailers to keep for
     -- the next call to 'recv'' (if any).
  -> Channel sess
  -> IO (Either (NoMessages (Inbound sess)) b)
recv' :: forall sess b.
HasCallStack =>
(Message (Inbound sess) -> b)
-> (Trailers (Inbound sess) -> b)
-> ((Message (Inbound sess), Trailers (Inbound sess))
    -> (b, Maybe (Trailers (Inbound sess))))
-> Channel sess
-> IO (Either (NoMessages (Inbound sess)) b)
recv' Message (Inbound sess) -> b
messageWithoutTrailers
      Trailers (Inbound sess) -> b
trailersWithoutMessage
      (Message (Inbound sess), Trailers (Inbound sess))
-> (b, Maybe (Trailers (Inbound sess)))
messageWithTrailers
      Channel{TVar (ThreadState (FlowState (Inbound sess)))
channelInbound :: forall sess.
Channel sess -> TVar (ThreadState (FlowState (Inbound sess)))
channelInbound :: TVar (ThreadState (FlowState (Inbound sess)))
channelInbound, TVar (RecvFinal (Inbound sess))
channelRecvFinal :: forall sess. Channel sess -> TVar (RecvFinal (Inbound sess))
channelRecvFinal :: TVar (RecvFinal (Inbound sess))
channelRecvFinal} =
    TVar (ThreadState (FlowState (Inbound sess)))
-> (FlowState (Inbound sess)
    -> STM (Either (NoMessages (Inbound sess)) b))
-> IO (Either (NoMessages (Inbound sess)) b)
forall a b. TVar (ThreadState a) -> (a -> STM b) -> IO b
withThreadInterface TVar (ThreadState (FlowState (Inbound sess)))
channelInbound FlowState (Inbound sess)
-> STM (Either (NoMessages (Inbound sess)) b)
aux
  where
    aux ::
         FlowState (Inbound sess)
      -> STM (Either (NoMessages (Inbound sess)) b)
    aux :: FlowState (Inbound sess)
-> STM (Either (NoMessages (Inbound sess)) b)
aux FlowState (Inbound sess)
st = do
        -- By checking that we haven't received the final message yet, we know
        -- that this call to 'takeTMVar' will not block indefinitely: the thread
        -- that receives messages from the peer will get to it eventually
        -- (unless it dies, in which case the thread status will change and the
        -- call to 'getThreadInterface' will be retried).
        readFinal <- TVar (RecvFinal (Inbound sess)) -> STM (RecvFinal (Inbound sess))
forall a. TVar a -> STM a
readTVar TVar (RecvFinal (Inbound sess))
channelRecvFinal
        case readFinal of
          RecvFinal (Inbound sess)
RecvNotFinal ->
            case FlowState (Inbound sess)
st of
              FlowStateRegular RegularFlowState (Inbound sess)
regular -> b -> Either (NoMessages (Inbound sess)) b
forall a b. b -> Either a b
Right (b -> Either (NoMessages (Inbound sess)) b)
-> STM b -> STM (Either (NoMessages (Inbound sess)) b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> do
                streamElem <- TMVar
  (StreamElem (Trailers (Inbound sess)) (Message (Inbound sess)))
-> STM
     (StreamElem (Trailers (Inbound sess)) (Message (Inbound sess)))
forall a. TMVar a -> STM a
takeTMVar (RegularFlowState (Inbound sess)
-> TMVar
     (StreamElem (Trailers (Inbound sess)) (Message (Inbound sess)))
forall {k} (flow :: k).
RegularFlowState flow
-> TMVar (StreamElem (Trailers flow) (Message flow))
flowMsg RegularFlowState (Inbound sess)
regular)
                -- We update 'channelRecvFinal' in the same tx as the read, to
                -- atomically change "there is a value" to "all values read".
                case streamElem of
                  StreamElem Message (Inbound sess)
msg ->
                    b -> STM b
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> STM b) -> b -> STM b
forall a b. (a -> b) -> a -> b
$ Message (Inbound sess) -> b
messageWithoutTrailers Message (Inbound sess)
msg
                  FinalElem Message (Inbound sess)
msg Trailers (Inbound sess)
trailers -> do
                    let (b
b, Maybe (Trailers (Inbound sess))
mTrailers) = (Message (Inbound sess), Trailers (Inbound sess))
-> (b, Maybe (Trailers (Inbound sess)))
messageWithTrailers (Message (Inbound sess)
msg, Trailers (Inbound sess)
trailers)
                    TVar (RecvFinal (Inbound sess))
-> RecvFinal (Inbound sess) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (RecvFinal (Inbound sess))
channelRecvFinal (RecvFinal (Inbound sess) -> STM ())
-> RecvFinal (Inbound sess) -> STM ()
forall a b. (a -> b) -> a -> b
$
                      RecvFinal (Inbound sess)
-> (Trailers (Inbound sess) -> RecvFinal (Inbound sess))
-> Maybe (Trailers (Inbound sess))
-> RecvFinal (Inbound sess)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (CallStack -> RecvFinal (Inbound sess)
forall {k} (flow :: k). CallStack -> RecvFinal flow
RecvFinal CallStack
HasCallStack => CallStack
callStack) Trailers (Inbound sess) -> RecvFinal (Inbound sess)
forall {k} (flow :: k). Trailers flow -> RecvFinal flow
RecvWithoutTrailers Maybe (Trailers (Inbound sess))
mTrailers
                    b -> STM b
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> STM b) -> b -> STM b
forall a b. (a -> b) -> a -> b
$ b
b
                  NoMoreElems Trailers (Inbound sess)
trailers -> do
                    TVar (RecvFinal (Inbound sess))
-> RecvFinal (Inbound sess) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (RecvFinal (Inbound sess))
channelRecvFinal (RecvFinal (Inbound sess) -> STM ())
-> RecvFinal (Inbound sess) -> STM ()
forall a b. (a -> b) -> a -> b
$ CallStack -> RecvFinal (Inbound sess)
forall {k} (flow :: k). CallStack -> RecvFinal flow
RecvFinal CallStack
HasCallStack => CallStack
callStack
                    b -> STM b
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> STM b) -> b -> STM b
forall a b. (a -> b) -> a -> b
$ Trailers (Inbound sess) -> b
trailersWithoutMessage Trailers (Inbound sess)
trailers
              FlowStateNoMessages NoMessages (Inbound sess)
trailers -> do
                TVar (RecvFinal (Inbound sess))
-> RecvFinal (Inbound sess) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (RecvFinal (Inbound sess))
channelRecvFinal (RecvFinal (Inbound sess) -> STM ())
-> RecvFinal (Inbound sess) -> STM ()
forall a b. (a -> b) -> a -> b
$ CallStack -> RecvFinal (Inbound sess)
forall {k} (flow :: k). CallStack -> RecvFinal flow
RecvFinal CallStack
HasCallStack => CallStack
callStack
                Either (NoMessages (Inbound sess)) b
-> STM (Either (NoMessages (Inbound sess)) b)
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either (NoMessages (Inbound sess)) b
 -> STM (Either (NoMessages (Inbound sess)) b))
-> Either (NoMessages (Inbound sess)) b
-> STM (Either (NoMessages (Inbound sess)) b)
forall a b. (a -> b) -> a -> b
$ NoMessages (Inbound sess) -> Either (NoMessages (Inbound sess)) b
forall a b. a -> Either a b
Left NoMessages (Inbound sess)
trailers
          RecvWithoutTrailers Trailers (Inbound sess)
trailers -> do
            TVar (RecvFinal (Inbound sess))
-> RecvFinal (Inbound sess) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (RecvFinal (Inbound sess))
channelRecvFinal (RecvFinal (Inbound sess) -> STM ())
-> RecvFinal (Inbound sess) -> STM ()
forall a b. (a -> b) -> a -> b
$ CallStack -> RecvFinal (Inbound sess)
forall {k} (flow :: k). CallStack -> RecvFinal flow
RecvFinal CallStack
HasCallStack => CallStack
callStack
            Either (NoMessages (Inbound sess)) b
-> STM (Either (NoMessages (Inbound sess)) b)
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either (NoMessages (Inbound sess)) b
 -> STM (Either (NoMessages (Inbound sess)) b))
-> Either (NoMessages (Inbound sess)) b
-> STM (Either (NoMessages (Inbound sess)) b)
forall a b. (a -> b) -> a -> b
$ b -> Either (NoMessages (Inbound sess)) b
forall a b. b -> Either a b
Right (b -> Either (NoMessages (Inbound sess)) b)
-> b -> Either (NoMessages (Inbound sess)) b
forall a b. (a -> b) -> a -> b
$ Trailers (Inbound sess) -> b
trailersWithoutMessage Trailers (Inbound sess)
trailers
          RecvFinal CallStack
cs ->
            RecvAfterFinal -> STM (Either (NoMessages (Inbound sess)) b)
forall e a. Exception e => e -> STM a
throwSTM (RecvAfterFinal -> STM (Either (NoMessages (Inbound sess)) b))
-> RecvAfterFinal -> STM (Either (NoMessages (Inbound sess)) b)
forall a b. (a -> b) -> a -> b
$ CallStack -> RecvAfterFinal
RecvAfterFinal CallStack
cs

-- | Thrown by 'send'
--
-- The 'CallStack' is the callstack of the final call to 'send'.
--
-- See 'send' for additional discussion.
data SendAfterFinal =
    -- | Call to 'send' after the final message was sent
    SendAfterFinal CallStack

    -- | Call to 'send', but we are in the Trailers-Only case
  | SendButTrailersOnly
  deriving stock (Int -> SendAfterFinal -> ShowS
[SendAfterFinal] -> ShowS
SendAfterFinal -> String
(Int -> SendAfterFinal -> ShowS)
-> (SendAfterFinal -> String)
-> ([SendAfterFinal] -> ShowS)
-> Show SendAfterFinal
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SendAfterFinal -> ShowS
showsPrec :: Int -> SendAfterFinal -> ShowS
$cshow :: SendAfterFinal -> String
show :: SendAfterFinal -> String
$cshowList :: [SendAfterFinal] -> ShowS
showList :: [SendAfterFinal] -> ShowS
Show)
  deriving anyclass (Show SendAfterFinal
Typeable SendAfterFinal
(Typeable SendAfterFinal, Show SendAfterFinal) =>
(SendAfterFinal -> SomeException)
-> (SomeException -> Maybe SendAfterFinal)
-> (SendAfterFinal -> String)
-> (SendAfterFinal -> Bool)
-> Exception SendAfterFinal
SomeException -> Maybe SendAfterFinal
SendAfterFinal -> Bool
SendAfterFinal -> String
SendAfterFinal -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e)
-> (e -> String)
-> (e -> Bool)
-> Exception e
$ctoException :: SendAfterFinal -> SomeException
toException :: SendAfterFinal -> SomeException
$cfromException :: SomeException -> Maybe SendAfterFinal
fromException :: SomeException -> Maybe SendAfterFinal
$cdisplayException :: SendAfterFinal -> String
displayException :: SendAfterFinal -> String
$cbacktraceDesired :: SendAfterFinal -> Bool
backtraceDesired :: SendAfterFinal -> Bool
Exception)

-- | Thrown by 'recv'
--
-- The 'CallStack' is the callstack of the final call to 'recv'.
--
-- See 'recv' for additional discussion.
data RecvAfterFinal =
     -- | Call to 'recv' after the final message was already received
     RecvAfterFinal CallStack
  deriving stock (Int -> RecvAfterFinal -> ShowS
[RecvAfterFinal] -> ShowS
RecvAfterFinal -> String
(Int -> RecvAfterFinal -> ShowS)
-> (RecvAfterFinal -> String)
-> ([RecvAfterFinal] -> ShowS)
-> Show RecvAfterFinal
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> RecvAfterFinal -> ShowS
showsPrec :: Int -> RecvAfterFinal -> ShowS
$cshow :: RecvAfterFinal -> String
show :: RecvAfterFinal -> String
$cshowList :: [RecvAfterFinal] -> ShowS
showList :: [RecvAfterFinal] -> ShowS
Show)
  deriving anyclass (Show RecvAfterFinal
Typeable RecvAfterFinal
(Typeable RecvAfterFinal, Show RecvAfterFinal) =>
(RecvAfterFinal -> SomeException)
-> (SomeException -> Maybe RecvAfterFinal)
-> (RecvAfterFinal -> String)
-> (RecvAfterFinal -> Bool)
-> Exception RecvAfterFinal
SomeException -> Maybe RecvAfterFinal
RecvAfterFinal -> Bool
RecvAfterFinal -> String
RecvAfterFinal -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e)
-> (e -> String)
-> (e -> Bool)
-> Exception e
$ctoException :: RecvAfterFinal -> SomeException
toException :: RecvAfterFinal -> SomeException
$cfromException :: SomeException -> Maybe RecvAfterFinal
fromException :: SomeException -> Maybe RecvAfterFinal
$cdisplayException :: RecvAfterFinal -> String
displayException :: RecvAfterFinal -> String
$cbacktraceDesired :: RecvAfterFinal -> Bool
backtraceDesired :: RecvAfterFinal -> Bool
Exception)

{-------------------------------------------------------------------------------
  Closing
-------------------------------------------------------------------------------}

-- | Wait for the outbound thread to terminate
--
-- See 'close' for discussion.
waitForOutbound :: Channel sess -> IO ()
waitForOutbound :: forall sess. Channel sess -> IO ()
waitForOutbound Channel{TVar (ThreadState (FlowState (Outbound sess)))
channelOutbound :: forall sess.
Channel sess -> TVar (ThreadState (FlowState (Outbound sess)))
channelOutbound :: TVar (ThreadState (FlowState (Outbound sess)))
channelOutbound} = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
    TVar (ThreadState (FlowState (Outbound sess))) -> STM ()
forall a. TVar (ThreadState a) -> STM ()
waitForNormalThreadTermination TVar (ThreadState (FlowState (Outbound sess)))
channelOutbound

-- | Close the channel
--
-- Before a channel can be closed, you should 'send' the final outbound message
-- and then 'waitForOutbound' until all outbound messages have been processed.
-- Not doing so is considered a bug (it is not possible to do this implicitly,
-- because the final call to 'send' involves a choice of trailers, and calling
-- 'waitForOutbound' /without/ a final close to 'send' will result in deadlock).
-- Typically code will also process all /incoming/ messages, but doing so is of
-- course not mandatory.
--
-- Calling 'close' will kill the outbound thread ('sendMessageLoop'), /if/ it is
-- still running. If the thread was terminated with an exception, this could
-- mean one of two things:
--
-- 1. The connection to the peer was lost
-- 2. Proper procedure for outbound messages was not followed (see above)
--
-- In the case of (2) this is bug in the caller, and so 'close' will return an
-- exception. In the case of (1), however, very likely an exception will
-- /already/ have been thrown when a communication attempt was made, and 'close'
-- will return 'Nothing'. This matches the design philosophy in @grapesy@ that
-- exceptions are thrown \"lazily\" rather than \"strictly\".
close ::
     HasCallStack
  => Channel sess
  -> ExitCase a    -- ^ The reason why the channel is being closed
  -> IO (Maybe SomeException)
close :: forall sess a.
HasCallStack =>
Channel sess -> ExitCase a -> IO (Maybe SomeException)
close Channel{TVar (ThreadState (FlowState (Outbound sess)))
channelOutbound :: forall sess.
Channel sess -> TVar (ThreadState (FlowState (Outbound sess)))
channelOutbound :: TVar (ThreadState (FlowState (Outbound sess)))
channelOutbound} ExitCase a
reason = do
    -- We leave the inbound thread running. Although the channel is closed,
    -- there might still be unprocessed messages in the queue. The inbound
    -- thread will terminate once it reaches the end of the queue.
    outbound <- TVar (ThreadState (FlowState (Outbound sess)))
-> SomeException -> IO (CancelResult (FlowState (Outbound sess)))
forall a.
TVar (ThreadState a) -> SomeException -> IO (CancelResult a)
cancelThread TVar (ThreadState (FlowState (Outbound sess)))
channelOutbound SomeException
channelClosed
    case outbound of
      AlreadyTerminated FlowState (Outbound sess)
_ ->
        Maybe SomeException -> IO (Maybe SomeException)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe SomeException -> IO (Maybe SomeException))
-> Maybe SomeException -> IO (Maybe SomeException)
forall a b. (a -> b) -> a -> b
$ Maybe SomeException
forall a. Maybe a
Nothing
      AlreadyAborted SomeException
_err ->
        -- Connection to the peer was lost prior to closing
        Maybe SomeException -> IO (Maybe SomeException)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe SomeException -> IO (Maybe SomeException))
-> Maybe SomeException -> IO (Maybe SomeException)
forall a b. (a -> b) -> a -> b
$ Maybe SomeException
forall a. Maybe a
Nothing
      CancelResult (FlowState (Outbound sess))
Cancelled ->
        -- Proper procedure for outbound messages was not followed
        Maybe SomeException -> IO (Maybe SomeException)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe SomeException -> IO (Maybe SomeException))
-> Maybe SomeException -> IO (Maybe SomeException)
forall a b. (a -> b) -> a -> b
$ SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just SomeException
channelClosed
  where
    channelClosed :: SomeException
    channelClosed :: SomeException
channelClosed =
        case ExitCase a
reason of
          ExitCaseSuccess a
_   -> ChannelDiscarded -> SomeException
forall e. Exception e => e -> SomeException
toException (ChannelDiscarded -> SomeException)
-> ChannelDiscarded -> SomeException
forall a b. (a -> b) -> a -> b
$ CallStack -> ChannelDiscarded
ChannelDiscarded CallStack
HasCallStack => CallStack
callStack
          ExitCase a
ExitCaseAbort       -> ChannelAborted -> SomeException
forall e. Exception e => e -> SomeException
toException (ChannelAborted -> SomeException)
-> ChannelAborted -> SomeException
forall a b. (a -> b) -> a -> b
$ CallStack -> ChannelAborted
ChannelAborted   CallStack
HasCallStack => CallStack
callStack
          ExitCaseException SomeException
e -> SomeException
e

-- | Channel was closed because it was discarded
--
-- This typically corresponds to leaving the scope of 'runHandler' or
-- 'withRPC' (without throwing an exception).
data ChannelDiscarded = ChannelDiscarded CallStack
  deriving stock (Int -> ChannelDiscarded -> ShowS
[ChannelDiscarded] -> ShowS
ChannelDiscarded -> String
(Int -> ChannelDiscarded -> ShowS)
-> (ChannelDiscarded -> String)
-> ([ChannelDiscarded] -> ShowS)
-> Show ChannelDiscarded
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ChannelDiscarded -> ShowS
showsPrec :: Int -> ChannelDiscarded -> ShowS
$cshow :: ChannelDiscarded -> String
show :: ChannelDiscarded -> String
$cshowList :: [ChannelDiscarded] -> ShowS
showList :: [ChannelDiscarded] -> ShowS
Show)
  deriving anyclass (Show ChannelDiscarded
Typeable ChannelDiscarded
(Typeable ChannelDiscarded, Show ChannelDiscarded) =>
(ChannelDiscarded -> SomeException)
-> (SomeException -> Maybe ChannelDiscarded)
-> (ChannelDiscarded -> String)
-> (ChannelDiscarded -> Bool)
-> Exception ChannelDiscarded
SomeException -> Maybe ChannelDiscarded
ChannelDiscarded -> Bool
ChannelDiscarded -> String
ChannelDiscarded -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e)
-> (e -> String)
-> (e -> Bool)
-> Exception e
$ctoException :: ChannelDiscarded -> SomeException
toException :: ChannelDiscarded -> SomeException
$cfromException :: SomeException -> Maybe ChannelDiscarded
fromException :: SomeException -> Maybe ChannelDiscarded
$cdisplayException :: ChannelDiscarded -> String
displayException :: ChannelDiscarded -> String
$cbacktraceDesired :: ChannelDiscarded -> Bool
backtraceDesired :: ChannelDiscarded -> Bool
Exception)

-- | Channel was closed for an unknown reason
--
-- This will only be used in monad stacks that have error mechanisms other
-- than exceptions.
data ChannelAborted = ChannelAborted CallStack
  deriving stock (Int -> ChannelAborted -> ShowS
[ChannelAborted] -> ShowS
ChannelAborted -> String
(Int -> ChannelAborted -> ShowS)
-> (ChannelAborted -> String)
-> ([ChannelAborted] -> ShowS)
-> Show ChannelAborted
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ChannelAborted -> ShowS
showsPrec :: Int -> ChannelAborted -> ShowS
$cshow :: ChannelAborted -> String
show :: ChannelAborted -> String
$cshowList :: [ChannelAborted] -> ShowS
showList :: [ChannelAborted] -> ShowS
Show)
  deriving anyclass (Show ChannelAborted
Typeable ChannelAborted
(Typeable ChannelAborted, Show ChannelAborted) =>
(ChannelAborted -> SomeException)
-> (SomeException -> Maybe ChannelAborted)
-> (ChannelAborted -> String)
-> (ChannelAborted -> Bool)
-> Exception ChannelAborted
SomeException -> Maybe ChannelAborted
ChannelAborted -> Bool
ChannelAborted -> String
ChannelAborted -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e)
-> (e -> String)
-> (e -> Bool)
-> Exception e
$ctoException :: ChannelAborted -> SomeException
toException :: ChannelAborted -> SomeException
$cfromException :: SomeException -> Maybe ChannelAborted
fromException :: SomeException -> Maybe ChannelAborted
$cdisplayException :: ChannelAborted -> String
displayException :: ChannelAborted -> String
$cbacktraceDesired :: ChannelAborted -> Bool
backtraceDesired :: ChannelAborted -> Bool
Exception)

{-------------------------------------------------------------------------------
  Support for half-closing
-------------------------------------------------------------------------------}

type InboundResult sess =
       Either (NoMessages (Inbound sess))
              (Trailers   (Inbound sess))

-- | Should we allow for a half-clsoed connection state?
--
-- In HTTP2, streams are bidirectional and can be half-closed in either
-- direction. This is however not true for all applications /of/ HTTP2. For
-- example, in gRPC the stream can be half-closed from the client to the server
-- (indicating that the client will not send any more messages), but not from
-- the server to the client: when the server half-closes their connection, it
-- sends the gRPC trailers and this terminates the call.
data AllowHalfClosed sess =
    ContinueWhenInboundClosed
  | TerminateWhenInboundClosed (InboundResult sess -> SomeException)

-- | Link outbound thread to the inbound thread
--
-- This should be wrapped around the body of the inbound thread. It ensures that
-- when the inbound thread throws an exception, the outbound thread dies also.
-- This improves predictability of exceptions: the inbound thread spends most of
-- its time blocked on messages from the peer, and will therefore notice when
-- the connection is lost. This is not true for the outbound thread, which
-- spends most of its time blocked waiting for messages to send to the peer.
linkOutboundToInbound :: forall sess.
     IsSession sess
  => AllowHalfClosed sess
  -> Channel sess
  -> IO (InboundResult sess)
  -> IO ()
linkOutboundToInbound :: forall sess.
IsSession sess =>
AllowHalfClosed sess
-> Channel sess -> IO (InboundResult sess) -> IO ()
linkOutboundToInbound AllowHalfClosed sess
allowHalfClosed Channel sess
channel IO (InboundResult sess)
inbound = do
    mResult <- IO (InboundResult sess)
-> IO (Either SomeException (InboundResult sess))
forall e a. Exception e => IO a -> IO (Either e a)
try IO (InboundResult sess)
inbound

    -- Implementation note: After cancelThread returns, 'channelOutbound' has
    -- been updated, and considered dead, even if perhaps the thread is still
    -- cleaning up.

    case (mResult, allowHalfClosed) of
      (Right InboundResult sess
_result, AllowHalfClosed sess
ContinueWhenInboundClosed) ->
        () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
      (Right InboundResult sess
result, TerminateWhenInboundClosed InboundResult sess -> SomeException
f) ->
        IO (CancelResult (FlowState (Outbound sess))) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (CancelResult (FlowState (Outbound sess))) -> IO ())
-> IO (CancelResult (FlowState (Outbound sess))) -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (ThreadState (FlowState (Outbound sess)))
-> SomeException -> IO (CancelResult (FlowState (Outbound sess)))
forall a.
TVar (ThreadState a) -> SomeException -> IO (CancelResult a)
cancelThread (Channel sess -> TVar (ThreadState (FlowState (Outbound sess)))
forall sess.
Channel sess -> TVar (ThreadState (FlowState (Outbound sess)))
channelOutbound Channel sess
channel) (InboundResult sess -> SomeException
f InboundResult sess
result)
      (Left (SomeException
exception :: SomeException), AllowHalfClosed sess
_) -> do
        IO (CancelResult (FlowState (Outbound sess))) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (CancelResult (FlowState (Outbound sess))) -> IO ())
-> IO (CancelResult (FlowState (Outbound sess))) -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (ThreadState (FlowState (Outbound sess)))
-> SomeException -> IO (CancelResult (FlowState (Outbound sess)))
forall a.
TVar (ThreadState a) -> SomeException -> IO (CancelResult a)
cancelThread (Channel sess -> TVar (ThreadState (FlowState (Outbound sess)))
forall sess.
Channel sess -> TVar (ThreadState (FlowState (Outbound sess)))
channelOutbound Channel sess
channel) SomeException
exception
        SomeException -> IO ()
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO SomeException
exception
  where
    ()
_ = forall (c :: Constraint). c => ()
addConstraint @(IsSession sess)

{-------------------------------------------------------------------------------
  Constructing channels

  Both 'sendMessageLoop' and 'recvMessageLoop' will be run in newly forked
  threads, using the 'Thread' API from "Network.GRPC.Util.Thread". We are
  therefore not particularly worried about these loops being interrupted by
  asynchronous exceptions: this only happens if the threads are explicitly
  terminated (when the corresponding channels are closed), in which case any
  attempt to interact with them after they have been killed will be handled by
  'getThreadInterface' throwing 'ThreadInterfaceUnavailable'.
-------------------------------------------------------------------------------}

-- | Send all messages to the node's peer
sendMessageLoop :: forall sess.
     IsSession sess
  => sess
  -> RegularFlowState (Outbound sess)
  -> OutputStream
  -> IO ()
sendMessageLoop :: forall sess.
IsSession sess =>
sess -> RegularFlowState (Outbound sess) -> OutputStream -> IO ()
sendMessageLoop sess
sess RegularFlowState (Outbound sess)
st OutputStream
stream = do
    trailers <- IO (Trailers (Outbound sess))
loop
    atomically $ putTMVar (flowTerminated st) trailers
  where
    build :: (Message (Outbound sess) -> Builder)
    build :: Message (Outbound sess) -> Builder
build = sess
-> Headers (Outbound sess) -> Message (Outbound sess) -> Builder
forall sess.
IsSession sess =>
sess
-> Headers (Outbound sess) -> Message (Outbound sess) -> Builder
buildMsg sess
sess (RegularFlowState (Outbound sess) -> Headers (Outbound sess)
forall {k} (flow :: k). RegularFlowState flow -> Headers flow
flowHeaders RegularFlowState (Outbound sess)
st)

    loop :: IO (Trailers (Outbound sess))
    loop :: IO (Trailers (Outbound sess))
loop = do
        msg <- STM
  (StreamElem (Trailers (Outbound sess)) (Message (Outbound sess)))
-> IO
     (StreamElem (Trailers (Outbound sess)) (Message (Outbound sess)))
forall a. STM a -> IO a
atomically (STM
   (StreamElem (Trailers (Outbound sess)) (Message (Outbound sess)))
 -> IO
      (StreamElem (Trailers (Outbound sess)) (Message (Outbound sess))))
-> STM
     (StreamElem (Trailers (Outbound sess)) (Message (Outbound sess)))
-> IO
     (StreamElem (Trailers (Outbound sess)) (Message (Outbound sess)))
forall a b. (a -> b) -> a -> b
$ TMVar
  (StreamElem (Trailers (Outbound sess)) (Message (Outbound sess)))
-> STM
     (StreamElem (Trailers (Outbound sess)) (Message (Outbound sess)))
forall a. TMVar a -> STM a
takeTMVar (RegularFlowState (Outbound sess)
-> TMVar
     (StreamElem (Trailers (Outbound sess)) (Message (Outbound sess)))
forall {k} (flow :: k).
RegularFlowState flow
-> TMVar (StreamElem (Trailers flow) (Message flow))
flowMsg RegularFlowState (Outbound sess)
st)
        case msg of
          StreamElem Message (Outbound sess)
x -> do
            HasCallStack => OutputStream -> Builder -> IO ()
OutputStream -> Builder -> IO ()
writeChunk OutputStream
stream (Builder -> IO ()) -> Builder -> IO ()
forall a b. (a -> b) -> a -> b
$ Message (Outbound sess) -> Builder
build Message (Outbound sess)
x
            HasCallStack => OutputStream -> IO ()
OutputStream -> IO ()
flush OutputStream
stream
            IO (Trailers (Outbound sess))
loop
          FinalElem Message (Outbound sess)
x Trailers (Outbound sess)
trailers -> do
            HasCallStack => OutputStream -> Builder -> IO ()
OutputStream -> Builder -> IO ()
writeChunkFinal OutputStream
stream (Builder -> IO ()) -> Builder -> IO ()
forall a b. (a -> b) -> a -> b
$ Message (Outbound sess) -> Builder
build Message (Outbound sess)
x
            Trailers (Outbound sess) -> IO (Trailers (Outbound sess))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Trailers (Outbound sess)
trailers
          NoMoreElems Trailers (Outbound sess)
trailers -> do
            -- It is crucial to still 'writeChunkFinal' here to guarantee that
            -- cancellation is a no-op. Without it, cancellation may result in a
            -- @RST_STREAM@ frame may being sent to the peer.
            --
            -- This does not necessarily write a DATA frame, since http2 avoids
            -- writing empty data frames unless they are marked @END_OF_STREAM@.
            HasCallStack => OutputStream -> Builder -> IO ()
OutputStream -> Builder -> IO ()
writeChunkFinal OutputStream
stream (Builder -> IO ()) -> Builder -> IO ()
forall a b. (a -> b) -> a -> b
$ Builder
forall a. Monoid a => a
mempty
            Trailers (Outbound sess) -> IO (Trailers (Outbound sess))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Trailers (Outbound sess)
trailers

-- | Receive all messages sent by the node's peer
recvMessageLoop :: forall sess.
     IsSession sess
  => sess
  -> RegularFlowState (Inbound sess)
  -> InputStream
  -> IO (Trailers (Inbound sess))
recvMessageLoop :: forall sess.
IsSession sess =>
sess
-> RegularFlowState (Inbound sess)
-> InputStream
-> IO (Trailers (Inbound sess))
recvMessageLoop sess
sess RegularFlowState (Inbound sess)
st InputStream
stream =
    Parser String (Message (Inbound sess))
-> IO (Trailers (Inbound sess))
go (Parser String (Message (Inbound sess))
 -> IO (Trailers (Inbound sess)))
-> Parser String (Message (Inbound sess))
-> IO (Trailers (Inbound sess))
forall a b. (a -> b) -> a -> b
$ sess
-> Headers (Inbound sess) -> Parser String (Message (Inbound sess))
forall sess.
IsSession sess =>
sess
-> Headers (Inbound sess) -> Parser String (Message (Inbound sess))
parseMsg sess
sess (RegularFlowState (Inbound sess) -> Headers (Inbound sess)
forall {k} (flow :: k). RegularFlowState flow -> Headers flow
flowHeaders RegularFlowState (Inbound sess)
st)
  where
    go :: Parser String (Message (Inbound sess)) -> IO (Trailers (Inbound sess))
    go :: Parser String (Message (Inbound sess))
-> IO (Trailers (Inbound sess))
go Parser String (Message (Inbound sess))
parser = do
        mProcessedFinal <- ProcessResult String (Trailers (Inbound sess))
-> IO (Maybe (Trailers (Inbound sess)))
forall b. ProcessResult String b -> IO (Maybe b)
throwParseErrors (ProcessResult String (Trailers (Inbound sess))
 -> IO (Maybe (Trailers (Inbound sess))))
-> IO (ProcessResult String (Trailers (Inbound sess)))
-> IO (Maybe (Trailers (Inbound sess)))
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO (ByteString, Bool)
-> (Message (Inbound sess) -> IO ())
-> (Message (Inbound sess) -> IO (Trailers (Inbound sess)))
-> Parser String (Message (Inbound sess))
-> IO (ProcessResult String (Trailers (Inbound sess)))
forall (m :: * -> *) e a b.
Monad m =>
m (ByteString, Bool)
-> (a -> m ()) -> (a -> m b) -> Parser e a -> m (ProcessResult e b)
Parser.processAll
          (HasCallStack => InputStream -> IO (ByteString, Bool)
InputStream -> IO (ByteString, Bool)
getChunk InputStream
stream)
          Message (Inbound sess) -> IO ()
processOne
          Message (Inbound sess) -> IO (Trailers (Inbound sess))
processFinal
          Parser String (Message (Inbound sess))
parser
        case mProcessedFinal of
          Just Trailers (Inbound sess)
trailers ->
            Trailers (Inbound sess) -> IO (Trailers (Inbound sess))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Trailers (Inbound sess)
trailers
          Maybe (Trailers (Inbound sess))
Nothing -> do
            trailers <- IO (Trailers (Inbound sess))
processTrailers
            atomically $ putTMVar (flowMsg st) $ NoMoreElems trailers
            return trailers

    processOne :: Message (Inbound sess) -> IO ()
    processOne :: Message (Inbound sess) -> IO ()
processOne Message (Inbound sess)
msg = do
        STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar
  (StreamElem (Trailers (Inbound sess)) (Message (Inbound sess)))
-> StreamElem (Trailers (Inbound sess)) (Message (Inbound sess))
-> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar (RegularFlowState (Inbound sess)
-> TMVar
     (StreamElem (Trailers (Inbound sess)) (Message (Inbound sess)))
forall {k} (flow :: k).
RegularFlowState flow
-> TMVar (StreamElem (Trailers flow) (Message flow))
flowMsg RegularFlowState (Inbound sess)
st) (StreamElem (Trailers (Inbound sess)) (Message (Inbound sess))
 -> STM ())
-> StreamElem (Trailers (Inbound sess)) (Message (Inbound sess))
-> STM ()
forall a b. (a -> b) -> a -> b
$ Message (Inbound sess)
-> StreamElem (Trailers (Inbound sess)) (Message (Inbound sess))
forall b a. a -> StreamElem b a
StreamElem Message (Inbound sess)
msg

    processFinal :: Message (Inbound sess) -> IO (Trailers (Inbound sess))
    processFinal :: Message (Inbound sess) -> IO (Trailers (Inbound sess))
processFinal Message (Inbound sess)
msg = do
        trailers <- IO (Trailers (Inbound sess))
processTrailers
        atomically $ putTMVar (flowMsg st) $ FinalElem msg trailers
        return trailers

    processTrailers :: IO (Trailers (Inbound sess))
    processTrailers :: IO (Trailers (Inbound sess))
processTrailers = do
        trailers <- sess -> [Header] -> IO (Trailers (Inbound sess))
forall sess.
IsSession sess =>
sess -> [Header] -> IO (Trailers (Inbound sess))
parseInboundTrailers sess
sess ([Header] -> IO (Trailers (Inbound sess)))
-> IO [Header] -> IO (Trailers (Inbound sess))
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< HasCallStack => InputStream -> IO [Header]
InputStream -> IO [Header]
getTrailers InputStream
stream
        atomically $ putTMVar (flowTerminated st) $ trailers
        return trailers

    throwParseErrors :: Parser.ProcessResult String b -> IO (Maybe b)
    throwParseErrors :: forall b. ProcessResult String b -> IO (Maybe b)
throwParseErrors (Parser.ProcessError String
err) =
        PeerException -> IO (Maybe b)
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO (PeerException -> IO (Maybe b)) -> PeerException -> IO (Maybe b)
forall a b. (a -> b) -> a -> b
$ String -> PeerException
PeerSentMalformedMessage String
err
    throwParseErrors (Parser.ProcessedWithFinal b
b Leftover
leftover) = do
        Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Leftover -> Bool
BS.Lazy.null Leftover
leftover) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ PeerException -> IO ()
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO PeerException
PeerSentIncompleteMessage
        Maybe b -> IO (Maybe b)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe b -> IO (Maybe b)) -> Maybe b -> IO (Maybe b)
forall a b. (a -> b) -> a -> b
$ b -> Maybe b
forall a. a -> Maybe a
Just b
b
    throwParseErrors (Parser.ProcessedWithoutFinal Leftover
leftover) = do
        Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Leftover -> Bool
BS.Lazy.null Leftover
leftover) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ PeerException -> IO ()
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO PeerException
PeerSentIncompleteMessage
        Maybe b -> IO (Maybe b)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe b -> IO (Maybe b)) -> Maybe b -> IO (Maybe b)
forall a b. (a -> b) -> a -> b
$ Maybe b
forall a. Maybe a
Nothing

outboundTrailersMaker :: forall sess.
     IsSession sess
  => sess
  -> Channel sess
  -> RegularFlowState (Outbound sess)
  -> HTTP2.TrailersMaker
outboundTrailersMaker :: forall sess.
IsSession sess =>
sess
-> Channel sess
-> RegularFlowState (Outbound sess)
-> TrailersMaker
outboundTrailersMaker sess
sess Channel{TVar (ThreadState (FlowState (Outbound sess)))
channelOutbound :: forall sess.
Channel sess -> TVar (ThreadState (FlowState (Outbound sess)))
channelOutbound :: TVar (ThreadState (FlowState (Outbound sess)))
channelOutbound} RegularFlowState (Outbound sess)
regular = TrailersMaker
go
  where
    go :: HTTP2.TrailersMaker
    go :: TrailersMaker
go (Just ByteString
_) = NextTrailersMaker -> IO NextTrailersMaker
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (NextTrailersMaker -> IO NextTrailersMaker)
-> NextTrailersMaker -> IO NextTrailersMaker
forall a b. (a -> b) -> a -> b
$ TrailersMaker -> NextTrailersMaker
HTTP2.NextTrailersMaker TrailersMaker
go
    go Maybe ByteString
Nothing  = do
        mFlowState <- STM (Either SomeException (Trailers (Outbound sess)))
-> IO (Either SomeException (Trailers (Outbound sess)))
forall a. STM a -> IO a
atomically (STM (Either SomeException (Trailers (Outbound sess)))
 -> IO (Either SomeException (Trailers (Outbound sess))))
-> STM (Either SomeException (Trailers (Outbound sess)))
-> IO (Either SomeException (Trailers (Outbound sess)))
forall a b. (a -> b) -> a -> b
$
          TVar (ThreadState (FlowState (Outbound sess)))
-> STM (Trailers (Outbound sess))
-> STM (Either SomeException (Trailers (Outbound sess)))
forall a b.
TVar (ThreadState a) -> STM b -> STM (Either SomeException b)
unlessAbnormallyTerminated TVar (ThreadState (FlowState (Outbound sess)))
channelOutbound (STM (Trailers (Outbound sess))
 -> STM (Either SomeException (Trailers (Outbound sess))))
-> STM (Trailers (Outbound sess))
-> STM (Either SomeException (Trailers (Outbound sess)))
forall a b. (a -> b) -> a -> b
$
            TMVar (Trailers (Outbound sess)) -> STM (Trailers (Outbound sess))
forall a. TMVar a -> STM a
readTMVar (RegularFlowState (Outbound sess)
-> TMVar (Trailers (Outbound sess))
forall {k} (flow :: k).
RegularFlowState flow -> TMVar (Trailers flow)
flowTerminated RegularFlowState (Outbound sess)
regular)
        case mFlowState of
            Right Trailers (Outbound sess)
trailers ->
              NextTrailersMaker -> IO NextTrailersMaker
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (NextTrailersMaker -> IO NextTrailersMaker)
-> NextTrailersMaker -> IO NextTrailersMaker
forall a b. (a -> b) -> a -> b
$ [Header] -> NextTrailersMaker
HTTP2.Trailers ([Header] -> NextTrailersMaker) -> [Header] -> NextTrailersMaker
forall a b. (a -> b) -> a -> b
$ sess -> Trailers (Outbound sess) -> [Header]
forall sess.
IsSession sess =>
sess -> Trailers (Outbound sess) -> [Header]
buildOutboundTrailers sess
sess Trailers (Outbound sess)
trailers
            Left SomeException
_exception ->
              NextTrailersMaker -> IO NextTrailersMaker
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (NextTrailersMaker -> IO NextTrailersMaker)
-> NextTrailersMaker -> IO NextTrailersMaker
forall a b. (a -> b) -> a -> b
$ [Header] -> NextTrailersMaker
HTTP2.Trailers []