{-# LANGUAGE OverloadedStrings #-}

-- | Open (ongoing) RPC call
--
-- Intended for unqualified import.
module Network.GRPC.Client.Call (
    -- * Construction
    Call -- opaque
  , withRPC

    -- * Open (ongoing) call
  , sendInput
  , recvOutput
  , recvResponseMetadata

    -- ** Protocol specific wrappers
  , sendNextInput
  , sendFinalInput
  , sendEndOfInput
  , recvResponseInitialMetadata
  , recvNextOutput
  , recvFinalOutput
  , recvTrailers

    -- ** Low-level\/specialized API
  , sendInputWithMeta
  , recvNextOutputElem
  , recvOutputWithMeta
  , recvInitialResponse
  ) where

import Control.Concurrent
import Control.Concurrent.STM
import Control.Concurrent.Thread.Delay qualified as UnboundedDelays
import Control.Monad
import Control.Monad.Catch
import Control.Monad.IO.Class
import Data.Bifunctor
import Data.Bitraversable
import Data.ByteString.Char8 qualified as BS.Strict.C8
import Data.Foldable (asum)
import Data.List (intersperse)
import Data.Maybe (fromMaybe)
import Data.Proxy
import Data.Text qualified as Text
import Data.Version
import GHC.Stack

import Network.GRPC.Client.Connection (Connection, ConnParams(..))
import Network.GRPC.Client.Connection qualified as Connection
import Network.GRPC.Client.Session
import Network.GRPC.Common
import Network.GRPC.Common.Compression qualified as Compression
import Network.GRPC.Common.StreamElem qualified as StreamElem
import Network.GRPC.Spec
import Network.GRPC.Spec.Util.HKD qualified as HKD
import Network.GRPC.Util.GHC
import Network.GRPC.Util.HTTP2.Stream (ServerDisconnected(..))
import Network.GRPC.Util.Session qualified as Session
import Network.GRPC.Util.Thread qualified as Thread

import Paths_grapesy qualified as Grapesy

{-------------------------------------------------------------------------------
  Open a call
-------------------------------------------------------------------------------}

-- | State of the call
--
-- This type is kept abstract (opaque) in the public facing API.
data Call rpc = SupportsClientRpc rpc => Call {
      forall {k} (rpc :: k). Call rpc -> Channel (ClientSession rpc)
callChannel :: Session.Channel (ClientSession rpc)
    }

-- | Scoped RPC call
--
-- This is the low-level API for making RPC calls, providing full flexibility.
-- You may wish to consider using the infrastructure from
-- "Network.GRPC.Client.StreamType.IO" instead.
--
-- Typical usage:
--
-- > withRPC conn def (Proxy @ListFeatures) $ \call -> do
-- >   .. use 'call' to send and receive messages
--
-- for some previously established connection 'conn'
-- (see 'Network.GRPC.Client.withConnection') and where @ListFeatures@ is some
-- kind of RPC.
--
-- The call is setup in the background, and might not yet have been established
-- when the body is run. If you want to be sure that the call has been setup,
-- you can call 'recvResponseMetadata'.
--
-- Leaving the scope of 'withRPC' before the client informs the server that they
-- have sent their last message (using 'sendInput' or 'sendEndOfInput') is
-- considered a cancellation, and accordingly throws a 'GrpcException' with
-- 'GrpcCancelled' (see also <https://grpc.io/docs/guides/cancellation/>).
--
-- There is one exception to this rule: if the server unilaterally closes the
-- RPC (that is, the server already sent the trailers), then the call is
-- considered closed and the cancellation exception is not raised. Under normal
-- circumstances (with well-behaved server handlers) this should not arise.
-- (The gRPC specification itself is not very specific about this case; see
-- discussion at <https://stackoverflow.com/questions/55511528/should-grpc-server-side-half-closing-implicitly-terminate-the-client>.)
--
-- If there are still /inbound/ messages upon leaving the scope of 'withRPC' no
-- exception is raised (but the call is nonetheless still closed, and the server
-- handler will be informed that the client has disappeared).
--
-- Note on timeouts: if a timeout is specified for the call (either through
-- 'callTimeout' or through 'connDefaultTimeout'), when the timeout is reached
-- the RPC is cancelled; any further attempts to receive or send messages will
-- result in a 'GrpcException' with 'GrpcDeadlineExceeded'. As per the gRPC
-- specification, this does /not/ rely on the server; this does mean that the
-- same deadline also applies if the /client/ is slow (rather than the server).
withRPC :: forall rpc m a.
     (MonadMask m, MonadIO m, SupportsClientRpc rpc, HasCallStack)
  => Connection -> CallParams rpc -> Proxy rpc -> (Call rpc -> m a) -> m a
withRPC :: forall {k} (rpc :: k) (m :: * -> *) a.
(MonadMask m, MonadIO m, SupportsClientRpc rpc, HasCallStack) =>
Connection
-> CallParams rpc -> Proxy rpc -> (Call rpc -> m a) -> m a
withRPC Connection
conn CallParams rpc
callParams Proxy rpc
proxy Call rpc -> m a
k = ((a, ()) -> a) -> m (a, ()) -> m a
forall a b. (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (a, ()) -> a
forall a b. (a, b) -> a
fst (m (a, ()) -> m a) -> m (a, ()) -> m a
forall a b. (a -> b) -> a -> b
$
    m (Call rpc, CancelRequest)
-> ((Call rpc, CancelRequest) -> ExitCase a -> m ())
-> ((Call rpc, CancelRequest) -> m a)
-> m (a, ())
forall a b c.
HasCallStack =>
m a -> (a -> ExitCase b -> m c) -> (a -> m b) -> m (b, c)
forall (m :: * -> *) a b c.
(MonadMask m, HasCallStack) =>
m a -> (a -> ExitCase b -> m c) -> (a -> m b) -> m (b, c)
generalBracket
      (IO (Call rpc, CancelRequest) -> m (Call rpc, CancelRequest)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Call rpc, CancelRequest) -> m (Call rpc, CancelRequest))
-> IO (Call rpc, CancelRequest) -> m (Call rpc, CancelRequest)
forall a b. (a -> b) -> a -> b
$
         Connection
-> Proxy rpc -> CallParams rpc -> IO (Call rpc, CancelRequest)
forall {k} (rpc :: k).
(SupportsClientRpc rpc, HasCallStack) =>
Connection
-> Proxy rpc -> CallParams rpc -> IO (Call rpc, CancelRequest)
startRPC Connection
conn Proxy rpc
proxy CallParams rpc
callParams)
      (\(Call{Channel (ClientSession rpc)
callChannel :: forall {k} (rpc :: k). Call rpc -> Channel (ClientSession rpc)
callChannel :: Channel (ClientSession rpc)
callChannel}, CancelRequest
cancelRequest) ExitCase a
exitCase -> IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$
         Channel (ClientSession rpc) -> CancelRequest -> ExitCase a -> IO ()
forall rpc a. Channel rpc -> CancelRequest -> ExitCase a -> IO ()
closeRPC Channel (ClientSession rpc)
callChannel CancelRequest
cancelRequest ExitCase a
exitCase)
      (Call rpc -> m a
k (Call rpc -> m a)
-> ((Call rpc, CancelRequest) -> Call rpc)
-> (Call rpc, CancelRequest)
-> m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Call rpc, CancelRequest) -> Call rpc
forall a b. (a, b) -> a
fst)

-- | Open new channel to the server
--
-- This is a non-blocking call; the connection will be set up in a
-- background thread; if this takes time, then the first call to
-- 'sendInput' or 'recvOutput' will block, but the call to 'startRPC'
-- itself will not block. This non-blocking nature makes this safe to use
-- in 'bracket' patterns.
startRPC :: forall rpc.
     (SupportsClientRpc rpc, HasCallStack)
  => Connection
  -> Proxy rpc
  -> CallParams rpc
  -> IO (Call rpc, Session.CancelRequest)
startRPC :: forall {k} (rpc :: k).
(SupportsClientRpc rpc, HasCallStack) =>
Connection
-> Proxy rpc -> CallParams rpc -> IO (Call rpc, CancelRequest)
startRPC Connection
conn Proxy rpc
_ CallParams rpc
callParams = do
    (connClosed, connToServer) <- HasCallStack =>
Connection -> IO (TMVar (Maybe SomeException), ConnectionToServer)
Connection -> IO (TMVar (Maybe SomeException), ConnectionToServer)
Connection.getConnectionToServer Connection
conn
    cOut     <- Connection.getOutboundCompression conn
    metadata <- buildMetadataIO $ callRequestMetadata callParams
    let flowStart :: Session.FlowStart (ClientOutbound rpc)
        flowStart = Headers (ClientOutbound rpc) -> FlowStart (ClientOutbound rpc)
forall {k} (flow :: k). Headers flow -> FlowStart flow
Session.FlowStartRegular (Headers (ClientOutbound rpc) -> FlowStart (ClientOutbound rpc))
-> Headers (ClientOutbound rpc) -> FlowStart (ClientOutbound rpc)
forall a b. (a -> b) -> a -> b
$ OutboundHeaders {
            outHeaders :: RequestHeaders
outHeaders     = Maybe Compression -> [CustomMetadata] -> RequestHeaders
requestHeaders Maybe Compression
cOut [CustomMetadata]
metadata
          , outCompression :: Compression
outCompression = Compression -> Maybe Compression -> Compression
forall a. a -> Maybe a -> a
fromMaybe Compression
noCompression Maybe Compression
cOut
          }

    let serverClosedConnection ::
             Either (TrailersOnly' HandledSynthesized) ProperTrailers'
          -> SomeException
        serverClosedConnection =
              (GrpcException -> SomeException)
-> (GrpcNormalTermination -> SomeException)
-> Either GrpcException GrpcNormalTermination
-> SomeException
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either GrpcException -> SomeException
forall e. Exception e => e -> SomeException
toException GrpcNormalTermination -> SomeException
forall e. Exception e => e -> SomeException
toException
            (Either GrpcException GrpcNormalTermination -> SomeException)
-> (Either (TrailersOnly' HandledSynthesized) ProperTrailers'
    -> Either GrpcException GrpcNormalTermination)
-> Either (TrailersOnly' HandledSynthesized) ProperTrailers'
-> SomeException
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProperTrailers' -> Either GrpcException GrpcNormalTermination
grpcClassifyTermination
            (ProperTrailers' -> Either GrpcException GrpcNormalTermination)
-> (Either (TrailersOnly' HandledSynthesized) ProperTrailers'
    -> ProperTrailers')
-> Either (TrailersOnly' HandledSynthesized) ProperTrailers'
-> Either GrpcException GrpcNormalTermination
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (TrailersOnly' HandledSynthesized -> ProperTrailers')
-> (ProperTrailers' -> ProperTrailers')
-> Either (TrailersOnly' HandledSynthesized) ProperTrailers'
-> ProperTrailers'
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either TrailersOnly' HandledSynthesized -> ProperTrailers'
trailersOnlyToProperTrailers' ProperTrailers' -> ProperTrailers'
forall a. a -> a
id

    (channel, cancelRequest) <-
      Session.setupRequestChannel
        session
        connToServer
        serverClosedConnection
        flowStart

    -- The spec mandates that
    --
    -- > If a server has gone past the deadline when processing a request, the
    -- > client will give up and fail the RPC with the DEADLINE_EXCEEDED status.
    --
    -- and also that the deadline applies when when wait-for-ready semantics is
    -- used.
    --
    -- We have to be careful implementing this. In particular, we definitely
    -- don't want to impose the timeout on the /client/ (that is, we should not
    -- force the client to exit the scope of 'withRPC' within the timeout).
    -- Instead, we work a thread that cancels the RPC after the timeout expires;
    -- this means that /if/ the client that attempts to communicate with the
    -- server after the timeout, only then will it receive an exception.
    --
    -- The thread we spawn here is cleaned up by the monitor thread (below).
    --
    -- See
    --
    -- o <https://grpc.io/docs/guides/deadlines/>
    -- o <https://grpc.io/docs/guides/wait-for-ready/>
    mClientSideTimeout <-
      case callTimeout callParams of
        Maybe Timeout
Nothing -> Maybe ThreadId -> IO (Maybe ThreadId)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe ThreadId
forall a. Maybe a
Nothing
        Just Timeout
t  -> (ThreadId -> Maybe ThreadId) -> IO ThreadId -> IO (Maybe ThreadId)
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ThreadId -> Maybe ThreadId
forall a. a -> Maybe a
Just (IO ThreadId -> IO (Maybe ThreadId))
-> IO ThreadId -> IO (Maybe ThreadId)
forall a b. (a -> b) -> a -> b
$ ThreadLabel -> IO () -> IO ThreadId
forkLabelled ThreadLabel
"grapesy:clientSideTimeout" (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
          Integer -> IO ()
UnboundedDelays.delay (Timeout -> Integer
timeoutToMicro Timeout
t)
          let timeout :: SomeException
              timeout :: SomeException
timeout = GrpcException -> SomeException
forall e. Exception e => e -> SomeException
toException (GrpcException -> SomeException) -> GrpcException -> SomeException
forall a b. (a -> b) -> a -> b
$ GrpcException {
                    grpcError :: GrpcError
grpcError         = GrpcError
GrpcDeadlineExceeded
                  , grpcErrorMessage :: Maybe Text
grpcErrorMessage  = Maybe Text
forall a. Maybe a
Nothing
                  , grpcErrorDetails :: Maybe ByteString
grpcErrorDetails  = Maybe ByteString
forall a. Maybe a
Nothing
                  , grpcErrorMetadata :: [CustomMetadata]
grpcErrorMetadata = []
                  }

          -- We recognized client-side that the timeout we imposed on the server
          -- has passed. Acting on this is however tricky:
          --
          -- o A call to 'closeRPC' will only terminate the /outbound/ thread;
          --   the idea is the inbound thread might still be reading in-flight
          --   messages, and it will terminate once the last message is read or
          --   the thread notices a broken connection.
          -- o Unfortunately, this does not work in the timeout case: /if/ the
          --   outbound thread has not yet terminated (that is, the client has
          --   not yet sent their final message), then calling 'closeRPC' will
          --   result in a RST_STREAM being sent to the server, which /should/
          --   result in the inbound connection being closed also, but may not,
          --   in the case of a non-compliant server.
          -- o Worse, if the client /did/ already send their final message, the
          --   outbound thread has already terminated, no RST_STREAM will be
          --   sent, and the we will continue to wait for messages from the
          --   server.
          --
          -- Ideally we'd inform the receiving thread that a timeout has been
          -- reached and to "continue until it would block", but that is hard
          -- to do. So instead we just kill the receiving thread, which means
          -- that once the timeout is reached, the client will not be able to
          -- receive any further messages (even if that is because the /client/
          -- was slow, rather than the server).

          IO (CancelResult (FlowState (ClientInbound rpc))) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (CancelResult (FlowState (ClientInbound rpc))) -> IO ())
-> IO (CancelResult (FlowState (ClientInbound rpc))) -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (ThreadState (FlowState (ClientInbound rpc)))
-> SomeException
-> IO (CancelResult (FlowState (ClientInbound rpc)))
forall a.
TVar (ThreadState a) -> SomeException -> IO (CancelResult a)
Thread.cancelThread (Channel (ClientSession rpc)
-> TVar (ThreadState (FlowState (Inbound (ClientSession rpc))))
forall sess.
Channel sess -> TVar (ThreadState (FlowState (Inbound sess)))
Session.channelInbound Channel (ClientSession rpc)
channel) SomeException
timeout
          Channel (ClientSession rpc)
-> CancelRequest -> ExitCase Any -> IO ()
forall rpc a. Channel rpc -> CancelRequest -> ExitCase a -> IO ()
closeRPC Channel (ClientSession rpc)
channel CancelRequest
cancelRequest (ExitCase Any -> IO ()) -> ExitCase Any -> IO ()
forall a b. (a -> b) -> a -> b
$ SomeException -> ExitCase Any
forall a. SomeException -> ExitCase a
ExitCaseException SomeException
timeout

    -- Spawn a thread to monitor the connection, and close the new channel when
    -- the connection is closed. To prevent a memory leak by hanging on to the
    -- channel for the lifetime of the connection, the thread also terminates in
    -- the (normal) case that the channel is closed before the connection is.
    _ <- forkLabelled "grapesy:monitorConnection" $ do
      status <- atomically $ do
          (Left <$> Thread.waitForNormalOrAbnormalThreadTermination
                      (Session.channelInbound channel))
        `orElse`
          (Right <$> readTMVar connClosed)
      forM_ mClientSideTimeout killThread
      case status of
        Left Maybe SomeException
_ -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return () -- Channel closed before the connection
        Right Maybe SomeException
mErr -> do
          let exitReason :: ExitCase ()
              exitReason :: ExitCase ()
exitReason =
                case Maybe SomeException
mErr of
                  Maybe SomeException
Nothing -> () -> ExitCase ()
forall a. a -> ExitCase a
ExitCaseSuccess ()
                  Just SomeException
exitWithException ->
                    SomeException -> ExitCase ()
forall a. SomeException -> ExitCase a
ExitCaseException (SomeException -> ExitCase ())
-> (ServerDisconnected -> SomeException)
-> ServerDisconnected
-> ExitCase ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ServerDisconnected -> SomeException
forall e. Exception e => e -> SomeException
toException (ServerDisconnected -> ExitCase ())
-> ServerDisconnected -> ExitCase ()
forall a b. (a -> b) -> a -> b
$
                      SomeException -> CallStack -> ServerDisconnected
ServerDisconnected SomeException
exitWithException CallStack
HasCallStack => CallStack
callStack
          _mAlreadyClosed <- Channel (ClientSession rpc)
-> ExitCase () -> IO (Maybe SomeException)
forall sess a.
HasCallStack =>
Channel sess -> ExitCase a -> IO (Maybe SomeException)
Session.close Channel (ClientSession rpc)
channel ExitCase ()
exitReason
          return ()

    return (Call channel, cancelRequest)
  where
    connParams :: ConnParams
    connParams :: ConnParams
connParams = Connection -> ConnParams
Connection.connParams Connection
conn

    requestHeaders :: Maybe Compression -> [CustomMetadata] -> RequestHeaders
    requestHeaders :: Maybe Compression -> [CustomMetadata] -> RequestHeaders
requestHeaders Maybe Compression
cOut [CustomMetadata]
metadata = RequestHeaders{
          requestTimeout :: HKD Undecorated (Maybe Timeout)
requestTimeout =
            [Maybe Timeout] -> Maybe Timeout
forall (t :: * -> *) (f :: * -> *) a.
(Foldable t, Alternative f) =>
t (f a) -> f a
asum [
                CallParams rpc -> Maybe Timeout
forall {k} (rpc :: k). CallParams rpc -> Maybe Timeout
callTimeout CallParams rpc
callParams
              , ConnParams -> Maybe Timeout
connDefaultTimeout ConnParams
connParams
              ]
        , requestMetadata :: CustomMetadataMap
requestMetadata =
            [CustomMetadata] -> CustomMetadataMap
customMetadataMapFromList [CustomMetadata]
metadata
        , requestCompression :: HKD Undecorated (Maybe CompressionId)
requestCompression =
            Compression -> CompressionId
compressionId (Compression -> CompressionId)
-> Maybe Compression -> Maybe CompressionId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe Compression
cOut
        , requestAcceptCompression :: HKD Undecorated (Maybe (NonEmpty CompressionId))
requestAcceptCompression = NonEmpty CompressionId -> Maybe (NonEmpty CompressionId)
forall a. a -> Maybe a
Just (NonEmpty CompressionId -> Maybe (NonEmpty CompressionId))
-> NonEmpty CompressionId -> Maybe (NonEmpty CompressionId)
forall a b. (a -> b) -> a -> b
$
            Negotation -> NonEmpty CompressionId
Compression.offer (Negotation -> NonEmpty CompressionId)
-> Negotation -> NonEmpty CompressionId
forall a b. (a -> b) -> a -> b
$ ConnParams -> Negotation
connCompression ConnParams
connParams
        , requestContentType :: HKD Undecorated (Maybe ContentType)
requestContentType =
            ConnParams -> Maybe ContentType
connContentType ConnParams
connParams
        , requestMessageType :: HKD Undecorated (Maybe MessageType)
requestMessageType =
            MessageType -> Maybe MessageType
forall a. a -> Maybe a
Just MessageType
MessageTypeDefault
        , requestUserAgent :: HKD Undecorated (Maybe ByteString)
requestUserAgent = ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just (ByteString -> Maybe ByteString) -> ByteString -> Maybe ByteString
forall a b. (a -> b) -> a -> b
$
            [ByteString] -> ByteString
forall a. Monoid a => [a] -> a
mconcat [
                ByteString
"grpc-haskell-grapesy/"
              , [ByteString] -> ByteString
forall a. Monoid a => [a] -> a
mconcat ([ByteString] -> ByteString)
-> ([ByteString] -> [ByteString]) -> [ByteString] -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> [ByteString] -> [ByteString]
forall a. a -> [a] -> [a]
intersperse ByteString
"." ([ByteString] -> ByteString) -> [ByteString] -> ByteString
forall a b. (a -> b) -> a -> b
$
                  (Int -> ByteString) -> [Int] -> [ByteString]
forall a b. (a -> b) -> [a] -> [b]
map (ThreadLabel -> ByteString
BS.Strict.C8.pack (ThreadLabel -> ByteString)
-> (Int -> ThreadLabel) -> Int -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> ThreadLabel
forall a. Show a => a -> ThreadLabel
show) ([Int] -> [ByteString]) -> [Int] -> [ByteString]
forall a b. (a -> b) -> a -> b
$
                    Version -> [Int]
versionBranch Version
Grapesy.version
              ]
        , requestIncludeTE :: HKD Undecorated Bool
requestIncludeTE =
            Bool
HKD Undecorated Bool
True
        , requestTraceContext :: HKD Undecorated (Maybe TraceContext)
requestTraceContext =
            Maybe TraceContext
HKD Undecorated (Maybe TraceContext)
forall a. Maybe a
Nothing
        , requestPreviousRpcAttempts :: HKD Undecorated (Maybe Int)
requestPreviousRpcAttempts =
            Maybe Int
HKD Undecorated (Maybe Int)
forall a. Maybe a
Nothing
        , requestUnrecognized :: HKD Undecorated ()
requestUnrecognized =
            ()
        }

    session :: ClientSession rpc
    session :: ClientSession rpc
session = ClientSession {
          clientConnection :: Connection
clientConnection = Connection
conn
        }

-- | Close the RPC (internal API only)
--
-- This is more subtle than one might think. The spec mandates that when a
-- client cancels a request (which in grapesy means exiting the scope of
-- withRPC), the client receives a CANCELLED exception. We need to deal with the
-- edge case mentioned in 'withRPC', however: the server might have already
-- closed the connection. The client must have evidence that this is the case,
-- which could mean one of two things:
--
-- o The client received the final message from the server
-- o The server threw an exception (and the client saw this)
--
-- We can check for the former using 'channelRecvFinal', and the latter using
-- 'hasThreadTerminated'. By checking both, we avoid race conditions:
--
-- o If the client received the final message, 'channelRecvFinal' /will/ have
--   been updated (we update this in the same transaction that returns the
--   actual element; see 'Network.GRPC.Util.Session.Channel.recv').
-- o If the server threw an exception, and the client observed this, then the
--   inbound thread state /must/ have changed to 'ThreadException'.
--
-- Note that it is /not/ sufficient to check if the inbound thread has
-- terminated: we might have received the final message, but the thread might
-- still be /about/ to terminate, but not /actually/ have terminated.
--
-- See also:
--
-- o <https://github.com/grpc/grpc/blob/master/doc/interop-test-descriptions.md#cancel_after_begin>
-- o <https://github.com/grpc/grpc/blob/master/doc/interop-test-descriptions.md#cancel_after_first_response>
closeRPC ::
     Session.Channel rpc
  -> Session.CancelRequest
  -> ExitCase a
  -> IO ()
closeRPC :: forall rpc a. Channel rpc -> CancelRequest -> ExitCase a -> IO ()
closeRPC Channel rpc
callChannel CancelRequest
cancelRequest ExitCase a
exitCase = IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    -- /Before/ we do anything else (see below), check if we have evidence
    -- that we can discard the connection.
    canDiscard <- IO Bool
checkCanDiscard

    -- Send the RST_STREAM frame /before/ closing the outbound thread.
    --
    -- When we call 'Session.close', we will terminate the
    -- 'sendMessageLoop', @http2@ will interpret this as a clean termination
    -- of the stream. We must therefore cancel this stream before calling
    -- 'Session.close'. /If/ the final message has already been sent,
    -- @http2@ guarantees (as a postcondition of @outBodyPushFinal@) that
    -- cancellation will be a no-op.
    sendResetFrame

    -- Now close the /outbound/ thread, see docs of 'Session.close' for
    -- details.
    mException <- liftIO $ Session.close callChannel exitCase
    case mException of
      Maybe SomeException
Nothing ->
        -- The outbound thread had already terminated
        () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
      Just SomeException
ex ->
        case SomeException -> Maybe ChannelDiscarded
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
ex of
          Maybe ChannelDiscarded
Nothing ->
            -- We are leaving the scope of 'withRPC' because of an exception
            -- in the client, just rethrow that exception.
            SomeException -> IO ()
forall e a. (HasCallStack, Exception e) => e -> IO a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM SomeException
ex
          Just ChannelDiscarded
discarded ->
            -- We are leaving the scope of 'withRPC' without having sent the
            -- final message.
            --
            -- If the server was closed before we cancelled the stream, this
            -- means that the server unilaterally closed the connection.
            -- This should be regarded as normal termination of the RPC (see
            -- the docs for 'withRPC')
            --
            -- Otherwise, the client left the scope of 'withRPC' before the
            -- RPC was complete, which the gRPC spec mandates to result in a
            -- 'GrpcCancelled' exception. See docs of 'throwCancelled'.
            Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
canDiscard (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
              ChannelDiscarded -> IO ()
throwCancelled ChannelDiscarded
discarded
  where
    -- Send a @RST_STREAM@ frame if necessary
    sendResetFrame :: IO ()
    sendResetFrame :: IO ()
sendResetFrame =
        CancelRequest
cancelRequest CancelRequest -> CancelRequest
forall a b. (a -> b) -> a -> b
$
          case ExitCase a
exitCase of
            ExitCaseSuccess a
_ ->
              -- Error code will be CANCEL
              Maybe SomeException
forall a. Maybe a
Nothing
            ExitCase a
ExitCaseAbort ->
              -- Error code will be INTERNAL_ERROR. The client aborted with an
              -- error that we don't have access to. We want to tell the server
              -- that something has gone wrong (i.e. INTERNAL_ERROR), so we must
              -- pass an exception, however the exact nature of the exception is
              -- not particularly important as it is only recorded locally.
              SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just (SomeException -> Maybe SomeException)
-> (ChannelAborted -> SomeException)
-> ChannelAborted
-> Maybe SomeException
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ChannelAborted -> SomeException
forall e. Exception e => e -> SomeException
toException (ChannelAborted -> Maybe SomeException)
-> ChannelAborted -> Maybe SomeException
forall a b. (a -> b) -> a -> b
$ CallStack -> ChannelAborted
Session.ChannelAborted CallStack
HasCallStack => CallStack
callStack
            ExitCaseException SomeException
e ->
              -- Error code will be INTERNAL_ERROR
              SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just SomeException
e

    throwCancelled :: ChannelDiscarded -> IO ()
    throwCancelled :: ChannelDiscarded -> IO ()
throwCancelled (ChannelDiscarded CallStack
cs) = do
        GrpcException -> IO ()
forall e a. (HasCallStack, Exception e) => e -> IO a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM (GrpcException -> IO ()) -> GrpcException -> IO ()
forall a b. (a -> b) -> a -> b
$ GrpcException {
            grpcError :: GrpcError
grpcError         = GrpcError
GrpcCancelled
          , grpcErrorMessage :: Maybe Text
grpcErrorMessage  = Text -> Maybe Text
forall a. a -> Maybe a
Just (Text -> Maybe Text) -> Text -> Maybe Text
forall a b. (a -> b) -> a -> b
$ [Text] -> Text
forall a. Monoid a => [a] -> a
mconcat [
                                    Text
"Channel discarded by client at "
                                  , ThreadLabel -> Text
Text.pack (ThreadLabel -> Text) -> ThreadLabel -> Text
forall a b. (a -> b) -> a -> b
$ CallStack -> ThreadLabel
prettyCallStack CallStack
cs
                                  ]
          , grpcErrorDetails :: Maybe ByteString
grpcErrorDetails  = Maybe ByteString
forall a. Maybe a
Nothing
          , grpcErrorMetadata :: [CustomMetadata]
grpcErrorMetadata = []
          }

    checkCanDiscard :: IO Bool
    checkCanDiscard :: IO Bool
checkCanDiscard = do
        mRecvFinal  <- STM (RecvFinal (Inbound rpc)) -> IO (RecvFinal (Inbound rpc))
forall a. STM a -> IO a
atomically (STM (RecvFinal (Inbound rpc)) -> IO (RecvFinal (Inbound rpc)))
-> STM (RecvFinal (Inbound rpc)) -> IO (RecvFinal (Inbound rpc))
forall a b. (a -> b) -> a -> b
$
          TVar (RecvFinal (Inbound rpc)) -> STM (RecvFinal (Inbound rpc))
forall a. TVar a -> STM a
readTVar (TVar (RecvFinal (Inbound rpc)) -> STM (RecvFinal (Inbound rpc)))
-> TVar (RecvFinal (Inbound rpc)) -> STM (RecvFinal (Inbound rpc))
forall a b. (a -> b) -> a -> b
$ Channel rpc -> TVar (RecvFinal (Inbound rpc))
forall sess. Channel sess -> TVar (RecvFinal (Inbound sess))
Session.channelRecvFinal Channel rpc
callChannel
        let onNotRunning :: STM ()
            onNotRunning = () -> STM ()
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        mTerminated <- atomically $
          Thread.getThreadState_
            (Session.channelInbound callChannel)
            onNotRunning
        return $
          or [
              case mRecvFinal of
                RecvFinal (Inbound rpc)
Session.RecvNotFinal          -> Bool
False
                Session.RecvWithoutTrailers Trailers (Inbound rpc)
_ -> Bool
True
                Session.RecvFinal CallStack
_           -> Bool
True

              -- We are checking if we have evidence that we can discard the
              -- channel. If the inbound thread is not yet running, this implies
              -- that the server has not yet initiated their response to us,
              -- which means we have no evidence to believe we can discard the
              -- channel.
            , case mTerminated of
                Thread.ThreadNotYetRunning_ () -> Bool
False
                ThreadState_ ()
Thread.ThreadRunning_          -> Bool
False
                ThreadState_ ()
Thread.ThreadDone_             -> Bool
True
                Thread.ThreadException_ SomeException
_      -> Bool
True
            ]

{-------------------------------------------------------------------------------
  Open (ongoing) call
-------------------------------------------------------------------------------}

-- | Send an input to the peer
--
-- Calling 'sendInput' again after sending the final message is a bug.
sendInput ::
     (HasCallStack, MonadIO m)
  => Call rpc
  -> StreamElem NoMetadata (Input rpc)
  -> m ()
sendInput :: forall {k} (m :: * -> *) (rpc :: k).
(HasCallStack, MonadIO m) =>
Call rpc -> StreamElem NoMetadata (Input rpc) -> m ()
sendInput Call rpc
call = Call rpc -> StreamElem NoMetadata (OutboundMeta, Input rpc) -> m ()
forall {k} (m :: * -> *) (rpc :: k).
(HasCallStack, MonadIO m) =>
Call rpc -> StreamElem NoMetadata (OutboundMeta, Input rpc) -> m ()
sendInputWithMeta Call rpc
call (StreamElem NoMetadata (OutboundMeta, Input rpc) -> m ())
-> (StreamElem NoMetadata (Input rpc)
    -> StreamElem NoMetadata (OutboundMeta, Input rpc))
-> StreamElem NoMetadata (Input rpc)
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Input rpc -> (OutboundMeta, Input rpc))
-> StreamElem NoMetadata (Input rpc)
-> StreamElem NoMetadata (OutboundMeta, Input rpc)
forall a b.
(a -> b) -> StreamElem NoMetadata a -> StreamElem NoMetadata b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (OutboundMeta
forall a. Default a => a
def,)

-- | Generalization of 'sendInput', providing additional control
--
-- See also 'Network.GRPC.Server.sendOutputWithMeta'.
--
-- Most applications will never need to use this function.
sendInputWithMeta ::
     (HasCallStack, MonadIO m)
  => Call rpc
  -> StreamElem NoMetadata (OutboundMeta, Input rpc)
  -> m ()
sendInputWithMeta :: forall {k} (m :: * -> *) (rpc :: k).
(HasCallStack, MonadIO m) =>
Call rpc -> StreamElem NoMetadata (OutboundMeta, Input rpc) -> m ()
sendInputWithMeta Call{Channel (ClientSession rpc)
callChannel :: forall {k} (rpc :: k). Call rpc -> Channel (ClientSession rpc)
callChannel :: Channel (ClientSession rpc)
callChannel} StreamElem NoMetadata (OutboundMeta, Input rpc)
msg = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
    Channel (ClientSession rpc)
-> StreamElem
     (Trailers (Outbound (ClientSession rpc)))
     (Message (Outbound (ClientSession rpc)))
-> IO ()
forall sess.
(HasCallStack, NFData (Message (Outbound sess))) =>
Channel sess
-> StreamElem (Trailers (Outbound sess)) (Message (Outbound sess))
-> IO ()
Session.send Channel (ClientSession rpc)
callChannel StreamElem NoMetadata (OutboundMeta, Input rpc)
StreamElem
  (Trailers (Outbound (ClientSession rpc)))
  (Message (Outbound (ClientSession rpc)))
msg

    -- This should be called before exiting the scope of 'withRPC'.
    StreamElem NoMetadata (OutboundMeta, Input rpc)
-> (NoMetadata -> IO ()) -> IO ()
forall (m :: * -> *) b a.
Applicative m =>
StreamElem b a -> (b -> m ()) -> m ()
StreamElem.whenDefinitelyFinal StreamElem NoMetadata (OutboundMeta, Input rpc)
msg ((NoMetadata -> IO ()) -> IO ()) -> (NoMetadata -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \NoMetadata
_ ->
      Channel (ClientSession rpc) -> IO ()
forall sess. Channel sess -> IO ()
Session.waitForOutbound Channel (ClientSession rpc)
callChannel

-- | Receive an output from the peer
--
-- After the final 'Output', you will receive any custom metadata (application
-- defined trailers) that the server returns. We do /NOT/ include the
-- 'GrpcStatus' here: a status of 'GrpcOk' carries no information, and any other
-- status will result in a 'GrpcException'. Calling 'recvOutput' again after
-- receiving the trailers is a bug and results in a 'RecvAfterFinal' exception.
recvOutput :: forall rpc m.
     (MonadIO m, HasCallStack)
  => Call rpc
  -> m (StreamElem (ResponseTrailingMetadata rpc) (Output rpc))
recvOutput :: forall {k} (rpc :: k) (m :: * -> *).
(MonadIO m, HasCallStack) =>
Call rpc
-> m (StreamElem (ResponseTrailingMetadata rpc) (Output rpc))
recvOutput call :: Call rpc
call@Call{} = IO (StreamElem (ResponseTrailingMetadata rpc) (Output rpc))
-> m (StreamElem (ResponseTrailingMetadata rpc) (Output rpc))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (StreamElem (ResponseTrailingMetadata rpc) (Output rpc))
 -> m (StreamElem (ResponseTrailingMetadata rpc) (Output rpc)))
-> IO (StreamElem (ResponseTrailingMetadata rpc) (Output rpc))
-> m (StreamElem (ResponseTrailingMetadata rpc) (Output rpc))
forall a b. (a -> b) -> a -> b
$ do
    streamElem <- Call rpc
-> IO (StreamElem ProperTrailers' (InboundMeta, Output rpc))
forall {k} (rpc :: k) (m :: * -> *).
(MonadIO m, HasCallStack) =>
Call rpc
-> m (StreamElem ProperTrailers' (InboundMeta, Output rpc))
recvOutputWithMeta Call rpc
call
    bitraverse (responseTrailingMetadata call) (return . snd) streamElem

-- | Receive an output from the peer, if one exists
--
-- If this is the final output, the /next/ call to 'recvNextOutputElem' will
-- return 'NoNextElem'; see also 'Network.GRPC.Server.recvNextInputElem' for
-- detailed discussion.
recvNextOutputElem ::
     (MonadIO m, HasCallStack)
  => Call rpc -> m (NextElem (Output rpc))
recvNextOutputElem :: forall {k} (m :: * -> *) (rpc :: k).
(MonadIO m, HasCallStack) =>
Call rpc -> m (NextElem (Output rpc))
recvNextOutputElem =
      (Either ProperTrailers' (InboundMeta, Output rpc)
 -> NextElem (Output rpc))
-> m (Either ProperTrailers' (InboundMeta, Output rpc))
-> m (NextElem (Output rpc))
forall a b. (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((ProperTrailers' -> NextElem (Output rpc))
-> ((InboundMeta, Output rpc) -> NextElem (Output rpc))
-> Either ProperTrailers' (InboundMeta, Output rpc)
-> NextElem (Output rpc)
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (NextElem (Output rpc) -> ProperTrailers' -> NextElem (Output rpc)
forall a b. a -> b -> a
const NextElem (Output rpc)
forall a. NextElem a
NoNextElem) (Output rpc -> NextElem (Output rpc)
forall a. a -> NextElem a
NextElem (Output rpc -> NextElem (Output rpc))
-> ((InboundMeta, Output rpc) -> Output rpc)
-> (InboundMeta, Output rpc)
-> NextElem (Output rpc)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (InboundMeta, Output rpc) -> Output rpc
forall a b. (a, b) -> b
snd))
    (m (Either ProperTrailers' (InboundMeta, Output rpc))
 -> m (NextElem (Output rpc)))
-> (Call rpc
    -> m (Either ProperTrailers' (InboundMeta, Output rpc)))
-> Call rpc
-> m (NextElem (Output rpc))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Call rpc -> m (Either ProperTrailers' (InboundMeta, Output rpc))
forall {k} (rpc :: k) (m :: * -> *).
(HasCallStack, MonadIO m) =>
Call rpc -> m (Either ProperTrailers' (InboundMeta, Output rpc))
recvEither

-- | Generalization of 'recvOutput', providing additional meta-information
--
-- This returns the full set of trailers, /even if those trailers indicate a
-- gRPC failure, or if any trailers fail to parse/. Put another way, gRPC
-- failures are returned as values here, rather than throwing an exception.
--
-- Most applications will never need to use this function.
--
-- See also 'Network.GRPC.Server.recvInputWithMeta'.
recvOutputWithMeta :: forall rpc m.
     (MonadIO m, HasCallStack)
  => Call rpc
  -> m (StreamElem ProperTrailers' (InboundMeta, Output rpc))
recvOutputWithMeta :: forall {k} (rpc :: k) (m :: * -> *).
(MonadIO m, HasCallStack) =>
Call rpc
-> m (StreamElem ProperTrailers' (InboundMeta, Output rpc))
recvOutputWithMeta = Call rpc
-> m (StreamElem ProperTrailers' (InboundMeta, Output rpc))
forall {k} (rpc :: k) (m :: * -> *).
(HasCallStack, MonadIO m) =>
Call rpc
-> m (StreamElem ProperTrailers' (InboundMeta, Output rpc))
recvBoth

-- | The initial metadata that was included in the response headers
--
-- The server can send two sets of metadata: an initial set of type
-- 'ResponseInitialMetadata' when it first initiates the response, and then a
-- final set of type 'ResponseTrailingMetadata' after the final message (see
-- 'recvOutput').
--
-- It is however possible for the server to send only a /single/ set; this is
-- the gRPC \"Trailers-Only\" case. The server can choose to do so when it knows
-- it will not send any messages; in this case, the initial response metadata is
-- fact of type 'ResponseTrailingMetadata' instead. The 'ResponseMetadata' type
-- distinguishes between these two cases.
--
-- If the \"Trailers-Only\" case can be ruled out (that is, if it would amount
-- to a protocol error), you can use 'recvResponseInitialMetadata' instead.
--
-- This can block: we need to wait until we receive the metadata. The precise
-- communication pattern will depend on the specifics of each server:
--
-- * It might be necessary to send one or more inputs to the server before it
--   returns any replies.
-- * The response metadata /will/ be available before the first output from the
--   server, and may indeed be available /well/ before.
recvResponseMetadata :: forall rpc m.
     MonadIO m
  => Call rpc -> m (ResponseMetadata rpc)
recvResponseMetadata :: forall {k} (rpc :: k) (m :: * -> *).
MonadIO m =>
Call rpc -> m (ResponseMetadata rpc)
recvResponseMetadata call :: Call rpc
call@Call{} = IO (ResponseMetadata rpc) -> m (ResponseMetadata rpc)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (ResponseMetadata rpc) -> m (ResponseMetadata rpc))
-> IO (ResponseMetadata rpc) -> m (ResponseMetadata rpc)
forall a b. (a -> b) -> a -> b
$
    Call rpc
-> IO
     (Either
        (TrailersOnly' HandledSynthesized)
        (ResponseHeaders' HandledSynthesized))
forall {k} (rpc :: k) (m :: * -> *).
MonadIO m =>
Call rpc
-> m (Either
        (TrailersOnly' HandledSynthesized)
        (ResponseHeaders' HandledSynthesized))
recvInitialResponse Call rpc
call IO
  (Either
     (TrailersOnly' HandledSynthesized)
     (ResponseHeaders' HandledSynthesized))
-> (Either
      (TrailersOnly' HandledSynthesized)
      (ResponseHeaders' HandledSynthesized)
    -> IO (ResponseMetadata rpc))
-> IO (ResponseMetadata rpc)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Either
  (TrailersOnly' HandledSynthesized)
  (ResponseHeaders' HandledSynthesized)
-> IO (ResponseMetadata rpc)
aux
  where
    aux ::
         Either (TrailersOnly'    HandledSynthesized)
                (ResponseHeaders' HandledSynthesized)
      -> IO (ResponseMetadata rpc)
    aux :: Either
  (TrailersOnly' HandledSynthesized)
  (ResponseHeaders' HandledSynthesized)
-> IO (ResponseMetadata rpc)
aux (Left TrailersOnly' HandledSynthesized
trailers) =
        case ProperTrailers' -> Either GrpcException GrpcNormalTermination
grpcClassifyTermination ProperTrailers'
properTrailers of
          Left GrpcException
exception ->
            GrpcException -> IO (ResponseMetadata rpc)
forall e a. (HasCallStack, Exception e) => e -> IO a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM GrpcException
exception
          Right GrpcNormalTermination
terminatedNormally -> do
            ResponseTrailingMetadata rpc -> ResponseMetadata rpc
forall {k} (rpc :: k).
ResponseTrailingMetadata rpc -> ResponseMetadata rpc
ResponseTrailingMetadata (ResponseTrailingMetadata rpc -> ResponseMetadata rpc)
-> IO (ResponseTrailingMetadata rpc) -> IO (ResponseMetadata rpc)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
              [CustomMetadata] -> IO (ResponseTrailingMetadata rpc)
forall a (m :: * -> *).
(ParseMetadata a, MonadThrow m) =>
[CustomMetadata] -> m a
forall (m :: * -> *).
MonadThrow m =>
[CustomMetadata] -> m (ResponseTrailingMetadata rpc)
parseMetadata (GrpcNormalTermination -> [CustomMetadata]
grpcTerminatedMetadata GrpcNormalTermination
terminatedNormally)
      where
        properTrailers :: ProperTrailers'
properTrailers = TrailersOnly' HandledSynthesized -> ProperTrailers'
trailersOnlyToProperTrailers' TrailersOnly' HandledSynthesized
trailers
    aux (Right ResponseHeaders' HandledSynthesized
headers) =
        ResponseInitialMetadata rpc -> ResponseMetadata rpc
forall {k} (rpc :: k).
ResponseInitialMetadata rpc -> ResponseMetadata rpc
ResponseInitialMetadata (ResponseInitialMetadata rpc -> ResponseMetadata rpc)
-> IO (ResponseInitialMetadata rpc) -> IO (ResponseMetadata rpc)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
          [CustomMetadata] -> IO (ResponseInitialMetadata rpc)
forall a (m :: * -> *).
(ParseMetadata a, MonadThrow m) =>
[CustomMetadata] -> m a
forall (m :: * -> *).
MonadThrow m =>
[CustomMetadata] -> m (ResponseInitialMetadata rpc)
parseMetadata (CustomMetadataMap -> [CustomMetadata]
customMetadataMapToList (CustomMetadataMap -> [CustomMetadata])
-> CustomMetadataMap -> [CustomMetadata]
forall a b. (a -> b) -> a -> b
$ ResponseHeaders' HandledSynthesized -> CustomMetadataMap
forall (f :: * -> *). ResponseHeaders_ f -> CustomMetadataMap
responseMetadata ResponseHeaders' HandledSynthesized
headers)

-- | Return the initial response from the server
--
-- This is a low-level function, and generalizes 'recvResponseInitialMetadata'.
-- If the server returns a gRPC error, that will be returned as a value here
-- rather than thrown as an exception.
--
-- Most applications will never need to use this function.
recvInitialResponse :: forall rpc m.
     MonadIO m
  => Call rpc
  -> m ( Either (TrailersOnly'    HandledSynthesized)
                 (ResponseHeaders' HandledSynthesized)
        )
recvInitialResponse :: forall {k} (rpc :: k) (m :: * -> *).
MonadIO m =>
Call rpc
-> m (Either
        (TrailersOnly' HandledSynthesized)
        (ResponseHeaders' HandledSynthesized))
recvInitialResponse Call{Channel (ClientSession rpc)
callChannel :: forall {k} (rpc :: k). Call rpc -> Channel (ClientSession rpc)
callChannel :: Channel (ClientSession rpc)
callChannel} = IO
  (Either
     (TrailersOnly' HandledSynthesized)
     (ResponseHeaders' HandledSynthesized))
-> m (Either
        (TrailersOnly' HandledSynthesized)
        (ResponseHeaders' HandledSynthesized))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO
   (Either
      (TrailersOnly' HandledSynthesized)
      (ResponseHeaders' HandledSynthesized))
 -> m (Either
         (TrailersOnly' HandledSynthesized)
         (ResponseHeaders' HandledSynthesized)))
-> IO
     (Either
        (TrailersOnly' HandledSynthesized)
        (ResponseHeaders' HandledSynthesized))
-> m (Either
        (TrailersOnly' HandledSynthesized)
        (ResponseHeaders' HandledSynthesized))
forall a b. (a -> b) -> a -> b
$
    (Headers (ClientInbound rpc)
 -> ResponseHeaders' HandledSynthesized)
-> Either
     (TrailersOnly' HandledSynthesized) (Headers (ClientInbound rpc))
-> Either
     (TrailersOnly' HandledSynthesized)
     (ResponseHeaders' HandledSynthesized)
forall a b.
(a -> b)
-> Either (TrailersOnly' HandledSynthesized) a
-> Either (TrailersOnly' HandledSynthesized) b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Headers (ClientInbound rpc) -> ResponseHeaders' HandledSynthesized
forall k (rpc :: k).
Headers (ClientInbound rpc) -> ResponseHeaders' HandledSynthesized
inbHeaders (Either
   (TrailersOnly' HandledSynthesized) (Headers (ClientInbound rpc))
 -> Either
      (TrailersOnly' HandledSynthesized)
      (ResponseHeaders' HandledSynthesized))
-> IO
     (Either
        (TrailersOnly' HandledSynthesized) (Headers (ClientInbound rpc)))
-> IO
     (Either
        (TrailersOnly' HandledSynthesized)
        (ResponseHeaders' HandledSynthesized))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Channel (ClientSession rpc)
-> IO
     (Either
        (NoMessages (Inbound (ClientSession rpc)))
        (Headers (Inbound (ClientSession rpc))))
forall sess.
Channel sess
-> IO (Either (NoMessages (Inbound sess)) (Headers (Inbound sess)))
Session.getInboundHeaders Channel (ClientSession rpc)
callChannel

{-------------------------------------------------------------------------------
  Protocol specific wrappers
-------------------------------------------------------------------------------}

-- | Send the next input
--
-- If this is the last input, you should call 'sendFinalInput' instead.
sendNextInput :: MonadIO m => Call rpc -> Input rpc -> m ()
sendNextInput :: forall {k} (m :: * -> *) (rpc :: k).
MonadIO m =>
Call rpc -> Input rpc -> m ()
sendNextInput Call rpc
call = Call rpc -> StreamElem NoMetadata (Input rpc) -> m ()
forall {k} (m :: * -> *) (rpc :: k).
(HasCallStack, MonadIO m) =>
Call rpc -> StreamElem NoMetadata (Input rpc) -> m ()
sendInput Call rpc
call (StreamElem NoMetadata (Input rpc) -> m ())
-> (Input rpc -> StreamElem NoMetadata (Input rpc))
-> Input rpc
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Input rpc -> StreamElem NoMetadata (Input rpc)
forall b a. a -> StreamElem b a
StreamElem

-- | Send final input
--
-- For some servers it is important that the client marks the final input /when
-- it is sent/. If you really want to send the final input and separately tell
-- the server that no more inputs will be provided, use 'sendEndOfInput' (or
-- 'sendInput').
sendFinalInput ::
     MonadIO m
  => Call rpc
  -> Input rpc
  -> m ()
sendFinalInput :: forall {k} (m :: * -> *) (rpc :: k).
MonadIO m =>
Call rpc -> Input rpc -> m ()
sendFinalInput Call rpc
call Input rpc
input =
    Call rpc -> StreamElem NoMetadata (Input rpc) -> m ()
forall {k} (m :: * -> *) (rpc :: k).
(HasCallStack, MonadIO m) =>
Call rpc -> StreamElem NoMetadata (Input rpc) -> m ()
sendInput Call rpc
call (Input rpc -> NoMetadata -> StreamElem NoMetadata (Input rpc)
forall b a. a -> b -> StreamElem b a
FinalElem Input rpc
input NoMetadata
NoMetadata)

-- | Indicate that there are no more inputs
--
-- See 'sendFinalInput' for additional discussion.
sendEndOfInput :: MonadIO m => Call rpc -> m ()
sendEndOfInput :: forall {k} (m :: * -> *) (rpc :: k). MonadIO m => Call rpc -> m ()
sendEndOfInput Call rpc
call = Call rpc -> StreamElem NoMetadata (Input rpc) -> m ()
forall {k} (m :: * -> *) (rpc :: k).
(HasCallStack, MonadIO m) =>
Call rpc -> StreamElem NoMetadata (Input rpc) -> m ()
sendInput Call rpc
call (StreamElem NoMetadata (Input rpc) -> m ())
-> StreamElem NoMetadata (Input rpc) -> m ()
forall a b. (a -> b) -> a -> b
$ NoMetadata -> StreamElem NoMetadata (Input rpc)
forall b a. b -> StreamElem b a
NoMoreElems NoMetadata
NoMetadata

-- | Receive /initial/ metadata
--
-- This is a specialization of 'recvResponseMetadata' which can be used if a use
-- of \"Trailers-Only\" amounts to a protocol error; if the server /does/ use
-- \"Trailers-Only\", this throws a 'ProtoclException'
-- ('UnexpectedTrailersOnly').
recvResponseInitialMetadata :: forall rpc m.
     MonadIO m
  => Call rpc
  -> m (ResponseInitialMetadata rpc)
recvResponseInitialMetadata :: forall {k} (rpc :: k) (m :: * -> *).
MonadIO m =>
Call rpc -> m (ResponseInitialMetadata rpc)
recvResponseInitialMetadata call :: Call rpc
call@Call{} = IO (ResponseInitialMetadata rpc) -> m (ResponseInitialMetadata rpc)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (ResponseInitialMetadata rpc)
 -> m (ResponseInitialMetadata rpc))
-> IO (ResponseInitialMetadata rpc)
-> m (ResponseInitialMetadata rpc)
forall a b. (a -> b) -> a -> b
$ do
    md <- Call rpc -> IO (ResponseMetadata rpc)
forall {k} (rpc :: k) (m :: * -> *).
MonadIO m =>
Call rpc -> m (ResponseMetadata rpc)
recvResponseMetadata Call rpc
call
    case md of
      ResponseInitialMetadata ResponseInitialMetadata rpc
md' ->
        ResponseInitialMetadata rpc -> IO (ResponseInitialMetadata rpc)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ResponseInitialMetadata rpc
md'
      ResponseTrailingMetadata ResponseTrailingMetadata rpc
md' ->
        ProtocolException rpc -> IO (ResponseInitialMetadata rpc)
forall a. ProtocolException rpc -> IO a
err (ProtocolException rpc -> IO (ResponseInitialMetadata rpc))
-> ProtocolException rpc -> IO (ResponseInitialMetadata rpc)
forall a b. (a -> b) -> a -> b
$ ResponseTrailingMetadata rpc -> ProtocolException rpc
forall {k} (rpc :: k).
ResponseTrailingMetadata rpc -> ProtocolException rpc
UnexpectedTrailersOnly ResponseTrailingMetadata rpc
md'
  where
    err :: ProtocolException rpc -> IO a
    err :: forall a. ProtocolException rpc -> IO a
err = SomeProtocolException -> IO a
forall e a. (HasCallStack, Exception e) => e -> IO a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM (SomeProtocolException -> IO a)
-> (ProtocolException rpc -> SomeProtocolException)
-> ProtocolException rpc
-> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProtocolException rpc -> SomeProtocolException
forall {k} (rpc :: k).
IsRPC rpc =>
ProtocolException rpc -> SomeProtocolException
ProtocolException

-- | Receive the next output
--
-- Throws 'ProtocolException' if there are no more outputs.
recvNextOutput :: forall rpc m.
     (MonadIO m, HasCallStack)
  => Call rpc -> m (Output rpc)
recvNextOutput :: forall {k} (rpc :: k) (m :: * -> *).
(MonadIO m, HasCallStack) =>
Call rpc -> m (Output rpc)
recvNextOutput call :: Call rpc
call@Call{} = IO (Output rpc) -> m (Output rpc)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Output rpc) -> m (Output rpc))
-> IO (Output rpc) -> m (Output rpc)
forall a b. (a -> b) -> a -> b
$ do
    mOut <- Call rpc -> IO (Either ProperTrailers' (InboundMeta, Output rpc))
forall {k} (rpc :: k) (m :: * -> *).
(HasCallStack, MonadIO m) =>
Call rpc -> m (Either ProperTrailers' (InboundMeta, Output rpc))
recvEither Call rpc
call
    case mOut of
      Left ProperTrailers'
trailers -> do
        trailingMetadata <- Call rpc -> ProperTrailers' -> IO (ResponseTrailingMetadata rpc)
forall {k} (m :: * -> *) (rpc :: k).
MonadIO m =>
Call rpc -> ProperTrailers' -> m (ResponseTrailingMetadata rpc)
responseTrailingMetadata Call rpc
call ProperTrailers'
trailers
        err $ TooFewOutputs @rpc trailingMetadata
      Right (InboundMeta
_env, Output rpc
out) ->
        Output rpc -> IO (Output rpc)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Output rpc
out
  where
    err :: ProtocolException rpc -> IO a
    err :: forall a. ProtocolException rpc -> IO a
err = SomeProtocolException -> IO a
forall e a. (HasCallStack, Exception e) => e -> IO a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM (SomeProtocolException -> IO a)
-> (ProtocolException rpc -> SomeProtocolException)
-> ProtocolException rpc
-> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProtocolException rpc -> SomeProtocolException
forall {k} (rpc :: k).
IsRPC rpc =>
ProtocolException rpc -> SomeProtocolException
ProtocolException

-- | Receive output, which we expect to be the /final/ output
--
-- Throws 'ProtocolException' if the output we receive is not final.
--
-- NOTE: If the first output we receive from the server is not marked as final,
-- we will block until we receive the end-of-stream indication.
recvFinalOutput :: forall rpc m.
     (MonadIO m, HasCallStack)
  => Call rpc
  -> m (Output rpc, ResponseTrailingMetadata rpc)
recvFinalOutput :: forall {k} (rpc :: k) (m :: * -> *).
(MonadIO m, HasCallStack) =>
Call rpc -> m (Output rpc, ResponseTrailingMetadata rpc)
recvFinalOutput call :: Call rpc
call@Call{} = IO (Output rpc, ResponseTrailingMetadata rpc)
-> m (Output rpc, ResponseTrailingMetadata rpc)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Output rpc, ResponseTrailingMetadata rpc)
 -> m (Output rpc, ResponseTrailingMetadata rpc))
-> IO (Output rpc, ResponseTrailingMetadata rpc)
-> m (Output rpc, ResponseTrailingMetadata rpc)
forall a b. (a -> b) -> a -> b
$ do
    out1 <- Call rpc
-> IO (StreamElem (ResponseTrailingMetadata rpc) (Output rpc))
forall {k} (rpc :: k) (m :: * -> *).
(MonadIO m, HasCallStack) =>
Call rpc
-> m (StreamElem (ResponseTrailingMetadata rpc) (Output rpc))
recvOutput Call rpc
call
    case out1 of
      NoMoreElems    ResponseTrailingMetadata rpc
ts -> ProtocolException rpc
-> IO (Output rpc, ResponseTrailingMetadata rpc)
forall a. ProtocolException rpc -> IO a
err (ProtocolException rpc
 -> IO (Output rpc, ResponseTrailingMetadata rpc))
-> ProtocolException rpc
-> IO (Output rpc, ResponseTrailingMetadata rpc)
forall a b. (a -> b) -> a -> b
$ forall (rpc :: k).
ResponseTrailingMetadata rpc -> ProtocolException rpc
forall {k} (rpc :: k).
ResponseTrailingMetadata rpc -> ProtocolException rpc
TooFewOutputs @rpc ResponseTrailingMetadata rpc
ts
      FinalElem  Output rpc
out ResponseTrailingMetadata rpc
ts -> (Output rpc, ResponseTrailingMetadata rpc)
-> IO (Output rpc, ResponseTrailingMetadata rpc)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Output rpc
out, ResponseTrailingMetadata rpc
ts)
      StreamElem Output rpc
out    -> do
        out2 <- Call rpc
-> IO (StreamElem (ResponseTrailingMetadata rpc) (Output rpc))
forall {k} (rpc :: k) (m :: * -> *).
(MonadIO m, HasCallStack) =>
Call rpc
-> m (StreamElem (ResponseTrailingMetadata rpc) (Output rpc))
recvOutput Call rpc
call
        case out2 of
          NoMoreElems ResponseTrailingMetadata rpc
ts    -> (Output rpc, ResponseTrailingMetadata rpc)
-> IO (Output rpc, ResponseTrailingMetadata rpc)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Output rpc
out, ResponseTrailingMetadata rpc
ts)
          FinalElem  Output rpc
out' ResponseTrailingMetadata rpc
_ -> ProtocolException rpc
-> IO (Output rpc, ResponseTrailingMetadata rpc)
forall a. ProtocolException rpc -> IO a
err (ProtocolException rpc
 -> IO (Output rpc, ResponseTrailingMetadata rpc))
-> ProtocolException rpc
-> IO (Output rpc, ResponseTrailingMetadata rpc)
forall a b. (a -> b) -> a -> b
$ forall (rpc :: k). Output rpc -> ProtocolException rpc
forall {k} (rpc :: k). Output rpc -> ProtocolException rpc
TooManyOutputs @rpc Output rpc
out'
          StreamElem Output rpc
out'   -> ProtocolException rpc
-> IO (Output rpc, ResponseTrailingMetadata rpc)
forall a. ProtocolException rpc -> IO a
err (ProtocolException rpc
 -> IO (Output rpc, ResponseTrailingMetadata rpc))
-> ProtocolException rpc
-> IO (Output rpc, ResponseTrailingMetadata rpc)
forall a b. (a -> b) -> a -> b
$ forall (rpc :: k). Output rpc -> ProtocolException rpc
forall {k} (rpc :: k). Output rpc -> ProtocolException rpc
TooManyOutputs @rpc Output rpc
out'
  where
    err :: ProtocolException rpc -> IO a
    err :: forall a. ProtocolException rpc -> IO a
err = SomeProtocolException -> IO a
forall e a. (HasCallStack, Exception e) => e -> IO a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM (SomeProtocolException -> IO a)
-> (ProtocolException rpc -> SomeProtocolException)
-> ProtocolException rpc
-> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProtocolException rpc -> SomeProtocolException
forall {k} (rpc :: k).
IsRPC rpc =>
ProtocolException rpc -> SomeProtocolException
ProtocolException

-- | Receive trailers
--
-- Throws 'ProtocolException' if we received an output.
recvTrailers :: forall rpc m.
     (MonadIO m, HasCallStack)
  => Call rpc -> m (ResponseTrailingMetadata rpc)
recvTrailers :: forall {k} (rpc :: k) (m :: * -> *).
(MonadIO m, HasCallStack) =>
Call rpc -> m (ResponseTrailingMetadata rpc)
recvTrailers call :: Call rpc
call@Call{} = IO (ResponseTrailingMetadata rpc)
-> m (ResponseTrailingMetadata rpc)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (ResponseTrailingMetadata rpc)
 -> m (ResponseTrailingMetadata rpc))
-> IO (ResponseTrailingMetadata rpc)
-> m (ResponseTrailingMetadata rpc)
forall a b. (a -> b) -> a -> b
$ do
    mOut <- Call rpc
-> IO (StreamElem (ResponseTrailingMetadata rpc) (Output rpc))
forall {k} (rpc :: k) (m :: * -> *).
(MonadIO m, HasCallStack) =>
Call rpc
-> m (StreamElem (ResponseTrailingMetadata rpc) (Output rpc))
recvOutput Call rpc
call
    case mOut of
      NoMoreElems     ResponseTrailingMetadata rpc
ts -> ResponseTrailingMetadata rpc -> IO (ResponseTrailingMetadata rpc)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ResponseTrailingMetadata rpc
ts
      FinalElem  Output rpc
out ResponseTrailingMetadata rpc
_ts -> ProtocolException rpc -> IO (ResponseTrailingMetadata rpc)
forall a. ProtocolException rpc -> IO a
err (ProtocolException rpc -> IO (ResponseTrailingMetadata rpc))
-> ProtocolException rpc -> IO (ResponseTrailingMetadata rpc)
forall a b. (a -> b) -> a -> b
$ forall (rpc :: k). Output rpc -> ProtocolException rpc
forall {k} (rpc :: k). Output rpc -> ProtocolException rpc
TooManyOutputs @rpc Output rpc
out
      StreamElem Output rpc
out     -> ProtocolException rpc -> IO (ResponseTrailingMetadata rpc)
forall a. ProtocolException rpc -> IO a
err (ProtocolException rpc -> IO (ResponseTrailingMetadata rpc))
-> ProtocolException rpc -> IO (ResponseTrailingMetadata rpc)
forall a b. (a -> b) -> a -> b
$ forall (rpc :: k). Output rpc -> ProtocolException rpc
forall {k} (rpc :: k). Output rpc -> ProtocolException rpc
TooManyOutputs @rpc Output rpc
out
  where
    err :: ProtocolException rpc -> IO a
    err :: forall a. ProtocolException rpc -> IO a
err = SomeProtocolException -> IO a
forall e a. (HasCallStack, Exception e) => e -> IO a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM (SomeProtocolException -> IO a)
-> (ProtocolException rpc -> SomeProtocolException)
-> ProtocolException rpc
-> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProtocolException rpc -> SomeProtocolException
forall {k} (rpc :: k).
IsRPC rpc =>
ProtocolException rpc -> SomeProtocolException
ProtocolException

{-------------------------------------------------------------------------------
  Internal auxiliary: deal with final message
-------------------------------------------------------------------------------}

recvBoth :: forall rpc m.
     (HasCallStack, MonadIO m)
  => Call rpc
  -> m (StreamElem ProperTrailers' (InboundMeta, Output rpc))
recvBoth :: forall {k} (rpc :: k) (m :: * -> *).
(HasCallStack, MonadIO m) =>
Call rpc
-> m (StreamElem ProperTrailers' (InboundMeta, Output rpc))
recvBoth Call{Channel (ClientSession rpc)
callChannel :: forall {k} (rpc :: k). Call rpc -> Channel (ClientSession rpc)
callChannel :: Channel (ClientSession rpc)
callChannel} = IO (StreamElem ProperTrailers' (InboundMeta, Output rpc))
-> m (StreamElem ProperTrailers' (InboundMeta, Output rpc))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (StreamElem ProperTrailers' (InboundMeta, Output rpc))
 -> m (StreamElem ProperTrailers' (InboundMeta, Output rpc)))
-> IO (StreamElem ProperTrailers' (InboundMeta, Output rpc))
-> m (StreamElem ProperTrailers' (InboundMeta, Output rpc))
forall a b. (a -> b) -> a -> b
$
    Either
  (TrailersOnly' HandledSynthesized)
  (StreamElem ProperTrailers' (InboundMeta, Output rpc))
-> StreamElem ProperTrailers' (InboundMeta, Output rpc)
flatten (Either
   (TrailersOnly' HandledSynthesized)
   (StreamElem ProperTrailers' (InboundMeta, Output rpc))
 -> StreamElem ProperTrailers' (InboundMeta, Output rpc))
-> IO
     (Either
        (TrailersOnly' HandledSynthesized)
        (StreamElem ProperTrailers' (InboundMeta, Output rpc)))
-> IO (StreamElem ProperTrailers' (InboundMeta, Output rpc))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Channel (ClientSession rpc)
-> IO
     (Either
        (NoMessages (Inbound (ClientSession rpc)))
        (StreamElem
           (Trailers (Inbound (ClientSession rpc)))
           (Message (Inbound (ClientSession rpc)))))
forall sess.
HasCallStack =>
Channel sess
-> IO
     (Either
        (NoMessages (Inbound sess))
        (StreamElem (Trailers (Inbound sess)) (Message (Inbound sess))))
Session.recvBoth Channel (ClientSession rpc)
callChannel
  where
    -- We lose type information here: Trailers-Only is no longer visible
    flatten ::
         Either
           (TrailersOnly' HandledSynthesized)
           (StreamElem ProperTrailers' (InboundMeta, Output rpc))
      -> StreamElem ProperTrailers' (InboundMeta, Output rpc)
    flatten :: Either
  (TrailersOnly' HandledSynthesized)
  (StreamElem ProperTrailers' (InboundMeta, Output rpc))
-> StreamElem ProperTrailers' (InboundMeta, Output rpc)
flatten (Left TrailersOnly' HandledSynthesized
trailersOnly) =
        ProperTrailers'
-> StreamElem ProperTrailers' (InboundMeta, Output rpc)
forall b a. b -> StreamElem b a
NoMoreElems (ProperTrailers'
 -> StreamElem ProperTrailers' (InboundMeta, Output rpc))
-> ProperTrailers'
-> StreamElem ProperTrailers' (InboundMeta, Output rpc)
forall a b. (a -> b) -> a -> b
$ TrailersOnly' HandledSynthesized -> ProperTrailers'
trailersOnlyToProperTrailers' TrailersOnly' HandledSynthesized
trailersOnly
    flatten (Right StreamElem ProperTrailers' (InboundMeta, Output rpc)
streamElem) =
        StreamElem ProperTrailers' (InboundMeta, Output rpc)
streamElem

recvEither :: forall rpc m.
     (HasCallStack, MonadIO m)
  => Call rpc
  -> m (Either ProperTrailers' (InboundMeta, Output rpc))
recvEither :: forall {k} (rpc :: k) (m :: * -> *).
(HasCallStack, MonadIO m) =>
Call rpc -> m (Either ProperTrailers' (InboundMeta, Output rpc))
recvEither Call{Channel (ClientSession rpc)
callChannel :: forall {k} (rpc :: k). Call rpc -> Channel (ClientSession rpc)
callChannel :: Channel (ClientSession rpc)
callChannel} = IO (Either ProperTrailers' (InboundMeta, Output rpc))
-> m (Either ProperTrailers' (InboundMeta, Output rpc))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either ProperTrailers' (InboundMeta, Output rpc))
 -> m (Either ProperTrailers' (InboundMeta, Output rpc)))
-> IO (Either ProperTrailers' (InboundMeta, Output rpc))
-> m (Either ProperTrailers' (InboundMeta, Output rpc))
forall a b. (a -> b) -> a -> b
$
    Either
  (TrailersOnly' HandledSynthesized)
  (Either ProperTrailers' (InboundMeta, Output rpc))
-> Either ProperTrailers' (InboundMeta, Output rpc)
flatten (Either
   (TrailersOnly' HandledSynthesized)
   (Either ProperTrailers' (InboundMeta, Output rpc))
 -> Either ProperTrailers' (InboundMeta, Output rpc))
-> IO
     (Either
        (TrailersOnly' HandledSynthesized)
        (Either ProperTrailers' (InboundMeta, Output rpc)))
-> IO (Either ProperTrailers' (InboundMeta, Output rpc))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Channel (ClientSession rpc)
-> IO
     (Either
        (NoMessages (Inbound (ClientSession rpc)))
        (Either
           (Trailers (Inbound (ClientSession rpc)))
           (Message (Inbound (ClientSession rpc)))))
forall sess.
HasCallStack =>
Channel sess
-> IO
     (Either
        (NoMessages (Inbound sess))
        (Either (Trailers (Inbound sess)) (Message (Inbound sess))))
Session.recvEither Channel (ClientSession rpc)
callChannel
  where
    flatten ::
         Either
           (TrailersOnly' HandledSynthesized)
           (Either ProperTrailers' (InboundMeta, Output rpc))
      -> Either ProperTrailers' (InboundMeta, Output rpc)
    flatten :: Either
  (TrailersOnly' HandledSynthesized)
  (Either ProperTrailers' (InboundMeta, Output rpc))
-> Either ProperTrailers' (InboundMeta, Output rpc)
flatten (Left TrailersOnly' HandledSynthesized
trailersOnly) =
        ProperTrailers' -> Either ProperTrailers' (InboundMeta, Output rpc)
forall a b. a -> Either a b
Left (ProperTrailers'
 -> Either ProperTrailers' (InboundMeta, Output rpc))
-> ProperTrailers'
-> Either ProperTrailers' (InboundMeta, Output rpc)
forall a b. (a -> b) -> a -> b
$ TrailersOnly' HandledSynthesized -> ProperTrailers'
trailersOnlyToProperTrailers' TrailersOnly' HandledSynthesized
trailersOnly
    flatten (Right (Left ProperTrailers'
properTrailers)) =
        ProperTrailers' -> Either ProperTrailers' (InboundMeta, Output rpc)
forall a b. a -> Either a b
Left (ProperTrailers'
 -> Either ProperTrailers' (InboundMeta, Output rpc))
-> ProperTrailers'
-> Either ProperTrailers' (InboundMeta, Output rpc)
forall a b. (a -> b) -> a -> b
$ ProperTrailers'
properTrailers
    flatten (Right (Right (InboundMeta, Output rpc)
msg)) =
        (InboundMeta, Output rpc)
-> Either ProperTrailers' (InboundMeta, Output rpc)
forall a b. b -> Either a b
Right ((InboundMeta, Output rpc)
 -> Either ProperTrailers' (InboundMeta, Output rpc))
-> (InboundMeta, Output rpc)
-> Either ProperTrailers' (InboundMeta, Output rpc)
forall a b. (a -> b) -> a -> b
$ (InboundMeta, Output rpc)
msg

responseTrailingMetadata ::
     MonadIO m
  => Call rpc
  -> ProperTrailers' -> m (ResponseTrailingMetadata rpc)
responseTrailingMetadata :: forall {k} (m :: * -> *) (rpc :: k).
MonadIO m =>
Call rpc -> ProperTrailers' -> m (ResponseTrailingMetadata rpc)
responseTrailingMetadata Call{} ProperTrailers'
trailers = IO (ResponseTrailingMetadata rpc)
-> m (ResponseTrailingMetadata rpc)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (ResponseTrailingMetadata rpc)
 -> m (ResponseTrailingMetadata rpc))
-> IO (ResponseTrailingMetadata rpc)
-> m (ResponseTrailingMetadata rpc)
forall a b. (a -> b) -> a -> b
$
    case ProperTrailers' -> Either GrpcException GrpcNormalTermination
grpcClassifyTermination ProperTrailers'
trailers of
      Right GrpcNormalTermination
terminatedNormally -> do
        [CustomMetadata] -> IO (ResponseTrailingMetadata rpc)
forall a (m :: * -> *).
(ParseMetadata a, MonadThrow m) =>
[CustomMetadata] -> m a
forall (m :: * -> *).
MonadThrow m =>
[CustomMetadata] -> m (ResponseTrailingMetadata rpc)
parseMetadata ([CustomMetadata] -> IO (ResponseTrailingMetadata rpc))
-> [CustomMetadata] -> IO (ResponseTrailingMetadata rpc)
forall a b. (a -> b) -> a -> b
$ GrpcNormalTermination -> [CustomMetadata]
grpcTerminatedMetadata GrpcNormalTermination
terminatedNormally
      Left GrpcException
exception ->
        GrpcException -> IO (ResponseTrailingMetadata rpc)
forall e a. (HasCallStack, Exception e) => e -> IO a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM GrpcException
exception


-- | Forget that we are in the Trailers-Only case
--
-- Error handling is a bit subtle here. If we are in the Trailers-Only case:
--
-- * Any synthesized errors have already been dealt with
--   (the type @TrailersOnly' Void@ tell us this)
-- * If 'connVerifyHeaders' is enabled, /all/ trailers have been verified
--   (unfortunately this we cannot see from type).
--
-- This means that we might only have a (non-synthesized) error for the
-- content-type if 'connVerifyHeaders' is /not/ enabled; since we are not
-- actually interested in the content-type here, we can therefore just ignore
-- these errors.
trailersOnlyToProperTrailers' ::
     TrailersOnly' HandledSynthesized
  -> ProperTrailers'
trailersOnlyToProperTrailers' :: TrailersOnly' HandledSynthesized -> ProperTrailers'
trailersOnlyToProperTrailers' =
      (ProperTrailers',
 Either (InvalidHeaders GrpcException) (Maybe ContentType))
-> ProperTrailers'
forall a b. (a, b) -> a
fst                                   -- justified by the comment above
    ((ProperTrailers',
  Either (InvalidHeaders GrpcException) (Maybe ContentType))
 -> ProperTrailers')
-> (TrailersOnly' HandledSynthesized
    -> (ProperTrailers',
        Either (InvalidHeaders GrpcException) (Maybe ContentType)))
-> TrailersOnly' HandledSynthesized
-> ProperTrailers'
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TrailersOnly_ (Checked (InvalidHeaders GrpcException))
-> (ProperTrailers',
    Either (InvalidHeaders GrpcException) (Maybe ContentType))
TrailersOnly_ (Checked (InvalidHeaders GrpcException))
-> (ProperTrailers',
    HKD (Checked (InvalidHeaders GrpcException)) (Maybe ContentType))
forall (f :: * -> *).
TrailersOnly_ f -> (ProperTrailers_ f, HKD f (Maybe ContentType))
trailersOnlyToProperTrailers
    (TrailersOnly_ (Checked (InvalidHeaders GrpcException))
 -> (ProperTrailers',
     Either (InvalidHeaders GrpcException) (Maybe ContentType)))
-> (TrailersOnly' HandledSynthesized
    -> TrailersOnly_ (Checked (InvalidHeaders GrpcException)))
-> TrailersOnly' HandledSynthesized
-> (ProperTrailers',
    Either (InvalidHeaders GrpcException) (Maybe ContentType))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (forall a.
 Either (InvalidHeaders HandledSynthesized) a
 -> Either (InvalidHeaders GrpcException) a)
-> TrailersOnly' HandledSynthesized
-> TrailersOnly_ (Checked (InvalidHeaders GrpcException))
forall (t :: (* -> *) -> *) (f :: * -> *) (g :: * -> *).
Traversable t =>
(forall a. f a -> g a)
-> t (DecoratedWith f) -> t (DecoratedWith g)
HKD.map ((InvalidHeaders HandledSynthesized -> InvalidHeaders GrpcException)
-> Either (InvalidHeaders HandledSynthesized) a
-> Either (InvalidHeaders GrpcException) a
forall a b c. (a -> b) -> Either a c -> Either b c
forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first ((InvalidHeaders HandledSynthesized
  -> InvalidHeaders GrpcException)
 -> Either (InvalidHeaders HandledSynthesized) a
 -> Either (InvalidHeaders GrpcException) a)
-> (InvalidHeaders HandledSynthesized
    -> InvalidHeaders GrpcException)
-> Either (InvalidHeaders HandledSynthesized) a
-> Either (InvalidHeaders GrpcException) a
forall a b. (a -> b) -> a -> b
$ (HandledSynthesized -> GrpcException)
-> InvalidHeaders HandledSynthesized
-> InvalidHeaders GrpcException
forall e e'. (e -> e') -> InvalidHeaders e -> InvalidHeaders e'
mapSynthesized HandledSynthesized -> GrpcException
forall a. HandledSynthesized -> a
handledSynthesized) -- simple injection