streamly-core
Copyright(c) 2017 Composewell Technologies
LicenseBSD3
Maintainerstreamly@composewell.com
Stabilityreleased
PortabilityGHC
Safe HaskellNone
LanguageHaskell2010

Streamly.Data.StreamK

Description

Streams represented as chains of function calls using Continuation Passing Style (CPS), suitable for dynamically and recursively composing potentially large number of streams. The K in StreamK stands for Kontinuation.

In addition to the combinators in this module, you can use operations from Streamly.Data.Stream for StreamK as well by converting StreamK to Stream (toStream), and vice-versa (fromStream). Please refer to Streamly.Internal.Data.StreamK for more functions that have not yet been released.

For documentation see the corresponding combinators in Streamly.Data.Stream. Documentation has been omitted in this module unless there is a difference worth mentioning or if the combinator does not exist in Streamly.Data.Stream.

Fused vs CPS Streams

Unlike the statically fused operations in Streamly.Data.Stream, StreamK operations are less efficient, involving a function call overhead for each element, but they exhibit linear O(n) time complexity wrt to the number of stream compositions. Therefore, they are suitable for dynamically composing streams e.g. appending potentially infinite streams in recursive loops. While fused streams can be used efficiently to process elements as small as a single byte, CPS streams are typically used on bigger chunks of data to avoid the larger overhead per element.

Overview

StreamK can be constructed like lists, except that they use nil instead of '[]' and cons instead of :.

>>> import Streamly.Data.StreamK (StreamK, cons, consM, nil)

cons constructs a stream from pure values:

>>> stream = 1 `cons` 2 `cons` nil :: StreamK IO Int

Operations from Streamly.Data.Stream can be used for StreamK as well by converting StreamK to Stream (toStream), and vice-versa (fromStream).

>>> Stream.fold Fold.toList $ StreamK.toStream stream -- IO [Int]
[1,2]

Stream can also be constructed from effects not just pure values:

>>> effect n = print n >> return n
>>> stream = effect 1 `consM` effect 2 `consM` nil
>>> Stream.fold Fold.toList $ StreamK.toStream stream
1
2
[1,2]
Synopsis

Setup

To execute the code examples provided in this module in ghci, please run the following commands first.

>>> :m
>>> import Control.Concurrent (threadDelay)
>>> import Data.Function (fix, (&))
>>> import Data.Semigroup (cycle1)
>>> import Streamly.Data.StreamK (StreamK)
>>> import qualified Streamly.Data.Fold as Fold
>>> import qualified Streamly.Data.Parser as Parser
>>> import qualified Streamly.Data.Stream as Stream
>>> import qualified Streamly.Data.StreamK as StreamK
>>> import qualified Streamly.FileSystem.DirIO as Dir
>>> mk = StreamK.fromStream . Stream.fromList
>>> un = Stream.toList . StreamK.toStream
>>> effect n = print n >> return n

For APIs that have not been released yet.

>>> import qualified Streamly.Internal.FileSystem.Path as Path
>>> import qualified Streamly.Internal.Data.StreamK as StreamK
>>> import qualified Streamly.Internal.FileSystem.DirIO as Dir

Type

data StreamK (m :: Type -> Type) a Source #

Instances

Instances details
(Foldable m, Monad m) => Foldable (StreamK m) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

fold :: Monoid m0 => StreamK m m0 -> m0 #

foldMap :: Monoid m0 => (a -> m0) -> StreamK m a -> m0 #

foldMap' :: Monoid m0 => (a -> m0) -> StreamK m a -> m0 #

foldr :: (a -> b -> b) -> b -> StreamK m a -> b #

foldr' :: (a -> b -> b) -> b -> StreamK m a -> b #

foldl :: (b -> a -> b) -> b -> StreamK m a -> b #

foldl' :: (b -> a -> b) -> b -> StreamK m a -> b #

foldr1 :: (a -> a -> a) -> StreamK m a -> a #

foldl1 :: (a -> a -> a) -> StreamK m a -> a #

toList :: StreamK m a -> [a] #

null :: StreamK m a -> Bool #

