{-# LANGUAGE RecordWildCards #-}
module Network.QUIC.Stream.Types (
Stream (..),
newStream,
TxStreamData (..),
StreamState (..),
RecvStreamQ (..),
RxStreamData (..),
Length,
syncFinTx,
waitFinTx,
) where
import Control.Concurrent
import Control.Concurrent.STM
import qualified Data.ByteString as BS
import Network.Control
import {-# SOURCE #-} Network.QUIC.Connection.Types
import Network.QUIC.Imports
import Network.QUIC.Stream.Frag
import Network.QUIC.Stream.Skew
import qualified Network.QUIC.Stream.Skew as Skew
import Network.QUIC.Types
data Stream = Stream
{ Stream -> StreamId
streamId :: StreamId
, Stream -> Connection
streamConnection :: Connection
,
Stream -> TVar TxFlow
streamFlowTx :: TVar TxFlow
, Stream -> IORef RxFlow
streamFlowRx :: IORef RxFlow
, Stream -> IORef StreamState
streamStateTx :: IORef StreamState
, Stream -> IORef StreamState
streamStateRx :: IORef StreamState
, Stream -> RecvStreamQ
streamRecvQ :: RecvStreamQ
, Stream -> IORef (Skew RxStreamData)
streamReass :: IORef (Skew RxStreamData)
, Stream -> MVar ()
streamSyncFinTx :: MVar ()
}
instance Show Stream where
show :: Stream -> String
show Stream
s = StreamId -> String
forall a. Show a => a -> String
show (StreamId -> String) -> StreamId -> String
forall a b. (a -> b) -> a -> b
$ Stream -> StreamId
streamId Stream
s
newStream :: Connection -> Int -> Int -> StreamId -> IO Stream
newStream :: Connection -> StreamId -> StreamId -> StreamId -> IO Stream
newStream Connection
streamConnection StreamId
streamId StreamId
txLim StreamId
rxLim = do
TVar TxFlow
streamFlowTx <- TxFlow -> IO (TVar TxFlow)
forall a. a -> IO (TVar a)
newTVarIO (TxFlow -> IO (TVar TxFlow)) -> TxFlow -> IO (TVar TxFlow)
forall a b. (a -> b) -> a -> b
$ StreamId -> TxFlow
newTxFlow StreamId
txLim
IORef RxFlow
streamFlowRx <- RxFlow -> IO (IORef RxFlow)
forall a. a -> IO (IORef a)
newIORef (RxFlow -> IO (IORef RxFlow)) -> RxFlow -> IO (IORef RxFlow)
forall a b. (a -> b) -> a -> b
$ StreamId -> RxFlow
newRxFlow StreamId
rxLim
IORef StreamState
streamStateTx <- StreamState -> IO (IORef StreamState)
forall a. a -> IO (IORef a)
newIORef StreamState
emptyStreamState
IORef StreamState
streamStateRx <- StreamState -> IO (IORef StreamState)
forall a. a -> IO (IORef a)
newIORef StreamState
emptyStreamState
RecvStreamQ
streamRecvQ <- IO RecvStreamQ
newRecvStreamQ
IORef (Skew RxStreamData)
streamReass <- Skew RxStreamData -> IO (IORef (Skew RxStreamData))
forall a. a -> IO (IORef a)
newIORef Skew RxStreamData
forall a. Skew a
Skew.empty
MVar ()
streamSyncFinTx <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
Stream -> IO Stream
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Stream{StreamId
MVar ()
TVar TxFlow
IORef RxFlow
IORef (Skew RxStreamData)
IORef StreamState
Connection
RecvStreamQ
streamId :: StreamId
streamConnection :: Connection
streamFlowTx :: TVar TxFlow
streamFlowRx :: IORef RxFlow
streamStateTx :: IORef StreamState
streamStateRx :: IORef StreamState
streamRecvQ :: RecvStreamQ
streamReass :: IORef (Skew RxStreamData)
streamSyncFinTx :: MVar ()
streamConnection :: Connection
streamId :: StreamId
streamFlowTx :: TVar TxFlow
streamFlowRx :: IORef RxFlow
streamStateTx :: IORef StreamState
streamStateRx :: IORef StreamState
streamRecvQ :: RecvStreamQ
streamReass :: IORef (Skew RxStreamData)
streamSyncFinTx :: MVar ()
..}
syncFinTx :: Stream -> IO ()
syncFinTx :: Stream -> IO ()
syncFinTx Stream
s = IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar (Stream -> MVar ()
streamSyncFinTx Stream
s) ()
waitFinTx :: Stream -> IO ()
waitFinTx :: Stream -> IO ()
waitFinTx Stream
s = MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar (MVar () -> IO ()) -> MVar () -> IO ()
forall a b. (a -> b) -> a -> b
$ Stream -> MVar ()
streamSyncFinTx Stream
s
type Length = Int
data TxStreamData = TxStreamData Stream [StreamData] Length Fin
data RxStreamData = RxStreamData
{ RxStreamData -> StreamData
rxstrmData :: StreamData
, RxStreamData -> StreamId
rxstrmOff :: Offset
, RxStreamData -> StreamId
rxstrmLen :: Length
, RxStreamData -> Bool
rxstrmFin :: Fin
}
deriving (RxStreamData -> RxStreamData -> Bool
(RxStreamData -> RxStreamData -> Bool)
-> (RxStreamData -> RxStreamData -> Bool) -> Eq RxStreamData
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: RxStreamData -> RxStreamData -> Bool
== :: RxStreamData -> RxStreamData -> Bool
$c/= :: RxStreamData -> RxStreamData -> Bool
/= :: RxStreamData -> RxStreamData -> Bool
Eq, StreamId -> RxStreamData -> ShowS
[RxStreamData] -> ShowS
RxStreamData -> String
(StreamId -> RxStreamData -> ShowS)
-> (RxStreamData -> String)
-> ([RxStreamData] -> ShowS)
-> Show RxStreamData
forall a.
(StreamId -> a -> ShowS)
-> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: StreamId -> RxStreamData -> ShowS
showsPrec :: StreamId -> RxStreamData -> ShowS
$cshow :: RxStreamData -> String
show :: RxStreamData -> String
$cshowList :: [RxStreamData] -> ShowS
showList :: [RxStreamData] -> ShowS
Show)
instance Frag RxStreamData where
currOff :: RxStreamData -> StreamId
currOff RxStreamData
r = RxStreamData -> StreamId
rxstrmOff RxStreamData
r
nextOff :: RxStreamData -> StreamId
nextOff RxStreamData
r = RxStreamData -> StreamId
rxstrmOff RxStreamData
r StreamId -> StreamId -> StreamId
forall a. Num a => a -> a -> a
+ RxStreamData -> StreamId
rxstrmLen RxStreamData
r
shrink :: StreamId -> RxStreamData -> RxStreamData
shrink StreamId
off' (RxStreamData StreamData
bs StreamId
off StreamId
len Bool
fin) =
let n :: StreamId
n = StreamId
off' StreamId -> StreamId -> StreamId
forall a. Num a => a -> a -> a
- StreamId
off
bs' :: StreamData
bs' = StreamId -> StreamData -> StreamData
BS.drop StreamId
n StreamData
bs
len' :: StreamId
len' = StreamId
len StreamId -> StreamId -> StreamId
forall a. Num a => a -> a -> a
- StreamId
n
in RxStreamData
{ rxstrmData :: StreamData
rxstrmData = StreamData
bs'
, rxstrmOff :: StreamId
rxstrmOff = StreamId
off'
, rxstrmLen :: StreamId
rxstrmLen = StreamId
len'
, rxstrmFin :: Bool
rxstrmFin = Bool
fin
}
data StreamState = StreamState
{ StreamState -> StreamId
streamOffset :: Offset
, StreamState -> Bool
streamFin :: Fin
}
deriving (StreamState -> StreamState -> Bool
(StreamState -> StreamState -> Bool)
-> (StreamState -> StreamState -> Bool) -> Eq StreamState
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: StreamState -> StreamState -> Bool
== :: StreamState -> StreamState -> Bool
$c/= :: StreamState -> StreamState -> Bool
/= :: StreamState -> StreamState -> Bool
Eq, StreamId -> StreamState -> ShowS
[StreamState] -> ShowS
StreamState -> String
(StreamId -> StreamState -> ShowS)
-> (StreamState -> String)
-> ([StreamState] -> ShowS)
-> Show StreamState
forall a.
(StreamId -> a -> ShowS)
-> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: StreamId -> StreamState -> ShowS
showsPrec :: StreamId -> StreamState -> ShowS
$cshow :: StreamState -> String
show :: StreamState -> String
$cshowList :: [StreamState] -> ShowS
showList :: [StreamState] -> ShowS
Show)
emptyStreamState :: StreamState
emptyStreamState :: StreamState
emptyStreamState =
StreamState
{ streamOffset :: StreamId
streamOffset = StreamId
0
, streamFin :: Bool
streamFin = Bool
False
}
data RecvStreamQ = RecvStreamQ
{ RecvStreamQ -> TQueue StreamData
recvStreamQ :: TQueue ByteString
, RecvStreamQ -> IORef (Maybe StreamData)
pendingData :: IORef (Maybe ByteString)
, RecvStreamQ -> IORef Bool
endOfStream :: IORef Bool
}
newRecvStreamQ :: IO RecvStreamQ
newRecvStreamQ :: IO RecvStreamQ
newRecvStreamQ = do
TQueue StreamData
recvStreamQ <- IO (TQueue StreamData)
forall a. IO (TQueue a)
newTQueueIO
IORef (Maybe StreamData)
pendingData <- Maybe StreamData -> IO (IORef (Maybe StreamData))
forall a. a -> IO (IORef a)
newIORef Maybe StreamData
forall a. Maybe a
Nothing
IORef Bool
endOfStream <- Bool -> IO (IORef Bool)
forall a. a -> IO (IORef a)
newIORef Bool
False
RecvStreamQ -> IO RecvStreamQ
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return RecvStreamQ{IORef Bool
IORef (Maybe StreamData)
TQueue StreamData
recvStreamQ :: TQueue StreamData
pendingData :: IORef (Maybe StreamData)
endOfStream :: IORef Bool
recvStreamQ :: TQueue StreamData
pendingData :: IORef (Maybe StreamData)
endOfStream :: IORef Bool
..}