{-# LANGUAGE OverloadedStrings #-}
module Network.GRPC.Client.Call (
Call
, withRPC
, sendInput
, recvOutput
, recvResponseMetadata
, sendNextInput
, sendFinalInput
, sendEndOfInput
, recvResponseInitialMetadata
, recvNextOutput
, recvFinalOutput
, recvTrailers
, sendInputWithMeta
, recvNextOutputElem
, recvOutputWithMeta
, recvInitialResponse
) where
import Control.Concurrent
import Control.Concurrent.STM
import Control.Concurrent.Thread.Delay qualified as UnboundedDelays
import Control.Monad
import Control.Monad.Catch
import Control.Monad.IO.Class
import Data.Bifunctor
import Data.Bitraversable
import Data.ByteString.Char8 qualified as BS.Strict.C8
import Data.Foldable (asum)
import Data.List (intersperse)
import Data.Maybe (fromMaybe)
import Data.Proxy
import Data.Text qualified as Text
import Data.Version
import GHC.Stack
import Network.GRPC.Client.Connection (Connection, ConnParams(..))
import Network.GRPC.Client.Connection qualified as Connection
import Network.GRPC.Client.Session
import Network.GRPC.Common
import Network.GRPC.Common.Compression qualified as Compression
import Network.GRPC.Common.StreamElem qualified as StreamElem
import Network.GRPC.Spec
import Network.GRPC.Spec.Util.HKD qualified as HKD
import Network.GRPC.Util.GHC
import Network.GRPC.Util.HTTP2.Stream (ServerDisconnected(..))
import Network.GRPC.Util.Session qualified as Session
import Network.GRPC.Util.Thread qualified as Thread
import Paths_grapesy qualified as Grapesy
data Call rpc = SupportsClientRpc rpc => Call {
forall {k} (rpc :: k). Call rpc -> Channel (ClientSession rpc)
callChannel :: Session.Channel (ClientSession rpc)
}
withRPC :: forall rpc m a.
(MonadMask m, MonadIO m, SupportsClientRpc rpc, HasCallStack)
=> Connection -> CallParams rpc -> Proxy rpc -> (Call rpc -> m a) -> m a
withRPC :: forall {k} (rpc :: k) (m :: * -> *) a.
(MonadMask m, MonadIO m, SupportsClientRpc rpc, HasCallStack) =>
Connection
-> CallParams rpc -> Proxy rpc -> (Call rpc -> m a) -> m a
withRPC Connection
conn CallParams rpc
callParams Proxy rpc
proxy Call rpc -> m a
k = ((a, ()) -> a) -> m (a, ()) -> m a
forall a b. (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (a, ()) -> a
forall a b. (a, b) -> a
fst (m (a, ()) -> m a) -> m (a, ()) -> m a
forall a b. (a -> b) -> a -> b
$
m (Call rpc, CancelRequest)
-> ((Call rpc, CancelRequest) -> ExitCase a -> m ())
-> ((Call rpc, CancelRequest) -> m a)
-> m (a, ())
forall a b c.
HasCallStack =>
m a -> (a -> ExitCase b -> m c) -> (a -> m b) -> m (b, c)
forall (m :: * -> *) a b c.
(MonadMask m, HasCallStack) =>
m a -> (a -> ExitCase b -> m c) -> (a -> m b) -> m (b, c)
generalBracket
(IO (Call rpc, CancelRequest) -> m (Call rpc, CancelRequest)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Call rpc, CancelRequest) -> m (Call rpc, CancelRequest))
-> IO (Call rpc, CancelRequest) -> m (Call rpc, CancelRequest)
forall a b. (a -> b) -> a -> b
$
Connection
-> Proxy rpc -> CallParams rpc -> IO (Call rpc, CancelRequest)
forall {k} (rpc :: k).
(SupportsClientRpc rpc, HasCallStack) =>
Connection
-> Proxy rpc -> CallParams rpc -> IO (Call rpc, CancelRequest)
startRPC Connection
conn Proxy rpc
proxy CallParams rpc
callParams)
(\(Call{Channel (ClientSession rpc)
callChannel :: forall {k} (rpc :: k). Call rpc -> Channel (ClientSession rpc)
callChannel :: Channel (ClientSession rpc)
callChannel}, CancelRequest
cancelRequest) ExitCase a
exitCase -> IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$
Channel (ClientSession rpc) -> CancelRequest -> ExitCase a -> IO ()
forall rpc a. Channel rpc -> CancelRequest -> ExitCase a -> IO ()
closeRPC Channel (ClientSession rpc)
callChannel CancelRequest
cancelRequest ExitCase a
exitCase)
(Call rpc -> m a
k (Call rpc -> m a)
-> ((Call rpc, CancelRequest) -> Call rpc)
-> (Call rpc, CancelRequest)
-> m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Call rpc, CancelRequest) -> Call rpc
forall a b. (a, b) -> a
fst)
startRPC :: forall rpc.
(SupportsClientRpc rpc, HasCallStack)
=> Connection
-> Proxy rpc
-> CallParams rpc
-> IO (Call rpc, Session.CancelRequest)
startRPC :: forall {k} (rpc :: k).
(SupportsClientRpc rpc, HasCallStack) =>
Connection
-> Proxy rpc -> CallParams rpc -> IO (Call rpc, CancelRequest)
startRPC Connection
conn Proxy rpc
_ CallParams rpc
callParams = do
(connClosed, connToServer) <- HasCallStack =>
Connection -> IO (TMVar (Maybe SomeException), ConnectionToServer)
Connection -> IO (TMVar (Maybe SomeException), ConnectionToServer)
Connection.getConnectionToServer Connection
conn
cOut <- Connection.getOutboundCompression conn
metadata <- buildMetadataIO $ callRequestMetadata callParams
let flowStart :: Session.FlowStart (ClientOutbound rpc)
flowStart = Headers (ClientOutbound rpc) -> FlowStart (ClientOutbound rpc)
forall {k} (flow :: k). Headers flow -> FlowStart flow
Session.FlowStartRegular (Headers (ClientOutbound rpc) -> FlowStart (ClientOutbound rpc))
-> Headers (ClientOutbound rpc) -> FlowStart (ClientOutbound rpc)
forall a b. (a -> b) -> a -> b
$ OutboundHeaders {
outHeaders :: RequestHeaders
outHeaders = Maybe Compression -> [CustomMetadata] -> RequestHeaders
requestHeaders Maybe Compression
cOut [CustomMetadata]
metadata
, outCompression :: Compression
outCompression = Compression -> Maybe Compression -> Compression
forall a. a -> Maybe a -> a
fromMaybe Compression
noCompression Maybe Compression
cOut
}
let serverClosedConnection ::
Either (TrailersOnly' HandledSynthesized) ProperTrailers'
-> SomeException
serverClosedConnection =
(GrpcException -> SomeException)
-> (GrpcNormalTermination -> SomeException)
-> Either GrpcException GrpcNormalTermination
-> SomeException
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either GrpcException -> SomeException
forall e. Exception e => e -> SomeException
toException GrpcNormalTermination -> SomeException
forall e. Exception e => e -> SomeException
toException
(Either GrpcException GrpcNormalTermination -> SomeException)
-> (Either (TrailersOnly' HandledSynthesized) ProperTrailers'
-> Either GrpcException GrpcNormalTermination)
-> Either (TrailersOnly' HandledSynthesized) ProperTrailers'
-> SomeException
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProperTrailers' -> Either GrpcException GrpcNormalTermination
grpcClassifyTermination
(ProperTrailers' -> Either GrpcException GrpcNormalTermination)
-> (Either (TrailersOnly' HandledSynthesized) ProperTrailers'
-> ProperTrailers')
-> Either (TrailersOnly' HandledSynthesized) ProperTrailers'
-> Either GrpcException GrpcNormalTermination
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (TrailersOnly' HandledSynthesized -> ProperTrailers')
-> (ProperTrailers' -> ProperTrailers')
-> Either (TrailersOnly' HandledSynthesized) ProperTrailers'
-> ProperTrailers'
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either TrailersOnly' HandledSynthesized -> ProperTrailers'
trailersOnlyToProperTrailers' ProperTrailers' -> ProperTrailers'
forall a. a -> a
id
(channel, cancelRequest) <-
Session.setupRequestChannel
session
connToServer
serverClosedConnection
flowStart
mClientSideTimeout <-
case callTimeout callParams of
Maybe Timeout
Nothing -> Maybe ThreadId -> IO (Maybe ThreadId)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe ThreadId
forall a. Maybe a
Nothing
Just Timeout
t -> (ThreadId -> Maybe ThreadId) -> IO ThreadId -> IO (Maybe ThreadId)
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ThreadId -> Maybe ThreadId
forall a. a -> Maybe a
Just (IO ThreadId -> IO (Maybe ThreadId))
-> IO ThreadId -> IO (Maybe ThreadId)
forall a b. (a -> b) -> a -> b
$ ThreadLabel -> IO () -> IO ThreadId
forkLabelled ThreadLabel
"grapesy:clientSideTimeout" (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
Integer -> IO ()
UnboundedDelays.delay (Timeout -> Integer
timeoutToMicro Timeout
t)
let timeout :: SomeException
timeout :: SomeException
timeout = GrpcException -> SomeException
forall e. Exception e => e -> SomeException
toException (GrpcException -> SomeException) -> GrpcException -> SomeException
forall a b. (a -> b) -> a -> b
$ GrpcException {
grpcError :: GrpcError
grpcError = GrpcError
GrpcDeadlineExceeded
, grpcErrorMessage :: Maybe Text
grpcErrorMessage = Maybe Text
forall a. Maybe a
Nothing
, grpcErrorDetails :: Maybe ByteString
grpcErrorDetails = Maybe ByteString
forall a. Maybe a
Nothing
, grpcErrorMetadata :: [CustomMetadata]
grpcErrorMetadata = []
}
IO (CancelResult (FlowState (ClientInbound rpc))) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (CancelResult (FlowState (ClientInbound rpc))) -> IO ())
-> IO (CancelResult (FlowState (ClientInbound rpc))) -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (ThreadState (FlowState (ClientInbound rpc)))
-> SomeException
-> IO (CancelResult (FlowState (ClientInbound rpc)))
forall a.
TVar (ThreadState a) -> SomeException -> IO (CancelResult a)
Thread.cancelThread (Channel (ClientSession rpc)
-> TVar (ThreadState (FlowState (Inbound (ClientSession rpc))))
forall sess.
Channel sess -> TVar (ThreadState (FlowState (Inbound sess)))
Session.channelInbound Channel (ClientSession rpc)
channel) SomeException
timeout
Channel (ClientSession rpc)
-> CancelRequest -> ExitCase Any -> IO ()
forall rpc a. Channel rpc -> CancelRequest -> ExitCase a -> IO ()
closeRPC Channel (ClientSession rpc)
channel CancelRequest
cancelRequest (ExitCase Any -> IO ()) -> ExitCase Any -> IO ()
forall a b. (a -> b) -> a -> b
$ SomeException -> ExitCase Any
forall a. SomeException -> ExitCase a
ExitCaseException SomeException
timeout
_ <- forkLabelled "grapesy:monitorConnection" $ do
status <- atomically $ do
(Left <$> Thread.waitForNormalOrAbnormalThreadTermination
(Session.channelInbound channel))
`orElse`
(Right <$> readTMVar connClosed)
forM_ mClientSideTimeout killThread
case status of
Left Maybe SomeException
_ -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Right Maybe SomeException
mErr -> do
let exitReason :: ExitCase ()
exitReason :: ExitCase ()
exitReason =
case Maybe SomeException
mErr of
Maybe SomeException
Nothing -> () -> ExitCase ()
forall a. a -> ExitCase a
ExitCaseSuccess ()
Just SomeException
exitWithException ->
SomeException -> ExitCase ()
forall a. SomeException -> ExitCase a
ExitCaseException (SomeException -> ExitCase ())
-> (ServerDisconnected -> SomeException)
-> ServerDisconnected
-> ExitCase ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ServerDisconnected -> SomeException
forall e. Exception e => e -> SomeException
toException (ServerDisconnected -> ExitCase ())
-> ServerDisconnected -> ExitCase ()
forall a b. (a -> b) -> a -> b
$
SomeException -> CallStack -> ServerDisconnected
ServerDisconnected SomeException
exitWithException CallStack
HasCallStack => CallStack
callStack
_mAlreadyClosed <- Channel (ClientSession rpc)
-> ExitCase () -> IO (Maybe SomeException)
forall sess a.
HasCallStack =>
Channel sess -> ExitCase a -> IO (Maybe SomeException)
Session.close Channel (ClientSession rpc)
channel ExitCase ()
exitReason
return ()
return (Call channel, cancelRequest)
where
connParams :: ConnParams
connParams :: ConnParams
connParams = Connection -> ConnParams
Connection.connParams Connection
conn
requestHeaders :: Maybe Compression -> [CustomMetadata] -> RequestHeaders
requestHeaders :: Maybe Compression -> [CustomMetadata] -> RequestHeaders
requestHeaders Maybe Compression
cOut [CustomMetadata]
metadata = RequestHeaders{
requestTimeout :: HKD Undecorated (Maybe Timeout)
requestTimeout =
[Maybe Timeout] -> Maybe Timeout
forall (t :: * -> *) (f :: * -> *) a.
(Foldable t, Alternative f) =>
t (f a) -> f a
asum [
CallParams rpc -> Maybe Timeout
forall {k} (rpc :: k). CallParams rpc -> Maybe Timeout
callTimeout CallParams rpc
callParams
, ConnParams -> Maybe Timeout
connDefaultTimeout ConnParams
connParams
]
, requestMetadata :: CustomMetadataMap
requestMetadata =
[CustomMetadata] -> CustomMetadataMap
customMetadataMapFromList [CustomMetadata]
metadata
, requestCompression :: HKD Undecorated (Maybe CompressionId)
requestCompression =
Compression -> CompressionId
compressionId (Compression -> CompressionId)
-> Maybe Compression -> Maybe CompressionId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe Compression
cOut
, requestAcceptCompression :: HKD Undecorated (Maybe (NonEmpty CompressionId))
requestAcceptCompression = NonEmpty CompressionId -> Maybe (NonEmpty CompressionId)
forall a. a -> Maybe a
Just (NonEmpty CompressionId -> Maybe (NonEmpty CompressionId))
-> NonEmpty CompressionId -> Maybe (NonEmpty CompressionId)
forall a b. (a -> b) -> a -> b
$
Negotation -> NonEmpty CompressionId
Compression.offer (Negotation -> NonEmpty CompressionId)
-> Negotation -> NonEmpty CompressionId
forall a b. (a -> b) -> a -> b
$ ConnParams -> Negotation
connCompression ConnParams
connParams
, requestContentType :: HKD Undecorated (Maybe ContentType)
requestContentType =
ConnParams -> Maybe ContentType
connContentType ConnParams
connParams
, requestMessageType :: HKD Undecorated (Maybe MessageType)
requestMessageType =
MessageType -> Maybe MessageType
forall a. a -> Maybe a
Just MessageType
MessageTypeDefault
, requestUserAgent :: HKD Undecorated (Maybe ByteString)
requestUserAgent = ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just (ByteString -> Maybe ByteString) -> ByteString -> Maybe ByteString
forall a b. (a -> b) -> a -> b
$
[ByteString] -> ByteString
forall a. Monoid a => [a] -> a
mconcat [
ByteString
"grpc-haskell-grapesy/"
, [ByteString] -> ByteString
forall a. Monoid a => [a] -> a
mconcat ([ByteString] -> ByteString)
-> ([ByteString] -> [ByteString]) -> [ByteString] -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> [ByteString] -> [ByteString]
forall a. a -> [a] -> [a]
intersperse ByteString
"." ([ByteString] -> ByteString) -> [ByteString] -> ByteString
forall a b. (a -> b) -> a -> b
$
(Int -> ByteString) -> [Int] -> [ByteString]
forall a b. (a -> b) -> [a] -> [b]
map (ThreadLabel -> ByteString
BS.Strict.C8.pack (ThreadLabel -> ByteString)
-> (Int -> ThreadLabel) -> Int -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> ThreadLabel
forall a. Show a => a -> ThreadLabel
show) ([Int] -> [ByteString]) -> [Int] -> [ByteString]
forall a b. (a -> b) -> a -> b
$
Version -> [Int]
versionBranch Version
Grapesy.version
]
, requestIncludeTE :: HKD Undecorated Bool
requestIncludeTE =
Bool
HKD Undecorated Bool
True
, requestTraceContext :: HKD Undecorated (Maybe TraceContext)
requestTraceContext =
Maybe TraceContext
HKD Undecorated (Maybe TraceContext)
forall a. Maybe a
Nothing
, requestPreviousRpcAttempts :: HKD Undecorated (Maybe Int)
requestPreviousRpcAttempts =
Maybe Int
HKD Undecorated (Maybe Int)
forall a. Maybe a
Nothing
, requestUnrecognized :: HKD Undecorated ()
requestUnrecognized =
()
}
session :: ClientSession rpc
session :: ClientSession rpc
session = ClientSession {
clientConnection :: Connection
clientConnection = Connection
conn
}
closeRPC ::
Session.Channel rpc
-> Session.CancelRequest
-> ExitCase a
-> IO ()
closeRPC :: forall rpc a. Channel rpc -> CancelRequest -> ExitCase a -> IO ()
closeRPC Channel rpc
callChannel CancelRequest
cancelRequest ExitCase a
exitCase = IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
canDiscard <- IO Bool
checkCanDiscard
sendResetFrame
mException <- liftIO $ Session.close callChannel exitCase
case mException of
Maybe SomeException
Nothing ->
() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just SomeException
ex ->
case SomeException -> Maybe ChannelDiscarded
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
ex of
Maybe ChannelDiscarded
Nothing ->
SomeException -> IO ()
forall e a. (HasCallStack, Exception e) => e -> IO a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM SomeException
ex
Just ChannelDiscarded
discarded ->
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
canDiscard (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
ChannelDiscarded -> IO ()
throwCancelled ChannelDiscarded
discarded
where
sendResetFrame :: IO ()
sendResetFrame :: IO ()
sendResetFrame =
CancelRequest
cancelRequest CancelRequest -> CancelRequest
forall a b. (a -> b) -> a -> b
$
case ExitCase a
exitCase of
ExitCaseSuccess a
_ ->
Maybe SomeException
forall a. Maybe a
Nothing
ExitCase a
ExitCaseAbort ->
SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just (SomeException -> Maybe SomeException)
-> (ChannelAborted -> SomeException)
-> ChannelAborted
-> Maybe SomeException
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ChannelAborted -> SomeException
forall e. Exception e => e -> SomeException
toException (ChannelAborted -> Maybe SomeException)
-> ChannelAborted -> Maybe SomeException
forall a b. (a -> b) -> a -> b
$ CallStack -> ChannelAborted
Session.ChannelAborted CallStack
HasCallStack => CallStack
callStack
ExitCaseException SomeException
e ->
SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just SomeException
e
throwCancelled :: ChannelDiscarded -> IO ()
throwCancelled :: ChannelDiscarded -> IO ()
throwCancelled (ChannelDiscarded CallStack
cs) = do
GrpcException -> IO ()
forall e a. (HasCallStack, Exception e) => e -> IO a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM (GrpcException -> IO ()) -> GrpcException -> IO ()
forall a b. (a -> b) -> a -> b
$ GrpcException {
grpcError :: GrpcError
grpcError = GrpcError
GrpcCancelled
, grpcErrorMessage :: Maybe Text
grpcErrorMessage = Text -> Maybe Text
forall a. a -> Maybe a
Just (Text -> Maybe Text) -> Text -> Maybe Text
forall a b. (a -> b) -> a -> b
$ [Text] -> Text
forall a. Monoid a => [a] -> a
mconcat [
Text
"Channel discarded by client at "
, ThreadLabel -> Text
Text.pack (ThreadLabel -> Text) -> ThreadLabel -> Text
forall a b. (a -> b) -> a -> b
$ CallStack -> ThreadLabel
prettyCallStack CallStack
cs
]
, grpcErrorDetails :: Maybe ByteString
grpcErrorDetails = Maybe ByteString
forall a. Maybe a
Nothing
, grpcErrorMetadata :: [CustomMetadata]
grpcErrorMetadata = []
}
checkCanDiscard :: IO Bool
checkCanDiscard :: IO Bool
checkCanDiscard = do
mRecvFinal <- STM (RecvFinal (Inbound rpc)) -> IO (RecvFinal (Inbound rpc))
forall a. STM a -> IO a
atomically (STM (RecvFinal (Inbound rpc)) -> IO (RecvFinal (Inbound rpc)))
-> STM (RecvFinal (Inbound rpc)) -> IO (RecvFinal (Inbound rpc))
forall a b. (a -> b) -> a -> b
$
TVar (RecvFinal (Inbound rpc)) -> STM (RecvFinal (Inbound rpc))
forall a. TVar a -> STM a
readTVar (TVar (RecvFinal (Inbound rpc)) -> STM (RecvFinal (Inbound rpc)))
-> TVar (RecvFinal (Inbound rpc)) -> STM (RecvFinal (Inbound rpc))
forall a b. (a -> b) -> a -> b
$ Channel rpc -> TVar (RecvFinal (Inbound rpc))
forall sess. Channel sess -> TVar (RecvFinal (Inbound sess))
Session.channelRecvFinal Channel rpc
callChannel
let onNotRunning :: STM ()
onNotRunning = () -> STM ()
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
mTerminated <- atomically $
Thread.getThreadState_
(Session.channelInbound callChannel)
onNotRunning
return $
or [
case mRecvFinal of
RecvFinal (Inbound rpc)
Session.RecvNotFinal -> Bool
False
Session.RecvWithoutTrailers Trailers (Inbound rpc)
_ -> Bool
True
Session.RecvFinal CallStack
_ -> Bool
True
, case mTerminated of
Thread.ThreadNotYetRunning_ () -> Bool
False
ThreadState_ ()
Thread.ThreadRunning_ -> Bool
False
ThreadState_ ()
Thread.ThreadDone_ -> Bool
True
Thread.ThreadException_ SomeException
_ -> Bool
True
]
sendInput ::
(HasCallStack, MonadIO m)
=> Call rpc
-> StreamElem NoMetadata (Input rpc)
-> m ()
sendInput :: forall {k} (m :: * -> *) (rpc :: k).
(HasCallStack, MonadIO m) =>
Call rpc -> StreamElem NoMetadata (Input rpc) -> m ()
sendInput Call rpc
call = Call rpc -> StreamElem NoMetadata (OutboundMeta, Input rpc) -> m ()
forall {k} (m :: * -> *) (rpc :: k).
(HasCallStack, MonadIO m) =>
Call rpc -> StreamElem NoMetadata (OutboundMeta, Input rpc) -> m ()
sendInputWithMeta Call rpc
call (StreamElem NoMetadata (OutboundMeta, Input rpc) -> m ())
-> (StreamElem NoMetadata (Input rpc)
-> StreamElem NoMetadata (OutboundMeta, Input rpc))
-> StreamElem NoMetadata (Input rpc)
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Input rpc -> (OutboundMeta, Input rpc))
-> StreamElem NoMetadata (Input rpc)
-> StreamElem NoMetadata (OutboundMeta, Input rpc)
forall a b.
(a -> b) -> StreamElem NoMetadata a -> StreamElem NoMetadata b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (OutboundMeta
forall a. Default a => a
def,)
sendInputWithMeta ::
(HasCallStack, MonadIO m)
=> Call rpc
-> StreamElem NoMetadata (OutboundMeta, Input rpc)
-> m ()
sendInputWithMeta :: forall {k} (m :: * -> *) (rpc :: k).
(HasCallStack, MonadIO m) =>
Call rpc -> StreamElem NoMetadata (OutboundMeta, Input rpc) -> m ()
sendInputWithMeta Call{Channel (ClientSession rpc)
callChannel :: forall {k} (rpc :: k). Call rpc -> Channel (ClientSession rpc)
callChannel :: Channel (ClientSession rpc)
callChannel} StreamElem NoMetadata (OutboundMeta, Input rpc)
msg = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
Channel (ClientSession rpc)
-> StreamElem
(Trailers (Outbound (ClientSession rpc)))
(Message (Outbound (ClientSession rpc)))
-> IO ()
forall sess.
(HasCallStack, NFData (Message (Outbound sess))) =>
Channel sess
-> StreamElem (Trailers (Outbound sess)) (Message (Outbound sess))
-> IO ()
Session.send Channel (ClientSession rpc)
callChannel StreamElem NoMetadata (OutboundMeta, Input rpc)
StreamElem
(Trailers (Outbound (ClientSession rpc)))
(Message (Outbound (ClientSession rpc)))
msg
StreamElem NoMetadata (OutboundMeta, Input rpc)
-> (NoMetadata -> IO ()) -> IO ()
forall (m :: * -> *) b a.
Applicative m =>
StreamElem b a -> (b -> m ()) -> m ()
StreamElem.whenDefinitelyFinal StreamElem NoMetadata (OutboundMeta, Input rpc)
msg ((NoMetadata -> IO ()) -> IO ()) -> (NoMetadata -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \NoMetadata
_ ->
Channel (ClientSession rpc) -> IO ()
forall sess. Channel sess -> IO ()
Session.waitForOutbound Channel (ClientSession rpc)
callChannel
recvOutput :: forall rpc m.
(MonadIO m, HasCallStack)
=> Call rpc
-> m (StreamElem (ResponseTrailingMetadata rpc) (Output rpc))
recvOutput :: forall {k} (rpc :: k) (m :: * -> *).
(MonadIO m, HasCallStack) =>
Call rpc
-> m (StreamElem (ResponseTrailingMetadata rpc) (Output rpc))
recvOutput call :: Call rpc
call@Call{} = IO (StreamElem (ResponseTrailingMetadata rpc) (Output rpc))
-> m (StreamElem (ResponseTrailingMetadata rpc) (Output rpc))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (StreamElem (ResponseTrailingMetadata rpc) (Output rpc))
-> m (StreamElem (ResponseTrailingMetadata rpc) (Output rpc)))
-> IO (StreamElem (ResponseTrailingMetadata rpc) (Output rpc))
-> m (StreamElem (ResponseTrailingMetadata rpc) (Output rpc))
forall a b. (a -> b) -> a -> b
$ do
streamElem <- Call rpc
-> IO (StreamElem ProperTrailers' (InboundMeta, Output rpc))
forall {k} (rpc :: k) (m :: * -> *).
(MonadIO m, HasCallStack) =>
Call rpc
-> m (StreamElem ProperTrailers' (InboundMeta, Output rpc))
recvOutputWithMeta Call rpc
call
bitraverse (responseTrailingMetadata call) (return . snd) streamElem
recvNextOutputElem ::
(MonadIO m, HasCallStack)
=> Call rpc -> m (NextElem (Output rpc))
recvNextOutputElem :: forall {k} (m :: * -> *) (rpc :: k).
(MonadIO m, HasCallStack) =>
Call rpc -> m (NextElem (Output rpc))
recvNextOutputElem =
(Either ProperTrailers' (InboundMeta, Output rpc)
-> NextElem (Output rpc))
-> m (Either ProperTrailers' (InboundMeta, Output rpc))
-> m (NextElem (Output rpc))
forall a b. (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((ProperTrailers' -> NextElem (Output rpc))
-> ((InboundMeta, Output rpc) -> NextElem (Output rpc))
-> Either ProperTrailers' (InboundMeta, Output rpc)
-> NextElem (Output rpc)
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (NextElem (Output rpc) -> ProperTrailers' -> NextElem (Output rpc)
forall a b. a -> b -> a
const NextElem (Output rpc)
forall a. NextElem a
NoNextElem) (Output rpc -> NextElem (Output rpc)
forall a. a -> NextElem a
NextElem (Output rpc -> NextElem (Output rpc))
-> ((InboundMeta, Output rpc) -> Output rpc)
-> (InboundMeta, Output rpc)
-> NextElem (Output rpc)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (InboundMeta, Output rpc) -> Output rpc
forall a b. (a, b) -> b
snd))
(m (Either ProperTrailers' (InboundMeta, Output rpc))
-> m (NextElem (Output rpc)))
-> (Call rpc
-> m (Either ProperTrailers' (InboundMeta, Output rpc)))
-> Call rpc
-> m (NextElem (Output rpc))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Call rpc -> m (Either ProperTrailers' (InboundMeta, Output rpc))
forall {k} (rpc :: k) (m :: * -> *).
(HasCallStack, MonadIO m) =>
Call rpc -> m (Either ProperTrailers' (InboundMeta, Output rpc))
recvEither
recvOutputWithMeta :: forall rpc m.
(MonadIO m, HasCallStack)
=> Call rpc
-> m (StreamElem ProperTrailers' (InboundMeta, Output rpc))
recvOutputWithMeta :: forall {k} (rpc :: k) (m :: * -> *).
(MonadIO m, HasCallStack) =>
Call rpc
-> m (StreamElem ProperTrailers' (InboundMeta, Output rpc))
recvOutputWithMeta = Call rpc
-> m (StreamElem ProperTrailers' (InboundMeta, Output rpc))
forall {k} (rpc :: k) (m :: * -> *).
(HasCallStack, MonadIO m) =>
Call rpc
-> m (StreamElem ProperTrailers' (InboundMeta, Output rpc))
recvBoth
recvResponseMetadata :: forall rpc m.
MonadIO m
=> Call rpc -> m (ResponseMetadata rpc)
recvResponseMetadata :: forall {k} (rpc :: k) (m :: * -> *).
MonadIO m =>
Call rpc -> m (ResponseMetadata rpc)
recvResponseMetadata call :: Call rpc
call@Call{} = IO (ResponseMetadata rpc) -> m (ResponseMetadata rpc)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (ResponseMetadata rpc) -> m (ResponseMetadata rpc))
-> IO (ResponseMetadata rpc) -> m (ResponseMetadata rpc)
forall a b. (a -> b) -> a -> b
$
Call rpc
-> IO
(Either
(TrailersOnly' HandledSynthesized)
(ResponseHeaders' HandledSynthesized))
forall {k} (rpc :: k) (m :: * -> *).
MonadIO m =>
Call rpc
-> m (Either
(TrailersOnly' HandledSynthesized)
(ResponseHeaders' HandledSynthesized))
recvInitialResponse Call rpc
call IO
(Either
(TrailersOnly' HandledSynthesized)
(ResponseHeaders' HandledSynthesized))
-> (Either
(TrailersOnly' HandledSynthesized)
(ResponseHeaders' HandledSynthesized)
-> IO (ResponseMetadata rpc))
-> IO (ResponseMetadata rpc)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Either
(TrailersOnly' HandledSynthesized)
(ResponseHeaders' HandledSynthesized)
-> IO (ResponseMetadata rpc)
aux
where
aux ::
Either (TrailersOnly' HandledSynthesized)
(ResponseHeaders' HandledSynthesized)
-> IO (ResponseMetadata rpc)
aux :: Either
(TrailersOnly' HandledSynthesized)
(ResponseHeaders' HandledSynthesized)
-> IO (ResponseMetadata rpc)
aux (Left TrailersOnly' HandledSynthesized
trailers) =
case ProperTrailers' -> Either GrpcException GrpcNormalTermination
grpcClassifyTermination ProperTrailers'
properTrailers of
Left GrpcException
exception ->
GrpcException -> IO (ResponseMetadata rpc)
forall e a. (HasCallStack, Exception e) => e -> IO a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM GrpcException
exception
Right GrpcNormalTermination
terminatedNormally -> do
ResponseTrailingMetadata rpc -> ResponseMetadata rpc
forall {k} (rpc :: k).
ResponseTrailingMetadata rpc -> ResponseMetadata rpc
ResponseTrailingMetadata (ResponseTrailingMetadata rpc -> ResponseMetadata rpc)
-> IO (ResponseTrailingMetadata rpc) -> IO (ResponseMetadata rpc)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
[CustomMetadata] -> IO (ResponseTrailingMetadata rpc)
forall a (m :: * -> *).
(ParseMetadata a, MonadThrow m) =>
[CustomMetadata] -> m a
forall (m :: * -> *).
MonadThrow m =>
[CustomMetadata] -> m (ResponseTrailingMetadata rpc)
parseMetadata (GrpcNormalTermination -> [CustomMetadata]
grpcTerminatedMetadata GrpcNormalTermination
terminatedNormally)
where
properTrailers :: ProperTrailers'
properTrailers = TrailersOnly' HandledSynthesized -> ProperTrailers'
trailersOnlyToProperTrailers' TrailersOnly' HandledSynthesized
trailers
aux (Right ResponseHeaders' HandledSynthesized
headers) =
ResponseInitialMetadata rpc -> ResponseMetadata rpc
forall {k} (rpc :: k).
ResponseInitialMetadata rpc -> ResponseMetadata rpc
ResponseInitialMetadata (ResponseInitialMetadata rpc -> ResponseMetadata rpc)
-> IO (ResponseInitialMetadata rpc) -> IO (ResponseMetadata rpc)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
[CustomMetadata] -> IO (ResponseInitialMetadata rpc)
forall a (m :: * -> *).
(ParseMetadata a, MonadThrow m) =>
[CustomMetadata] -> m a
forall (m :: * -> *).
MonadThrow m =>
[CustomMetadata] -> m (ResponseInitialMetadata rpc)
parseMetadata (CustomMetadataMap -> [CustomMetadata]
customMetadataMapToList (CustomMetadataMap -> [CustomMetadata])
-> CustomMetadataMap -> [CustomMetadata]
forall a b. (a -> b) -> a -> b
$ ResponseHeaders' HandledSynthesized -> CustomMetadataMap
forall (f :: * -> *). ResponseHeaders_ f -> CustomMetadataMap
responseMetadata ResponseHeaders' HandledSynthesized
headers)
recvInitialResponse :: forall rpc m.
MonadIO m
=> Call rpc
-> m ( Either (TrailersOnly' HandledSynthesized)
(ResponseHeaders' HandledSynthesized)
)
recvInitialResponse :: forall {k} (rpc :: k) (m :: * -> *).
MonadIO m =>
Call rpc
-> m (Either
(TrailersOnly' HandledSynthesized)
(ResponseHeaders' HandledSynthesized))
recvInitialResponse Call{Channel (ClientSession rpc)
callChannel :: forall {k} (rpc :: k). Call rpc -> Channel (ClientSession rpc)
callChannel :: Channel (ClientSession rpc)
callChannel} = IO
(Either
(TrailersOnly' HandledSynthesized)
(ResponseHeaders' HandledSynthesized))
-> m (Either
(TrailersOnly' HandledSynthesized)
(ResponseHeaders' HandledSynthesized))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO
(Either
(TrailersOnly' HandledSynthesized)
(ResponseHeaders' HandledSynthesized))
-> m (Either
(TrailersOnly' HandledSynthesized)
(ResponseHeaders' HandledSynthesized)))
-> IO
(Either
(TrailersOnly' HandledSynthesized)
(ResponseHeaders' HandledSynthesized))
-> m (Either
(TrailersOnly' HandledSynthesized)
(ResponseHeaders' HandledSynthesized))
forall a b. (a -> b) -> a -> b
$
(Headers (ClientInbound rpc)
-> ResponseHeaders' HandledSynthesized)
-> Either
(TrailersOnly' HandledSynthesized) (Headers (ClientInbound rpc))
-> Either
(TrailersOnly' HandledSynthesized)
(ResponseHeaders' HandledSynthesized)
forall a b.
(a -> b)
-> Either (TrailersOnly' HandledSynthesized) a
-> Either (TrailersOnly' HandledSynthesized) b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Headers (ClientInbound rpc) -> ResponseHeaders' HandledSynthesized
forall k (rpc :: k).
Headers (ClientInbound rpc) -> ResponseHeaders' HandledSynthesized
inbHeaders (Either
(TrailersOnly' HandledSynthesized) (Headers (ClientInbound rpc))
-> Either
(TrailersOnly' HandledSynthesized)
(ResponseHeaders' HandledSynthesized))
-> IO
(Either
(TrailersOnly' HandledSynthesized) (Headers (ClientInbound rpc)))
-> IO
(Either
(TrailersOnly' HandledSynthesized)
(ResponseHeaders' HandledSynthesized))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Channel (ClientSession rpc)
-> IO
(Either
(NoMessages (Inbound (ClientSession rpc)))
(Headers (Inbound (ClientSession rpc))))
forall sess.
Channel sess
-> IO (Either (NoMessages (Inbound sess)) (Headers (Inbound sess)))
Session.getInboundHeaders Channel (ClientSession rpc)
callChannel
sendNextInput :: MonadIO m => Call rpc -> Input rpc -> m ()
sendNextInput :: forall {k} (m :: * -> *) (rpc :: k).
MonadIO m =>
Call rpc -> Input rpc -> m ()
sendNextInput Call rpc
call = Call rpc -> StreamElem NoMetadata (Input rpc) -> m ()
forall {k} (m :: * -> *) (rpc :: k).
(HasCallStack, MonadIO m) =>
Call rpc -> StreamElem NoMetadata (Input rpc) -> m ()
sendInput Call rpc
call (StreamElem NoMetadata (Input rpc) -> m ())
-> (Input rpc -> StreamElem NoMetadata (Input rpc))
-> Input rpc
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Input rpc -> StreamElem NoMetadata (Input rpc)
forall b a. a -> StreamElem b a
StreamElem
sendFinalInput ::
MonadIO m
=> Call rpc
-> Input rpc
-> m ()
sendFinalInput :: forall {k} (m :: * -> *) (rpc :: k).
MonadIO m =>
Call rpc -> Input rpc -> m ()
sendFinalInput Call rpc
call Input rpc
input =
Call rpc -> StreamElem NoMetadata (Input rpc) -> m ()
forall {k} (m :: * -> *) (rpc :: k).
(HasCallStack, MonadIO m) =>
Call rpc -> StreamElem NoMetadata (Input rpc) -> m ()
sendInput Call rpc
call (Input rpc -> NoMetadata -> StreamElem NoMetadata (Input rpc)
forall b a. a -> b -> StreamElem b a
FinalElem Input rpc
input NoMetadata
NoMetadata)
sendEndOfInput :: MonadIO m => Call rpc -> m ()
sendEndOfInput :: forall {k} (m :: * -> *) (rpc :: k). MonadIO m => Call rpc -> m ()
sendEndOfInput Call rpc
call = Call rpc -> StreamElem NoMetadata (Input rpc) -> m ()
forall {k} (m :: * -> *) (rpc :: k).
(HasCallStack, MonadIO m) =>
Call rpc -> StreamElem NoMetadata (Input rpc) -> m ()
sendInput Call rpc
call (StreamElem NoMetadata (Input rpc) -> m ())
-> StreamElem NoMetadata (Input rpc) -> m ()
forall a b. (a -> b) -> a -> b
$ NoMetadata -> StreamElem NoMetadata (Input rpc)
forall b a. b -> StreamElem b a
NoMoreElems NoMetadata
NoMetadata
recvResponseInitialMetadata :: forall rpc m.
MonadIO m
=> Call rpc
-> m (ResponseInitialMetadata rpc)
recvResponseInitialMetadata :: forall {k} (rpc :: k) (m :: * -> *).
MonadIO m =>
Call rpc -> m (ResponseInitialMetadata rpc)
recvResponseInitialMetadata call :: Call rpc
call@Call{} = IO (ResponseInitialMetadata rpc) -> m (ResponseInitialMetadata rpc)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (ResponseInitialMetadata rpc)
-> m (ResponseInitialMetadata rpc))
-> IO (ResponseInitialMetadata rpc)
-> m (ResponseInitialMetadata rpc)
forall a b. (a -> b) -> a -> b
$ do
md <- Call rpc -> IO (ResponseMetadata rpc)
forall {k} (rpc :: k) (m :: * -> *).
MonadIO m =>
Call rpc -> m (ResponseMetadata rpc)
recvResponseMetadata Call rpc
call
case md of
ResponseInitialMetadata ResponseInitialMetadata rpc
md' ->
ResponseInitialMetadata rpc -> IO (ResponseInitialMetadata rpc)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ResponseInitialMetadata rpc
md'
ResponseTrailingMetadata ResponseTrailingMetadata rpc
md' ->
ProtocolException rpc -> IO (ResponseInitialMetadata rpc)
forall a. ProtocolException rpc -> IO a
err (ProtocolException rpc -> IO (ResponseInitialMetadata rpc))
-> ProtocolException rpc -> IO (ResponseInitialMetadata rpc)
forall a b. (a -> b) -> a -> b
$ ResponseTrailingMetadata rpc -> ProtocolException rpc
forall {k} (rpc :: k).
ResponseTrailingMetadata rpc -> ProtocolException rpc
UnexpectedTrailersOnly ResponseTrailingMetadata rpc
md'
where
err :: ProtocolException rpc -> IO a
err :: forall a. ProtocolException rpc -> IO a
err = SomeProtocolException -> IO a
forall e a. (HasCallStack, Exception e) => e -> IO a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM (SomeProtocolException -> IO a)
-> (ProtocolException rpc -> SomeProtocolException)
-> ProtocolException rpc
-> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProtocolException rpc -> SomeProtocolException
forall {k} (rpc :: k).
IsRPC rpc =>
ProtocolException rpc -> SomeProtocolException
ProtocolException
recvNextOutput :: forall rpc m.
(MonadIO m, HasCallStack)
=> Call rpc -> m (Output rpc)
recvNextOutput :: forall {k} (rpc :: k) (m :: * -> *).
(MonadIO m, HasCallStack) =>
Call rpc -> m (Output rpc)
recvNextOutput call :: Call rpc
call@Call{} = IO (Output rpc) -> m (Output rpc)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Output rpc) -> m (Output rpc))
-> IO (Output rpc) -> m (Output rpc)
forall a b. (a -> b) -> a -> b
$ do
mOut <- Call rpc -> IO (Either ProperTrailers' (InboundMeta, Output rpc))
forall {k} (rpc :: k) (m :: * -> *).
(HasCallStack, MonadIO m) =>
Call rpc -> m (Either ProperTrailers' (InboundMeta, Output rpc))
recvEither Call rpc
call
case mOut of
Left ProperTrailers'
trailers -> do
trailingMetadata <- Call rpc -> ProperTrailers' -> IO (ResponseTrailingMetadata rpc)
forall {k} (m :: * -> *) (rpc :: k).
MonadIO m =>
Call rpc -> ProperTrailers' -> m (ResponseTrailingMetadata rpc)
responseTrailingMetadata Call rpc
call ProperTrailers'
trailers
err $ TooFewOutputs @rpc trailingMetadata
Right (InboundMeta
_env, Output rpc
out) ->
Output rpc -> IO (Output rpc)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Output rpc
out
where
err :: ProtocolException rpc -> IO a
err :: forall a. ProtocolException rpc -> IO a
err = SomeProtocolException -> IO a
forall e a. (HasCallStack, Exception e) => e -> IO a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM (SomeProtocolException -> IO a)
-> (ProtocolException rpc -> SomeProtocolException)
-> ProtocolException rpc
-> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProtocolException rpc -> SomeProtocolException
forall {k} (rpc :: k).
IsRPC rpc =>
ProtocolException rpc -> SomeProtocolException
ProtocolException
recvFinalOutput :: forall rpc m.
(MonadIO m, HasCallStack)
=> Call rpc
-> m (Output rpc, ResponseTrailingMetadata rpc)
recvFinalOutput :: forall {k} (rpc :: k) (m :: * -> *).
(MonadIO m, HasCallStack) =>
Call rpc -> m (Output rpc, ResponseTrailingMetadata rpc)
recvFinalOutput call :: Call rpc
call@Call{} = IO (Output rpc, ResponseTrailingMetadata rpc)
-> m (Output rpc, ResponseTrailingMetadata rpc)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Output rpc, ResponseTrailingMetadata rpc)
-> m (Output rpc, ResponseTrailingMetadata rpc))
-> IO (Output rpc, ResponseTrailingMetadata rpc)
-> m (Output rpc, ResponseTrailingMetadata rpc)
forall a b. (a -> b) -> a -> b
$ do
out1 <- Call rpc
-> IO (StreamElem (ResponseTrailingMetadata rpc) (Output rpc))
forall {k} (rpc :: k) (m :: * -> *).
(MonadIO m, HasCallStack) =>
Call rpc
-> m (StreamElem (ResponseTrailingMetadata rpc) (Output rpc))
recvOutput Call rpc
call
case out1 of
NoMoreElems ResponseTrailingMetadata rpc
ts -> ProtocolException rpc
-> IO (Output rpc, ResponseTrailingMetadata rpc)
forall a. ProtocolException rpc -> IO a
err (ProtocolException rpc
-> IO (Output rpc, ResponseTrailingMetadata rpc))
-> ProtocolException rpc
-> IO (Output rpc, ResponseTrailingMetadata rpc)
forall a b. (a -> b) -> a -> b
$ forall (rpc :: k).
ResponseTrailingMetadata rpc -> ProtocolException rpc
forall {k} (rpc :: k).
ResponseTrailingMetadata rpc -> ProtocolException rpc
TooFewOutputs @rpc ResponseTrailingMetadata rpc
ts
FinalElem Output rpc
out ResponseTrailingMetadata rpc
ts -> (Output rpc, ResponseTrailingMetadata rpc)
-> IO (Output rpc, ResponseTrailingMetadata rpc)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Output rpc
out, ResponseTrailingMetadata rpc
ts)
StreamElem Output rpc
out -> do
out2 <- Call rpc
-> IO (StreamElem (ResponseTrailingMetadata rpc) (Output rpc))
forall {k} (rpc :: k) (m :: * -> *).
(MonadIO m, HasCallStack) =>
Call rpc
-> m (StreamElem (ResponseTrailingMetadata rpc) (Output rpc))
recvOutput Call rpc
call
case out2 of
NoMoreElems ResponseTrailingMetadata rpc
ts -> (Output rpc, ResponseTrailingMetadata rpc)
-> IO (Output rpc, ResponseTrailingMetadata rpc)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Output rpc
out, ResponseTrailingMetadata rpc
ts)
FinalElem Output rpc
out' ResponseTrailingMetadata rpc
_ -> ProtocolException rpc
-> IO (Output rpc, ResponseTrailingMetadata rpc)
forall a. ProtocolException rpc -> IO a
err (ProtocolException rpc
-> IO (Output rpc, ResponseTrailingMetadata rpc))
-> ProtocolException rpc
-> IO (Output rpc, ResponseTrailingMetadata rpc)
forall a b. (a -> b) -> a -> b
$ forall (rpc :: k). Output rpc -> ProtocolException rpc
forall {k} (rpc :: k). Output rpc -> ProtocolException rpc
TooManyOutputs @rpc Output rpc
out'
StreamElem Output rpc
out' -> ProtocolException rpc
-> IO (Output rpc, ResponseTrailingMetadata rpc)
forall a. ProtocolException rpc -> IO a
err (ProtocolException rpc
-> IO (Output rpc, ResponseTrailingMetadata rpc))
-> ProtocolException rpc
-> IO (Output rpc, ResponseTrailingMetadata rpc)
forall a b. (a -> b) -> a -> b
$ forall (rpc :: k). Output rpc -> ProtocolException rpc
forall {k} (rpc :: k). Output rpc -> ProtocolException rpc
TooManyOutputs @rpc Output rpc
out'
where
err :: ProtocolException rpc -> IO a
err :: forall a. ProtocolException rpc -> IO a
err = SomeProtocolException -> IO a
forall e a. (HasCallStack, Exception e) => e -> IO a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM (SomeProtocolException -> IO a)
-> (ProtocolException rpc -> SomeProtocolException)
-> ProtocolException rpc
-> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProtocolException rpc -> SomeProtocolException
forall {k} (rpc :: k).
IsRPC rpc =>
ProtocolException rpc -> SomeProtocolException
ProtocolException
recvTrailers :: forall rpc m.
(MonadIO m, HasCallStack)
=> Call rpc -> m (ResponseTrailingMetadata rpc)
recvTrailers :: forall {k} (rpc :: k) (m :: * -> *).
(MonadIO m, HasCallStack) =>
Call rpc -> m (ResponseTrailingMetadata rpc)
recvTrailers call :: Call rpc
call@Call{} = IO (ResponseTrailingMetadata rpc)
-> m (ResponseTrailingMetadata rpc)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (ResponseTrailingMetadata rpc)
-> m (ResponseTrailingMetadata rpc))
-> IO (ResponseTrailingMetadata rpc)
-> m (ResponseTrailingMetadata rpc)
forall a b. (a -> b) -> a -> b
$ do
mOut <- Call rpc
-> IO (StreamElem (ResponseTrailingMetadata rpc) (Output rpc))
forall {k} (rpc :: k) (m :: * -> *).
(MonadIO m, HasCallStack) =>
Call rpc
-> m (StreamElem (ResponseTrailingMetadata rpc) (Output rpc))
recvOutput Call rpc
call
case mOut of
NoMoreElems ResponseTrailingMetadata rpc
ts -> ResponseTrailingMetadata rpc -> IO (ResponseTrailingMetadata rpc)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ResponseTrailingMetadata rpc
ts
FinalElem Output rpc
out ResponseTrailingMetadata rpc
_ts -> ProtocolException rpc -> IO (ResponseTrailingMetadata rpc)
forall a. ProtocolException rpc -> IO a
err (ProtocolException rpc -> IO (ResponseTrailingMetadata rpc))
-> ProtocolException rpc -> IO (ResponseTrailingMetadata rpc)
forall a b. (a -> b) -> a -> b
$ forall (rpc :: k). Output rpc -> ProtocolException rpc
forall {k} (rpc :: k). Output rpc -> ProtocolException rpc
TooManyOutputs @rpc Output rpc
out
StreamElem Output rpc
out -> ProtocolException rpc -> IO (ResponseTrailingMetadata rpc)
forall a. ProtocolException rpc -> IO a
err (ProtocolException rpc -> IO (ResponseTrailingMetadata rpc))
-> ProtocolException rpc -> IO (ResponseTrailingMetadata rpc)
forall a b. (a -> b) -> a -> b
$ forall (rpc :: k). Output rpc -> ProtocolException rpc
forall {k} (rpc :: k). Output rpc -> ProtocolException rpc
TooManyOutputs @rpc Output rpc
out
where
err :: ProtocolException rpc -> IO a
err :: forall a. ProtocolException rpc -> IO a
err = SomeProtocolException -> IO a
forall e a. (HasCallStack, Exception e) => e -> IO a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM (SomeProtocolException -> IO a)
-> (ProtocolException rpc -> SomeProtocolException)
-> ProtocolException rpc
-> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProtocolException rpc -> SomeProtocolException
forall {k} (rpc :: k).
IsRPC rpc =>
ProtocolException rpc -> SomeProtocolException
ProtocolException
recvBoth :: forall rpc m.
(HasCallStack, MonadIO m)
=> Call rpc
-> m (StreamElem ProperTrailers' (InboundMeta, Output rpc))
recvBoth :: forall {k} (rpc :: k) (m :: * -> *).
(HasCallStack, MonadIO m) =>
Call rpc
-> m (StreamElem ProperTrailers' (InboundMeta, Output rpc))
recvBoth Call{Channel (ClientSession rpc)
callChannel :: forall {k} (rpc :: k). Call rpc -> Channel (ClientSession rpc)
callChannel :: Channel (ClientSession rpc)
callChannel} = IO (StreamElem ProperTrailers' (InboundMeta, Output rpc))
-> m (StreamElem ProperTrailers' (InboundMeta, Output rpc))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (StreamElem ProperTrailers' (InboundMeta, Output rpc))
-> m (StreamElem ProperTrailers' (InboundMeta, Output rpc)))
-> IO (StreamElem ProperTrailers' (InboundMeta, Output rpc))
-> m (StreamElem ProperTrailers' (InboundMeta, Output rpc))
forall a b. (a -> b) -> a -> b
$
Either
(TrailersOnly' HandledSynthesized)
(StreamElem ProperTrailers' (InboundMeta, Output rpc))
-> StreamElem ProperTrailers' (InboundMeta, Output rpc)
flatten (Either
(TrailersOnly' HandledSynthesized)
(StreamElem ProperTrailers' (InboundMeta, Output rpc))
-> StreamElem ProperTrailers' (InboundMeta, Output rpc))
-> IO
(Either
(TrailersOnly' HandledSynthesized)
(StreamElem ProperTrailers' (InboundMeta, Output rpc)))
-> IO (StreamElem ProperTrailers' (InboundMeta, Output rpc))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Channel (ClientSession rpc)
-> IO
(Either
(NoMessages (Inbound (ClientSession rpc)))
(StreamElem
(Trailers (Inbound (ClientSession rpc)))
(Message (Inbound (ClientSession rpc)))))
forall sess.
HasCallStack =>
Channel sess
-> IO
(Either
(NoMessages (Inbound sess))
(StreamElem (Trailers (Inbound sess)) (Message (Inbound sess))))
Session.recvBoth Channel (ClientSession rpc)
callChannel
where
flatten ::
Either
(TrailersOnly' HandledSynthesized)
(StreamElem ProperTrailers' (InboundMeta, Output rpc))
-> StreamElem ProperTrailers' (InboundMeta, Output rpc)
flatten :: Either
(TrailersOnly' HandledSynthesized)
(StreamElem ProperTrailers' (InboundMeta, Output rpc))
-> StreamElem ProperTrailers' (InboundMeta, Output rpc)
flatten (Left TrailersOnly' HandledSynthesized
trailersOnly) =
ProperTrailers'
-> StreamElem ProperTrailers' (InboundMeta, Output rpc)
forall b a. b -> StreamElem b a
NoMoreElems (ProperTrailers'
-> StreamElem ProperTrailers' (InboundMeta, Output rpc))
-> ProperTrailers'
-> StreamElem ProperTrailers' (InboundMeta, Output rpc)
forall a b. (a -> b) -> a -> b
$ TrailersOnly' HandledSynthesized -> ProperTrailers'
trailersOnlyToProperTrailers' TrailersOnly' HandledSynthesized
trailersOnly
flatten (Right StreamElem ProperTrailers' (InboundMeta, Output rpc)
streamElem) =
StreamElem ProperTrailers' (InboundMeta, Output rpc)
streamElem
recvEither :: forall rpc m.
(HasCallStack, MonadIO m)
=> Call rpc
-> m (Either ProperTrailers' (InboundMeta, Output rpc))
recvEither :: forall {k} (rpc :: k) (m :: * -> *).
(HasCallStack, MonadIO m) =>
Call rpc -> m (Either ProperTrailers' (InboundMeta, Output rpc))
recvEither Call{Channel (ClientSession rpc)
callChannel :: forall {k} (rpc :: k). Call rpc -> Channel (ClientSession rpc)
callChannel :: Channel (ClientSession rpc)
callChannel} = IO (Either ProperTrailers' (InboundMeta, Output rpc))
-> m (Either ProperTrailers' (InboundMeta, Output rpc))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either ProperTrailers' (InboundMeta, Output rpc))
-> m (Either ProperTrailers' (InboundMeta, Output rpc)))
-> IO (Either ProperTrailers' (InboundMeta, Output rpc))
-> m (Either ProperTrailers' (InboundMeta, Output rpc))
forall a b. (a -> b) -> a -> b
$
Either
(TrailersOnly' HandledSynthesized)
(Either ProperTrailers' (InboundMeta, Output rpc))
-> Either ProperTrailers' (InboundMeta, Output rpc)
flatten (Either
(TrailersOnly' HandledSynthesized)
(Either ProperTrailers' (InboundMeta, Output rpc))
-> Either ProperTrailers' (InboundMeta, Output rpc))
-> IO
(Either
(TrailersOnly' HandledSynthesized)
(Either ProperTrailers' (InboundMeta, Output rpc)))
-> IO (Either ProperTrailers' (InboundMeta, Output rpc))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Channel (ClientSession rpc)
-> IO
(Either
(NoMessages (Inbound (ClientSession rpc)))
(Either
(Trailers (Inbound (ClientSession rpc)))
(Message (Inbound (ClientSession rpc)))))
forall sess.
HasCallStack =>
Channel sess
-> IO
(Either
(NoMessages (Inbound sess))
(Either (Trailers (Inbound sess)) (Message (Inbound sess))))
Session.recvEither Channel (ClientSession rpc)
callChannel
where
flatten ::
Either
(TrailersOnly' HandledSynthesized)
(Either ProperTrailers' (InboundMeta, Output rpc))
-> Either ProperTrailers' (InboundMeta, Output rpc)
flatten :: Either
(TrailersOnly' HandledSynthesized)
(Either ProperTrailers' (InboundMeta, Output rpc))
-> Either ProperTrailers' (InboundMeta, Output rpc)
flatten (Left TrailersOnly' HandledSynthesized
trailersOnly) =
ProperTrailers' -> Either ProperTrailers' (InboundMeta, Output rpc)
forall a b. a -> Either a b
Left (ProperTrailers'
-> Either ProperTrailers' (InboundMeta, Output rpc))
-> ProperTrailers'
-> Either ProperTrailers' (InboundMeta, Output rpc)
forall a b. (a -> b) -> a -> b
$ TrailersOnly' HandledSynthesized -> ProperTrailers'
trailersOnlyToProperTrailers' TrailersOnly' HandledSynthesized
trailersOnly
flatten (Right (Left ProperTrailers'
properTrailers)) =
ProperTrailers' -> Either ProperTrailers' (InboundMeta, Output rpc)
forall a b. a -> Either a b
Left (ProperTrailers'
-> Either ProperTrailers' (InboundMeta, Output rpc))
-> ProperTrailers'
-> Either ProperTrailers' (InboundMeta, Output rpc)
forall a b. (a -> b) -> a -> b
$ ProperTrailers'
properTrailers
flatten (Right (Right (InboundMeta, Output rpc)
msg)) =
(InboundMeta, Output rpc)
-> Either ProperTrailers' (InboundMeta, Output rpc)
forall a b. b -> Either a b
Right ((InboundMeta, Output rpc)
-> Either ProperTrailers' (InboundMeta, Output rpc))
-> (InboundMeta, Output rpc)
-> Either ProperTrailers' (InboundMeta, Output rpc)
forall a b. (a -> b) -> a -> b
$ (InboundMeta, Output rpc)
msg
responseTrailingMetadata ::
MonadIO m
=> Call rpc
-> ProperTrailers' -> m (ResponseTrailingMetadata rpc)
responseTrailingMetadata :: forall {k} (m :: * -> *) (rpc :: k).
MonadIO m =>
Call rpc -> ProperTrailers' -> m (ResponseTrailingMetadata rpc)
responseTrailingMetadata Call{} ProperTrailers'
trailers = IO (ResponseTrailingMetadata rpc)
-> m (ResponseTrailingMetadata rpc)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (ResponseTrailingMetadata rpc)
-> m (ResponseTrailingMetadata rpc))
-> IO (ResponseTrailingMetadata rpc)
-> m (ResponseTrailingMetadata rpc)
forall a b. (a -> b) -> a -> b
$
case ProperTrailers' -> Either GrpcException GrpcNormalTermination
grpcClassifyTermination ProperTrailers'
trailers of
Right GrpcNormalTermination
terminatedNormally -> do
[CustomMetadata] -> IO (ResponseTrailingMetadata rpc)
forall a (m :: * -> *).
(ParseMetadata a, MonadThrow m) =>
[CustomMetadata] -> m a
forall (m :: * -> *).
MonadThrow m =>
[CustomMetadata] -> m (ResponseTrailingMetadata rpc)
parseMetadata ([CustomMetadata] -> IO (ResponseTrailingMetadata rpc))
-> [CustomMetadata] -> IO (ResponseTrailingMetadata rpc)
forall a b. (a -> b) -> a -> b
$ GrpcNormalTermination -> [CustomMetadata]
grpcTerminatedMetadata GrpcNormalTermination
terminatedNormally
Left GrpcException
exception ->
GrpcException -> IO (ResponseTrailingMetadata rpc)
forall e a. (HasCallStack, Exception e) => e -> IO a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM GrpcException
exception
trailersOnlyToProperTrailers' ::
TrailersOnly' HandledSynthesized
-> ProperTrailers'
trailersOnlyToProperTrailers' :: TrailersOnly' HandledSynthesized -> ProperTrailers'
trailersOnlyToProperTrailers' =
(ProperTrailers',
Either (InvalidHeaders GrpcException) (Maybe ContentType))
-> ProperTrailers'
forall a b. (a, b) -> a
fst
((ProperTrailers',
Either (InvalidHeaders GrpcException) (Maybe ContentType))
-> ProperTrailers')
-> (TrailersOnly' HandledSynthesized
-> (ProperTrailers',
Either (InvalidHeaders GrpcException) (Maybe ContentType)))
-> TrailersOnly' HandledSynthesized
-> ProperTrailers'
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TrailersOnly_ (Checked (InvalidHeaders GrpcException))
-> (ProperTrailers',
Either (InvalidHeaders GrpcException) (Maybe ContentType))
TrailersOnly_ (Checked (InvalidHeaders GrpcException))
-> (ProperTrailers',
HKD (Checked (InvalidHeaders GrpcException)) (Maybe ContentType))
forall (f :: * -> *).
TrailersOnly_ f -> (ProperTrailers_ f, HKD f (Maybe ContentType))
trailersOnlyToProperTrailers
(TrailersOnly_ (Checked (InvalidHeaders GrpcException))
-> (ProperTrailers',
Either (InvalidHeaders GrpcException) (Maybe ContentType)))
-> (TrailersOnly' HandledSynthesized
-> TrailersOnly_ (Checked (InvalidHeaders GrpcException)))
-> TrailersOnly' HandledSynthesized
-> (ProperTrailers',
Either (InvalidHeaders GrpcException) (Maybe ContentType))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (forall a.
Either (InvalidHeaders HandledSynthesized) a
-> Either (InvalidHeaders GrpcException) a)
-> TrailersOnly' HandledSynthesized
-> TrailersOnly_ (Checked (InvalidHeaders GrpcException))
forall (t :: (* -> *) -> *) (f :: * -> *) (g :: * -> *).
Traversable t =>
(forall a. f a -> g a)
-> t (DecoratedWith f) -> t (DecoratedWith g)
HKD.map ((InvalidHeaders HandledSynthesized -> InvalidHeaders GrpcException)
-> Either (InvalidHeaders HandledSynthesized) a
-> Either (InvalidHeaders GrpcException) a
forall a b c. (a -> b) -> Either a c -> Either b c
forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first ((InvalidHeaders HandledSynthesized
-> InvalidHeaders GrpcException)
-> Either (InvalidHeaders HandledSynthesized) a
-> Either (InvalidHeaders GrpcException) a)
-> (InvalidHeaders HandledSynthesized
-> InvalidHeaders GrpcException)
-> Either (InvalidHeaders HandledSynthesized) a
-> Either (InvalidHeaders GrpcException) a
forall a b. (a -> b) -> a -> b
$ (HandledSynthesized -> GrpcException)
-> InvalidHeaders HandledSynthesized
-> InvalidHeaders GrpcException
forall e e'. (e -> e') -> InvalidHeaders e -> InvalidHeaders e'
mapSynthesized HandledSynthesized -> GrpcException
forall a. HandledSynthesized -> a
handledSynthesized)