module Network.GRPC.Util.Session.Channel (
Channel(..)
, initChannel
, FlowState(..)
, RegularFlowState(..)
, initFlowStateRegular
, getInboundHeaders
, send
, recvBoth
, recvEither
, RecvFinal(..)
, RecvAfterFinal(..)
, SendAfterFinal(..)
, waitForOutbound
, close
, ChannelDiscarded(..)
, ChannelAborted(..)
, InboundResult
, AllowHalfClosed(..)
, linkOutboundToInbound
, sendMessageLoop
, recvMessageLoop
, outboundTrailersMaker
) where
import Control.Concurrent.STM
import Control.DeepSeq (NFData, force)
import Control.Exception
import Control.Monad
import Control.Monad.Catch (ExitCase(..))
import Data.Bifunctor
import Data.ByteString.Builder (Builder)
import Data.ByteString.Lazy qualified as BS.Lazy
import GHC.Stack
import Network.HTTP2.Client qualified as HTTP2 (
TrailersMaker
, NextTrailersMaker(..)
)
import Network.GRPC.Common.StreamElem (StreamElem(..))
import Network.GRPC.Common.StreamElem qualified as StreamElem
import Network.GRPC.Spec.Util.Parser (Parser)
import Network.GRPC.Spec.Util.Parser qualified as Parser
import Network.GRPC.Util.HTTP2.Stream
import Network.GRPC.Util.RedundantConstraint
import Network.GRPC.Util.Session.API
import Network.GRPC.Util.Thread
data Channel sess = Channel {
forall sess.
Channel sess -> TVar (ThreadState (FlowState (Inbound sess)))
channelInbound :: TVar (ThreadState (FlowState (Inbound sess)))
, forall sess.
Channel sess -> TVar (ThreadState (FlowState (Outbound sess)))
channelOutbound :: TVar (ThreadState (FlowState (Outbound sess)))
, forall sess. Channel sess -> TVar (Maybe CallStack)
channelSentFinal :: TVar (Maybe CallStack)
, forall sess. Channel sess -> TVar (RecvFinal (Inbound sess))
channelRecvFinal :: TVar (RecvFinal (Inbound sess))
}
data RecvFinal flow =
RecvNotFinal
| RecvWithoutTrailers (Trailers flow)
| RecvFinal CallStack
data FlowState flow =
FlowStateRegular (RegularFlowState flow)
| FlowStateNoMessages (NoMessages flow)
data RegularFlowState flow = RegularFlowState {
:: Headers flow
, forall {k} (flow :: k).
RegularFlowState flow
-> TMVar (StreamElem (Trailers flow) (Message flow))
flowMsg :: TMVar (StreamElem (Trailers flow) (Message flow))
, forall {k} (flow :: k).
RegularFlowState flow -> TMVar (Trailers flow)
flowTerminated :: TMVar (Trailers flow)
}
deriving instance (
Show (Headers flow)
, Show (TMVar (StreamElem (Trailers flow) (Message flow)))
, Show (TMVar (Trailers flow))
) => Show (RegularFlowState flow)
initChannel :: HasCallStack => IO (Channel sess)
initChannel :: forall sess. HasCallStack => IO (Channel sess)
initChannel = do
channelInbound <- IO (TVar (ThreadState (FlowState (Inbound sess))))
forall a. HasCallStack => IO (TVar (ThreadState a))
newThreadState
channelOutbound <- newThreadState
channelSentFinal <- newTVarIO Nothing
channelRecvFinal <- newTVarIO RecvNotFinal
return Channel{
channelInbound
, channelOutbound
, channelSentFinal
, channelRecvFinal
}
initFlowStateRegular :: Headers flow -> IO (RegularFlowState flow)
initFlowStateRegular :: forall {k} (flow :: k). Headers flow -> IO (RegularFlowState flow)
initFlowStateRegular Headers flow
flowHeaders = do
flowMsg <- IO (TMVar (StreamElem (Trailers flow) (Message flow)))
forall a. IO (TMVar a)
newEmptyTMVarIO
flowTerminated <- newEmptyTMVarIO
return RegularFlowState {
flowHeaders
, flowMsg
, flowTerminated
}
getInboundHeaders ::
Channel sess
-> IO (Either (NoMessages (Inbound sess)) (Headers (Inbound sess)))
Channel{TVar (ThreadState (FlowState (Inbound sess)))
channelInbound :: forall sess.
Channel sess -> TVar (ThreadState (FlowState (Inbound sess)))
channelInbound :: TVar (ThreadState (FlowState (Inbound sess)))
channelInbound} =
TVar (ThreadState (FlowState (Inbound sess)))
-> (FlowState (Inbound sess)
-> STM
(Either (NoMessages (Inbound sess)) (Headers (Inbound sess))))
-> IO (Either (NoMessages (Inbound sess)) (Headers (Inbound sess)))
forall a b. TVar (ThreadState a) -> (a -> STM b) -> IO b
withThreadInterface TVar (ThreadState (FlowState (Inbound sess)))
channelInbound (Either (NoMessages (Inbound sess)) (Headers (Inbound sess))
-> STM
(Either (NoMessages (Inbound sess)) (Headers (Inbound sess)))
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either (NoMessages (Inbound sess)) (Headers (Inbound sess))
-> STM
(Either (NoMessages (Inbound sess)) (Headers (Inbound sess))))
-> (FlowState (Inbound sess)
-> Either (NoMessages (Inbound sess)) (Headers (Inbound sess)))
-> FlowState (Inbound sess)
-> STM
(Either (NoMessages (Inbound sess)) (Headers (Inbound sess)))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FlowState (Inbound sess)
-> Either (NoMessages (Inbound sess)) (Headers (Inbound sess))
forall {k} (flow :: k).
FlowState flow -> Either (NoMessages flow) (Headers flow)
aux)
where
aux :: forall flow.
FlowState flow
-> Either (NoMessages flow) (Headers flow)
aux :: forall {k} (flow :: k).
FlowState flow -> Either (NoMessages flow) (Headers flow)
aux (FlowStateRegular RegularFlowState flow
regular) = Headers flow -> Either (NoMessages flow) (Headers flow)
forall a b. b -> Either a b
Right (Headers flow -> Either (NoMessages flow) (Headers flow))
-> Headers flow -> Either (NoMessages flow) (Headers flow)
forall a b. (a -> b) -> a -> b
$ RegularFlowState flow -> Headers flow
forall {k} (flow :: k). RegularFlowState flow -> Headers flow
flowHeaders RegularFlowState flow
regular
aux (FlowStateNoMessages NoMessages flow
trailers) = NoMessages flow -> Either (NoMessages flow) (Headers flow)
forall a b. a -> Either a b
Left NoMessages flow
trailers
send :: forall sess.
(HasCallStack, NFData (Message (Outbound sess)))
=> Channel sess
-> StreamElem (Trailers (Outbound sess)) (Message (Outbound sess))
-> IO ()
send :: forall sess.
(HasCallStack, NFData (Message (Outbound sess))) =>
Channel sess
-> StreamElem (Trailers (Outbound sess)) (Message (Outbound sess))
-> IO ()
send Channel{TVar (ThreadState (FlowState (Outbound sess)))
channelOutbound :: forall sess.
Channel sess -> TVar (ThreadState (FlowState (Outbound sess)))
channelOutbound :: TVar (ThreadState (FlowState (Outbound sess)))
channelOutbound, TVar (Maybe CallStack)
channelSentFinal :: forall sess. Channel sess -> TVar (Maybe CallStack)
channelSentFinal :: TVar (Maybe CallStack)
channelSentFinal} = \StreamElem (Trailers (Outbound sess)) (Message (Outbound sess))
msg -> do
msg' <- StreamElem (Trailers (Outbound sess)) (Message (Outbound sess))
-> IO
(StreamElem (Trailers (Outbound sess)) (Message (Outbound sess)))
forall a. a -> IO a
evaluate (StreamElem (Trailers (Outbound sess)) (Message (Outbound sess))
-> IO
(StreamElem (Trailers (Outbound sess)) (Message (Outbound sess))))
-> StreamElem (Trailers (Outbound sess)) (Message (Outbound sess))
-> IO
(StreamElem (Trailers (Outbound sess)) (Message (Outbound sess)))
forall a b. (a -> b) -> a -> b
$ Message (Outbound sess) -> Message (Outbound sess)
forall a. NFData a => a -> a
force (Message (Outbound sess) -> Message (Outbound sess))
-> StreamElem (Trailers (Outbound sess)) (Message (Outbound sess))
-> StreamElem (Trailers (Outbound sess)) (Message (Outbound sess))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StreamElem (Trailers (Outbound sess)) (Message (Outbound sess))
msg
withThreadInterface channelOutbound $ aux msg'
where
aux ::
StreamElem (Trailers (Outbound sess)) (Message (Outbound sess))
-> FlowState (Outbound sess)
-> STM ()
aux :: StreamElem (Trailers (Outbound sess)) (Message (Outbound sess))
-> FlowState (Outbound sess) -> STM ()
aux StreamElem (Trailers (Outbound sess)) (Message (Outbound sess))
msg FlowState (Outbound sess)
st = do
sentFinal <- TVar (Maybe CallStack) -> STM (Maybe CallStack)
forall a. TVar a -> STM a
readTVar TVar (Maybe CallStack)
channelSentFinal
case sentFinal of
Just CallStack
cs -> SendAfterFinal -> STM ()
forall e a. Exception e => e -> STM a
throwSTM (SendAfterFinal -> STM ()) -> SendAfterFinal -> STM ()
forall a b. (a -> b) -> a -> b
$ CallStack -> SendAfterFinal
SendAfterFinal CallStack
cs
Maybe CallStack
Nothing -> () -> STM ()
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
case st of
FlowStateRegular RegularFlowState (Outbound sess)
regular -> do
StreamElem (Trailers (Outbound sess)) (Message (Outbound sess))
-> (Trailers (Outbound sess) -> STM ()) -> STM ()
forall (m :: * -> *) b a.
Applicative m =>
StreamElem b a -> (b -> m ()) -> m ()
StreamElem.whenDefinitelyFinal StreamElem (Trailers (Outbound sess)) (Message (Outbound sess))
msg ((Trailers (Outbound sess) -> STM ()) -> STM ())
-> (Trailers (Outbound sess) -> STM ()) -> STM ()
forall a b. (a -> b) -> a -> b
$ \Trailers (Outbound sess)
_trailers ->
TVar (Maybe CallStack) -> Maybe CallStack -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe CallStack)
channelSentFinal (Maybe CallStack -> STM ()) -> Maybe CallStack -> STM ()
forall a b. (a -> b) -> a -> b
$ CallStack -> Maybe CallStack
forall a. a -> Maybe a
Just CallStack
HasCallStack => CallStack
callStack
TMVar
(StreamElem (Trailers (Outbound sess)) (Message (Outbound sess)))
-> StreamElem (Trailers (Outbound sess)) (Message (Outbound sess))
-> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar (RegularFlowState (Outbound sess)
-> TMVar
(StreamElem (Trailers (Outbound sess)) (Message (Outbound sess)))
forall {k} (flow :: k).
RegularFlowState flow
-> TMVar (StreamElem (Trailers flow) (Message flow))
flowMsg RegularFlowState (Outbound sess)
regular) StreamElem (Trailers (Outbound sess)) (Message (Outbound sess))
msg
FlowStateNoMessages NoMessages (Outbound sess)
_ ->
SendAfterFinal -> STM ()
forall e a. Exception e => e -> STM a
throwSTM (SendAfterFinal -> STM ()) -> SendAfterFinal -> STM ()
forall a b. (a -> b) -> a -> b
$ SendAfterFinal
SendButTrailersOnly
recvBoth :: forall sess.
HasCallStack
=> Channel sess
-> IO ( Either
(NoMessages (Inbound sess))
(StreamElem (Trailers (Inbound sess)) (Message (Inbound sess)))
)
recvBoth :: forall sess.
HasCallStack =>
Channel sess
-> IO
(Either
(NoMessages (Inbound sess))
(StreamElem (Trailers (Inbound sess)) (Message (Inbound sess))))
recvBoth =
(Message (Inbound sess)
-> StreamElem (Trailers (Inbound sess)) (Message (Inbound sess)))
-> (Trailers (Inbound sess)
-> StreamElem (Trailers (Inbound sess)) (Message (Inbound sess)))
-> ((Message (Inbound sess), Trailers (Inbound sess))
-> (StreamElem (Trailers (Inbound sess)) (Message (Inbound sess)),
Maybe (Trailers (Inbound sess))))
-> Channel sess
-> IO
(Either
(NoMessages (Inbound sess))
(StreamElem (Trailers (Inbound sess)) (Message (Inbound sess))))
forall sess b.
HasCallStack =>
(Message (Inbound sess) -> b)
-> (Trailers (Inbound sess) -> b)
-> ((Message (Inbound sess), Trailers (Inbound sess))
-> (b, Maybe (Trailers (Inbound sess))))
-> Channel sess
-> IO (Either (NoMessages (Inbound sess)) b)
recv'
Message (Inbound sess)
-> StreamElem (Trailers (Inbound sess)) (Message (Inbound sess))
forall b a. a -> StreamElem b a
StreamElem
Trailers (Inbound sess)
-> StreamElem (Trailers (Inbound sess)) (Message (Inbound sess))
forall b a. b -> StreamElem b a
NoMoreElems
((,Maybe (Trailers (Inbound sess))
forall a. Maybe a
Nothing) (StreamElem (Trailers (Inbound sess)) (Message (Inbound sess))
-> (StreamElem (Trailers (Inbound sess)) (Message (Inbound sess)),
Maybe (Trailers (Inbound sess))))
-> ((Message (Inbound sess), Trailers (Inbound sess))
-> StreamElem (Trailers (Inbound sess)) (Message (Inbound sess)))
-> (Message (Inbound sess), Trailers (Inbound sess))
-> (StreamElem (Trailers (Inbound sess)) (Message (Inbound sess)),
Maybe (Trailers (Inbound sess)))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Message (Inbound sess)
-> Trailers (Inbound sess)
-> StreamElem (Trailers (Inbound sess)) (Message (Inbound sess)))
-> (Message (Inbound sess), Trailers (Inbound sess))
-> StreamElem (Trailers (Inbound sess)) (Message (Inbound sess))
forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry Message (Inbound sess)
-> Trailers (Inbound sess)
-> StreamElem (Trailers (Inbound sess)) (Message (Inbound sess))
forall b a. a -> b -> StreamElem b a
FinalElem)
recvEither ::
HasCallStack
=> Channel sess
-> IO ( Either
(NoMessages (Inbound sess))
(Either (Trailers (Inbound sess)) (Message (Inbound sess)))
)
recvEither :: forall sess.
HasCallStack =>
Channel sess
-> IO
(Either
(NoMessages (Inbound sess))
(Either (Trailers (Inbound sess)) (Message (Inbound sess))))
recvEither =
(Message (Inbound sess)
-> Either (Trailers (Inbound sess)) (Message (Inbound sess)))
-> (Trailers (Inbound sess)
-> Either (Trailers (Inbound sess)) (Message (Inbound sess)))
-> ((Message (Inbound sess), Trailers (Inbound sess))
-> (Either (Trailers (Inbound sess)) (Message (Inbound sess)),
Maybe (Trailers (Inbound sess))))
-> Channel sess
-> IO
(Either
(NoMessages (Inbound sess))
(Either (Trailers (Inbound sess)) (Message (Inbound sess))))
forall sess b.
HasCallStack =>
(Message (Inbound sess) -> b)
-> (Trailers (Inbound sess) -> b)
-> ((Message (Inbound sess), Trailers (Inbound sess))
-> (b, Maybe (Trailers (Inbound sess))))
-> Channel sess
-> IO (Either (NoMessages (Inbound sess)) b)
recv'
Message (Inbound sess)
-> Either (Trailers (Inbound sess)) (Message (Inbound sess))
forall a b. b -> Either a b
Right
Trailers (Inbound sess)
-> Either (Trailers (Inbound sess)) (Message (Inbound sess))
forall a b. a -> Either a b
Left
((Message (Inbound sess)
-> Either (Trailers (Inbound sess)) (Message (Inbound sess)))
-> (Trailers (Inbound sess) -> Maybe (Trailers (Inbound sess)))
-> (Message (Inbound sess), Trailers (Inbound sess))
-> (Either (Trailers (Inbound sess)) (Message (Inbound sess)),
Maybe (Trailers (Inbound sess)))
forall a b c d. (a -> b) -> (c -> d) -> (a, c) -> (b, d)
forall (p :: * -> * -> *) a b c d.
Bifunctor p =>
(a -> b) -> (c -> d) -> p a c -> p b d
bimap Message (Inbound sess)
-> Either (Trailers (Inbound sess)) (Message (Inbound sess))
forall a b. b -> Either a b
Right Trailers (Inbound sess) -> Maybe (Trailers (Inbound sess))
forall a. a -> Maybe a
Just)
recv' :: forall sess b.
HasCallStack
=> (Message (Inbound sess) -> b)
-> (Trailers (Inbound sess) -> b)
-> ( (Message (Inbound sess), Trailers (Inbound sess))
-> (b, Maybe (Trailers (Inbound sess)))
)
-> Channel sess
-> IO (Either (NoMessages (Inbound sess)) b)
recv' :: forall sess b.
HasCallStack =>
(Message (Inbound sess) -> b)
-> (Trailers (Inbound sess) -> b)
-> ((Message (Inbound sess), Trailers (Inbound sess))
-> (b, Maybe (Trailers (Inbound sess))))
-> Channel sess
-> IO (Either (NoMessages (Inbound sess)) b)
recv' Message (Inbound sess) -> b
messageWithoutTrailers
Trailers (Inbound sess) -> b
trailersWithoutMessage
(Message (Inbound sess), Trailers (Inbound sess))
-> (b, Maybe (Trailers (Inbound sess)))
messageWithTrailers
Channel{TVar (ThreadState (FlowState (Inbound sess)))
channelInbound :: forall sess.
Channel sess -> TVar (ThreadState (FlowState (Inbound sess)))
channelInbound :: TVar (ThreadState (FlowState (Inbound sess)))
channelInbound, TVar (RecvFinal (Inbound sess))
channelRecvFinal :: forall sess. Channel sess -> TVar (RecvFinal (Inbound sess))
channelRecvFinal :: TVar (RecvFinal (Inbound sess))
channelRecvFinal} =
TVar (ThreadState (FlowState (Inbound sess)))
-> (FlowState (Inbound sess)
-> STM (Either (NoMessages (Inbound sess)) b))
-> IO (Either (NoMessages (Inbound sess)) b)
forall a b. TVar (ThreadState a) -> (a -> STM b) -> IO b
withThreadInterface TVar (ThreadState (FlowState (Inbound sess)))
channelInbound FlowState (Inbound sess)
-> STM (Either (NoMessages (Inbound sess)) b)
aux
where
aux ::
FlowState (Inbound sess)
-> STM (Either (NoMessages (Inbound sess)) b)
aux :: FlowState (Inbound sess)
-> STM (Either (NoMessages (Inbound sess)) b)
aux FlowState (Inbound sess)
st = do
readFinal <- TVar (RecvFinal (Inbound sess)) -> STM (RecvFinal (Inbound sess))
forall a. TVar a -> STM a
readTVar TVar (RecvFinal (Inbound sess))
channelRecvFinal
case readFinal of
RecvFinal (Inbound sess)
RecvNotFinal ->
case FlowState (Inbound sess)
st of
FlowStateRegular RegularFlowState (Inbound sess)
regular -> b -> Either (NoMessages (Inbound sess)) b
forall a b. b -> Either a b
Right (b -> Either (NoMessages (Inbound sess)) b)
-> STM b -> STM (Either (NoMessages (Inbound sess)) b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> do
streamElem <- TMVar
(StreamElem (Trailers (Inbound sess)) (Message (Inbound sess)))
-> STM
(StreamElem (Trailers (Inbound sess)) (Message (Inbound sess)))
forall a. TMVar a -> STM a
takeTMVar (RegularFlowState (Inbound sess)
-> TMVar
(StreamElem (Trailers (Inbound sess)) (Message (Inbound sess)))
forall {k} (flow :: k).
RegularFlowState flow
-> TMVar (StreamElem (Trailers flow) (Message flow))
flowMsg RegularFlowState (Inbound sess)
regular)
case streamElem of
StreamElem Message (Inbound sess)
msg ->
b -> STM b
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> STM b) -> b -> STM b
forall a b. (a -> b) -> a -> b
$ Message (Inbound sess) -> b
messageWithoutTrailers Message (Inbound sess)
msg
FinalElem Message (Inbound sess)
msg Trailers (Inbound sess)
trailers -> do
let (b
b, Maybe (Trailers (Inbound sess))
mTrailers) = (Message (Inbound sess), Trailers (Inbound sess))
-> (b, Maybe (Trailers (Inbound sess)))
messageWithTrailers (Message (Inbound sess)
msg, Trailers (Inbound sess)
trailers)
TVar (RecvFinal (Inbound sess))
-> RecvFinal (Inbound sess) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (RecvFinal (Inbound sess))
channelRecvFinal (RecvFinal (Inbound sess) -> STM ())
-> RecvFinal (Inbound sess) -> STM ()
forall a b. (a -> b) -> a -> b
$
RecvFinal (Inbound sess)
-> (Trailers (Inbound sess) -> RecvFinal (Inbound sess))
-> Maybe (Trailers (Inbound sess))
-> RecvFinal (Inbound sess)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (CallStack -> RecvFinal (Inbound sess)
forall {k} (flow :: k). CallStack -> RecvFinal flow
RecvFinal CallStack
HasCallStack => CallStack
callStack) Trailers (Inbound sess) -> RecvFinal (Inbound sess)
forall {k} (flow :: k). Trailers flow -> RecvFinal flow
RecvWithoutTrailers Maybe (Trailers (Inbound sess))
mTrailers
b -> STM b
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> STM b) -> b -> STM b
forall a b. (a -> b) -> a -> b
$ b
b
NoMoreElems Trailers (Inbound sess)
trailers -> do
TVar (RecvFinal (Inbound sess))
-> RecvFinal (Inbound sess) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (RecvFinal (Inbound sess))
channelRecvFinal (RecvFinal (Inbound sess) -> STM ())
-> RecvFinal (Inbound sess) -> STM ()
forall a b. (a -> b) -> a -> b
$ CallStack -> RecvFinal (Inbound sess)
forall {k} (flow :: k). CallStack -> RecvFinal flow
RecvFinal CallStack
HasCallStack => CallStack
callStack
b -> STM b
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> STM b) -> b -> STM b
forall a b. (a -> b) -> a -> b
$ Trailers (Inbound sess) -> b
trailersWithoutMessage Trailers (Inbound sess)
trailers
FlowStateNoMessages NoMessages (Inbound sess)
trailers -> do
TVar (RecvFinal (Inbound sess))
-> RecvFinal (Inbound sess) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (RecvFinal (Inbound sess))
channelRecvFinal (RecvFinal (Inbound sess) -> STM ())
-> RecvFinal (Inbound sess) -> STM ()
forall a b. (a -> b) -> a -> b
$ CallStack -> RecvFinal (Inbound sess)
forall {k} (flow :: k). CallStack -> RecvFinal flow
RecvFinal CallStack
HasCallStack => CallStack
callStack
Either (NoMessages (Inbound sess)) b
-> STM (Either (NoMessages (Inbound sess)) b)
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either (NoMessages (Inbound sess)) b
-> STM (Either (NoMessages (Inbound sess)) b))
-> Either (NoMessages (Inbound sess)) b
-> STM (Either (NoMessages (Inbound sess)) b)
forall a b. (a -> b) -> a -> b
$ NoMessages (Inbound sess) -> Either (NoMessages (Inbound sess)) b
forall a b. a -> Either a b
Left NoMessages (Inbound sess)
trailers
RecvWithoutTrailers Trailers (Inbound sess)
trailers -> do
TVar (RecvFinal (Inbound sess))
-> RecvFinal (Inbound sess) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (RecvFinal (Inbound sess))
channelRecvFinal (RecvFinal (Inbound sess) -> STM ())
-> RecvFinal (Inbound sess) -> STM ()
forall a b. (a -> b) -> a -> b
$ CallStack -> RecvFinal (Inbound sess)
forall {k} (flow :: k). CallStack -> RecvFinal flow
RecvFinal CallStack
HasCallStack => CallStack
callStack
Either (NoMessages (Inbound sess)) b
-> STM (Either (NoMessages (Inbound sess)) b)
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either (NoMessages (Inbound sess)) b
-> STM (Either (NoMessages (Inbound sess)) b))
-> Either (NoMessages (Inbound sess)) b
-> STM (Either (NoMessages (Inbound sess)) b)
forall a b. (a -> b) -> a -> b
$ b -> Either (NoMessages (Inbound sess)) b
forall a b. b -> Either a b
Right (b -> Either (NoMessages (Inbound sess)) b)
-> b -> Either (NoMessages (Inbound sess)) b
forall a b. (a -> b) -> a -> b
$ Trailers (Inbound sess) -> b
trailersWithoutMessage Trailers (Inbound sess)
trailers
RecvFinal CallStack
cs ->
RecvAfterFinal -> STM (Either (NoMessages (Inbound sess)) b)
forall e a. Exception e => e -> STM a
throwSTM (RecvAfterFinal -> STM (Either (NoMessages (Inbound sess)) b))
-> RecvAfterFinal -> STM (Either (NoMessages (Inbound sess)) b)
forall a b. (a -> b) -> a -> b
$ CallStack -> RecvAfterFinal
RecvAfterFinal CallStack
cs
data SendAfterFinal =
SendAfterFinal CallStack
| SendButTrailersOnly
deriving stock (Int -> SendAfterFinal -> ShowS
[SendAfterFinal] -> ShowS
SendAfterFinal -> String
(Int -> SendAfterFinal -> ShowS)
-> (SendAfterFinal -> String)
-> ([SendAfterFinal] -> ShowS)
-> Show SendAfterFinal
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SendAfterFinal -> ShowS
showsPrec :: Int -> SendAfterFinal -> ShowS
$cshow :: SendAfterFinal -> String
show :: SendAfterFinal -> String
$cshowList :: [SendAfterFinal] -> ShowS
showList :: [SendAfterFinal] -> ShowS
Show)
deriving anyclass (Show SendAfterFinal
Typeable SendAfterFinal
(Typeable SendAfterFinal, Show SendAfterFinal) =>
(SendAfterFinal -> SomeException)
-> (SomeException -> Maybe SendAfterFinal)
-> (SendAfterFinal -> String)
-> (SendAfterFinal -> Bool)
-> Exception SendAfterFinal
SomeException -> Maybe SendAfterFinal
SendAfterFinal -> Bool
SendAfterFinal -> String
SendAfterFinal -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e)
-> (e -> String)
-> (e -> Bool)
-> Exception e
$ctoException :: SendAfterFinal -> SomeException
toException :: SendAfterFinal -> SomeException
$cfromException :: SomeException -> Maybe SendAfterFinal
fromException :: SomeException -> Maybe SendAfterFinal
$cdisplayException :: SendAfterFinal -> String
displayException :: SendAfterFinal -> String
$cbacktraceDesired :: SendAfterFinal -> Bool
backtraceDesired :: SendAfterFinal -> Bool
Exception)
data RecvAfterFinal =
RecvAfterFinal CallStack
deriving stock (Int -> RecvAfterFinal -> ShowS
[RecvAfterFinal] -> ShowS
RecvAfterFinal -> String
(Int -> RecvAfterFinal -> ShowS)
-> (RecvAfterFinal -> String)
-> ([RecvAfterFinal] -> ShowS)
-> Show RecvAfterFinal
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> RecvAfterFinal -> ShowS
showsPrec :: Int -> RecvAfterFinal -> ShowS
$cshow :: RecvAfterFinal -> String
show :: RecvAfterFinal -> String
$cshowList :: [RecvAfterFinal] -> ShowS
showList :: [RecvAfterFinal] -> ShowS
Show)
deriving anyclass (Show RecvAfterFinal
Typeable RecvAfterFinal
(Typeable RecvAfterFinal, Show RecvAfterFinal) =>
(RecvAfterFinal -> SomeException)
-> (SomeException -> Maybe RecvAfterFinal)
-> (RecvAfterFinal -> String)
-> (RecvAfterFinal -> Bool)
-> Exception RecvAfterFinal
SomeException -> Maybe RecvAfterFinal
RecvAfterFinal -> Bool
RecvAfterFinal -> String
RecvAfterFinal -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e)
-> (e -> String)
-> (e -> Bool)
-> Exception e
$ctoException :: RecvAfterFinal -> SomeException
toException :: RecvAfterFinal -> SomeException
$cfromException :: SomeException -> Maybe RecvAfterFinal
fromException :: SomeException -> Maybe RecvAfterFinal
$cdisplayException :: RecvAfterFinal -> String
displayException :: RecvAfterFinal -> String
$cbacktraceDesired :: RecvAfterFinal -> Bool
backtraceDesired :: RecvAfterFinal -> Bool
Exception)
waitForOutbound :: Channel sess -> IO ()
waitForOutbound :: forall sess. Channel sess -> IO ()
waitForOutbound Channel{TVar (ThreadState (FlowState (Outbound sess)))
channelOutbound :: forall sess.
Channel sess -> TVar (ThreadState (FlowState (Outbound sess)))
channelOutbound :: TVar (ThreadState (FlowState (Outbound sess)))
channelOutbound} = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
TVar (ThreadState (FlowState (Outbound sess))) -> STM ()
forall a. TVar (ThreadState a) -> STM ()
waitForNormalThreadTermination TVar (ThreadState (FlowState (Outbound sess)))
channelOutbound
close ::
HasCallStack
=> Channel sess
-> ExitCase a
-> IO (Maybe SomeException)
close :: forall sess a.
HasCallStack =>
Channel sess -> ExitCase a -> IO (Maybe SomeException)
close Channel{TVar (ThreadState (FlowState (Outbound sess)))
channelOutbound :: forall sess.
Channel sess -> TVar (ThreadState (FlowState (Outbound sess)))
channelOutbound :: TVar (ThreadState (FlowState (Outbound sess)))
channelOutbound} ExitCase a
reason = do
outbound <- TVar (ThreadState (FlowState (Outbound sess)))
-> SomeException -> IO (CancelResult (FlowState (Outbound sess)))
forall a.
TVar (ThreadState a) -> SomeException -> IO (CancelResult a)
cancelThread TVar (ThreadState (FlowState (Outbound sess)))
channelOutbound SomeException
channelClosed
case outbound of
AlreadyTerminated FlowState (Outbound sess)
_ ->
Maybe SomeException -> IO (Maybe SomeException)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe SomeException -> IO (Maybe SomeException))
-> Maybe SomeException -> IO (Maybe SomeException)
forall a b. (a -> b) -> a -> b
$ Maybe SomeException
forall a. Maybe a
Nothing
AlreadyAborted SomeException
_err ->
Maybe SomeException -> IO (Maybe SomeException)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe SomeException -> IO (Maybe SomeException))
-> Maybe SomeException -> IO (Maybe SomeException)
forall a b. (a -> b) -> a -> b
$ Maybe SomeException
forall a. Maybe a
Nothing
CancelResult (FlowState (Outbound sess))
Cancelled ->
Maybe SomeException -> IO (Maybe SomeException)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe SomeException -> IO (Maybe SomeException))
-> Maybe SomeException -> IO (Maybe SomeException)
forall a b. (a -> b) -> a -> b
$ SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just SomeException
channelClosed
where
channelClosed :: SomeException
channelClosed :: SomeException
channelClosed =
case ExitCase a
reason of
ExitCaseSuccess a
_ -> ChannelDiscarded -> SomeException
forall e. Exception e => e -> SomeException
toException (ChannelDiscarded -> SomeException)
-> ChannelDiscarded -> SomeException
forall a b. (a -> b) -> a -> b
$ CallStack -> ChannelDiscarded
ChannelDiscarded CallStack
HasCallStack => CallStack
callStack
ExitCase a
ExitCaseAbort -> ChannelAborted -> SomeException
forall e. Exception e => e -> SomeException
toException (ChannelAborted -> SomeException)
-> ChannelAborted -> SomeException
forall a b. (a -> b) -> a -> b
$ CallStack -> ChannelAborted
ChannelAborted CallStack
HasCallStack => CallStack
callStack
ExitCaseException SomeException
e -> SomeException
e
data ChannelDiscarded = ChannelDiscarded CallStack
deriving stock (Int -> ChannelDiscarded -> ShowS
[ChannelDiscarded] -> ShowS
ChannelDiscarded -> String
(Int -> ChannelDiscarded -> ShowS)
-> (ChannelDiscarded -> String)
-> ([ChannelDiscarded] -> ShowS)
-> Show ChannelDiscarded
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ChannelDiscarded -> ShowS
showsPrec :: Int -> ChannelDiscarded -> ShowS
$cshow :: ChannelDiscarded -> String
show :: ChannelDiscarded -> String
$cshowList :: [ChannelDiscarded] -> ShowS
showList :: [ChannelDiscarded] -> ShowS
Show)
deriving anyclass (Show ChannelDiscarded
Typeable ChannelDiscarded
(Typeable ChannelDiscarded, Show ChannelDiscarded) =>
(ChannelDiscarded -> SomeException)
-> (SomeException -> Maybe ChannelDiscarded)
-> (ChannelDiscarded -> String)
-> (ChannelDiscarded -> Bool)
-> Exception ChannelDiscarded
SomeException -> Maybe ChannelDiscarded
ChannelDiscarded -> Bool
ChannelDiscarded -> String
ChannelDiscarded -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e)
-> (e -> String)
-> (e -> Bool)
-> Exception e
$ctoException :: ChannelDiscarded -> SomeException
toException :: ChannelDiscarded -> SomeException
$cfromException :: SomeException -> Maybe ChannelDiscarded
fromException :: SomeException -> Maybe ChannelDiscarded
$cdisplayException :: ChannelDiscarded -> String
displayException :: ChannelDiscarded -> String
$cbacktraceDesired :: ChannelDiscarded -> Bool
backtraceDesired :: ChannelDiscarded -> Bool
Exception)
data ChannelAborted = ChannelAborted CallStack
deriving stock (Int -> ChannelAborted -> ShowS
[ChannelAborted] -> ShowS
ChannelAborted -> String
(Int -> ChannelAborted -> ShowS)
-> (ChannelAborted -> String)
-> ([ChannelAborted] -> ShowS)
-> Show ChannelAborted
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ChannelAborted -> ShowS
showsPrec :: Int -> ChannelAborted -> ShowS
$cshow :: ChannelAborted -> String
show :: ChannelAborted -> String
$cshowList :: [ChannelAborted] -> ShowS
showList :: [ChannelAborted] -> ShowS
Show)
deriving anyclass (Show ChannelAborted
Typeable ChannelAborted
(Typeable ChannelAborted, Show ChannelAborted) =>
(ChannelAborted -> SomeException)
-> (SomeException -> Maybe ChannelAborted)
-> (ChannelAborted -> String)
-> (ChannelAborted -> Bool)
-> Exception ChannelAborted
SomeException -> Maybe ChannelAborted
ChannelAborted -> Bool
ChannelAborted -> String
ChannelAborted -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e)
-> (e -> String)
-> (e -> Bool)
-> Exception e
$ctoException :: ChannelAborted -> SomeException
toException :: ChannelAborted -> SomeException
$cfromException :: SomeException -> Maybe ChannelAborted
fromException :: SomeException -> Maybe ChannelAborted
$cdisplayException :: ChannelAborted -> String
displayException :: ChannelAborted -> String
$cbacktraceDesired :: ChannelAborted -> Bool
backtraceDesired :: ChannelAborted -> Bool
Exception)
type InboundResult sess =
Either (NoMessages (Inbound sess))
(Trailers (Inbound sess))
data AllowHalfClosed sess =
ContinueWhenInboundClosed
| TerminateWhenInboundClosed (InboundResult sess -> SomeException)
linkOutboundToInbound :: forall sess.
IsSession sess
=> AllowHalfClosed sess
-> Channel sess
-> IO (InboundResult sess)
-> IO ()
linkOutboundToInbound :: forall sess.
IsSession sess =>
AllowHalfClosed sess
-> Channel sess -> IO (InboundResult sess) -> IO ()
linkOutboundToInbound AllowHalfClosed sess
allowHalfClosed Channel sess
channel IO (InboundResult sess)
inbound = do
mResult <- IO (InboundResult sess)
-> IO (Either SomeException (InboundResult sess))
forall e a. Exception e => IO a -> IO (Either e a)
try IO (InboundResult sess)
inbound
case (mResult, allowHalfClosed) of
(Right InboundResult sess
_result, AllowHalfClosed sess
ContinueWhenInboundClosed) ->
() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
(Right InboundResult sess
result, TerminateWhenInboundClosed InboundResult sess -> SomeException
f) ->
IO (CancelResult (FlowState (Outbound sess))) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (CancelResult (FlowState (Outbound sess))) -> IO ())
-> IO (CancelResult (FlowState (Outbound sess))) -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (ThreadState (FlowState (Outbound sess)))
-> SomeException -> IO (CancelResult (FlowState (Outbound sess)))
forall a.
TVar (ThreadState a) -> SomeException -> IO (CancelResult a)
cancelThread (Channel sess -> TVar (ThreadState (FlowState (Outbound sess)))
forall sess.
Channel sess -> TVar (ThreadState (FlowState (Outbound sess)))
channelOutbound Channel sess
channel) (InboundResult sess -> SomeException
f InboundResult sess
result)
(Left (SomeException
exception :: SomeException), AllowHalfClosed sess
_) -> do
IO (CancelResult (FlowState (Outbound sess))) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (CancelResult (FlowState (Outbound sess))) -> IO ())
-> IO (CancelResult (FlowState (Outbound sess))) -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (ThreadState (FlowState (Outbound sess)))
-> SomeException -> IO (CancelResult (FlowState (Outbound sess)))
forall a.
TVar (ThreadState a) -> SomeException -> IO (CancelResult a)
cancelThread (Channel sess -> TVar (ThreadState (FlowState (Outbound sess)))
forall sess.
Channel sess -> TVar (ThreadState (FlowState (Outbound sess)))
channelOutbound Channel sess
channel) SomeException
exception
SomeException -> IO ()
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO SomeException
exception
where
()
_ = forall (c :: Constraint). c => ()
addConstraint @(IsSession sess)
sendMessageLoop :: forall sess.
IsSession sess
=> sess
-> RegularFlowState (Outbound sess)
-> OutputStream
-> IO ()
sendMessageLoop :: forall sess.
IsSession sess =>
sess -> RegularFlowState (Outbound sess) -> OutputStream -> IO ()
sendMessageLoop sess
sess RegularFlowState (Outbound sess)
st OutputStream
stream = do
trailers <- IO (Trailers (Outbound sess))
loop
atomically $ putTMVar (flowTerminated st) trailers
where
build :: (Message (Outbound sess) -> Builder)
build :: Message (Outbound sess) -> Builder
build = sess
-> Headers (Outbound sess) -> Message (Outbound sess) -> Builder
forall sess.
IsSession sess =>
sess
-> Headers (Outbound sess) -> Message (Outbound sess) -> Builder
buildMsg sess
sess (RegularFlowState (Outbound sess) -> Headers (Outbound sess)
forall {k} (flow :: k). RegularFlowState flow -> Headers flow
flowHeaders RegularFlowState (Outbound sess)
st)
loop :: IO (Trailers (Outbound sess))
loop :: IO (Trailers (Outbound sess))
loop = do
msg <- STM
(StreamElem (Trailers (Outbound sess)) (Message (Outbound sess)))
-> IO
(StreamElem (Trailers (Outbound sess)) (Message (Outbound sess)))
forall a. STM a -> IO a
atomically (STM
(StreamElem (Trailers (Outbound sess)) (Message (Outbound sess)))
-> IO
(StreamElem (Trailers (Outbound sess)) (Message (Outbound sess))))
-> STM
(StreamElem (Trailers (Outbound sess)) (Message (Outbound sess)))
-> IO
(StreamElem (Trailers (Outbound sess)) (Message (Outbound sess)))
forall a b. (a -> b) -> a -> b
$ TMVar
(StreamElem (Trailers (Outbound sess)) (Message (Outbound sess)))
-> STM
(StreamElem (Trailers (Outbound sess)) (Message (Outbound sess)))
forall a. TMVar a -> STM a
takeTMVar (RegularFlowState (Outbound sess)
-> TMVar
(StreamElem (Trailers (Outbound sess)) (Message (Outbound sess)))
forall {k} (flow :: k).
RegularFlowState flow
-> TMVar (StreamElem (Trailers flow) (Message flow))
flowMsg RegularFlowState (Outbound sess)
st)
case msg of
StreamElem Message (Outbound sess)
x -> do
HasCallStack => OutputStream -> Builder -> IO ()
OutputStream -> Builder -> IO ()
writeChunk OutputStream
stream (Builder -> IO ()) -> Builder -> IO ()
forall a b. (a -> b) -> a -> b
$ Message (Outbound sess) -> Builder
build Message (Outbound sess)
x
HasCallStack => OutputStream -> IO ()
OutputStream -> IO ()
flush OutputStream
stream
IO (Trailers (Outbound sess))
loop
FinalElem Message (Outbound sess)
x Trailers (Outbound sess)
trailers -> do
HasCallStack => OutputStream -> Builder -> IO ()
OutputStream -> Builder -> IO ()
writeChunkFinal OutputStream
stream (Builder -> IO ()) -> Builder -> IO ()
forall a b. (a -> b) -> a -> b
$ Message (Outbound sess) -> Builder
build Message (Outbound sess)
x
Trailers (Outbound sess) -> IO (Trailers (Outbound sess))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Trailers (Outbound sess)
trailers
NoMoreElems Trailers (Outbound sess)
trailers -> do
HasCallStack => OutputStream -> Builder -> IO ()
OutputStream -> Builder -> IO ()
writeChunkFinal OutputStream
stream (Builder -> IO ()) -> Builder -> IO ()
forall a b. (a -> b) -> a -> b
$ Builder
forall a. Monoid a => a
mempty
Trailers (Outbound sess) -> IO (Trailers (Outbound sess))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Trailers (Outbound sess)
trailers
recvMessageLoop :: forall sess.
IsSession sess
=> sess
-> RegularFlowState (Inbound sess)
-> InputStream
-> IO (Trailers (Inbound sess))
recvMessageLoop :: forall sess.
IsSession sess =>
sess
-> RegularFlowState (Inbound sess)
-> InputStream
-> IO (Trailers (Inbound sess))
recvMessageLoop sess
sess RegularFlowState (Inbound sess)
st InputStream
stream =
Parser String (Message (Inbound sess))
-> IO (Trailers (Inbound sess))
go (Parser String (Message (Inbound sess))
-> IO (Trailers (Inbound sess)))
-> Parser String (Message (Inbound sess))
-> IO (Trailers (Inbound sess))
forall a b. (a -> b) -> a -> b
$ sess
-> Headers (Inbound sess) -> Parser String (Message (Inbound sess))
forall sess.
IsSession sess =>
sess
-> Headers (Inbound sess) -> Parser String (Message (Inbound sess))
parseMsg sess
sess (RegularFlowState (Inbound sess) -> Headers (Inbound sess)
forall {k} (flow :: k). RegularFlowState flow -> Headers flow
flowHeaders RegularFlowState (Inbound sess)
st)
where
go :: Parser String (Message (Inbound sess)) -> IO (Trailers (Inbound sess))
go :: Parser String (Message (Inbound sess))
-> IO (Trailers (Inbound sess))
go Parser String (Message (Inbound sess))
parser = do
mProcessedFinal <- ProcessResult String (Trailers (Inbound sess))
-> IO (Maybe (Trailers (Inbound sess)))
forall b. ProcessResult String b -> IO (Maybe b)
throwParseErrors (ProcessResult String (Trailers (Inbound sess))
-> IO (Maybe (Trailers (Inbound sess))))
-> IO (ProcessResult String (Trailers (Inbound sess)))
-> IO (Maybe (Trailers (Inbound sess)))
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO (ByteString, Bool)
-> (Message (Inbound sess) -> IO ())
-> (Message (Inbound sess) -> IO (Trailers (Inbound sess)))
-> Parser String (Message (Inbound sess))
-> IO (ProcessResult String (Trailers (Inbound sess)))
forall (m :: * -> *) e a b.
Monad m =>
m (ByteString, Bool)
-> (a -> m ()) -> (a -> m b) -> Parser e a -> m (ProcessResult e b)
Parser.processAll
(HasCallStack => InputStream -> IO (ByteString, Bool)
InputStream -> IO (ByteString, Bool)
getChunk InputStream
stream)
Message (Inbound sess) -> IO ()
processOne
Message (Inbound sess) -> IO (Trailers (Inbound sess))
processFinal
Parser String (Message (Inbound sess))
parser
case mProcessedFinal of
Just Trailers (Inbound sess)
trailers ->
Trailers (Inbound sess) -> IO (Trailers (Inbound sess))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Trailers (Inbound sess)
trailers
Maybe (Trailers (Inbound sess))
Nothing -> do
trailers <- IO (Trailers (Inbound sess))
processTrailers
atomically $ putTMVar (flowMsg st) $ NoMoreElems trailers
return trailers
processOne :: Message (Inbound sess) -> IO ()
processOne :: Message (Inbound sess) -> IO ()
processOne Message (Inbound sess)
msg = do
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar
(StreamElem (Trailers (Inbound sess)) (Message (Inbound sess)))
-> StreamElem (Trailers (Inbound sess)) (Message (Inbound sess))
-> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar (RegularFlowState (Inbound sess)
-> TMVar
(StreamElem (Trailers (Inbound sess)) (Message (Inbound sess)))
forall {k} (flow :: k).
RegularFlowState flow
-> TMVar (StreamElem (Trailers flow) (Message flow))
flowMsg RegularFlowState (Inbound sess)
st) (StreamElem (Trailers (Inbound sess)) (Message (Inbound sess))
-> STM ())
-> StreamElem (Trailers (Inbound sess)) (Message (Inbound sess))
-> STM ()
forall a b. (a -> b) -> a -> b
$ Message (Inbound sess)
-> StreamElem (Trailers (Inbound sess)) (Message (Inbound sess))
forall b a. a -> StreamElem b a
StreamElem Message (Inbound sess)
msg
processFinal :: Message (Inbound sess) -> IO (Trailers (Inbound sess))
processFinal :: Message (Inbound sess) -> IO (Trailers (Inbound sess))
processFinal Message (Inbound sess)
msg = do
trailers <- IO (Trailers (Inbound sess))
processTrailers
atomically $ putTMVar (flowMsg st) $ FinalElem msg trailers
return trailers
processTrailers :: IO (Trailers (Inbound sess))
processTrailers :: IO (Trailers (Inbound sess))
processTrailers = do
trailers <- sess -> [Header] -> IO (Trailers (Inbound sess))
forall sess.
IsSession sess =>
sess -> [Header] -> IO (Trailers (Inbound sess))
parseInboundTrailers sess
sess ([Header] -> IO (Trailers (Inbound sess)))
-> IO [Header] -> IO (Trailers (Inbound sess))
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< HasCallStack => InputStream -> IO [Header]
InputStream -> IO [Header]
getTrailers InputStream
stream
atomically $ putTMVar (flowTerminated st) $ trailers
return trailers
throwParseErrors :: Parser.ProcessResult String b -> IO (Maybe b)
throwParseErrors :: forall b. ProcessResult String b -> IO (Maybe b)
throwParseErrors (Parser.ProcessError String
err) =
PeerException -> IO (Maybe b)
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO (PeerException -> IO (Maybe b)) -> PeerException -> IO (Maybe b)
forall a b. (a -> b) -> a -> b
$ String -> PeerException
PeerSentMalformedMessage String
err
throwParseErrors (Parser.ProcessedWithFinal b
b Leftover
leftover) = do
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Leftover -> Bool
BS.Lazy.null Leftover
leftover) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ PeerException -> IO ()
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO PeerException
PeerSentIncompleteMessage
Maybe b -> IO (Maybe b)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe b -> IO (Maybe b)) -> Maybe b -> IO (Maybe b)
forall a b. (a -> b) -> a -> b
$ b -> Maybe b
forall a. a -> Maybe a
Just b
b
throwParseErrors (Parser.ProcessedWithoutFinal Leftover
leftover) = do
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Leftover -> Bool
BS.Lazy.null Leftover
leftover) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ PeerException -> IO ()
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO PeerException
PeerSentIncompleteMessage
Maybe b -> IO (Maybe b)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe b -> IO (Maybe b)) -> Maybe b -> IO (Maybe b)
forall a b. (a -> b) -> a -> b
$ Maybe b
forall a. Maybe a
Nothing
outboundTrailersMaker :: forall sess.
IsSession sess
=> sess
-> Channel sess
-> RegularFlowState (Outbound sess)
-> HTTP2.TrailersMaker
outboundTrailersMaker :: forall sess.
IsSession sess =>
sess
-> Channel sess
-> RegularFlowState (Outbound sess)
-> TrailersMaker
outboundTrailersMaker sess
sess Channel{TVar (ThreadState (FlowState (Outbound sess)))
channelOutbound :: forall sess.
Channel sess -> TVar (ThreadState (FlowState (Outbound sess)))
channelOutbound :: TVar (ThreadState (FlowState (Outbound sess)))
channelOutbound} RegularFlowState (Outbound sess)
regular = TrailersMaker
go
where
go :: HTTP2.TrailersMaker
go :: TrailersMaker
go (Just ByteString
_) = NextTrailersMaker -> IO NextTrailersMaker
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (NextTrailersMaker -> IO NextTrailersMaker)
-> NextTrailersMaker -> IO NextTrailersMaker
forall a b. (a -> b) -> a -> b
$ TrailersMaker -> NextTrailersMaker
HTTP2.NextTrailersMaker TrailersMaker
go
go Maybe ByteString
Nothing = do
mFlowState <- STM (Either SomeException (Trailers (Outbound sess)))
-> IO (Either SomeException (Trailers (Outbound sess)))
forall a. STM a -> IO a
atomically (STM (Either SomeException (Trailers (Outbound sess)))
-> IO (Either SomeException (Trailers (Outbound sess))))
-> STM (Either SomeException (Trailers (Outbound sess)))
-> IO (Either SomeException (Trailers (Outbound sess)))
forall a b. (a -> b) -> a -> b
$
TVar (ThreadState (FlowState (Outbound sess)))
-> STM (Trailers (Outbound sess))
-> STM (Either SomeException (Trailers (Outbound sess)))
forall a b.
TVar (ThreadState a) -> STM b -> STM (Either SomeException b)
unlessAbnormallyTerminated TVar (ThreadState (FlowState (Outbound sess)))
channelOutbound (STM (Trailers (Outbound sess))
-> STM (Either SomeException (Trailers (Outbound sess))))
-> STM (Trailers (Outbound sess))
-> STM (Either SomeException (Trailers (Outbound sess)))
forall a b. (a -> b) -> a -> b
$
TMVar (Trailers (Outbound sess)) -> STM (Trailers (Outbound sess))
forall a. TMVar a -> STM a
readTMVar (RegularFlowState (Outbound sess)
-> TMVar (Trailers (Outbound sess))
forall {k} (flow :: k).
RegularFlowState flow -> TMVar (Trailers flow)
flowTerminated RegularFlowState (Outbound sess)
regular)
case mFlowState of
Right Trailers (Outbound sess)
trailers ->
NextTrailersMaker -> IO NextTrailersMaker
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (NextTrailersMaker -> IO NextTrailersMaker)
-> NextTrailersMaker -> IO NextTrailersMaker
forall a b. (a -> b) -> a -> b
$ [Header] -> NextTrailersMaker
HTTP2.Trailers ([Header] -> NextTrailersMaker) -> [Header] -> NextTrailersMaker
forall a b. (a -> b) -> a -> b
$ sess -> Trailers (Outbound sess) -> [Header]
forall sess.
IsSession sess =>
sess -> Trailers (Outbound sess) -> [Header]
buildOutboundTrailers sess
sess Trailers (Outbound sess)
trailers
Left SomeException
_exception ->
NextTrailersMaker -> IO NextTrailersMaker
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (NextTrailersMaker -> IO NextTrailersMaker)
-> NextTrailersMaker -> IO NextTrailersMaker
forall a b. (a -> b) -> a -> b
$ [Header] -> NextTrailersMaker
HTTP2.Trailers []