{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}

module Network.QUIC.Stream.Reass (
    takeRecvStreamQwithSize,
    putRxStreamData,
    FlowCntl (..),
    tryReassemble,
) where

import qualified Data.ByteString as BS
import Data.Sequence (Seq)
import qualified Data.Sequence as Seq

import Network.QUIC.Imports
import Network.QUIC.Stream.Frag
import Network.QUIC.Stream.Misc
import Network.QUIC.Stream.Queue
import qualified Network.QUIC.Stream.Skew as Skew
import Network.QUIC.Stream.Types
import Network.QUIC.Types

----------------------------------------------------------------

getEndOfStream :: Stream -> IO Bool
getEndOfStream :: Stream -> IO Bool
getEndOfStream Stream{Offset
MVar ()
TVar TxFlow
IORef RxFlow
IORef (Skew RxStreamData)
IORef StreamState
Connection
RecvStreamQ
streamId :: Offset
streamConnection :: Connection
streamFlowTx :: TVar TxFlow
streamFlowRx :: IORef RxFlow
streamStateTx :: IORef StreamState
streamStateRx :: IORef StreamState
streamRecvQ :: RecvStreamQ
streamReass :: IORef (Skew RxStreamData)
streamSyncFinTx :: MVar ()
streamSyncFinTx :: Stream -> MVar ()
streamReass :: Stream -> IORef (Skew RxStreamData)
streamRecvQ :: Stream -> RecvStreamQ
streamStateRx :: Stream -> IORef StreamState
streamStateTx :: Stream -> IORef StreamState
streamFlowRx :: Stream -> IORef RxFlow
streamFlowTx :: Stream -> TVar TxFlow
streamConnection :: Stream -> Connection
streamId :: Stream -> Offset
..} = IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef (IORef Bool -> IO Bool) -> IORef Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ RecvStreamQ -> IORef Bool
endOfStream RecvStreamQ
streamRecvQ

setEndOfStream :: Stream -> IO ()
setEndOfStream :: Stream -> IO ()
setEndOfStream Stream{Offset
MVar ()
TVar TxFlow
IORef RxFlow
IORef (Skew RxStreamData)
IORef StreamState
Connection
RecvStreamQ
streamSyncFinTx :: Stream -> MVar ()
streamReass :: Stream -> IORef (Skew RxStreamData)
streamRecvQ :: Stream -> RecvStreamQ
streamStateRx :: Stream -> IORef StreamState
streamStateTx :: Stream -> IORef StreamState
streamFlowRx :: Stream -> IORef RxFlow
streamFlowTx :: Stream -> TVar TxFlow
streamConnection :: Stream -> Connection
streamId :: Stream -> Offset
streamId :: Offset
streamConnection :: Connection
streamFlowTx :: TVar TxFlow
streamFlowRx :: IORef RxFlow
streamStateTx :: IORef StreamState
streamStateRx :: IORef StreamState
streamRecvQ :: RecvStreamQ
streamReass :: IORef (Skew RxStreamData)
streamSyncFinTx :: MVar ()
..} = IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (RecvStreamQ -> IORef Bool
endOfStream RecvStreamQ
streamRecvQ) Bool
True

readPendingData :: Stream -> IO (Maybe ByteString)
readPendingData :: Stream -> IO (Maybe StreamData)
readPendingData Stream{Offset
MVar ()
TVar TxFlow
IORef RxFlow
IORef (Skew RxStreamData)
IORef StreamState
Connection
RecvStreamQ
streamSyncFinTx :: Stream -> MVar ()
streamReass :: Stream -> IORef (Skew RxStreamData)
streamRecvQ :: Stream -> RecvStreamQ
streamStateRx :: Stream -> IORef StreamState
streamStateTx :: Stream -> IORef StreamState
streamFlowRx :: Stream -> IORef RxFlow
streamFlowTx :: Stream -> TVar TxFlow
streamConnection :: Stream -> Connection
streamId :: Stream -> Offset
streamId :: Offset
streamConnection :: Connection
streamFlowTx :: TVar TxFlow
streamFlowRx :: IORef RxFlow
streamStateTx :: IORef StreamState
streamStateRx :: IORef StreamState
streamRecvQ :: RecvStreamQ
streamReass :: IORef (Skew RxStreamData)
streamSyncFinTx :: MVar ()
..} = IORef (Maybe StreamData) -> IO (Maybe StreamData)
forall a. IORef a -> IO a
readIORef (IORef (Maybe StreamData) -> IO (Maybe StreamData))
-> IORef (Maybe StreamData) -> IO (Maybe StreamData)
forall a b. (a -> b) -> a -> b
$ RecvStreamQ -> IORef (Maybe StreamData)
pendingData RecvStreamQ
streamRecvQ

