module Network.HTTP2.Client (
runHttp2Client
, newHttp2Client
, withHttp2Stream
, headers
, sendData
, Http2Client(..)
, PushPromiseHandler
, StreamDefinition(..)
, StreamStarter
, TooMuchConcurrency(..)
, StreamThread
, Http2Stream(..)
, IncomingFlowControl(..)
, OutgoingFlowControl(..)
, linkAsyncs
, RemoteSentGoAwayFrame(..)
, GoAwayHandler
, defaultGoAwayHandler
, FallBackFrameHandler
, ignoreFallbackHandler
, FlagSetter
, Http2ClientAsyncs(..)
, _gtfo
, module Network.HTTP2.Client.FrameConnection
, module Network.Socket
, module Network.TLS
) where
import Control.Concurrent.Async (Async, async, race, withAsync, link)
import Control.Exception (bracket, throwIO, SomeException, catch)
import Control.Concurrent.MVar (newMVar, takeMVar, putMVar)
import Control.Concurrent (threadDelay)
import Control.Monad (forever, when, forM_)
import Data.ByteString (ByteString)
import qualified Data.ByteString as ByteString
import Data.IORef (newIORef, atomicModifyIORef', readIORef)
import Data.Maybe (fromMaybe)
import GHC.Exception (Exception)
import Network.HPACK as HPACK
import Network.HTTP2 as HTTP2
import Network.Socket (HostName, PortNumber)
import Network.TLS (ClientParams)
import Network.HTTP2.Client.Channels
import Network.HTTP2.Client.Dispatch
import Network.HTTP2.Client.FrameConnection
data IncomingFlowControl = IncomingFlowControl {
_addCredit :: WindowSize -> IO ()
, _consumeCredit :: WindowSize -> IO Int
, _updateWindow :: IO Bool
}
data OutgoingFlowControl = OutgoingFlowControl {
_receiveCredit :: WindowSize -> IO ()
, _withdrawCredit :: WindowSize -> IO WindowSize
}
data StreamDefinition a = StreamDefinition {
_initStream :: IO StreamThread
, _handleStream :: IncomingFlowControl -> OutgoingFlowControl -> IO a
}
type StreamStarter a =
(Http2Stream -> StreamDefinition a) -> IO (Either TooMuchConcurrency a)
newtype TooMuchConcurrency = TooMuchConcurrency { _getStreamRoomNeeded :: Int }
deriving Show
data Http2Client = Http2Client {
_ping :: ByteString -> IO (IO (FrameHeader, FramePayload))
, _settings :: SettingsList -> IO (IO (FrameHeader, FramePayload))
, _goaway :: ErrorCodeId -> ByteString -> IO ()
, _startStream :: forall a. StreamStarter a
, _incomingFlowControl :: IncomingFlowControl
, _outgoingFlowControl :: OutgoingFlowControl
, _payloadSplitter :: IO PayloadSplitter
, _asyncs :: !Http2ClientAsyncs
, _close :: IO ()
}
data InitHttp2Client = InitHttp2Client {
_initPing :: ByteString -> IO (IO (FrameHeader, FramePayload))
, _initSettings :: SettingsList -> IO (IO (FrameHeader, FramePayload))
, _initGoaway :: ErrorCodeId -> ByteString -> IO ()
, _initStartStream :: forall a. StreamStarter a
, _initIncomingFlowControl :: IncomingFlowControl
, _initOutgoingFlowControl :: OutgoingFlowControl
, _initPaylodSplitter :: IO PayloadSplitter
, _initClose :: IO ()
}
data Http2ClientAsyncs = Http2ClientAsyncs {
_waitSettingsAsync :: Async (FrameHeader, FramePayload)
, _incomingFramesAsync :: Async ()
}
linkAsyncs :: Http2Client -> IO ()
linkAsyncs client =
let Http2ClientAsyncs{..} = _asyncs client in do
link _waitSettingsAsync
link _incomingFramesAsync
_gtfo :: Http2Client -> ErrorCodeId -> ByteString -> IO ()
_gtfo = _goaway
data StreamThread = CST
data Http2Stream = Http2Stream {
_headers :: HPACK.HeaderList
-> (FrameFlags -> FrameFlags)
-> IO StreamThread
, _prio :: Priority -> IO ()
, _rst :: ErrorCodeId -> IO ()
, _waitHeaders :: IO (FrameHeader, StreamId, Either ErrorCode HeaderList)
, _waitData :: IO (FrameHeader, Either ErrorCode ByteString)
, _sendDataChunk :: (FrameFlags -> FrameFlags) -> ByteString -> IO ()
, _waitPushPromise :: Maybe (PushPromiseHandler -> IO ())
}
type PushPromiseHandler =
StreamId -> Http2Stream -> HeaderList -> IncomingFlowControl -> OutgoingFlowControl -> IO ()
withHttp2Stream :: Http2Client -> StreamStarter a
withHttp2Stream = _startStream
type FlagSetter = FrameFlags -> FrameFlags
headers :: Http2Stream -> HeaderList -> FlagSetter -> IO StreamThread
headers = _headers
runHttp2Client
:: Http2FrameConnection
-> Int
-> Int
-> SettingsList
-> GoAwayHandler
-> FallBackFrameHandler
-> (Http2Client -> IO a)
-> IO a
runHttp2Client conn encoderBufSize decoderBufSize initSettings goAwayHandler fallbackHandler mainHandler = do
(incomingLoop, initClient) <- initHttp2Client conn encoderBufSize decoderBufSize goAwayHandler fallbackHandler
withAsync incomingLoop $ \aIncoming -> do
settsIO <- _initSettings initClient initSettings
withAsync settsIO $ \aSettings -> do
mainHandler $ Http2Client {
_settings = _initSettings initClient
, _ping = _initPing initClient
, _goaway = _initGoaway initClient
, _close = _initClose initClient
, _startStream = _initStartStream initClient
, _incomingFlowControl = _initIncomingFlowControl initClient
, _outgoingFlowControl = _initOutgoingFlowControl initClient
, _payloadSplitter = _initPaylodSplitter initClient
, _asyncs = Http2ClientAsyncs aSettings aIncoming
}
newHttp2Client
:: Http2FrameConnection
-> Int
-> Int
-> SettingsList
-> GoAwayHandler
-> FallBackFrameHandler
-> IO Http2Client
newHttp2Client conn encoderBufSize decoderBufSize initSettings goAwayHandler fallbackHandler = do
(incomingLoop, initClient) <- initHttp2Client conn encoderBufSize decoderBufSize goAwayHandler fallbackHandler
aIncoming <- async incomingLoop
settsIO <- _initSettings initClient initSettings
aSettings <- async settsIO
return $ Http2Client {
_settings = _initSettings initClient
, _ping = _initPing initClient
, _goaway = _initGoaway initClient
, _close = _initClose initClient
, _startStream = _initStartStream initClient
, _incomingFlowControl = _initIncomingFlowControl initClient
, _outgoingFlowControl = _initOutgoingFlowControl initClient
, _payloadSplitter = _initPaylodSplitter initClient
, _asyncs = Http2ClientAsyncs aSettings aIncoming
}
initHttp2Client
:: Http2FrameConnection
-> Int
-> Int
-> GoAwayHandler
-> FallBackFrameHandler
-> IO (IO (), InitHttp2Client)
initHttp2Client conn encoderBufSize decoderBufSize goAwayHandler fallbackHandler = do
let controlStream = makeFrameClientStream conn 0
let ackPing = sendPingFrame controlStream HTTP2.setAck
let ackSettings = sendSettingsFrame controlStream HTTP2.setAck []
dispatch <- newDispatchIO
dispatchControl <- newDispatchControlIO encoderBufSize
ackPing
ackSettings
goAwayHandler
fallbackHandler
creditFrames <- newDispatchReadChanIO dispatch
_initOutgoingFlowControl <- newOutgoingFlowControl dispatchControl 0 creditFrames
_initIncomingFlowControl <- newIncomingFlowControl dispatchControl controlStream
dispatchHPACK <- newDispatchHPACKIO decoderBufSize
let incomingLoop = dispatchLoop conn dispatch dispatchControl _initIncomingFlowControl dispatchHPACK
conccurentStreams <- newIORef 0
clientStreamIdMutex <- newMVar 0
let withClientStreamId h = bracket (takeMVar clientStreamIdMutex)
(putMVar clientStreamIdMutex . succ)
(\k -> h (2 * k + 1))
let _initStartStream getWork = do
maxConcurrency <- fromMaybe 100 . maxConcurrentStreams . _serverSettings <$> readSettings dispatchControl
roomNeeded <- atomicModifyIORef' conccurentStreams
(\n -> if n < maxConcurrency then (n + 1, 0) else (n, 1 + n maxConcurrency))
if roomNeeded > 0
then
return $ Left $ TooMuchConcurrency roomNeeded
else Right <$> do
cont <- withClientStreamId $ \sid -> do
streamHeaders <- newDispatchHPACKReadHeadersChanIO dispatchHPACK
streamFrames <- newDispatchReadChanIO dispatch
streamPP <- newDispatchHPACKReadPushPromisesChanIO dispatchHPACK
initializeStream conn
dispatchControl
streamFrames
streamHeaders
(Just streamPP)
sid
getWork
v <- cont
atomicModifyIORef' conccurentStreams (\n -> (n 1, ()))
pure v
let _initPing dat = do
pingFrames <- newDispatchReadChanIO dispatch
sendPingFrame controlStream id dat
return $ waitFrame (isPingReply dat) pingFrames
let _initSettings settslist = do
settingsFrames <- newDispatchReadChanIO dispatch
sendSettingsFrame controlStream id settslist
return $ do
ret <- waitFrame isSettingsReply settingsFrames
modifySettings dispatchControl
(\(ConnectionSettings cli srv) ->
(ConnectionSettings (HTTP2.updateSettings cli settslist) srv, ()))
return ret
let _initGoaway err errStr = do
sId <- readMaxReceivedStreamIdIO dispatch
sendGTFOFrame controlStream sId err errStr
let _initPaylodSplitter = settingsPayloadSplitter <$> readSettings dispatchControl
let _initClose = closeConnection conn
return (incomingLoop, InitHttp2Client{..})
initializeStream
:: Exception e
=> Http2FrameConnection
-> DispatchControl
-> FramesChan e
-> HeadersChan
-> Maybe (PushPromisesChan e)
-> StreamId
-> (Http2Stream -> StreamDefinition a)
-> IO (IO a)
initializeStream conn control frames headersFrames mPushPromises sid getWork = do
let frameStream = makeFrameClientStream conn sid
credits <- dupChan frames
incomingStreamFlowControl <- newIncomingFlowControl control frameStream
outgoingStreamFlowControl <- newOutgoingFlowControl control sid credits
let _headers headersList flags = do
splitter <- settingsPayloadSplitter <$> readSettings control
sendHeaders frameStream (_dispatchControlHpackEncoder control) headersList splitter flags
let _waitHeaders = waitHeadersWithStreamId sid headersFrames
let _waitData = do
(fh, fp) <- waitFrameWithTypeIdForStreamId sid [FrameRSTStream, FrameData] frames
case fp of
DataFrame dat -> do
_ <- _consumeCredit incomingStreamFlowControl (HTTP2.payloadLength fh)
_addCredit incomingStreamFlowControl (HTTP2.payloadLength fh)
return (fh, Right dat)
RSTStreamFrame err -> do
return (fh, Left $ HTTP2.fromErrorCodeId err)
_ -> error "waitFrameWithTypeIdForStreamId returned an unknown frame"
let _sendDataChunk = sendDataFrame frameStream
let _rst = sendResetFrame frameStream
let _prio = sendPriorityFrame frameStream
let makeWaitPushPromise pushPromises ppHandler = do
(_,ppFrames,ppHeaders,ppSid,ppReadHeaders) <- waitPushPromiseWithParentStreamId sid pushPromises
let mkStreamActions stream = StreamDefinition (return CST) (ppHandler sid stream ppReadHeaders)
ppCont <- initializeStream conn
control
ppFrames
ppHeaders
Nothing
ppSid
mkStreamActions
ppCont
let _waitPushPromise = fmap makeWaitPushPromise mPushPromises
let streamActions = getWork $ Http2Stream{..}
_ <- _initStream streamActions
return $ _handleStream streamActions incomingStreamFlowControl outgoingStreamFlowControl
dispatchLoop
:: Http2FrameConnection
-> Dispatch
-> DispatchControl
-> IncomingFlowControl
-> DispatchHPACK
-> IO ()
dispatchLoop conn d dc inFlowControl dh = do
let getNextFrame = next conn
delayException . forever $ do
frame <- getNextFrame
dispatchFramesStep frame d
whenFrame (hasStreamId 0) frame $ \got ->
dispatchControlFramesStep got dc
whenFrame (hasTypeId [FrameData]) frame $ \got ->
creditDataFramesStep inFlowControl got
whenFrame (hasTypeId [FrameRSTStream, FramePushPromise, FrameHeaders]) frame $ \got -> do
let hpackLoop (FinishedWithHeaders act) =
act
hpackLoop (FinishedWithPushPromise act) = do
newDispatchReadChanIO d >>= act
hpackLoop (WaitContinuation act) =
getNextFrame >>= act >>= hpackLoop
hpackLoop (dispatchHPACKFramesStep got dh)
dispatchFramesStep
:: (FrameHeader, Either HTTP2Error FramePayload)
-> Dispatch
-> IO ()
dispatchFramesStep frame@(fh,_) (Dispatch{..}) = do
atomicModifyIORef' _dispatchMaxStreamId (\n -> (max n (streamId fh), ()))
writeChan _dispatchWriteChan frame
dispatchControlFramesStep
:: (FrameHeader, FramePayload)
-> DispatchControl
-> IO ()
dispatchControlFramesStep controlFrame@(fh, payload) (DispatchControl{..}) = do
case payload of
(SettingsFrame settsList)
| not . testAck . flags $ fh -> do
atomicModifyIORef' _dispatchControlConnectionSettings
(\(ConnectionSettings cli srv) ->
(ConnectionSettings cli (HTTP2.updateSettings srv settsList), ()))
maybe (return ())
(_applySettings _dispatchControlHpackEncoder)
(lookup SettingsHeaderTableSize settsList)
_dispatchControlAckSettings
| otherwise -> do
ignore "TODO: settings ack should be taken into account only after reception, we should return a waitSettingsAck in the _settings function"
(PingFrame pingMsg)
| not . testAck . flags $ fh ->
_dispatchControlAckPing pingMsg
| otherwise -> do
ignore "PingFrame replies waited for in the requestor thread"
(WindowUpdateFrame _ ) ->
ignore "connection-wide WindowUpdateFrame waited for in OutgoingFlowControl threads"
(GoAwayFrame lastSid errCode reason) ->
_dispatchControlOnGoAway $ RemoteSentGoAwayFrame lastSid errCode reason
_ ->
_dispatchControlOnFallback controlFrame
where
ignore :: String -> IO ()
ignore _ = return ()
creditDataFramesStep
:: IncomingFlowControl
-> (FrameHeader, FramePayload)
-> IO ()
creditDataFramesStep flowControl (fh,_) = do
_ <- _consumeCredit flowControl (HTTP2.payloadLength fh)
_addCredit flowControl (HTTP2.payloadLength fh)
data HPACKLoopDecision =
ForwardHeader !StreamId
| OpenPushPromise !StreamId !StreamId
data HPACKStepResult =
WaitContinuation !((FrameHeader, Either HTTP2Error FramePayload) -> IO HPACKStepResult)
| FinishedWithHeaders !(IO ())
| FinishedWithPushPromise !(DispatchChan -> IO ())
dispatchHPACKFramesStep
:: (FrameHeader, FramePayload)
-> DispatchHPACK
-> HPACKStepResult
dispatchHPACKFramesStep (fh,fp) (DispatchHPACK{..}) =
let (decision, pattern) = case fp of
PushPromiseFrame ppSid hbf -> do
(OpenPushPromise sid ppSid, Right hbf)
HeadersFrame _ hbf ->
(ForwardHeader sid, Right hbf)
RSTStreamFrame err ->
(ForwardHeader sid, Left err)
_ ->
error "wrong TypeId"
in go fh decision pattern
where
sid :: StreamId
sid = HTTP2.streamId fh
go :: FrameHeader -> HPACKLoopDecision -> Either ErrorCodeId ByteString -> HPACKStepResult
go curFh decision (Right buffer) =
if not $ HTTP2.testEndHeader (HTTP2.flags curFh)
then WaitContinuation $ \frame -> do
let interrupted fh2 fp2 =
not $ hasTypeId [ FrameRSTStream , FrameContinuation ] fh2 fp2
whenFrameElse interrupted frame (\_ ->
error "invalid frame type while waiting for CONTINUATION")
(\(lastFh, lastFp) ->
case lastFp of
ContinuationFrame chbf ->
return $ go lastFh decision (Right (ByteString.append buffer chbf))
RSTStreamFrame err ->
return $ go lastFh decision (Left err)
_ ->
error "continued frame has invalid type")
else case decision of
ForwardHeader sId -> FinishedWithHeaders $ do
newHdrs <- decodeHeader _dispatchHPACKDynamicTable buffer
writeChan _dispatchHPACKWriteHeadersChan (curFh, sId, Right newHdrs)
OpenPushPromise parentSid newSid -> FinishedWithPushPromise $ \ppChan -> do
newHdrs <- decodeHeader _dispatchHPACKDynamicTable buffer
ppHeaders <- dupChan _dispatchHPACKWriteHeadersChan
writeChan _dispatchHPACKWritePushPromisesChan (parentSid, ppChan, ppHeaders, newSid, newHdrs)
go curFh _ (Left err) = FinishedWithHeaders $
writeChan _dispatchHPACKWriteHeadersChan (curFh, sid, (Left $ HTTP2.fromErrorCodeId err))
newIncomingFlowControl
:: DispatchControl
-> Http2FrameClientStream
-> IO IncomingFlowControl
newIncomingFlowControl control stream = do
let getBase = if _getStreamId stream == 0
then return HTTP2.defaultInitialWindowSize
else initialWindowSize . _clientSettings <$> readSettings control
creditAdded <- newIORef 0
creditConsumed <- newIORef 0
let _addCredit n = atomicModifyIORef' creditAdded (\c -> (c + n, ()))
let _consumeCredit n = do
conso <- atomicModifyIORef' creditConsumed (\c -> (c + n, c + n))
base <- getBase
extra <- readIORef creditAdded
return $ base + extra conso
let _updateWindow = do
base <- initialWindowSize . _clientSettings <$> readSettings control
added <- readIORef creditAdded
consumed <- readIORef creditConsumed
let transferred = min added (HTTP2.maxWindowSize base + consumed)
let shouldUpdate = transferred > 0
_addCredit (negate transferred)
_ <- _consumeCredit (negate transferred)
when shouldUpdate (sendWindowUpdateFrame stream transferred)
return shouldUpdate
return $ IncomingFlowControl _addCredit _consumeCredit _updateWindow
newOutgoingFlowControl ::
Exception e
=> DispatchControl
-> StreamId
-> Chan (FrameHeader, Either e FramePayload)
-> IO OutgoingFlowControl
newOutgoingFlowControl control sid frames = do
credit <- newIORef 0
let getBase = if sid == 0
then return HTTP2.defaultInitialWindowSize
else initialWindowSize . _serverSettings <$> readSettings control
let receive n = atomicModifyIORef' credit (\c -> (c + n, ()))
let withdraw 0 = return 0
withdraw n = do
base <- getBase
got <- atomicModifyIORef' credit (\c ->
if base + c >= n
then (c n, n)
else (0 base, base + c))
if got > 0
then return got
else do
amount <- race (waitSettingsChange base) waitSomeCredit
receive (either (const 0) id amount)
withdraw n
return $ OutgoingFlowControl receive withdraw
where
waitSettingsChange prev = do
new <- initialWindowSize . _serverSettings <$> readSettings control
if new == prev then threadDelay 1000000 >> waitSettingsChange prev else return ()
waitSomeCredit = do
(_, fp) <- waitFrameWithTypeIdForStreamId sid [FrameWindowUpdate] frames
case fp of
WindowUpdateFrame amt -> return amt
_ -> error "waitFrameWithTypeIdForStreamId returned an unknown frame"
sendHeaders
:: Http2FrameClientStream
-> HpackEncoderContext
-> HeaderList
-> PayloadSplitter
-> (FrameFlags -> FrameFlags)
-> IO StreamThread
sendHeaders s enc hdrs blockSplitter flagmod = do
headerBlockFragments <- blockSplitter <$> _encodeHeaders enc hdrs
let framers = (HeadersFrame Nothing) : repeat ContinuationFrame
let frames = zipWith ($) framers headerBlockFragments
let modifiersReversed = (HTTP2.setEndHeader . flagmod) : repeat id
let arrangedFrames = reverse $ zip modifiersReversed (reverse frames)
sendBackToBack s arrangedFrames
return CST
type PayloadSplitter = ByteString -> [ByteString]
settingsPayloadSplitter :: ConnectionSettings -> PayloadSplitter
settingsPayloadSplitter (ConnectionSettings _ srv) =
fixedSizeChunks (maxFrameSize srv)
fixedSizeChunks :: Int -> ByteString -> [ByteString]
fixedSizeChunks 0 _ = error "cannot chunk by zero-length blocks"
fixedSizeChunks _ "" = []
fixedSizeChunks len bstr =
let
(chunk, rest) = ByteString.splitAt len bstr
in
chunk : fixedSizeChunks len rest
sendData :: Http2Client -> Http2Stream -> FlagSetter -> ByteString -> IO ()
sendData conn stream flagmod dat = do
splitter <- _payloadSplitter conn
let chunks = splitter dat
let pairs = reverse $ zip (flagmod : repeat id) (reverse chunks)
when (null chunks) $ _sendDataChunk stream flagmod ""
forM_ pairs $ \(flags, chunk) -> _sendDataChunk stream flags chunk
sendDataFrame
:: Http2FrameClientStream
-> (FrameFlags -> FrameFlags) -> ByteString -> IO ()
sendDataFrame s flagmod dat = do
sendOne s flagmod (DataFrame dat)
sendResetFrame :: Http2FrameClientStream -> ErrorCodeId -> IO ()
sendResetFrame s err = do
sendOne s id (RSTStreamFrame err)
sendGTFOFrame
:: Http2FrameClientStream
-> StreamId -> ErrorCodeId -> ByteString -> IO ()
sendGTFOFrame s lastStreamId err errStr = do
sendOne s id (GoAwayFrame lastStreamId err errStr)
rfcError :: String -> a
rfcError msg = error (msg ++ "draft-ietf-httpbis-http2-17")
sendPingFrame
:: Http2FrameClientStream
-> (FrameFlags -> FrameFlags)
-> ByteString
-> IO ()
sendPingFrame s flags dat
| _getStreamId s /= 0 =
rfcError "PING frames are not associated with any individual stream."
| ByteString.length dat /= 8 =
rfcError "PING frames MUST contain 8 octets"
| otherwise = sendOne s flags (PingFrame dat)
sendWindowUpdateFrame
:: Http2FrameClientStream -> WindowSize -> IO ()
sendWindowUpdateFrame s amount = do
let payload = WindowUpdateFrame amount
sendOne s id payload
return ()
sendSettingsFrame
:: Http2FrameClientStream
-> (FrameFlags -> FrameFlags) -> SettingsList -> IO ()
sendSettingsFrame s flags setts
| _getStreamId s /= 0 =
rfcError "The stream identifier for a SETTINGS frame MUST be zero (0x0)."
| otherwise = do
let payload = SettingsFrame setts
sendOne s flags payload
return ()
sendPriorityFrame :: Http2FrameClientStream -> Priority -> IO ()
sendPriorityFrame s p = do
let payload = PriorityFrame p
sendOne s id payload
return ()
delayException :: IO a -> IO a
delayException act = act `catch` slowdown
where
slowdown :: SomeException -> IO a
slowdown e = threadDelay 50000 >> throwIO e