Copyright | (c) 2017 Composewell Technologies |
---|---|
License | BSD3 |
Maintainer | streamly@composewell.com |
Stability | released |
Portability | GHC |
Safe Haskell | None |
Language | Haskell2010 |
Streamly.Data.StreamK
Description
Streams represented as chains of function calls using Continuation Passing
Style (CPS), suitable for dynamically and recursively composing potentially
large number of streams. The K
in StreamK
stands for Kontinuation.
In addition to the combinators in this module, you can use operations from
Streamly.Data.Stream for StreamK as well by converting StreamK to Stream
(toStream
), and vice-versa (fromStream
). Please refer to
Streamly.Internal.Data.StreamK for more functions that have not yet been
released.
For documentation see the corresponding combinators in Streamly.Data.Stream. Documentation has been omitted in this module unless there is a difference worth mentioning or if the combinator does not exist in Streamly.Data.Stream.
Fused vs CPS Streams
Unlike the statically fused operations in Streamly.Data.Stream, StreamK operations are less efficient, involving a function call overhead for each element, but they exhibit linear O(n) time complexity wrt to the number of stream compositions. Therefore, they are suitable for dynamically composing streams e.g. appending potentially infinite streams in recursive loops. While fused streams can be used efficiently to process elements as small as a single byte, CPS streams are typically used on bigger chunks of data to avoid the larger overhead per element.
Overview
StreamK can be constructed like lists, except that they use nil
instead of
'[]' and cons
instead of :
.
>>>
import Streamly.Data.StreamK (StreamK, cons, consM, nil)
cons
constructs a stream from pure values:
>>>
stream = 1 `cons` 2 `cons` nil :: StreamK IO Int
Operations from Streamly.Data.Stream can be used for StreamK as well by
converting StreamK to Stream (toStream
), and vice-versa (fromStream
).
>>>
Stream.fold Fold.toList $ StreamK.toStream stream -- IO [Int]
[1,2]
Stream can also be constructed from effects not just pure values:
>>>
effect n = print n >> return n
>>>
stream = effect 1 `consM` effect 2 `consM` nil
>>>
Stream.fold Fold.toList $ StreamK.toStream stream
1 2 [1,2]
Synopsis
- data StreamK (m :: Type -> Type) a
- nil :: forall (m :: Type -> Type) a. StreamK m a
- nilM :: Applicative m => m b -> StreamK m a
- cons :: forall a (m :: Type -> Type). a -> StreamK m a -> StreamK m a
- consM :: Monad m => m a -> StreamK m a -> StreamK m a
- fromPure :: forall a (m :: Type -> Type). a -> StreamK m a
- fromEffect :: Monad m => m a -> StreamK m a
- fromStream :: forall (m :: Type -> Type) a. Monad m => Stream m a -> StreamK m a
- toStream :: forall (m :: Type -> Type) a. Applicative m => StreamK m a -> Stream m a
- fromFoldable :: forall f a (m :: Type -> Type). Foldable f => f a -> StreamK m a
- toList :: Monad m => StreamK m a -> m [a]
- uncons :: Applicative m => StreamK m a -> m (Maybe (a, StreamK m a))
- drain :: Monad m => StreamK m a -> m ()
- toParserK :: forall (m :: Type -> Type) a b. Monad m => Parser a m b -> ParserK a m b
- parse :: Monad m => ParserK a m b -> StreamK m a -> m (Either ParseError b)
- parseBreak :: Monad m => ParserK a m b -> StreamK m a -> m (Either ParseError b, StreamK m a)
- parsePos :: Monad m => ParserK a m b -> StreamK m a -> m (Either ParseErrorPos b)
- parseBreakPos :: Monad m => ParserK a m b -> StreamK m a -> m (Either ParseErrorPos b, StreamK m a)
- mapM :: Monad m => (a -> m b) -> StreamK m a -> StreamK m b
- dropWhile :: forall a (m :: Type -> Type). (a -> Bool) -> StreamK m a -> StreamK m a
- take :: forall (m :: Type -> Type) a. Int -> StreamK m a -> StreamK m a
- filter :: forall a (m :: Type -> Type). (a -> Bool) -> StreamK m a -> StreamK m a
- append :: forall (m :: Type -> Type) a. StreamK m a -> StreamK m a -> StreamK m a
- interleave :: forall (m :: Type -> Type) a. StreamK m a -> StreamK m a -> StreamK m a
- mergeBy :: forall a (m :: Type -> Type). (a -> a -> Ordering) -> StreamK m a -> StreamK m a -> StreamK m a
- mergeByM :: Monad m => (a -> a -> m Ordering) -> StreamK m a -> StreamK m a -> StreamK m a
- zipWith :: forall (m :: Type -> Type) a b c. Monad m => (a -> b -> c) -> StreamK m a -> StreamK m b -> StreamK m c
- zipWithM :: Monad m => (a -> b -> m c) -> StreamK m a -> StreamK m b -> StreamK m c
- crossWith :: forall (m :: Type -> Type) a b c. Monad m => (a -> b -> c) -> StreamK m a -> StreamK m b -> StreamK m c
- concatEffect :: Monad m => m (StreamK m a) -> StreamK m a
- concatMap :: forall a (m :: Type -> Type) b. (a -> StreamK m b) -> StreamK m a -> StreamK m b
- bfsConcatMap :: forall a (m :: Type -> Type) b. (a -> StreamK m b) -> StreamK m a -> StreamK m b
- fairConcatMap :: forall a (m :: Type -> Type) b. (a -> StreamK m b) -> StreamK m a -> StreamK m b
- concatMapWith :: forall (m :: Type -> Type) b a. (StreamK m b -> StreamK m b -> StreamK m b) -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
- concatFor :: forall (m :: Type -> Type) a b. StreamK m a -> (a -> StreamK m b) -> StreamK m b
- bfsConcatFor :: forall (m :: Type -> Type) a b. StreamK m a -> (a -> StreamK m b) -> StreamK m b
- fairConcatFor :: forall (m :: Type -> Type) a b. StreamK m a -> (a -> StreamK m b) -> StreamK m b
- concatForM :: Monad m => StreamK m a -> (a -> m (StreamK m b)) -> StreamK m b
- bfsConcatForM :: Monad m => StreamK m a -> (a -> m (StreamK m b)) -> StreamK m b
- fairConcatForM :: Monad m => StreamK m a -> (a -> m (StreamK m b)) -> StreamK m b
- mergeMapWith :: forall (m :: Type -> Type) b a. (StreamK m b -> StreamK m b -> StreamK m b) -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
- reverse :: forall (m :: Type -> Type) a. StreamK m a -> StreamK m a
- sortBy :: forall (m :: Type -> Type) a. Monad m => (a -> a -> Ordering) -> StreamK m a -> StreamK m a
- handle :: (MonadCatch m, Exception e) => (e -> m (StreamK m a)) -> StreamK m a -> StreamK m a
- bracketIO :: forall (m :: Type -> Type) b c a. (MonadIO m, MonadCatch m) => IO b -> (b -> IO c) -> (b -> StreamK m a) -> StreamK m a
- parseBreakChunks :: (Monad m, Unbox a) => ParserK (Array a) m b -> StreamK m (Array a) -> m (Either ParseError b, StreamK m (Array a))
- parseChunks :: (Monad m, Unbox a) => ParserK (Array a) m b -> StreamK m (Array a) -> m (Either ParseError b)
Setup
To execute the code examples provided in this module in ghci, please run the following commands first.
>>>
:m
>>>
import Control.Concurrent (threadDelay)
>>>
import Data.Function (fix, (&))
>>>
import Data.Semigroup (cycle1)
>>>
import Streamly.Data.StreamK (StreamK)
>>>
import qualified Streamly.Data.Fold as Fold
>>>
import qualified Streamly.Data.Parser as Parser
>>>
import qualified Streamly.Data.Stream as Stream
>>>
import qualified Streamly.Data.StreamK as StreamK
>>>
import qualified Streamly.FileSystem.DirIO as Dir
>>>
mk = StreamK.fromStream . Stream.fromList
>>>
un = Stream.toList . StreamK.toStream
>>>
effect n = print n >> return n
For APIs that have not been released yet.
>>>
import qualified Streamly.Internal.FileSystem.Path as Path
>>>
import qualified Streamly.Internal.Data.StreamK as StreamK
>>>
import qualified Streamly.Internal.FileSystem.DirIO as Dir
Type
data StreamK (m :: Type -> Type) a Source #
Instances
Construction
Primitives
Primitives to construct a stream from pure values or monadic actions. All other stream construction and generation combinators described later can be expressed in terms of these primitives. However, the special versions provided in this module can be much more efficient in some cases. Users can create custom combinators using these primitives.
nil :: forall (m :: Type -> Type) a. StreamK m a Source #
A stream that terminates without producing any output or side effect.
>>>
Stream.fold Fold.toList (StreamK.toStream StreamK.nil)
[]
nilM :: Applicative m => m b -> StreamK m a Source #
A stream that terminates without producing any output, but produces a side effect.
>>>
Stream.fold Fold.toList (StreamK.toStream (StreamK.nilM (print "nil")))
"nil" []
Pre-release
cons :: forall a (m :: Type -> Type). a -> StreamK m a -> StreamK m a infixr 5 Source #
A right associative prepend operation to add a pure value at the head of an existing stream:
>>>
s = 1 `StreamK.cons` 2 `StreamK.cons` 3 `StreamK.cons` StreamK.nil
>>>
Stream.fold Fold.toList (StreamK.toStream s)
[1,2,3]
Unlike Streamly.Data.Stream cons StreamK cons can be used recursively:
>>>
repeat x = let xs = StreamK.cons x xs in xs
>>>
fromFoldable = Prelude.foldr StreamK.cons StreamK.nil
cons is same as the following but more efficient:
>>>
cons x xs = return x `StreamK.consM` xs
consM :: Monad m => m a -> StreamK m a -> StreamK m a infixr 5 Source #
A right associative prepend operation to add an effectful value at the head of an existing stream::
>>>
s = putStrLn "hello" `StreamK.consM` putStrLn "world" `StreamK.consM` StreamK.nil
>>>
Stream.fold Fold.drain (StreamK.toStream s)
hello world
It can be used efficiently with foldr
:
>>>
fromFoldableM = Prelude.foldr StreamK.consM StreamK.nil
Same as the following but more efficient:
>>>
consM x xs = StreamK.fromEffect x `StreamK.append` xs
From Values
fromEffect :: Monad m => m a -> StreamK m a Source #
From Stream
Please note that Stream
type does not observe any exceptions from
the consumer of the stream whereas StreamK
does.
From Containers
fromFoldable :: forall f a (m :: Type -> Type). Foldable f => f a -> StreamK m a Source #
>>>
fromFoldable = Prelude.foldr StreamK.cons StreamK.nil
Construct a stream from a Foldable
containing pure values:
To Containers
Elimination
Primitives
Parsing
toParserK :: forall (m :: Type -> Type) a b. Monad m => Parser a m b -> ParserK a m b Source #
Convert a Parser
to ParserK
.
Pre-release
parse :: Monad m => ParserK a m b -> StreamK m a -> m (Either ParseError b) Source #
Run a ParserK
over a StreamK
. Please use parseChunks
where possible,
for better performance.
parseBreak :: Monad m => ParserK a m b -> StreamK m a -> m (Either ParseError b, StreamK m a) Source #
Similar to parseBreak
but works on singular elements.
parsePos :: Monad m => ParserK a m b -> StreamK m a -> m (Either ParseErrorPos b) Source #
Like parse
but includes stream position information in the error
messages.
parseBreakPos :: Monad m => ParserK a m b -> StreamK m a -> m (Either ParseErrorPos b, StreamK m a) Source #
Like parseBreak
but includes stream position information in the error
messages.
Transformation
Combining Two Streams
Unlike the operations in Streamly.Data.Stream, these operations can
be used to dynamically compose large number of streams e.g. using the
concatMapWith
and mergeMapWith
operations. They have a linear O(n)
time complexity wrt to the number of streams being composed.
Appending
append :: forall (m :: Type -> Type) a. StreamK m a -> StreamK m a -> StreamK m a infixr 6 Source #
Unlike the fused Streamly.Data.Stream append, StreamK append can be used at scale, recursively, with linear performance:
>>>
cycle xs = let ys = xs `StreamK.append` ys in ys
concatMapWith
append
(same as concatMap) flattens a stream of streams in a
depth-first manner i.e. it yields each stream fully and then the next and so
on. Given a stream of three streams:
1. [1,2,3] 2. [4,5,6] 3. [7,8,9]
The resulting stream will be [1,2,3,4,5,6,7,8,9]
.
Best used in a right associative manner.
Interleaving
interleave :: forall (m :: Type -> Type) a. StreamK m a -> StreamK m a -> StreamK m a infixr 6 Source #
Interleave two streams fairly, yielding one item from each in a round-robin fashion:
>>>
un $ StreamK.interleave (mk [1,3,5]) (mk [2,4,6])
[1,2,3,4,5,6]>>>
un $ StreamK.interleave (mk [1,3]) (mk [2,4,6])
[1,2,3,4,6]>>>
un $ StreamK.interleave (mk []) (mk [2,4,6])
[2,4,6]
interleave
is right associative when used as an infix operator.
>>>
un $ mk [1,2,3] `StreamK.interleave` mk [4,5,6] `StreamK.interleave` mk [7,8,9]
[1,4,2,7,3,5,8,6,9]
Because of right association, the first stream yields as many items as the next two streams combined.
Be careful when refactoring code involving a chain of three or more
interleave
operations as it is not associative i.e. right associated code
may not produce the same result as left associated. This is a direct
consequence of the disbalance of scheduling in the previous example. If left
associated the above example would produce:
>>>
un $ (mk [1,2,3] `StreamK.interleave` mk [4,5,6]) `StreamK.interleave` mk [7,8,9]
[1,7,4,8,2,9,5,3,6]
Note: Use concatMap based interleaving instead of the binary operator to interleave more than two streams to avoid associativity issues.
concatMapWith
interleave
flattens a stream of streams using interleave
in a right associative manner. Given a stream of three streams:
1. [1,2,3] 2. [4,5,6] 3. [7,8,9]
The resulting sequence is [1,4,2,7,3,5,8,6,9]
.
For this reason, the right associated flattening with interleave
can work
with infinite number of streams without opening too many streams at the same
time. Each stream is consumed twice as much as the next one; if we are
combining an infinite number of streams of size n
then at most log n
streams will be opened at any given time, because the first stream will
finish by the time the stream after log n
th stream is opened.
Compare with bfsConcatMap
and mergeMapWith
interleave
.
For interleaving many streams, the best way is to use bfsConcatMap
.
See also the fused version interleave
.
Merging
mergeBy :: forall a (m :: Type -> Type). (a -> a -> Ordering) -> StreamK m a -> StreamK m a -> StreamK m a Source #
Merging of n
streams can be performed by combining the streams pair
wise using mergeMapWith
to give O(n * log n) time complexity. If used
with concatMapWith
it will have O(n^2) performance.
Zipping
zipWith :: forall (m :: Type -> Type) a b c. Monad m => (a -> b -> c) -> StreamK m a -> StreamK m b -> StreamK m c Source #
Zipping of n
streams can be performed by combining the streams pair
wise using mergeMapWith
with O(n * log n) time complexity. If used
with concatMapWith
it will have O(n^2) performance.
Cross Product
crossWith :: forall (m :: Type -> Type) a b c. Monad m => (a -> b -> c) -> StreamK m a -> StreamK m b -> StreamK m c Source #
Definition:
>>>
crossWith f m1 m2 = fmap f m1 `StreamK.crossApply` m2
Note that the second stream is evaluated multiple times.
Stream of streams
Some useful idioms:
>>>
concatFoldableWith f = Prelude.foldr f StreamK.nil
>>>
concatMapFoldableWith f g = Prelude.foldr (f . g) StreamK.nil
>>>
concatForFoldableWith f xs g = Prelude.foldr (f . g) StreamK.nil xs
concatMap :: forall a (m :: Type -> Type) b. (a -> StreamK m b) -> StreamK m a -> StreamK m b Source #
If total iterations are kept the same, each increase in the nesting level increases the cost by roughly 1.5 times.
bfsConcatMap :: forall a (m :: Type -> Type) b. (a -> StreamK m b) -> StreamK m a -> StreamK m b Source #
See bfsConcatFor
for detailed documentation.
>>>
bfsConcatMap = flip StreamK.bfsConcatFor
fairConcatMap :: forall a (m :: Type -> Type) b. (a -> StreamK m b) -> StreamK m a -> StreamK m b Source #
See fairConcatFor
for detailed documentation.
>>>
fairConcatMap = flip StreamK.fairConcatFor
concatMapWith :: forall (m :: Type -> Type) b a. (StreamK m b -> StreamK m b -> StreamK m b) -> (a -> StreamK m b) -> StreamK m a -> StreamK m b Source #
Perform a concatMap
using a specified concat strategy. The first
argument specifies a merge or concat function that is used to merge the
streams generated by the map function.
For example, interleaving n streams in a left biased manner:
>>>
lists = mk [[1,5],[2,6],[3,7],[4,8]]
>>>
un $ StreamK.concatMapWith StreamK.interleave mk lists
[1,2,5,3,6,4,7,8]
For a fair interleaving example see bfsConcatMap
and mergeMapWith
.
concatFor :: forall (m :: Type -> Type) a b. StreamK m a -> (a -> StreamK m b) -> StreamK m b Source #
Map a stream generating function on each element of a stream and
concatenate the results. This is the same as the bind function of the monad
instance. It is just a flipped concatMap
but more convenient to use for
nested use case, feels like an imperative for
loop.
>>>
concatFor = flip StreamK.concatMap
A concatenating for
loop:
>>>
:{
un $ StreamK.concatFor (mk [1,2,3]) $ \x -> StreamK.fromPure x :} [1,2,3]
Nested concatenating for
loops:
>>>
:{
un $ StreamK.concatFor (mk [1,2,3]) $ \x -> StreamK.concatFor (mk [4,5,6]) $ \y -> StreamK.fromPure (x, y) :} [(1,4),(1,5),(1,6),(2,4),(2,5),(2,6),(3,4),(3,5),(3,6)]
bfsConcatFor :: forall (m :: Type -> Type) a b. StreamK m a -> (a -> StreamK m b) -> StreamK m b Source #
While concatFor
flattens a stream of streams in a depth first manner,
bfsConcatFor
flattens it in a breadth-first manner. It yields one item
from the first stream, then one item from the next stream and so on. In
nested loops it has the effect of prioritizing the new outer loop iteration
over the inner loops, thus inverting the looping.
Given a stream of three streams:
1. [1,2,3] 2. [4,5,6] 3. [7,8,9]
The resulting stream is (1,4,7),(2,5,8),(3,6,9)
. The parenthesis are added
to emphasize the iterations.
For example:
>>>
stream = mk [[1,2,3],[4,5,6],[7,8,9]]
>>>
:{
un $ StreamK.bfsConcatFor stream $ \x -> StreamK.fromStream $ Stream.fromList x :} [1,4,7,2,5,8,3,6,9]
Compare with concatForWith
interleave
which explores the depth
exponentially more compared to the breadth, such that each stream yields
twice as many items compared to the next stream.
See also the equivalent fused version unfoldEachInterleave
.
fairConcatFor :: forall (m :: Type -> Type) a b. StreamK m a -> (a -> StreamK m b) -> StreamK m b Source #
fairConcatFor
is like concatFor
but traverses the depth and breadth of
nesting equally. Therefore, the outer and the inner loops in a nested loop
get equal priority. It can be used to nest infinite streams without starving
outer streams due to inner ones.
Given a stream of three streams:
1. [1,2,3] 2. [4,5,6] 3. [7,8,9]
Here, outer loop is the stream of streams and the inner loops are the
individual streams. The traversal sweeps the diagonals in the above grid to
give equal chance to outer and inner loops. The resulting stream is
(1),(2,4),(3,5,7),(6,8),(9)
, diagonals are parenthesized for emphasis.
Looping
A single stream case is equivalent to concatFor
:
>>>
un $ StreamK.fairConcatFor (mk [1,2]) $ \x -> StreamK.fromPure x
[1,2]
Fair Nested Looping
Multiple streams nest like for
loops. The result is a cross product of the
streams. However, the ordering of the results of the cross product is such
that each stream gets consumed equally. In other words, inner iterations of
a nested loop get the same priority as the outer iterations. Inner
iterations do not finish completely before the outer iterations start.
>>>
:{
un $ do StreamK.fairConcatFor (mk [1,2,3]) $ \x -> StreamK.fairConcatFor (mk [4,5,6]) $ \y -> StreamK.fromPure (x, y) :} [(1,4),(1,5),(2,4),(1,6),(2,5),(3,4),(2,6),(3,5),(3,6)]
Nesting Infinite Streams
Example with infinite streams. Print all pairs in the cross product with sum less than a specified number.
>>>
:{
Stream.toList $ Stream.takeWhile (\(x,y) -> x + y < 6) $ StreamK.toStream $ StreamK.fairConcatFor (mk [1..]) $ \x -> StreamK.fairConcatFor (mk [1..]) $ \y -> StreamK.fromPure (x, y) :} [(1,1),(1,2),(2,1),(1,3),(2,2),(3,1),(1,4),(2,3),(3,2),(4,1)]
How the nesting works?
If we look at the cross product of [1,2,3], [4,5,6], the streams being
combined using fairConcatFor
are the following sequential loop iterations:
(1,4) (1,5) (1,6) -- first iteration of the outer loop (2,4) (2,5) (2,6) -- second iteration of the outer loop (3,4) (3,5) (3,6) -- third iteration of the outer loop
The result is a triangular or diagonal traversal of these iterations:
[(1,4),(1,5),(2,4),(1,6),(2,5),(3,4),(2,6),(3,5),(3,6)]
Non-Termination Cases
If one of the two interleaved streams does not produce an output at all and continues forever then the other stream will never get scheduled. This is because a stream is unscheduled only after it produces an output. This can lead to non-terminating programs, an example is provided below.
>>>
:{
oddsIf x = mk (if x then [1,3..] else [2,4..]) filterEven x = if even x then StreamK.fromPure x else StreamK.nil :}
>>>
:{
evens = StreamK.fairConcatFor (mk [True,False]) $ \r -> StreamK.concatFor (oddsIf r) filterEven :}
The evens
function does not terminate because, when r is True, the nested
concatFor
is a non-productive infinite loop, therefore, the outer loop
never gets a chance to generate the False
value.
But the following refactoring of the above code works as expected:
>>>
:{
mixed = StreamK.fairConcatFor (mk [True,False]) $ \r -> StreamK.concatFor (oddsIf r) StreamK.fromPure :}
>>>
evens = StreamK.fairConcatFor mixed filterEven
>>>
Stream.toList $ Stream.take 3 $ StreamK.toStream evens
[2,4,6]
This works because in mixed
both the streams being interleaved are
productive.
Care should be taken how you write your program, keep in mind the scheduling implications. To avoid such scheduling problems in serial interleaving, you can use concurrent interleaving instead i.e. parFairConcatFor. Due to concurrent threads the other branch will make progress even if one is an infinite loop producing nothing.
Logic Programming
Streamly provides all operations for logic programming. It provides
functionality equivalent to LogicT
type from the logict
package.
The MonadLogic
operations can be implemented using the available stream
operations. For example, uncons
is msplit
, interleave
corresponds to
the interleave
operation of MonadLogic, fairConcatFor
is the
fair bind (>>-
) operation.
Related Operations
concatForWith
interleave
is another way to interleave two serial
streams. In this case, the inner loop iterations get exponentially more
priority over the outer iterations of the nested loop. This is biased
towards the inner loops - this is exactly how the logic-t and list-t
implementation of fair bind works.
concatForM :: Monad m => StreamK m a -> (a -> m (StreamK m b)) -> StreamK m b Source #
Like concatFor
but maps an effectful function. It allows conveniently
mixing monadic effects with streams.
>>>
import Control.Monad.IO.Class (liftIO)
>>>
:{
un $ StreamK.concatForM (mk [1,2,3]) $ \x -> do liftIO $ putStrLn (show x) pure $ StreamK.fromPure x :} 1 2 3 [1,2,3]
Nested concatentating for
loops:
>>>
:{
un $ StreamK.concatForM (mk [1,2,3]) $ \x -> do liftIO $ putStrLn (show x) pure $ StreamK.concatFor (mk [4,5,6]) $ \y -> StreamK.fromPure (x, y) :} 1 2 3 [(1,4),(1,5),(1,6),(2,4),(2,5),(2,6),(3,4),(3,5),(3,6)]
bfsConcatForM :: Monad m => StreamK m a -> (a -> m (StreamK m b)) -> StreamK m b Source #
Like bfsConcatFor
but maps a monadic function.
fairConcatForM :: Monad m => StreamK m a -> (a -> m (StreamK m b)) -> StreamK m b Source #
Like fairConcatFor
but maps a monadic function.
mergeMapWith :: forall (m :: Type -> Type) b a. (StreamK m b -> StreamK m b -> StreamK m b) -> (a -> StreamK m b) -> StreamK m a -> StreamK m b Source #
Combine streams in pairs using a binary combinator, the resulting streams are then combined again in pairs recursively until we get to a single combined stream. The composition would thus form a binary tree.
For example, 'mergeMapWith interleave' gives the following result:
>>>
lists = mk [[1,2,3],[4,5,6],[7,8,9],[10,11,12]]
>>>
un $ StreamK.mergeMapWith StreamK.interleave mk lists
[1,7,4,10,2,8,5,11,3,9,6,12]
The above example is equivalent to the following pairings:
>>>
pair1 = mk [1,2,3] `StreamK.interleave` mk [4,5,6]
>>>
pair2 = mk [7,8,9] `StreamK.interleave` mk [10,11,12]
>>>
un $ pair1 `StreamK.interleave` pair2
[1,7,4,10,2,8,5,11,3,9,6,12]
If the number of streams being combined is not a power of 2, the binary tree composed by mergeMapWith is not balanced, therefore, the output may not look fairly interleaved, it will be biased towards the unpaired streams:
>>>
lists = mk [[1,2,3],[4,5,6],[7,8,9]]
>>>
un $ StreamK.mergeMapWith StreamK.interleave mk lists
[1,7,4,8,2,9,5,3,6]
An efficient merge sort can be implemented by using mergeBy
as the
combining function:
>>>
combine = StreamK.mergeBy compare
>>>
un $ StreamK.mergeMapWith combine StreamK.fromPure (mk [5,1,7,9,2])
[1,2,5,7,9]
Caution: the stream of streams must be finite
Pre-release
Buffered Operations
sortBy :: forall (m :: Type -> Type) a. Monad m => (a -> a -> Ordering) -> StreamK m a -> StreamK m a Source #
Sort the input stream using a supplied comparison function.
Sorting can be achieved by simply:
>>>
sortBy cmp = StreamK.mergeMapWith (StreamK.mergeBy cmp) StreamK.fromPure
However, this combinator uses a parser to first split the input stream into down and up sorted segments and then merges them to optimize sorting when pre-sorted sequences exist in the input stream.
O(n) space
Exceptions
Please note that Stream
type does not observe any exceptions from
the consumer of the stream whereas StreamK
does.
handle :: (MonadCatch m, Exception e) => (e -> m (StreamK m a)) -> StreamK m a -> StreamK m a Source #
Like Streamly.Data.Stream.handle
but with one
significant difference, this function observes exceptions from the consumer
of the stream as well.
You can also convert StreamK
to Stream
and use exception handling from
Stream
module:
>>>
handle f s = StreamK.fromStream $ Stream.handle (\e -> StreamK.toStream (f e)) (StreamK.toStream s)
Resource Management
Please note that Stream
type does not observe any exceptions from
the consumer of the stream whereas StreamK
does.
bracketIO :: forall (m :: Type -> Type) b c a. (MonadIO m, MonadCatch m) => IO b -> (b -> IO c) -> (b -> StreamK m a) -> StreamK m a Source #
Like Streamly.Data.Stream.bracketIO
but with one
significant difference, this function observes exceptions from the consumer
of the stream as well. Therefore, it cleans up the resource promptly when
the consumer encounters an exception.
You can also convert StreamK
to Stream
and use resource handling from
Stream
module:
>>>
bracketIO bef aft bet = StreamK.fromStream $ Stream.bracketIO bef aft (StreamK.toStream . bet)