writePendingData :: Stream -> ByteString -> IO ()
writePendingData :: Stream -> StreamData -> IO ()
writePendingData Stream{Offset
MVar ()
TVar TxFlow
IORef RxFlow
IORef (Skew RxStreamData)
IORef StreamState
Connection
RecvStreamQ
streamSyncFinTx :: Stream -> MVar ()
streamReass :: Stream -> IORef (Skew RxStreamData)
streamRecvQ :: Stream -> RecvStreamQ
streamStateRx :: Stream -> IORef StreamState
streamStateTx :: Stream -> IORef StreamState
streamFlowRx :: Stream -> IORef RxFlow
streamFlowTx :: Stream -> TVar TxFlow
streamConnection :: Stream -> Connection
streamId :: Stream -> Offset
streamId :: Offset
streamConnection :: Connection
streamFlowTx :: TVar TxFlow
streamFlowRx :: IORef RxFlow
streamStateTx :: IORef StreamState
streamStateRx :: IORef StreamState
streamRecvQ :: RecvStreamQ
streamReass :: IORef (Skew RxStreamData)
streamSyncFinTx :: MVar ()
..} StreamData
bs = IORef (Maybe StreamData) -> Maybe StreamData -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (RecvStreamQ -> IORef (Maybe StreamData)
pendingData RecvStreamQ
streamRecvQ) (Maybe StreamData -> IO ()) -> Maybe StreamData -> IO ()
forall a b. (a -> b) -> a -> b
$ StreamData -> Maybe StreamData
forall a. a -> Maybe a
Just StreamData
bs

clearPendingData :: Stream -> IO ()
clearPendingData :: Stream -> IO ()
clearPendingData Stream{Offset
MVar ()
TVar TxFlow
IORef RxFlow
IORef (Skew RxStreamData)
IORef StreamState
Connection
RecvStreamQ
streamSyncFinTx :: Stream -> MVar ()
streamReass :: Stream -> IORef (Skew RxStreamData)
streamRecvQ :: Stream -> RecvStreamQ
streamStateRx :: Stream -> IORef StreamState
streamStateTx :: Stream -> IORef StreamState
streamFlowRx :: Stream -> IORef RxFlow
streamFlowTx :: Stream -> TVar TxFlow
streamConnection :: Stream -> Connection
streamId :: Stream -> Offset
streamId :: Offset
streamConnection :: Connection
streamFlowTx :: TVar TxFlow
streamFlowRx :: IORef RxFlow
streamStateTx :: IORef StreamState
streamStateRx :: IORef StreamState
streamRecvQ :: RecvStreamQ
streamReass :: IORef (Skew RxStreamData)
streamSyncFinTx :: MVar ()
..} = IORef (Maybe StreamData) -> Maybe StreamData -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (RecvStreamQ -> IORef (Maybe StreamData)
pendingData RecvStreamQ
streamRecvQ) Maybe StreamData
forall a. Maybe a
Nothing

----------------------------------------------------------------

takeRecvStreamQwithSize
    :: Stream
    -> Int
    -- ^ Number of bytes to receive.
    -> IO ByteString
takeRecvStreamQwithSize :: Stream -> Offset -> IO StreamData
takeRecvStreamQwithSize Stream
strm Offset
siz0 = do
    Bool
eos <- Stream -> IO Bool
getEndOfStream Stream
strm
    if Bool
eos
        then StreamData -> IO StreamData
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return StreamData
""
        else do
            Maybe StreamData
mb <- Stream -> IO (Maybe StreamData)
readPendingData Stream
strm
            case Maybe StreamData