length :: StreamK m a -> Int #

elem :: Eq a => a -> StreamK m a -> Bool #

maximum :: Ord a => StreamK m a -> a #

minimum :: Ord a => StreamK m a -> a #

sum :: Num a => StreamK m a -> a #

product :: Num a => StreamK m a -> a #

Traversable (StreamK Identity) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

traverse :: Applicative f => (a -> f b) -> StreamK Identity a -> f (StreamK Identity b) #

sequenceA :: Applicative f => StreamK Identity (f a) -> f (StreamK Identity a) #

mapM :: Monad m => (a -> m b) -> StreamK Identity a -> m (StreamK Identity b) #

sequence :: Monad m => StreamK Identity (m a) -> m (StreamK Identity a) #

Monad m => Functor (StreamK m) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

fmap :: (a -> b) -> StreamK m a -> StreamK m b #

(<$) :: a -> StreamK m b -> StreamK m a #

a ~ Char => IsString (StreamK Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Monoid (StreamK m a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

mempty :: StreamK m a #

mappend :: StreamK m a -> StreamK m a -> StreamK m a #

mconcat :: [StreamK m a] -> StreamK m a #

Semigroup (StreamK m a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

(<>) :: StreamK m a -> StreamK m a -> StreamK m a #

sconcat :: NonEmpty (StreamK m a) -> StreamK m a #

stimes :: Integral b => b -> StreamK m a -> StreamK m a #

IsList (StreamK Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Associated Types

type Item (StreamK Identity a) 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

type Item (StreamK Identity a) = a
Read a => Read (StreamK Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Show a => Show (StreamK Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

type Item (StreamK Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

type Item (StreamK Identity a) = a

Construction

Primitives

Primitives to construct a stream from pure values or monadic actions. All other stream construction and generation combinators described later can be expressed in terms of these primitives. However, the special versions provided in this module can be much more efficient in some cases. Users can create custom combinators using these primitives.

nil :: forall (m :: Type -> Type) a. StreamK m a Source #

A stream that terminates without producing any output or side effect.

>>> Stream.fold Fold.toList (StreamK.toStream StreamK.nil)
[]

nilM :: Applicative m => m b -> StreamK m a Source #

A stream that terminates without producing any output, but produces a side effect.

>>> Stream.fold Fold.toList (StreamK.toStream (StreamK.nilM (print "nil")))
"nil"
[]

Pre-release

cons :: forall a (m :: Type -> Type). a -> StreamK m a -> StreamK m a infixr 5 Source #

A right associative prepend operation to add a pure value at the head of an existing stream:

>>> s = 1 `StreamK.cons` 2 `StreamK.cons` 3 `StreamK.cons` StreamK.nil
>>> Stream.fold Fold.toList (StreamK.toStream s)
[1,2,3]

Unlike Streamly.Data.Stream cons StreamK cons can be used recursively:

>>> repeat x = let xs = StreamK.cons x xs in xs
>>> fromFoldable = Prelude.foldr StreamK.cons StreamK.nil

cons is same as the following but more efficient:

>>> cons x xs = return x `StreamK.consM` xs

consM :: Monad m => m a -> StreamK m a -> StreamK m a infixr 5 Source #

A right associative prepend operation to add an effectful value at the head of an existing stream::

>>> s = putStrLn "hello" `StreamK.consM` putStrLn "world" `StreamK.consM` StreamK.nil
>>> Stream.fold Fold.drain (StreamK.toStream s)
hello
world

It can be used efficiently with foldr:

>>> fromFoldableM = Prelude.foldr StreamK.consM StreamK.nil

Same as the following but more efficient:

>>> consM x xs = StreamK.fromEffect x `StreamK.append` xs

From Values

fromPure :: forall a (m :: Type -> Type). a -> StreamK m a Source #

fromEffect :: Monad m => m a -> StreamK m a Source #

From Stream

Please note that Stream type does not observe any exceptions from the consumer of the stream whereas StreamK does.

fromStream :: forall (m :: Type -> Type) a. Monad m => Stream m a -> StreamK m a Source #

Convert a fused Stream to StreamK.

For example:

>>> s1 = StreamK.fromStream $ Stream.fromList [1,2]
>>> s2 = StreamK.fromStream $ Stream.fromList [3,4]
>>> Stream.fold Fold.toList $ StreamK.toStream $ s1 `StreamK.append` s2
[1,2,3,4]

toStream :: forall (m :: Type -> Type) a. Applicative m => StreamK m a -> Stream m a Source #

Convert a StreamK to a fused Stream.

From Containers

fromFoldable :: forall f a (m :: Type -> Type). Foldable f => f a -> StreamK m a Source #

>>> fromFoldable = Prelude.foldr StreamK.cons StreamK.nil

Construct a stream from a Foldable containing pure values:

To Containers

toList :: Monad m => StreamK m a -> m [a] Source #

Elimination

Primitives

uncons :: Applicative m => StreamK m a -> m (Maybe (a, StreamK m a)) Source #

drain :: Monad m => StreamK m a -> m () Source #

Parsing

toParserK :: forall (m :: Type -> Type) a b. Monad m => Parser a m b -> ParserK a m b Source #

Convert a Parser to ParserK.

Pre-release

parse :: Monad m => ParserK a m b -> StreamK m a -> m (Either ParseError b) Source #

Run a ParserK over a StreamK. Please use parseChunks where possible, for better performance.

parseBreak :: Monad m => ParserK a m b -> StreamK m a -> m (Either ParseError b, StreamK m a) Source #

Similar to parseBreak but works on singular elements.

parsePos :: Monad m => ParserK a m b -> StreamK m a -> m (Either ParseErrorPos b) Source #

Like parse but includes stream position information in the error messages.

parseBreakPos :: Monad m => ParserK a m b -> StreamK m a -> m (Either ParseErrorPos b, StreamK m a) Source #

Like parseBreak but includes stream position information in the error messages.

Transformation

mapM :: Monad m => (a -> m b) -> StreamK m a -> StreamK m b Source #

dropWhile :: forall a (m :: Type -> Type). (a -> Bool) -> StreamK m a -> StreamK m a Source #

take :: forall (m :: Type -> Type) a. Int -> StreamK m a -> StreamK m a Source #

filter :: forall a (m :: Type -> Type). (a -> Bool) -> StreamK m a -> StreamK m a Source #

Combining Two Streams

Unlike the operations in Streamly.Data.Stream, these operations can be used to dynamically compose large number of streams e.g. using the concatMapWith and mergeMapWith operations. They have a linear O(n) time complexity wrt to the number of streams being composed.

Appending

append :: forall (m :: Type -> Type) a. StreamK m a -> StreamK m a -> StreamK m a infixr 6 Source #

Unlike the fused Streamly.Data.Stream append, StreamK append can be used at scale, recursively, with linear performance:

>>> cycle xs = let ys = xs `StreamK.append` ys in ys

concatMapWith append (same as concatMap) flattens a stream of streams in a depth-first manner i.e. it yields each stream fully and then the next and so on. Given a stream of three streams:

1. [1,2,3]
2. [4,5,6]
3. [7,8,9]

The resulting stream will be [1,2,3,4,5,6,7,8,9].

Best used in a right associative manner.

Interleaving

interleave :: forall (m :: Type -> Type) a. StreamK m a -> StreamK m a -> StreamK m a infixr 6 Source #

Interleave two streams fairly, yielding one item from each in a round-robin fashion:

>>> un $ StreamK.interleave (mk [1,3,5]) (mk [2,4,6])
[1,2,3,4,5,6]
>>> un $ StreamK.interleave (mk [1,3]) (mk [2,4,6])
[1,2,3,4,6]
>>> un $ StreamK.interleave (mk []) (mk [2,4,6])
[2,4,6]

interleave is right associative when used as an infix operator.

>>> un $ mk [1,2,3] `StreamK.interleave` mk [4,5,6] `StreamK.interleave` mk [7,8,9]
[1,4,2,7,3,5,8,6,9]

Because of right association, the first stream yields as many items as the next two streams combined.

Be careful when refactoring code involving a chain of three or more interleave operations as it is not associative i.e. right associated code may not produce the same result as left associated. This is a direct consequence of the disbalance of scheduling in the previous example. If left associated the above example would produce:

>>> un $ (mk [1,2,3] `StreamK.interleave` mk [4,5,6]) `StreamK.interleave` mk [7,8,9]
[1,7,4,8,2,9,5,3,6]

Note: Use concatMap based interleaving instead of the binary operator to interleave more than two streams to avoid associativity issues.

concatMapWith interleave flattens a stream of streams using interleave in a right associative manner. Given a stream of three streams:

1. [1,2,3]
2. [4,5,6]
3. [7,8,9]

The resulting sequence is [1,4,2,7,3,5,8,6,9].

For this reason, the right associated flattening with interleave can work with infinite number of streams without opening too many streams at the same time. Each stream is consumed twice as much as the next one; if we are combining an infinite number of streams of size n then at most log n streams will be opened at any given time, because the first stream will finish by the time the stream after log n th stream is opened.

Compare with bfsConcatMap and mergeMapWith interleave.

For interleaving many streams, the best way is to use bfsConcatMap.

See also the fused version interleave.

Merging

mergeBy :: forall a (m :: Type -> Type). (a -> a -> Ordering) -> StreamK m a -> StreamK m a -> StreamK m a Source #

Merging of n streams can be performed by combining the streams pair wise using mergeMapWith to give O(n * log n) time complexity. If used with concatMapWith it will have O(n^2) performance.

mergeByM :: Monad m => (a -> a -> m Ordering) -> StreamK m a -> StreamK m a -> StreamK m a Source #

Zipping

zipWith :: forall (m :: Type -> Type) a b c. Monad m => (a -> b -> c) -> StreamK m a -> StreamK m b -> StreamK m c Source #

Zipping of n streams can be performed by combining the streams pair wise using mergeMapWith with O(n * log n) time complexity. If used with concatMapWith it will have O(n^2) performance.

zipWithM :: Monad m => (a -> b -> m c) -> StreamK m a -> StreamK m b -> StreamK m c Source #

Cross Product

crossWith :: forall (m :: Type -> Type) a b c. Monad m => (a -> b -> c) -> StreamK m a -> StreamK m b -> StreamK m c Source #

Definition:

>>> crossWith f m1 m2 = fmap f m1 `StreamK.crossApply` m2

Note that the second stream is evaluated multiple times.

Stream of streams

Some useful idioms:

>>> concatFoldableWith f = Prelude.foldr f StreamK.nil
>>> concatMapFoldableWith f g = Prelude.foldr (f . g) StreamK.nil
>>> concatForFoldableWith f xs g = Prelude.foldr (f . g) StreamK.nil xs

concatEffect :: Monad m => m (StreamK m a) -> StreamK m a Source #

concatMap :: forall a (m :: Type -> Type) b. (a -> StreamK m b) -> StreamK m a -> StreamK m b Source #

If total iterations are kept the same, each increase in the nesting level increases the cost by roughly 1.5 times.

bfsConcatMap :: forall a (m :: Type -> Type) b. (a -> StreamK m b) -> StreamK m a -> StreamK m b Source #

See bfsConcatFor for detailed documentation.

>>> bfsConcatMap = flip StreamK.bfsConcatFor

fairConcatMap :: forall a (m :: Type -> Type) b. (a -> StreamK m b) -> StreamK m a -> StreamK m b Source #

See fairConcatFor for detailed documentation.

>>> fairConcatMap = flip StreamK.fairConcatFor

concatMapWith :: forall (m :: Type -> Type) b a. (StreamK m b -> StreamK m b -> StreamK m b) -> (a -> StreamK m b) -> StreamK m a -> StreamK m b Source #

Perform a concatMap using a specified concat strategy. The first argument specifies a merge or concat function that is used to merge the streams generated by the map function.

For example, interleaving n streams in a left biased manner:

>>> lists = mk [[1,5],[2,6],[3,7],[4,8]]
>>> un $ StreamK.concatMapWith StreamK.interleave mk lists
[1,2,5,3,6,4,7,8]

For a fair interleaving example see bfsConcatMap and mergeMapWith.

concatFor :: forall (m :: Type -> Type) a b. StreamK m a -> (a -> StreamK m b) -> StreamK m b Source #

Map a stream generating function on each element of a stream and concatenate the results. This is the same as the bind function of the monad instance. It is just a flipped concatMap but more convenient to use for nested use case, feels like an imperative for loop.

>>> concatFor = flip StreamK.concatMap

A concatenating for loop:

>>> :{
un $
    StreamK.concatFor (mk [1,2,3]) $ \x ->
      StreamK.fromPure x
:}
[1,2,3]

Nested concatenating for loops:

>>> :{
un $
    StreamK.concatFor (mk [1,2,3]) $ \x ->
     StreamK.concatFor (mk [4,5,6]) $ \y ->
      StreamK.fromPure (x, y)
:}
[(1,4),(1,5),(1,6),(2,4),(2,5),(2,6),(3,4),(3,5),(3,6)]

bfsConcatFor :: forall (m :: Type -> Type) a b. StreamK m a -> (a -> StreamK m b) -> StreamK m b Source #

While concatFor flattens a stream of streams in a depth first manner, bfsConcatFor flattens it in a breadth-first manner. It yields one item from the first stream, then one item from the next stream and so on. In nested loops it has the effect of prioritizing the new outer loop iteration over the inner loops, thus inverting the looping. Given a stream of three streams:

1. [1,2,3]
2. [4,5,6]
3. [7,8,9]

The resulting stream is (1,4,7),(2,5,8),(3,6,9). The parenthesis are added to emphasize the iterations.

For example:

>>> stream = mk [[1,2,3],[4,5,6],[7,8,9]]
>>> :{
 un $
     StreamK.bfsConcatFor stream $ \x ->
         StreamK.fromStream $ Stream.fromList x
:}
[1,4,7,2,5,8,3,6,9]

Compare with concatForWith interleave which explores the depth exponentially more compared to the breadth, such that each stream yields twice as many items compared to the next stream.

See also the equivalent fused version unfoldEachInterleave.

fairConcatFor :: forall (m :: Type -> Type) a b. StreamK m a -> (a -> StreamK m b) -> StreamK m b Source #

fairConcatFor is like concatFor but traverses the depth and breadth of nesting equally. Therefore, the outer and the inner loops in a nested loop get equal priority. It can be used to nest infinite streams without starving outer streams due to inner ones.

Given a stream of three streams:

1. [1,2,3]
2. [4,5,6]
3. [7,8,9]

Here, outer loop is the stream of streams and the inner loops are the individual streams. The traversal sweeps the diagonals in the above grid to give equal chance to outer and inner loops. The resulting stream is (1),(2,4),(3,5,7),(6,8),(9), diagonals are parenthesized for emphasis.

Looping

A single stream case is equivalent to concatFor:

>>> un $ StreamK.fairConcatFor (mk [1,2]) $ \x -> StreamK.fromPure x
[1,2]

Fair Nested Looping

Multiple streams nest like for loops. The result is a cross product of the streams. However, the ordering of the results of the cross product is such that each stream gets consumed equally. In other words, inner iterations of a nested loop get the same priority as the outer iterations. Inner iterations do not finish completely before the outer iterations start.

>>> :{
un $ do
    StreamK.fairConcatFor (mk [1,2,3]) $ \x ->
     StreamK.fairConcatFor (mk [4,5,6]) $ \y ->
      StreamK.fromPure (x, y)
:}
[(1,4),(1,5),(2,4),(1,6),(2,5),(3,4),(2,6),(3,5),(3,6)]

Nesting Infinite Streams

Example with infinite streams. Print all pairs in the cross product with sum less than a specified number.

>>> :{
Stream.toList
 $ Stream.takeWhile (\(x,y) -> x + y < 6)
 $ StreamK.toStream
 $ StreamK.fairConcatFor (mk [1..]) $ \x ->
    StreamK.fairConcatFor (mk [1..]) $ \y ->
     StreamK.fromPure (x, y)
:}
[(1,1),(1,2),(2,1),(1,3),(2,2),(3,1),(1,4),(2,3),(3,2),(4,1)]

How the nesting works?

If we look at the cross product of [1,2,3], [4,5,6], the streams being combined using fairConcatFor are the following sequential loop iterations:

(1,4) (1,5) (1,6) -- first iteration of the outer loop
(2,4) (2,5) (2,6) -- second iteration of the outer loop
(3,4) (3,5) (3,6) -- third iteration of the outer loop

The result is a triangular or diagonal traversal of these iterations:

[(1,4),(1,5),(2,4),(1,6),(2,5),(3,4),(2,6),(3,5),(3,6)]

Non-Termination Cases

If one of the two interleaved streams does not produce an output at all and continues forever then the other stream will never get scheduled. This is because a stream is unscheduled only after it produces an output. This can lead to non-terminating programs, an example is provided below.

>>> :{
oddsIf x = mk (if x then [1,3..] else [2,4..])
filterEven x = if even x then StreamK.fromPure x else StreamK.nil
:}
>>> :{
evens =
    StreamK.fairConcatFor (mk [True,False]) $ \r ->
     StreamK.concatFor (oddsIf r) filterEven
:}

The evens function does not terminate because, when r is True, the nested concatFor is a non-productive infinite loop, therefore, the outer loop never gets a chance to generate the False value.

But the following refactoring of the above code works as expected:

>>> :{
mixed =
     StreamK.fairConcatFor (mk [True,False]) $ \r ->
         StreamK.concatFor (oddsIf r) StreamK.fromPure
:}
>>> evens = StreamK.fairConcatFor mixed filterEven
>>> Stream.toList $ Stream.take 3 $ StreamK.toStream evens
[2,4,6]

This works because in mixed both the streams being interleaved are productive.

Care should be taken how you write your program, keep in mind the scheduling implications. To avoid such scheduling problems in serial interleaving, you can use concurrent interleaving instead i.e. parFairConcatFor. Due to concurrent threads the other branch will make progress even if one is an infinite loop producing nothing.

Logic Programming

Streamly provides all operations for logic programming. It provides functionality equivalent to LogicT type from the logict package. The MonadLogic operations can be implemented using the available stream operations. For example, uncons is msplit, interleave corresponds to the interleave operation of MonadLogic, fairConcatFor is the fair bind (>>-) operation.

Related Operations

concatForWith interleave is another way to interleave two serial streams. In this case, the inner loop iterations get exponentially more priority over the outer iterations of the nested loop. This is biased towards the inner loops - this is exactly how the logic-t and list-t implementation of fair bind works.

concatForM :: Monad m => StreamK m a -> (a -> m (StreamK m b)) -> StreamK m b Source #

Like concatFor but maps an effectful function. It allows conveniently mixing monadic effects with streams.

>>> import Control.Monad.IO.Class (liftIO)
>>> :{
un $
    StreamK.concatForM (mk [1,2,3]) $ \x -> do
      liftIO $ putStrLn (show x)
      pure $ StreamK.fromPure x
:}
1
2
3
[1,2,3]

Nested concatentating for loops:

>>> :{
un $
    StreamK.concatForM (mk [1,2,3]) $ \x -> do
      liftIO $ putStrLn (show x)
      pure $ StreamK.concatFor (mk [4,5,6]) $ \y ->
        StreamK.fromPure (x, y)
:}
1
2
3
[(1,4),(1,5),(1,6),(2,4),(2,5),(2,6),(3,4),(3,5),(3,6)]

bfsConcatForM :: Monad m => StreamK m a -> (a -> m (StreamK m b)) -> StreamK m b Source #

Like bfsConcatFor but maps a monadic function.

fairConcatForM :: Monad m => StreamK m a -> (a -> m (StreamK m b)) -> StreamK m b Source #

Like fairConcatFor but maps a monadic function.

mergeMapWith :: forall (m :: Type -> Type) b a. (StreamK m b -> StreamK m b -> StreamK m b) -> (a -> StreamK m b) -> StreamK m a -> StreamK m b Source #

Combine streams in pairs using a binary combinator, the resulting streams are then combined again in pairs recursively until we get to a single combined stream. The composition would thus form a binary tree.

For example, 'mergeMapWith interleave' gives the following result:

>>> lists = mk [[1,2,3],[4,5,6],[7,8,9],[10,11,12]]
>>> un $ StreamK.mergeMapWith StreamK.interleave mk lists
[1,7,4,10,2,8,5,11,3,9,6,12]

The above example is equivalent to the following pairings:

>>> pair1 = mk [1,2,3] `StreamK.interleave` mk [4,5,6]
>>> pair2 = mk [7,8,9] `StreamK.interleave` mk [10,11,12]
>>> un $ pair1 `StreamK.interleave` pair2
[1,7,4,10,2,8,5,11,3,9,6,12]

If the number of streams being combined is not a power of 2, the binary tree composed by mergeMapWith is not balanced, therefore, the output may not look fairly interleaved, it will be biased towards the unpaired streams:

>>> lists = mk [[1,2,3],[4,5,6],[7,8,9]]
>>> un $ StreamK.mergeMapWith StreamK.interleave mk lists
[1,7,4,8,2,9,5,3,6]

An efficient merge sort can be implemented by using mergeBy as the combining function:

>>> combine = StreamK.mergeBy compare
>>> un $ StreamK.mergeMapWith combine StreamK.fromPure (mk [5,1,7,9,2])
[1,2,5,7,9]

Caution: the stream of streams must be finite

Pre-release

Buffered Operations

reverse :: forall (m :: Type -> Type) a. StreamK m a -> StreamK m a Source #

sortBy :: forall (m :: Type -> Type) a. Monad m => (a -> a -> Ordering) -> StreamK m a -> StreamK m a Source #

Sort the input stream using a supplied comparison function.

Sorting can be achieved by simply:

>>> sortBy cmp = StreamK.mergeMapWith (StreamK.mergeBy cmp) StreamK.fromPure

However, this combinator uses a parser to first split the input stream into down and up sorted segments and then merges them to optimize sorting when pre-sorted sequences exist in the input stream.

O(n) space

Exceptions

Please note that Stream type does not observe any exceptions from the consumer of the stream whereas StreamK does.

handle :: (MonadCatch m, Exception e) => (e -> m (StreamK m a)) -> StreamK m a -> StreamK m a Source #

Like Streamly.Data.Stream.handle but with one significant difference, this function observes exceptions from the consumer of the stream as well.

You can also convert StreamK to Stream and use exception handling from Stream module:

>>> handle f s = StreamK.fromStream $ Stream.handle (\e -> StreamK.toStream (f e)) (StreamK.toStream s)

Resource Management

Please note that Stream type does not observe any exceptions from the consumer of the stream whereas StreamK does.

bracketIO :: forall (m :: Type -> Type) b c a. (MonadIO m, MonadCatch m) => IO b -> (b -> IO c) -> (b -> StreamK m a) -> StreamK m a Source #

Like Streamly.Data.Stream.bracketIO but with one significant difference, this function observes exceptions from the consumer of the stream as well. Therefore, it cleans up the resource promptly when the consumer encounters an exception.

You can also convert StreamK to Stream and use resource handling from Stream module:

>>> bracketIO bef aft bet = StreamK.fromStream $ Stream.bracketIO bef aft (StreamK.toStream . bet)

Deprecated

parseBreakChunks :: (Monad m, Unbox a) => ParserK (Array a) m b -> StreamK m (Array a) -> m (Either ParseError b, StreamK m (Array a)) Source #

Deprecated: Use Streamly.Data.Array.parseBreak instead

Run a ParserK over a chunked StreamK and return the parse result and the remaining Stream.

parseChunks :: (Monad m, Unbox a) => ParserK (Array a) m b -> StreamK m (Array a) -> m (Either ParseError b) Source #

Deprecated: Use Streamly.Data.Array.parse instead