| Copyright | (c) Dong Han 2017-2020 |
|---|---|
| License | BSD |
| Maintainer | winterland1989@gmail.com |
| Stability | experimental |
| Portability | non-portable |
| Safe Haskell | None |
| Language | Haskell2010 |
Z.IO.BIO
Description
This module provides BIO (block IO) type to facilitate writing streaming programs. A BIO node usually:
- Process input in unit of block(or item).
- Running in constant spaces, which means the memory usage won't accumulate.
- Keep some state in IO, which is sealed in
BIOclosure.
Some examples of such nodes are:
- Compressor / decompressor, e.g. zlib, etc.
- Codec, e.g. utf8 codec, base64 codec.
- Ciphers.
- Packet parsers.
We use BIO inp out type to represent all the objects above, BIO Void out to represent an IO source,
and BIO inp Void to represent an IO sink, which can all be connected with >|> to build a larger BIO node.
import Z.Data.CBytes (CBytes)
import Z.IO
import Z.IO.BIO
import Z.IO.BIO.Zlib
base64AndCompressFile :: HasCallStack => CBytes -> CBytes -> IO ()
base64AndCompressFile origin target = do
base64Enc <- newBase64Encoder
(_, zlibCompressor) <- newCompress defaultCompressConfig{compressWindowBits = 31}
withResource (initSourceFromFile origin) $ src ->
withResource (initSinkToFile target) $ sink ->
runBIO $ src >|> base64Enc >|> zlibCompressor >|> sink
> base64AndCompressFile "test" "test.gz"
-- run 'zcat "test.gz" | base64 -d' will give you original file
Synopsis
- data BIO inp out = BIO {}
- type Source out = BIO Void out
- type Sink inp = BIO inp Void
- (>|>) :: BIO a b -> BIO b c -> BIO a c
- (>~>) :: BIO a b -> (b -> c) -> BIO a c
- (>!>) :: BIO a b -> (HasCallStack => b -> IO c) -> BIO a c
- appendSource :: Source a -> Source a -> IO (Source a)
- concatSource :: [Source a] -> IO (Source a)
- zipSource :: Source a -> Source b -> IO (Source (a, b))
- zipBIO :: BIO a b -> BIO a c -> IO (BIO a (b, c))
- joinSink :: Sink out -> Sink out -> Sink out
- fuseSink :: [Sink out] -> Sink out
- runBIO :: HasCallStack => BIO Void Void -> IO ()
- runSource :: HasCallStack => Source x -> IO [x]
- runSource_ :: HasCallStack => Source x -> IO ()
- runBlock :: HasCallStack => BIO inp out -> inp -> IO [out]
- runBlock_ :: HasCallStack => BIO inp out -> inp -> IO ()
- unsafeRunBlock :: HasCallStack => IO (BIO inp out) -> inp -> [out]
- runBlocks :: HasCallStack => BIO inp out -> [inp] -> IO [out]
- runBlocks_ :: HasCallStack => BIO inp out -> [inp] -> IO ()
- unsafeRunBlocks :: HasCallStack => IO (BIO inp out) -> [inp] -> [out]
- pureBIO :: (a -> b) -> BIO a b
- ioBIO :: (HasCallStack => a -> IO b) -> BIO a b
- sourceFromList :: [a] -> IO (Source a)
- initSourceFromFile :: HasCallStack => CBytes -> Resource (Source Bytes)
- sourceFromBuffered :: HasCallStack => BufferedInput -> Source Bytes
- sourceFromInput :: (HasCallStack, Input i) => i -> IO (Source Bytes)
- sourceTextFromBuffered :: HasCallStack => BufferedInput -> Source Text
- sourceTextFromInput :: (HasCallStack, Input i) => i -> IO (Source Text)
- sourceJSONFromBuffered :: forall a. (JSON a, HasCallStack) => BufferedInput -> Source a
- sourceJSONFromInput :: (HasCallStack, Input i, JSON a) => i -> IO (Source a)
- sourceParsedBufferInput :: HasCallStack => Parser a -> BufferedInput -> Source a
- sourceParsedInput :: (Input i, HasCallStack) => Parser a -> i -> IO (Source a)
- sinkToList :: IO (IORef [a], Sink a)
- sinkToBuffered :: HasCallStack => BufferedOutput -> Sink Bytes
- sinkBuilderToBuffered :: HasCallStack => BufferedOutput -> Sink (Builder a)
- sinkToOutput :: HasCallStack => Output o => o -> IO (Sink Bytes)
- initSinkToFile :: HasCallStack => CBytes -> Resource (Sink Bytes)
- sinkBuilderToOutput :: (Output o, HasCallStack) => o -> IO (Sink (Builder ()))
- sinkToIO :: HasCallStack => (a -> IO ()) -> Sink a
- newParserNode :: HasCallStack => Parser a -> IO (BIO Bytes a)
- newReChunk :: Int -> IO (BIO Bytes Bytes)
- newUTF8Decoder :: HasCallStack => IO (BIO Bytes Text)
- newMagicSplitter :: Word8 -> IO (BIO Bytes Bytes)
- newLineSplitter :: IO (BIO Bytes Bytes)
- newBase64Encoder :: IO (BIO Bytes Bytes)
- newBase64Decoder :: IO (BIO Bytes Bytes)
- hexEncoder :: Bool -> BIO Bytes Bytes
- newHexDecoder :: IO (BIO Bytes Bytes)
- newCounterNode :: IO (Counter, BIO a a)
- newSeqNumNode :: IO (Counter, BIO a (Int, a))
- newGroupingNode :: Int -> IO (BIO a (SmallArray a))
The BIO type
A BIO(blocked IO) node.
A BIO node consist of two functions: push and pull. It can be used to describe different kinds of IO
devices:
BIO inp outdescribe an IO state machine(e.g. z_stream in zlib), which takes some input in block, then outputs.type Source out = BIO Void outdescribed an IO source, which never takes input, but gives output until EOF whenpulled.type Sink inp = BIO inp Voiddescribed an IO sink, which takes input and perform some IO effects, such as writing to terminal or files.
You can connect these BIO nodes with >|>, which connect left node's output to right node's input,
and return a new BIO node with left node's input type and right node's output type.
You can run a BIO node in different ways:
runBIOwill continuously pull value from source, push to sink until source reaches EOF.runSourcewill continuously pull value from source, and perform effects along the way.runBlockwill supply a single block of input as whole input, and return output if there's any.runBlockswill supply a list of blocks as whole input, and return a list of output blocks.
Note BIO usually contains some IO states, you can consider it as an opaque IORef:
- You shouldn't use a
BIOnode across multipleBIOchain unless the state can be reset. - You shouldn't use a
BIOnode across multiple threads unless document states otherwise.
BIO is simply a convenient way to construct single-thread streaming computation, to use BIO
in multiple threads, check Z.IO.BIO.Concurrent module.
Constructors
| BIO | |
Fields
| |
Basic combinators
(>|>) :: BIO a b -> BIO b c -> BIO a c infixl 3 Source #
Connect two BIO nodes, feed left one's output to right one's input.
(>!>) :: BIO a b -> (HasCallStack => b -> IO c) -> BIO a c Source #
Connect BIO to an effectful function.
appendSource :: Source a -> Source a -> IO (Source a) Source #
Connect two BIO source, after first reach EOF, draw element from second.
concatSource :: [Source a] -> IO (Source a) Source #
Connect list of BIO sources, after one reach EOF, draw element from next.
zipSource :: Source a -> Source b -> IO (Source (a, b)) Source #
Zip two BIO source into one, reach EOF when either one reached EOF.
zipBIO :: BIO a b -> BIO a c -> IO (BIO a (b, c)) Source #
Zip two BIO node into one, reach EOF when either one reached EOF.
The output item number should match, unmatched output will be discarded.
Run BIO chain
runSource_ :: HasCallStack => Source x -> IO () Source #
Drain a source without collecting result.
runBlock :: HasCallStack => BIO inp out -> inp -> IO [out] Source #
Supply a single block of input, then run BIO node until EOF.
Note many BIO node will be closed or not be able to take new input after drained.
runBlock_ :: HasCallStack => BIO inp out -> inp -> IO () Source #
Supply a single block of input, then run BIO node until EOF with collecting result.
Note many BIO node will be closed or not be able to take new input after drained.
unsafeRunBlock :: HasCallStack => IO (BIO inp out) -> inp -> [out] Source #
Wrap a stream computation into a pure interface.
You can wrap a stateful BIO computation(including the creation of BIO node),
when you can guarantee a computation is pure, e.g. compressing, decoding, etc.
runBlocks :: HasCallStack => BIO inp out -> [inp] -> IO [out] Source #
Supply blocks of input, then run BIO node until EOF.
Note many BIO node will be closed or not be able to take new input after drained.
runBlocks_ :: HasCallStack => BIO inp out -> [inp] -> IO () Source #
Supply blocks of input, then run BIO node until EOF with collecting result.
Note many BIO node will be closed or not be able to take new input after drained.
unsafeRunBlocks :: HasCallStack => IO (BIO inp out) -> [inp] -> [out] Source #
Wrap a stream computation into a pure interface.
Similar to unsafeRunBlock, but with a list of input blocks.
Make new BIO
pureBIO :: (a -> b) -> BIO a b Source #
BIO node from a pure function.
BIO node made with this funtion are stateless, thus can be reused across chains.
ioBIO :: (HasCallStack => a -> IO b) -> BIO a b Source #
BIO node from an IO function.
BIO node made with this funtion may not be stateless, it depends on if the IO function use IO state.
Source
sourceFromList :: [a] -> IO (Source a) Source #
Source a list from memory.
initSourceFromFile :: HasCallStack => CBytes -> Resource (Source Bytes) Source #
Turn a file into a Bytes source.
sourceFromBuffered :: HasCallStack => BufferedInput -> Source Bytes Source #
Turn a BufferedInput into BIO source, map EOF to Nothing.
sourceFromInput :: (HasCallStack, Input i) => i -> IO (Source Bytes) Source #
Turn an input device into a Bytes source.
sourceTextFromBuffered :: HasCallStack => BufferedInput -> Source Text Source #
Turn a UTF8 encoded BufferedInput into BIO source, map EOF to Nothing.
sourceTextFromInput :: (HasCallStack, Input i) => i -> IO (Source Text) Source #
Turn an input device into a Text source.
sourceJSONFromBuffered :: forall a. (JSON a, HasCallStack) => BufferedInput -> Source a Source #
Turn a JSON encoded BufferedInput into BIO source, ignoring any
whitespaces bewteen JSON objects. If EOF reached, then return Nothing.
Throw OtherError with name EJSON if JSON value is not parsed or converted.
sourceJSONFromInput :: (HasCallStack, Input i, JSON a) => i -> IO (Source a) Source #
Turn an input device into a JSON source.
Throw OtherError with name EJSON if JSON value is not parsed or converted.
sourceParsedBufferInput :: HasCallStack => Parser a -> BufferedInput -> Source a Source #
Turn buffered input device into a packet source, throw OtherError with name EPARSE if parsing fail.
sourceParsedInput :: (Input i, HasCallStack) => Parser a -> i -> IO (Source a) Source #
Turn input device into a packet source.
Sink
sinkToList :: IO (IORef [a], Sink a) Source #
Sink to a list in memory.
The list's IORef is not thread safe here,
and list items are in reversed order during sinking(will be reversed when flushed, i.e. pulled),
Please don't use it in multiple thread.
sinkToBuffered :: HasCallStack => BufferedOutput -> Sink Bytes Source #
Turn a BufferedOutput into a Bytes sink.
sinkBuilderToBuffered :: HasCallStack => BufferedOutput -> Sink (Builder a) Source #
Turn a BufferedOutput into a Builder sink.
sinkToOutput :: HasCallStack => Output o => o -> IO (Sink Bytes) Source #
initSinkToFile :: HasCallStack => CBytes -> Resource (Sink Bytes) Source #
sinkBuilderToOutput :: (Output o, HasCallStack) => o -> IO (Sink (Builder ())) Source #
Bytes specific
newParserNode :: HasCallStack => Parser a -> IO (BIO Bytes a) Source #
Read buffer and parse with Parser.
This function will continuously draw data from input before parsing finish. Unconsumed bytes will be returned to buffer.
Return Nothing if reach EOF before parsing, throw OtherError with name EPARSE if parsing fail.
Make a chunk size divider.
A divider size divide each chunk's size to the nearest multiplier to granularity, last trailing chunk is directly returned.
newUTF8Decoder :: HasCallStack => IO (BIO Bytes Text) Source #
Make a new UTF8 decoder, which decode bytes streams into text streams.
If there're invalid UTF8 bytes, an OtherError with name EINVALIDUTF8 will be thrown.`
Note this node is supposed to be used with preprocess node such as compressor, decoder, etc. where bytes
boundary cannot be controlled, UTF8 decoder will concat trailing bytes from last block to next one.
Use this node directly with sourceFromBuffered / sourceFromInput will not be as efficient as directly use
sourceTextFromBuffered / sourceTextFromInput, because BufferedInput provides push back capability,
trailing bytes can be pushde back to reading buffer and returned with next block input together.
newMagicSplitter :: Word8 -> IO (BIO Bytes Bytes) Source #
Make a new stream splitter based on magic byte.
newLineSplitter :: IO (BIO Bytes Bytes) Source #
Make a new stream splitter based on linefeed(rn or n).
The result bytes doesn't contain linefeed.
Make a hex encoder node.
Hex encoder is stateless, it can be reused across chains.
Generic BIO
newCounterNode :: IO (Counter, BIO a a) Source #
Make a new BIO node which counts items flow throught it.
Returned Counter is increased atomically, it's safe to read / reset the counter from other threads.
newSeqNumNode :: IO (Counter, BIO a (Int, a)) Source #
Make a new BIO node which counts items, and label item with a sequence number.
Returned Counter is increased atomically, it's safe to read / reset the counter from other threads.
newGroupingNode :: Int -> IO (BIO a (SmallArray a)) Source #
Make a BIO node grouping items into fixed size arrays.