mb of
                Maybe StreamData
Nothing -> do
                    StreamData
b0 <- Stream -> IO StreamData
takeRecvStreamQ Stream
strm
                    if StreamData
b0 StreamData -> StreamData -> Bool
forall a. Eq a => a -> a -> Bool
== StreamData
""
                        then do
                            Stream -> IO ()
setEndOfStream Stream
strm
                            StreamData -> IO StreamData
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return StreamData
""
                        else StreamData -> IO StreamData
handleBytes StreamData
b0
                Just StreamData
b0 -> do
                    Stream -> IO ()
clearPendingData Stream
strm
                    StreamData -> IO StreamData
handleBytes StreamData
b0
  where
    handleBytes :: StreamData -> IO StreamData
handleBytes StreamData
b0 =
        let len :: Offset
len = StreamData -> Offset
BS.length StreamData
b0
         in case Offset
len Offset -> Offset -> Ordering
forall a. Ord a => a -> a -> Ordering
`compare` Offset
siz0 of
                Ordering
LT -> Offset -> ([StreamData] -> [StreamData]) -> IO StreamData
tryRead (Offset
siz0 Offset -> Offset -> Offset
forall a. Num a => a -> a -> a
- Offset
len) (StreamData
b0 StreamData -> [StreamData] -> [StreamData]
forall a. a -> [a] -> [a]
:)
                Ordering
EQ -> StreamData -> IO StreamData
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return StreamData
b0
                Ordering
GT -> do
                    let (StreamData
b1, StreamData
b2) = Offset -> StreamData -> (StreamData, StreamData)
BS.splitAt Offset
siz0 StreamData
b0
                    Stream -> StreamData -> IO ()
writePendingData Stream
strm StreamData
b2
                    StreamData -> IO StreamData
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return StreamData
b1
    tryRead :: Offset -> ([StreamData] -> [StreamData]) -> IO StreamData
tryRead Offset
siz [StreamData] -> [StreamData]
build = do
        Maybe StreamData
mb <- Stream -> IO (Maybe StreamData)
tryTakeRecvStreamQ Stream
strm
        case Maybe StreamData
mb of
            Maybe StreamData
Nothing -> StreamData -> IO StreamData
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (StreamData -> IO StreamData) -> StreamData -> IO StreamData
forall a b. (a -> b) -> a -> b
$ [StreamData] -> StreamData
BS.concat ([StreamData] -> StreamData) -> [StreamData] -> StreamData
forall a b. (a -> b) -> a -> b
$ [StreamData] -> [StreamData]
build []
            Just StreamData
b -> do
                if StreamData
b StreamData -> StreamData -> Bool
forall a. Eq a => a -> a -> Bool
== StreamData
""
                    then do
                        Stream -> IO ()
setEndOfStream Stream
strm
                        StreamData -> IO StreamData
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (StreamData -> IO StreamData) -> StreamData -> IO StreamData
forall a b. (a -> b) -> a -> b
$ [StreamData] -> StreamData
BS.concat ([StreamData] -> StreamData) -> [StreamData] -> StreamData
forall a b. (a -> b) -> a -> b
$ [StreamData] -> [StreamData]
build []
                    else do
                        let len :: Offset
len = StreamData -> Offset
BS.length StreamData
b
                        case Offset
len Offset -> Offset -> Ordering
forall a. Ord a => a -> a -> Ordering
`compare` Offset
siz of
                            Ordering
LT -> Offset -> ([StreamData] -> [StreamData]) -> IO StreamData
tryRead (Offset
siz Offset -> Offset -> Offset
forall a. Num a => a -> a -> a
- Offset
len) ([StreamData] -> [StreamData]
build ([StreamData] -> [StreamData])
-> ([StreamData] -> [StreamData]) -> [StreamData] -> [StreamData]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (StreamData
b StreamData -> [StreamData] -> [StreamData]
forall a. a -> [a] -> [a]
:))
                            Ordering
EQ -> StreamData -> IO StreamData
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (StreamData -> IO StreamData) -> StreamData -> IO StreamData
forall a b. (a -> b) -> a -> b
$ [StreamData] -> StreamData
BS.concat ([StreamData] -> StreamData) -> [StreamData] -> StreamData
forall a b. (a -> b) -> a -> b
$ [StreamData] -> [StreamData]
build [StreamData
b]
                            Ordering
GT -> do
                                let (StreamData
b1, StreamData
b2) = Offset -> StreamData -> (StreamData, StreamData)
BS.splitAt Offset
siz StreamData
b
                                Stream -> StreamData -> IO ()
writePendingData Stream
strm StreamData
b2
                                StreamData -> IO StreamData
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (StreamData -> IO StreamData) -> StreamData -> IO StreamData
forall a b. (a -> b) -> a -> b
$ [StreamData] -> StreamData
BS.concat ([StreamData] -> StreamData) -> [StreamData] -> StreamData
forall a b. (a -> b) -> a -> b
$ [StreamData] -> [StreamData]
build [StreamData
b1]

----------------------------------------------------------------
----------------------------------------------------------------

data FlowCntl = OverLimit | Duplicated | Reassembled

putRxStreamData :: Stream -> RxStreamData -> IO FlowCntl
putRxStreamData :: Stream -> RxStreamData -> IO FlowCntl
putRxStreamData Stream
s rx :: RxStreamData
rx@(RxStreamData StreamData
_ Offset
off Offset
len Bool
_) = do
    Offset
lim <- Stream -> IO Offset
getRxMaxStreamData Stream
s
    if Offset
len Offset -> Offset -> Offset
forall a. Num a => a -> a -> a
+ Offset
off Offset -> Offset -> Bool
forall a. Ord a => a -> a -> Bool
> Offset
lim
        then FlowCntl -> IO FlowCntl
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return FlowCntl
OverLimit
        else do
            Bool
dup <- Stream -> RxStreamData -> (StreamData -> IO ()) -> IO () -> IO Bool
tryReassemble Stream
s RxStreamData
rx StreamData -> IO ()
put IO ()
putFin
            if Bool
dup
                then FlowCntl -> IO FlowCntl
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return FlowCntl
Duplicated
                else FlowCntl -> IO FlowCntl
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return FlowCntl
Reassembled
  where
    put :: StreamData -> IO ()
put StreamData
"" = () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    put StreamData
d = do
        Stream -> Offset -> IO ()
addRxStreamData Stream
s (Offset -> IO ()) -> Offset -> IO ()
forall a b. (a -> b) -> a -> b
$ StreamData -> Offset
BS.length StreamData
d
        Stream -> StreamData -> IO ()
putRecvStreamQ Stream
s StreamData
d
    putFin :: IO ()
putFin = Stream -> StreamData -> IO ()
putRecvStreamQ Stream
s StreamData
""

-- fin of StreamState off fin means see-fin-already.
-- return value indicates duplication
tryReassemble
    :: Stream -> RxStreamData -> (StreamData -> IO ()) -> IO () -> IO Bool
tryReassemble :: Stream -> RxStreamData -> (StreamData -> IO ()) -> IO () -> IO Bool
tryReassemble Stream{} (RxStreamData StreamData
"" Offset
_ Offset
_ Bool
False) StreamData -> IO ()
_ IO ()
_ = Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
tryReassemble Stream{Offset
MVar ()
TVar TxFlow
IORef RxFlow
IORef (Skew RxStreamData)
IORef StreamState
Connection
RecvStreamQ
streamSyncFinTx :: Stream -> MVar ()
streamReass :: Stream -> IORef (Skew RxStreamData)
streamRecvQ :: Stream -> RecvStreamQ
streamStateRx :: Stream -> IORef StreamState
streamStateTx :: Stream -> IORef StreamState
streamFlowRx :: Stream -> IORef RxFlow
streamFlowTx :: Stream -> TVar TxFlow
streamConnection :: Stream -> Connection
streamId :: Stream -> Offset
streamId :: Offset
streamConnection :: Connection
streamFlowTx :: TVar TxFlow
streamFlowRx :: IORef RxFlow
streamStateTx :: IORef StreamState
streamStateRx :: IORef StreamState
streamRecvQ :: RecvStreamQ
streamReass :: IORef (Skew RxStreamData)
streamSyncFinTx :: MVar ()
..} x :: RxStreamData
x@(RxStreamData StreamData
"" Offset
off Offset
_ Bool
True) StreamData -> IO ()
_ IO ()
putFin = do
    si0 :: StreamState
si0@(StreamState Offset
off0 Bool
fin0) <- IORef StreamState -> IO StreamState
forall a. IORef a -> IO a
readIORef IORef StreamState
streamStateRx
    let si1 :: StreamState
si1 = StreamState
si0{streamFin = True}
    if Bool
fin0
        then do
            -- stdoutLogger "Illegal Fin" -- fixme
            Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
        else case Offset
off Offset -> Offset -> Ordering
forall a. Ord a => a -> a -> Ordering
`compare` Offset
off0 of
            Ordering
LT -> Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
            Ordering
EQ -> do
                IORef StreamState -> StreamState -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef StreamState
streamStateRx StreamState
si1
                IO ()
putFin
                Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
            Ordering
GT -> do
                IORef StreamState -> StreamState -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef StreamState
streamStateRx StreamState
si1
                IORef (Skew RxStreamData)
-> (Skew RxStreamData -> Skew RxStreamData) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORef'' IORef (Skew RxStreamData)
streamReass (RxStreamData -> Skew RxStreamData -> Skew RxStreamData
forall a. Frag a => a -> Skew a -> Skew a
Skew.insert RxStreamData
x)
                Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
tryReassemble Stream{Offset
MVar ()
TVar TxFlow
IORef RxFlow
IORef (Skew RxStreamData)
IORef StreamState
Connection
RecvStreamQ
streamSyncFinTx :: Stream -> MVar ()
streamReass :: Stream -> IORef (Skew RxStreamData)
streamRecvQ :: Stream -> RecvStreamQ
streamStateRx :: Stream -> IORef StreamState
streamStateTx :: Stream -> IORef StreamState
streamFlowRx :: Stream -> IORef RxFlow
streamFlowTx :: Stream -> TVar TxFlow
streamConnection :: Stream -> Connection
streamId :: Stream -> Offset
streamId :: Offset
streamConnection :: Connection
streamFlowTx :: TVar TxFlow
streamFlowRx :: IORef RxFlow
streamStateTx :: IORef StreamState
streamStateRx :: IORef StreamState
streamRecvQ :: RecvStreamQ
streamReass :: IORef (Skew RxStreamData)
streamSyncFinTx :: MVar ()
..} x :: RxStreamData
x@(RxStreamData StreamData
dat Offset
off Offset
len Bool
False) StreamData -> IO ()
put IO ()
putFin = do
    si0 :: StreamState
si0@(StreamState Offset
off0 Bool
_) <- IORef StreamState -> IO StreamState
forall a. IORef a -> IO a
readIORef IORef StreamState
streamStateRx
    case Offset
off Offset -> Offset -> Ordering
forall a. Ord a => a -> a -> Ordering
`compare` Offset
off0 of
        Ordering
LT -> Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
        Ordering
EQ -> do
            StreamData -> IO ()
put StreamData
dat
            StreamState -> Offset -> IO ()
loop StreamState
si0 (Offset
off0 Offset -> Offset -> Offset
forall a. Num a => a -> a -> a
+ Offset
len)
            Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
        Ordering
GT -> do
            IORef (Skew RxStreamData)
-> (Skew RxStreamData -> Skew RxStreamData) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORef'' IORef (Skew RxStreamData)
streamReass (RxStreamData -> Skew RxStreamData -> Skew RxStreamData
forall a. Frag a => a -> Skew a -> Skew a
Skew.insert RxStreamData
x)
            Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
  where
    loop :: StreamState -> Offset -> IO ()
loop StreamState
si0 Offset
xff = do
        Maybe (Seq RxStreamData)
mrxs <- IORef (Skew RxStreamData)
-> (Skew RxStreamData
    -> (Skew RxStreamData, Maybe (Seq RxStreamData)))
-> IO (Maybe (Seq RxStreamData))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (Skew RxStreamData)
streamReass (Offset
-> Skew RxStreamData
-> (Skew RxStreamData, Maybe (Seq RxStreamData))
forall a. Frag a => Offset -> Skew a -> (Skew a, Maybe (Seq a))
Skew.deleteMinIf Offset
xff)
        case Maybe (Seq RxStreamData)
mrxs of
            Maybe (Seq RxStreamData)
Nothing -> IORef StreamState -> StreamState -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef StreamState
streamStateRx StreamState
si0{streamOffset = xff}
            Just Seq RxStreamData
rxs -> do
                (RxStreamData -> IO ()) -> Seq RxStreamData -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (StreamData -> IO ()
put (StreamData -> IO ())
-> (RxStreamData -> StreamData) -> RxStreamData -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RxStreamData -> StreamData
rxstrmData) Seq RxStreamData
rxs
                let xff1 :: Offset
xff1 = Seq RxStreamData -> Offset
forall a. Frag a => a -> Offset
nextOff Seq RxStreamData
rxs
                if Seq RxStreamData -> Bool
hasFin Seq RxStreamData
rxs
                    then do
                        IO ()
putFin
                    else do
                        StreamState -> Offset -> IO ()
loop StreamState
si0 Offset
xff1
tryReassemble Stream{Offset
MVar ()
TVar TxFlow
IORef RxFlow
IORef (Skew RxStreamData)
IORef StreamState
Connection
RecvStreamQ
streamSyncFinTx :: Stream -> MVar ()
streamReass :: Stream -> IORef (Skew RxStreamData)
streamRecvQ :: Stream -> RecvStreamQ
streamStateRx :: Stream -> IORef StreamState
streamStateTx :: Stream -> IORef StreamState
streamFlowRx :: Stream -> IORef RxFlow
streamFlowTx :: Stream -> TVar TxFlow
streamConnection :: Stream -> Connection
streamId :: Stream -> Offset
streamId :: Offset
streamConnection :: Connection
streamFlowTx :: TVar TxFlow
streamFlowRx :: IORef RxFlow
streamStateTx :: IORef StreamState
streamStateRx :: IORef StreamState
streamRecvQ :: RecvStreamQ
streamReass :: IORef (Skew RxStreamData)
streamSyncFinTx :: MVar ()
..} x :: RxStreamData
x@(RxStreamData StreamData
dat Offset
off Offset
len Bool
True) StreamData -> IO ()
put IO ()
putFin = do
    si0 :: StreamState
si0@(StreamState Offset
off0 Bool
fin0) <- IORef StreamState -> IO StreamState
forall a. IORef a -> IO a
readIORef IORef StreamState
streamStateRx
    let si1 :: StreamState
si1 = StreamState
si0{streamFin = True}
    if Bool
fin0
        then Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
        else case Offset
off Offset -> Offset -> Ordering
forall a. Ord a => a -> a -> Ordering
`compare` Offset
off0 of
            Ordering
LT -> Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
            Ordering
EQ -> do
                let off1 :: Offset
off1 = Offset
off0 Offset -> Offset -> Offset
forall a. Num a => a -> a -> a
+ Offset
len
                IORef StreamState -> StreamState -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef StreamState
streamStateRx StreamState
si1{streamOffset = off1}
                StreamData -> IO ()
put StreamData
dat
                IO ()
putFin
                Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
            Ordering
GT -> do
                IORef StreamState -> StreamState -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef StreamState
streamStateRx StreamState
si1
                IORef (Skew RxStreamData)
-> (Skew RxStreamData -> Skew RxStreamData) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORef'' IORef (Skew RxStreamData)
streamReass (RxStreamData -> Skew RxStreamData -> Skew RxStreamData
forall a. Frag a => a -> Skew a -> Skew a
Skew.insert RxStreamData
x)
                Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False

hasFin :: Seq RxStreamData -> Bool
hasFin :: Seq RxStreamData -> Bool
hasFin Seq RxStreamData
s = case Seq RxStreamData -> ViewR RxStreamData
forall a. Seq a -> ViewR a
Seq.viewr Seq RxStreamData
s of
    ViewR RxStreamData
Seq.EmptyR -> Bool
False
    Seq RxStreamData
_ Seq.:> RxStreamData
x -> RxStreamData -> Bool
rxstrmFin RxStreamData
x