streamly-core
Copyright(c) 2017 Composewell Technologies
LicenseBSD3
Maintainerstreamly@composewell.com
Stabilityexperimental
PortabilityGHC
Safe HaskellNone
LanguageHaskell2010

Streamly.Internal.Data.StreamK

Description

 
Synopsis

Setup

To execute the code examples provided in this module in ghci, please run the following commands first.

>>> :m
>>> import Control.Concurrent (threadDelay)
>>> import Data.Function (fix, (&))
>>> import Data.Semigroup (cycle1)
>>> import Streamly.Data.StreamK (StreamK)
>>> import qualified Streamly.Data.Fold as Fold
>>> import qualified Streamly.Data.Parser as Parser
>>> import qualified Streamly.Data.Stream as Stream
>>> import qualified Streamly.Data.StreamK as StreamK
>>> import qualified Streamly.FileSystem.DirIO as Dir
>>> mk = StreamK.fromStream . Stream.fromList
>>> un = Stream.toList . StreamK.toStream
>>> effect n = print n >> return n

For APIs that have not been released yet.

>>> import qualified Streamly.Internal.FileSystem.Path as Path
>>> import qualified Streamly.Internal.Data.StreamK as StreamK
>>> import qualified Streamly.Internal.FileSystem.DirIO as Dir

foldr :: Monad m => (a -> b -> b) -> b -> StreamK m a -> m b Source #

Lazy right associative fold.

repeat :: forall a (m :: Type -> Type). a -> StreamK m a Source #

Generate an infinite stream by repeating a pure value.

>>> repeat x = let xs = StreamK.cons x xs in xs

Pre-release

foldl' :: Monad m => (b -> a -> b) -> b -> StreamK m a -> m b Source #

Strict left associative fold.

mfix :: Monad m => (m a -> StreamK m a) -> StreamK m a Source #

We can define cyclic structures using let:

>>> :set -fno-warn-unrecognised-warning-flags
>>> :set -fno-warn-x-partial
>>> let (a, b) = ([1, b], head a) in (a, b)
([1,1],1)

The function fix defined as:

>>> fix f = let x = f x in x

ensures that the argument of a function and its output refer to the same lazy value x i.e. the same location in memory. Thus x can be defined in terms of itself, creating structures with cyclic references.

>>> f ~(a, b) = ([1, b], head a)
>>> fix f
([1,1],1)

mfix is essentially the same as fix but for monadic values.

Using mfix for streams we can construct a stream in which each element of the stream is defined in a cyclic fashion. The argument of the function being fixed represents the current element of the stream which is being returned by the stream monad. Thus, we can use the argument to construct itself.

In the following example, the argument action of the function f represents the tuple (x,y) returned by it in a given iteration. We define the first element of the tuple in terms of the second.

>>> import System.IO.Unsafe (unsafeInterleaveIO)
>>> :{
main = Stream.fold (Fold.drainMapM print) $ StreamK.toStream $ StreamK.mfix f
    where
    f action = StreamK.unNested $ do
        let incr n act = fmap ((+n) . snd) $ unsafeInterleaveIO act
        x <- StreamK.Nested $ StreamK.fromStream $ Stream.sequence $ Stream.fromList [incr 1 action, incr 2 action]
        y <- StreamK.Nested $ StreamK.fromStream $ Stream.fromList [4,5]
        return (x, y)
:}

Note: you cannot achieve this by just changing the order of the monad statements because that would change the order in which the stream elements are generated.

Note that the function f must be lazy in its argument, that's why we use unsafeInterleaveIO on action because IO monad is strict.

Pre-release

unfoldr :: forall b a (m :: Type -> Type). (b -> Maybe (a, b)) -> b -> StreamK m a Source #

>>> :{
unfoldr step s =
    case step s of
        Nothing -> StreamK.nil
        Just (a, b) -> a `StreamK.cons` unfoldr step b
:}

Build a stream by unfolding a pure step function step starting from a seed s. The step function returns the next element in the stream and the next seed value. When it is done it returns Nothing and the stream ends. For example,

>>> :{
let f b =
        if b > 2
        then Nothing
        else Just (b, b + 1)
in StreamK.toList $ StreamK.unfoldr f 0
:}
[0,1,2]

build :: forall (m :: Type -> Type) a. (forall b. (a -> b -> b) -> b -> b) -> StreamK m a Source #

map :: forall a b (m :: Type -> Type). (a -> b) -> StreamK m a -> StreamK m b Source #

fromList :: forall a (m :: Type -> Type). [a] -> StreamK m a Source #

uncons :: Applicative m => StreamK m a -> m (Maybe (a, StreamK m a)) Source #

tail :: Applicative m => StreamK m a -> m (Maybe (StreamK m a)) Source #

Same as:

>>> tail = fmap (fmap snd) . StreamK.uncons

init :: Applicative m => StreamK m a -> m (Maybe (StreamK m a)) Source #

Extract all but the last element of the stream, if any. This will end up evaluating the last element as well to find out that it is last.

Pre-release

null :: Monad m => StreamK m a -> m Bool Source #

reverse :: forall (m :: Type -> Type) a. StreamK m a -> StreamK m a Source #

concatMap :: forall a (m :: Type -> Type) b. (a -> StreamK m b) -> StreamK m a -> StreamK m b Source #

If total iterations are kept the same, each increase in the nesting level increases the cost by roughly 1.5 times.

foldrM :: (a -> m b -> m b) -> m b -> StreamK m a -> m b Source #

Lazy right fold with a monadic step function.

type Stream = StreamK Source #

Deprecated: Please use StreamK instead.

Continuation Passing Style (CPS) version of Streamly.Data.Stream.Stream. Unlike Streamly.Data.Stream.Stream, StreamK can be composed recursively without affecting performance.

Semigroup instance appends two streams:

>>> (<>) = Stream.append

cons :: forall a (m :: Type -> Type). a -> StreamK m a -> StreamK m a infixr 5 Source #

A right associative prepend operation to add a pure value at the head of an existing stream:

>>> s = 1 `StreamK.cons` 2 `StreamK.cons` 3 `StreamK.cons` StreamK.nil
>>> Stream.fold Fold.toList (StreamK.toStream s)
[1,2,3]

Unlike Streamly.Data.Stream cons StreamK cons can be used recursively:

>>> repeat x = let xs = StreamK.cons x xs in xs
>>> fromFoldable = Prelude.foldr StreamK.cons StreamK.nil

cons is same as the following but more efficient:

>>> cons x xs = return x `StreamK.consM` xs

append :: forall (m :: Type -> Type) a. StreamK m a -> StreamK m a -> StreamK m a infixr 6 Source #

Unlike the fused Streamly.Data.Stream append, StreamK append can be used at scale, recursively, with linear performance:

>>> cycle xs = let ys = xs `StreamK.append` ys in ys

concatMapWith append (same as concatMap) flattens a stream of streams in a depth-first manner i.e. it yields each stream fully and then the next and so on. Given a stream of three streams:

1. [1,2,3]
2. [4,5,6]
3. [7,8,9]

The resulting stream will be [1,2,3,4,5,6,7,8,9].

Best used in a right associative manner.

fromPure :: forall a (m :: Type -> Type). a -> StreamK m a Source #

nil :: forall (m :: Type -> Type) a. StreamK m a Source #

A stream that terminates without producing any output or side effect.

>>> Stream.fold Fold.toList (StreamK.toStream StreamK.nil)
[]

newtype StreamK (m :: Type -> Type) a Source #

Constructors

MkStream (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) 

Instances

Instances details
(Foldable m, Monad m) => Foldable (StreamK m) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

fold :: Monoid m0 => StreamK m m0 -> m0 #

foldMap :: Monoid m0 => (a -> m0) -> StreamK m a -> m0 #

foldMap' :: Monoid m0 => (a -> m0) -> StreamK m a -> m0 #

foldr :: (a -> b -> b) -> b -> StreamK m a -> b #

foldr' :: (a -> b -> b) -> b -> StreamK m a -> b #

foldl :: (b -> a -> b) -> b -> StreamK m a -> b #

foldl' :: (b -> a -> b) -> b -> StreamK m a -> b #

foldr1 :: (a -> a -> a) -> StreamK m a -> a #

foldl1 :: (a -> a -> a) -> StreamK m a -> a #

toList :: StreamK m a -> [a] #

null :: StreamK m a -> Bool #

length :: StreamK m a -> Int #

elem :: Eq a => a -> StreamK m a -> Bool #

maximum :: Ord a => StreamK m a -> a #

minimum :: Ord a => StreamK m a -> a #

sum :: Num a => StreamK m a -> a #

product :: Num a => StreamK m a -> a #

Traversable (StreamK Identity) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

traverse :: Applicative f => (a -> f b) -> StreamK Identity a -> f (StreamK Identity b) #

sequenceA :: Applicative f => StreamK Identity (f a) -> f (StreamK Identity a) #

mapM :: Monad m => (a -> m b) -> StreamK Identity a -> m (StreamK Identity b) #

sequence :: Monad m => StreamK Identity (m a) -> m (StreamK Identity a) #

Monad m => Functor (StreamK m) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

fmap :: (a -> b) -> StreamK m a -> StreamK m b #

(<$) :: a -> StreamK m b -> StreamK m a #

a ~ Char => IsString (StreamK Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Monoid (StreamK m a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

mempty :: StreamK m a #

mappend :: StreamK m a -> StreamK m a -> StreamK m a #

mconcat :: [StreamK m a] -> StreamK m a #

Semigroup (StreamK m a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

(<>) :: StreamK m a -> StreamK m a -> StreamK m a #

sconcat :: NonEmpty (StreamK m a) -> StreamK m a #

stimes :: Integral b => b -> StreamK m a -> StreamK m a #

IsList (StreamK Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Associated Types

type Item (StreamK Identity a) 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

type Item (StreamK Identity a) = a
Read a => Read (StreamK Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Show a => Show (StreamK Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

type Item (StreamK Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

type Item (StreamK Identity a) = a

nilM :: Applicative m => m b -> StreamK m a Source #

A stream that terminates without producing any output, but produces a side effect.

>>> Stream.fold Fold.toList (StreamK.toStream (StreamK.nilM (print "nil")))
"nil"
[]

Pre-release

consM :: Monad m => m a -> StreamK m a -> StreamK m a infixr 5 Source #

A right associative prepend operation to add an effectful value at the head of an existing stream::

>>> s = putStrLn "hello" `StreamK.consM` putStrLn "world" `StreamK.consM` StreamK.nil
>>> Stream.fold Fold.drain (StreamK.toStream s)
hello
world

It can be used efficiently with foldr:

>>> fromFoldableM = Prelude.foldr StreamK.consM StreamK.nil

Same as the following but more efficient:

>>> consM x xs = StreamK.fromEffect x `StreamK.append` xs

before :: Monad m => m b -> StreamK m a -> StreamK m a Source #

Run an action before evaluating the stream.

mkStream :: (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a Source #

drain :: Monad m => StreamK m a -> m () Source #

foldlM' :: Monad m => (b -> a -> m b) -> m b -> StreamK m a -> m b Source #

Like foldl' but with a monadic step function.

fromEffect :: Monad m => m a -> StreamK m a Source #

unfoldrM :: Monad m => (b -> m (Maybe (a, b))) -> b -> StreamK m a Source #

Build a stream by unfolding a monadic step function starting from a seed. The step function returns the next element in the stream and the next seed value. When it is done it returns Nothing and the stream ends. For example,

>>> :{
let f b =
        if b > 2
        then return Nothing
        else return (Just (b, b + 1))
in StreamK.toList $ StreamK.unfoldrM f 0
:}
[0,1,2]

interleave :: forall (m :: Type -> Type) a. StreamK m a -> StreamK m a -> StreamK m a infixr 6 Source #

Interleave two streams fairly, yielding one item from each in a round-robin fashion:

>>> un $ StreamK.interleave (mk [1,3,5]) (mk [2,4,6])
[1,2,3,4,5,6]
>>> un $ StreamK.interleave (mk [1,3]) (mk [2,4,6])
[1,2,3,4,6]
>>> un $ StreamK.interleave (mk []) (mk [2,4,6])
[2,4,6]

interleave is right associative when used as an infix operator.

>>> un $ mk [1,2,3] `StreamK.interleave` mk [4,5,6] `StreamK.interleave` mk [7,8,9]
[1,4,2,7,3,5,8,6,9]

Because of right association, the first stream yields as many items as the next two streams combined.

Be careful when refactoring code involving a chain of three or more interleave operations as it is not associative i.e. right associated code may not produce the same result as left associated. This is a direct consequence of the disbalance of scheduling in the previous example. If left associated the above example would produce:

>>> un $ (mk [1,2,3] `StreamK.interleave` mk [4,5,6]) `StreamK.interleave` mk [7,8,9]
[1,7,4,8,2,9,5,3,6]

Note: Use concatMap based interleaving instead of the binary operator to interleave more than two streams to avoid associativity issues.

concatMapWith interleave flattens a stream of streams using interleave in a right associative manner. Given a stream of three streams:

1. [1,2,3]
2. [4,5,6]
3. [7,8,9]

The resulting sequence is [1,4,2,7,3,5,8,6,9].

For this reason, the right associated flattening with interleave can work with infinite number of streams without opening too many streams at the same time. Each stream is consumed twice as much as the next one; if we are combining an infinite number of streams of size n then at most log n streams will be opened at any given time, because the first stream will finish by the time the stream after log n th stream is opened.

Compare with bfsConcatMap and mergeMapWith interleave.

For interleaving many streams, the best way is to use bfsConcatMap.

See also the fused version interleave.

crossWith :: forall (m :: Type -> Type) a b c. Monad m => (a -> b -> c) -> StreamK m a -> StreamK m b -> StreamK m c Source #

Definition:

>>> crossWith f m1 m2 = fmap f m1 `StreamK.crossApply` m2

Note that the second stream is evaluated multiple times.

cross :: forall (m :: Type -> Type) a b. Monad m => StreamK m a -> StreamK m b -> StreamK m (a, b) Source #

Given a StreamK m a and StreamK m b generate a stream with all possible combinations of the tuple (a, b).

Definition:

>>> cross = StreamK.crossWith (,)

The second stream is evaluated multiple times. If that is not desired it can be cached in an Array and then generated from the array before calling this function. Caching may also improve performance if the stream is expensive to evaluate.

See cross for a much faster fused alternative.

Time: O(m x n)

Pre-release

concatEffect :: Monad m => m (StreamK m a) -> StreamK m a Source #

fairConcatMap :: forall a (m :: Type -> Type) b. (a -> StreamK m b) -> StreamK m a -> StreamK m b Source #

See fairConcatFor for detailed documentation.

>>> fairConcatMap = flip StreamK.fairConcatFor

concatFor :: forall (m :: Type -> Type) a b. StreamK m a -> (a -> StreamK m b) -> StreamK m b Source #

Map a stream generating function on each element of a stream and concatenate the results. This is the same as the bind function of the monad instance. It is just a flipped concatMap but more convenient to use for nested use case, feels like an imperative for loop.

>>> concatFor = flip StreamK.concatMap

A concatenating for loop:

>>> :{
un $
    StreamK.concatFor (mk [1,2,3]) $ \x ->
      StreamK.fromPure x
:}
[1,2,3]

Nested concatenating for loops:

>>> :{
un $
    StreamK.concatFor (mk [1,2,3]) $ \x ->
     StreamK.concatFor (mk [4,5,6]) $ \y ->
      StreamK.fromPure (x, y)
:}
[(1,4),(1,5),(1,6),(2,4),(2,5),(2,6),(3,4),(3,5),(3,6)]

fairConcatFor :: forall (m :: Type -> Type) a b. StreamK m a -> (a -> StreamK m b) -> StreamK m b Source #

fairConcatFor is like concatFor but traverses the depth and breadth of nesting equally. Therefore, the outer and the inner loops in a nested loop get equal priority. It can be used to nest infinite streams without starving outer streams due to inner ones.

Given a stream of three streams:

1. [1,2,3]
2. [4,5,6]
3. [7,8,9]

Here, outer loop is the stream of streams and the inner loops are the individual streams. The traversal sweeps the diagonals in the above grid to give equal chance to outer and inner loops. The resulting stream is (1),(2,4),(3,5,7),(6,8),(9), diagonals are parenthesized for emphasis.

Looping

A single stream case is equivalent to concatFor:

>>> un $ StreamK.fairConcatFor (mk [1,2]) $ \x -> StreamK.fromPure x
[1,2]

Fair Nested Looping

Multiple streams nest like for loops. The result is a cross product of the streams. However, the ordering of the results of the cross product is such that each stream gets consumed equally. In other words, inner iterations of a nested loop get the same priority as the outer iterations. Inner iterations do not finish completely before the outer iterations start.

>>> :{
un $ do
    StreamK.fairConcatFor (mk [1,2,3]) $ \x ->
     StreamK.fairConcatFor (mk [4,5,6]) $ \y ->
      StreamK.fromPure (x, y)
:}
[(1,4),(1,5),(2,4),(1,6),(2,5),(3,4),(2,6),(3,5),(3,6)]

Nesting Infinite Streams

Example with infinite streams. Print all pairs in the cross product with sum less than a specified number.

>>> :{
Stream.toList
 $ Stream.takeWhile (\(x,y) -> x + y < 6)
 $ StreamK.toStream
 $ StreamK.fairConcatFor (mk [1..]) $ \x ->
    StreamK.fairConcatFor (mk [1..]) $ \y ->
     StreamK.fromPure (x, y)
:}
[(1,1),(1,2),(2,1),(1,3),(2,2),(3,1),(1,4),(2,3),(3,2),(4,1)]

How the nesting works?

If we look at the cross product of [1,2,3], [4,5,6], the streams being combined using fairConcatFor are the following sequential loop iterations:

(1,4) (1,5) (1,6) -- first iteration of the outer loop
(2,4) (2,5) (2,6) -- second iteration of the outer loop
(3,4) (3,5) (3,6) -- third iteration of the outer loop

The result is a triangular or diagonal traversal of these iterations:

[(1,4),(1,5),(2,4),(1,6),(2,5),(3,4),(2,6),(3,5),(3,6)]

Non-Termination Cases

If one of the two interleaved streams does not produce an output at all and continues forever then the other stream will never get scheduled. This is because a stream is unscheduled only after it produces an output. This can lead to non-terminating programs, an example is provided below.

>>> :{
oddsIf x = mk (if x then [1,3..] else [2,4..])
filterEven x = if even x then StreamK.fromPure x else StreamK.nil
:}
>>> :{
evens =
    StreamK.fairConcatFor (mk [True,False]) $ \r ->
     StreamK.concatFor (oddsIf r) filterEven
:}

The evens function does not terminate because, when r is True, the nested concatFor is a non-productive infinite loop, therefore, the outer loop never gets a chance to generate the False value.

But the following refactoring of the above code works as expected:

>>> :{
mixed =
     StreamK.fairConcatFor (mk [True,False]) $ \r ->
         StreamK.concatFor (oddsIf r) StreamK.fromPure
:}
>>> evens = StreamK.fairConcatFor mixed filterEven
>>> Stream.toList $ Stream.take 3 $ StreamK.toStream evens
[2,4,6]

This works because in mixed both the streams being interleaved are productive.

Care should be taken how you write your program, keep in mind the scheduling implications. To avoid such scheduling problems in serial interleaving, you can use concurrent interleaving instead i.e. parFairConcatFor. Due to concurrent threads the other branch will make progress even if one is an infinite loop producing nothing.

Logic Programming

Streamly provides all operations for logic programming. It provides functionality equivalent to LogicT type from the logict package. The MonadLogic operations can be implemented using the available stream operations. For example, uncons is msplit, interleave corresponds to the interleave operation of MonadLogic, fairConcatFor is the fair bind (>>-) operation.

Related Operations

concatForWith interleave is another way to interleave two serial streams. In this case, the inner loop iterations get exponentially more priority over the outer iterations of the nested loop. This is biased towards the inner loops - this is exactly how the logic-t and list-t implementation of fair bind works.

concatForM :: Monad m => StreamK m a -> (a -> m (StreamK m b)) -> StreamK m b Source #

Like concatFor but maps an effectful function. It allows conveniently mixing monadic effects with streams.

>>> import Control.Monad.IO.Class (liftIO)
>>> :{
un $
    StreamK.concatForM (mk [1,2,3]) $ \x -> do
      liftIO $ putStrLn (show x)
      pure $ StreamK.fromPure x
:}
1
2
3
[1,2,3]

Nested concatentating for loops:

>>> :{
un $
    StreamK.concatForM (mk [1,2,3]) $ \x -> do
      liftIO $ putStrLn (show x)
      pure $ StreamK.concatFor (mk [4,5,6]) $ \y ->
        StreamK.fromPure (x, y)
:}
1
2
3
[(1,4),(1,5),(1,6),(2,4),(2,5),(2,6),(3,4),(3,5),(3,6)]

fairConcatForM :: Monad m => StreamK m a -> (a -> m (StreamK m b)) -> StreamK m b Source #

Like fairConcatFor but maps a monadic function.

fromFoldable :: forall f a (m :: Type -> Type). Foldable f => f a -> StreamK m a Source #

>>> fromFoldable = Prelude.foldr StreamK.cons StreamK.nil

Construct a stream from a Foldable containing pure values:

bfsConcatMap :: forall a (m :: Type -> Type) b. (a -> StreamK m b) -> StreamK m a -> StreamK m b Source #

See bfsConcatFor for detailed documentation.

>>> bfsConcatMap = flip StreamK.bfsConcatFor

concatMapWith :: forall (m :: Type -> Type) b a. (StreamK m b -> StreamK m b -> StreamK m b) -> (a -> StreamK m b) -> StreamK m a -> StreamK m b Source #

Perform a concatMap using a specified concat strategy. The first argument specifies a merge or concat function that is used to merge the streams generated by the map function.

For example, interleaving n streams in a left biased manner:

>>> lists = mk [[1,5],[2,6],[3,7],[4,8]]
>>> un $ StreamK.concatMapWith StreamK.interleave mk lists
[1,2,5,3,6,4,7,8]

For a fair interleaving example see bfsConcatMap and mergeMapWith.

bfsConcatFor :: forall (m :: Type -> Type) a b. StreamK m a -> (a -> StreamK m b) -> StreamK m b Source #

While concatFor flattens a stream of streams in a depth first manner, bfsConcatFor flattens it in a breadth-first manner. It yields one item from the first stream, then one item from the next stream and so on. In nested loops it has the effect of prioritizing the new outer loop iteration over the inner loops, thus inverting the looping. Given a stream of three streams:

1. [1,2,3]
2. [4,5,6]
3. [7,8,9]

The resulting stream is (1,4,7),(2,5,8),(3,6,9). The parenthesis are added to emphasize the iterations.

For example:

>>> stream = mk [[1,2,3],[4,5,6],[7,8,9]]
>>> :{
 un $
     StreamK.bfsConcatFor stream $ \x ->
         StreamK.fromStream $ Stream.fromList x
:}
[1,4,7,2,5,8,3,6,9]

Compare with concatForWith interleave which explores the depth exponentially more compared to the breadth, such that each stream yields twice as many items compared to the next stream.

See also the equivalent fused version unfoldEachInterleave.

bfsConcatForM :: Monad m => StreamK m a -> (a -> m (StreamK m b)) -> StreamK m b Source #

Like bfsConcatFor but maps a monadic function.

mergeMapWith :: forall (m :: Type -> Type) b a. (StreamK m b -> StreamK m b -> StreamK m b) -> (a -> StreamK m b) -> StreamK m a -> StreamK m b Source #

Combine streams in pairs using a binary combinator, the resulting streams are then combined again in pairs recursively until we get to a single combined stream. The composition would thus form a binary tree.

For example, 'mergeMapWith interleave' gives the following result:

>>> lists = mk [[1,2,3],[4,5,6],[7,8,9],[10,11,12]]
>>> un $ StreamK.mergeMapWith StreamK.interleave mk lists
[1,7,4,10,2,8,5,11,3,9,6,12]

The above example is equivalent to the following pairings:

>>> pair1 = mk [1,2,3] `StreamK.interleave` mk [4,5,6]
>>> pair2 = mk [7,8,9] `StreamK.interleave` mk [10,11,12]
>>> un $ pair1 `StreamK.interleave` pair2
[1,7,4,10,2,8,5,11,3,9,6,12]

If the number of streams being combined is not a power of 2, the binary tree composed by mergeMapWith is not balanced, therefore, the output may not look fairly interleaved, it will be biased towards the unpaired streams:

>>> lists = mk [[1,2,3],[4,5,6],[7,8,9]]
>>> un $ StreamK.mergeMapWith StreamK.interleave mk lists
[1,7,4,8,2,9,5,3,6]

An efficient merge sort can be implemented by using mergeBy as the combining function:

>>> combine = StreamK.mergeBy compare
>>> un $ StreamK.mergeMapWith combine StreamK.fromPure (mk [5,1,7,9,2])
[1,2,5,7,9]

Caution: the stream of streams must be finite

Pre-release

newtype Nested (m :: Type -> Type) a Source #

Nested is a list-transformer monad, it serves the same purpose as the ListT type from the list-t package. It is similar to the standard Haskell lists' monad instance. Nested monad behaves like nested for loops implementing a computation based on a cross product over multiple streams.

>>> mk = StreamK.Nested . StreamK.fromStream . Stream.fromList
>>> un = Stream.toList . StreamK.toStream . StreamK.unNested

Looping

In the following code the variable x assumes values of the elements of the stream one at a time and runs the code that follows; using that value. It is equivalent to a for loop:

>>> :{
un $ do
    x <- mk [1,2,3] -- for each element in the stream
    return x
:}
[1,2,3]

Nested Looping

Multiple streams can be nested like nested for loops. The result is a cross product of the streams.

>>> :{
un $ do
    x <- mk [1,2,3] -- outer loop, for each element in the stream
    y <- mk [4,5,6] -- inner loop, for each element in the stream
    return (x, y)
:}
[(1,4),(1,5),(1,6),(2,4),(2,5),(2,6),(3,4),(3,5),(3,6)]

Note that an infinite stream in an inner loop will block the outer streams from moving to the next iteration.

How it works?

The bind operation of the monad is flipped concatMapWith append. The concatMap operation maps the lines involving y as a function of x over the stream [1,2,3]. The streams generated so are combined using the append operation. If we desugar the above monad code using bind explicitly, it becomes clear how it works:

>>> import Streamly.Internal.Data.StreamK (Nested(..))
>>> (Nested m) >>= f = Nested $ StreamK.concatMapWith StreamK.append (unNested . f) m
>>> un (mk [1,2,3] >>= (\x -> (mk [4,5,6] >>= \y -> return (x,y))))
[(1,4),(1,5),(1,6),(2,4),(2,5),(2,6),(3,4),(3,5),(3,6)]

You can achieve the looping and nested looping by directly using concatMap but the monad and the "do notation" gives you better ergonomics.

Interleaving of loop iterations

If we look at the cross product of [1,2,3], [4,5,6], the streams being combined using append are the for loop iterations as follows:

(1,4) (1,5) (1,6) -- first iteration of the outer loop
(2,4) (2,5) (2,6) -- second iteration of the outer loop
(3,4) (3,5) (3,6) -- third iteration of the outer loop

The result is equivalent to sequentially appending all the iterations of the nested for loop:

[(1,4),(1,5),(1,6),(2,4),(2,5),(2,6),(3,4),(3,5),(3,6)]

Logic Programming

Nested also serves the purpose of LogicT type from the logict package. The MonadLogic operations can be implemented using the available stream operations. For example, uncons is msplit, interleave corresponds to the interleave operation of MonadLogic, fairConcatFor is the fair bind (>>-) operation. The FairNested type provides a monad with fair bind.

Related Functionality

A custom type can be created using bfsConcatMap as the monad bind operation then the nested loops would get inverted - the innermost loop becomes the outermost and vice versa.

See FairNested if you want all the streams to get equal chance to execute even if they are infinite.

Constructors

Nested 

Fields

Instances

Instances details
MonadTrans Nested Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

lift :: Monad m => m a -> Nested m a #

Monad m => MonadFail (Nested m) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

fail :: String -> Nested m a #

MonadIO m => MonadIO (Nested m) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

liftIO :: IO a -> Nested m a #

(Foldable m, Monad m) => Foldable (Nested m) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

fold :: Monoid m0 => Nested m m0 -> m0 #

foldMap :: Monoid m0 => (a -> m0) -> Nested m a -> m0 #

foldMap' :: Monoid m0 => (a -> m0) -> Nested m a -> m0 #

foldr :: (a -> b -> b) -> b -> Nested m a -> b #

foldr' :: (a -> b -> b) -> b -> Nested m a -> b #

foldl :: (b -> a -> b) -> b -> Nested m a -> b #

foldl' :: (b -> a -> b) -> b -> Nested m a -> b #

foldr1 :: (a -> a -> a) -> Nested m a -> a #

foldl1 :: (a -> a -> a) -> Nested m a -> a #

toList :: Nested m a -> [a] #

null :: Nested m a -> Bool #

length :: Nested m a -> Int #

elem :: Eq a => a -> Nested m a -> Bool #

maximum :: Ord a => Nested m a -> a #

minimum :: Ord a => Nested m a -> a #

sum :: Num a => Nested m a -> a #

product :: Num a => Nested m a -> a #

Traversable (Nested Identity) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

traverse :: Applicative f => (a -> f b) -> Nested Identity a -> f (Nested Identity b) #

sequenceA :: Applicative f => Nested Identity (f a) -> f (Nested Identity a) #

mapM :: Monad m => (a -> m b) -> Nested Identity a -> m (Nested Identity b) #

sequence :: Monad m => Nested Identity (m a) -> m (Nested Identity a) #

(Monad m, Functor m) => Alternative (Nested m) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

empty :: Nested m a #

(<|>) :: Nested m a -> Nested m a -> Nested m a #

some :: Nested m a -> Nested m [a] #

many :: Nested m a -> Nested m [a] #

Monad m => Applicative (Nested m) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

pure :: a -> Nested m a #

(<*>) :: Nested m (a -> b) -> Nested m a -> Nested m b #

liftA2 :: (a -> b -> c) -> Nested m a -> Nested m b -> Nested m c #

(*>) :: Nested m a -> Nested m b -> Nested m b #

(<*) :: Nested m a -> Nested m b -> Nested m a #

Monad m => Functor (Nested m) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

fmap :: (a -> b) -> Nested m a -> Nested m b #

(<$) :: a -> Nested m b -> Nested m a #

Monad m => Monad (Nested m) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

(>>=) :: Nested m a -> (a -> Nested m b) -> Nested m b #

(>>) :: Nested m a -> Nested m b -> Nested m b #

return :: a -> Nested m a #

Monad m => MonadPlus (Nested m) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

mzero :: Nested m a #

mplus :: Nested m a -> Nested m a -> Nested m a #

MonadThrow m => MonadThrow (Nested m) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

throwM :: (HasCallStack, Exception e) => e -> Nested m a #

a ~ Char => IsString (Nested Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Monoid (Nested m a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

mempty :: Nested m a #

mappend :: Nested m a -> Nested m a -> Nested m a #

mconcat :: [Nested m a] -> Nested m a #

Semigroup (Nested m a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

(<>) :: Nested m a -> Nested m a -> Nested m a #

sconcat :: NonEmpty (Nested m a) -> Nested m a #

stimes :: Integral b => b -> Nested m a -> Nested m a #

IsList (Nested Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Associated Types

type Item (Nested Identity a) 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Read a => Read (Nested Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Show a => Show (Nested Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

type Item (Nested Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

fromFoldableM :: (Foldable f, Monad m) => f (m a) -> StreamK m a Source #

interleaveEndBy' :: forall (m :: Type -> Type) a. StreamK m a -> StreamK m a -> StreamK m a Source #

Examples:

>>> fromList = StreamK.fromStream . Stream.fromList
>>> toList = Stream.toList . StreamK.toStream
>>> f x y = toList $ StreamK.interleaveEndBy' (fromList x) (fromList y)
>>> f "..." "abc"
"a.b.c."
>>> f "..." "ab"
"a.b."

Currently broken, generates an additional element at the end::

> f ".." "abc"

"a.b."

interleaveSepBy :: forall (m :: Type -> Type) a. StreamK m a -> StreamK m a -> StreamK m a Source #

interleaveMin :: forall (m :: Type -> Type) a. StreamK m a -> StreamK m a -> StreamK m a infixr 6 Source #

Deprecated: Please use flip interleaveEndBy' instead.

interleaveFst :: forall (m :: Type -> Type) a. StreamK m a -> StreamK m a -> StreamK m a infixr 6 Source #

Deprecated: Please use flip interleaveSepBy instead.

foldrS :: forall a (m :: Type -> Type) b. (a -> StreamK m b -> StreamK m b) -> StreamK m b -> StreamK m a -> StreamK m b Source #

Right fold to a streaming monad.

foldrS StreamK.cons StreamK.nil === id

foldrS can be used to perform stateless stream to stream transformations like map and filter in general. It can be coupled with a scan to perform stateful transformations. However, note that the custom map and filter routines can be much more efficient than this due to better stream fusion.

>>> input = StreamK.fromStream $ Stream.fromList [1..5]
>>> Stream.fold Fold.toList $ StreamK.toStream $ StreamK.foldrS StreamK.cons StreamK.nil input
[1,2,3,4,5]

Find if any element in the stream is True:

>>> step x xs = if odd x then StreamK.fromPure True else xs
>>> input = StreamK.fromStream (Stream.fromList (2:4:5:undefined)) :: StreamK IO Int
>>> Stream.fold Fold.toList $ StreamK.toStream $ StreamK.foldrS step (StreamK.fromPure False) input
[True]

Map (+2) on odd elements and filter out the even elements:

>>> step x xs = if odd x then (x + 2) `StreamK.cons` xs else xs
>>> input = StreamK.fromStream (Stream.fromList [1..5]) :: StreamK IO Int
>>> Stream.fold Fold.toList $ StreamK.toStream $ StreamK.foldrS step StreamK.nil input
[3,5,7]

Pre-release

foldlS :: forall (m :: Type -> Type) b a. (StreamK m b -> a -> StreamK m b) -> StreamK m b -> StreamK m a -> StreamK m b Source #

Lazy left fold to a stream.

initNonEmpty :: forall (m :: Type -> Type) a. Stream m a -> Stream m a Source #

init for non-empty streams, fails for empty stream case.

See also init for a non-partial version of this function..

tailNonEmpty :: forall (m :: Type -> Type) a. StreamK m a -> StreamK m a Source #

tail for non-empty streams, fails for empty stream case.

See also tail for a non-partial version of this function..

Note: this is same as "drop 1" with error on empty stream.

foldlx' :: forall m a b x. Monad m => (x -> a -> x) -> x -> (x -> b) -> StreamK m a -> m b Source #

Strict left fold with an extraction function. Like the standard strict left fold, but applies a user supplied extraction function (the third argument) to the folded value at the end. This is designed to work with the foldl library. The suffix x is a mnemonic for extraction.

Note that the accumulator is always evaluated including the initial value.

foldlMx' :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> StreamK m a -> m b Source #

Like foldx, but with a monadic step function.

crossApply :: forall (m :: Type -> Type) a b. StreamK m (a -> b) -> StreamK m a -> StreamK m b Source #

Apply a stream of functions to a stream of values and flatten the results.

Note that the second stream is evaluated multiple times.

Definition:

>>> crossApply = StreamK.crossApplyWith StreamK.append
>>> crossApply = Stream.crossWith id

crossApplyFst :: forall (m :: Type -> Type) a b. StreamK m a -> StreamK m b -> StreamK m a Source #

crossApplySnd :: forall (m :: Type -> Type) a b. StreamK m a -> StreamK m b -> StreamK m b Source #

mkCross :: forall (m :: Type -> Type) a. StreamK m a -> Nested m a Source #

Deprecated: Use Nested instead.

Wrap the StreamK type in a Nested newtype to enable cross product style applicative and monad instances.

This is a type level operation with no runtime overhead.

unCross :: forall (m :: Type -> Type) a. CrossStreamK m a -> StreamK m a Source #

Unwrap the StreamK type from CrossStreamK newtype.

This is a type level operation with no runtime overhead.

newtype FairNested (m :: Type -> Type) a Source #

FairNested is like the Nested type but explores the depth and breadth of the cross product grid equally, so that each of the stream being crossed is consumed equally. It can be used to nest infinite streams without starving one due to the other.

>>> mk = StreamK.FairNested . StreamK.fromStream . Stream.fromList
>>> un = Stream.toList . StreamK.toStream . StreamK.unFairNested

Looping

A single stream case is equivalent to Nested, it is a simple for loop over the stream:

>>> :{
un $ do
    x <- mk [1,2] -- for each element in the stream
    return x
:}
[1,2]

Fair Nested Looping

Multiple streams nest like for loops. The result is a cross product of the streams. However, the ordering of the results of the cross product is such that each stream gets consumed equally. In other words, inner iterations of a nested loop get the same priority as the outer iterations. Inner iterations do not finish completely before the outer iterations start.

>>> :{
un $ do
    x <- mk [1,2,3] -- outer, for each element in the stream
    y <- mk [4,5,6] -- inner, for each element in the stream
    -- Perform the following actions for each x, for each y
    return (x, y)
:}
[(1,4),(1,5),(2,4),(1,6),(2,5),(3,4),(2,6),(3,5),(3,6)]

Nesting Infinite Streams

Example with infinite streams. Print all pairs in the cross product with sum less than a specified number.

>>> :{
Stream.toList
 $ Stream.takeWhile (\(x,y) -> x + y < 6)
 $ StreamK.toStream $ StreamK.unFairNested
 $ do
    x <- mk [1..] -- infinite stream
    y <- mk [1..] -- infinite stream
    return (x, y)
:}
[(1,1),(1,2),(2,1),(1,3),(2,2),(3,1),(1,4),(2,3),(3,2),(4,1)]

How it works?

FairNested uses fairConcatFor as the monad bind operation. If we look at the cross product of [1,2,3], [4,5,6], the streams being combined using concatMapDigaonal are the sequential loop iterations:

(1,4) (1,5) (1,6) -- first iteration of the outer loop
(2,4) (2,5) (2,6) -- second iteration of the outer loop
(3,4) (3,5) (3,6) -- third iteration of the outer loop

The result is a triangular or diagonal traversal of these iterations:

[(1,4),(1,5),(2,4),(1,6),(2,5),(3,4),(2,6),(3,5),(3,6)]

Associativity Issues

WARNING! The FairNested monad breaks the associativity law intentionally for usefulness, it is associative only up to permutation equivalence. In this monad the association order of statements might make a difference to the ordering of the results because of changing the way in which streams are scheduled. The same issues arise when you use the interleave operation directly, association order matters - however, here it can be more subtle as the programmer may not see it directly.

>>> un (mk [1,2] >>= (\x -> mk [x, x + 1] >>= (\y -> mk [y, y + 2])))
[1,3,2,2,4,4,3,5]
>>> un ((mk [1,2] >>= (\x -> mk [x, x + 1])) >>= (\y -> mk [y, y + 2]))
[1,3,2,4,2,4,3,5]

This type is designed to be used for use cases where ordering of results does not matter, we want to explore different streams to find specific results, but the order in which we find or present the results may not be important. Re-association of statements in this monad may change how different branches are scheduled, which may change the scheduling priority of some streams over others, this may end up starving some branches - in the worst case some branches may be fully starved by some infinite branches producing nothing - resulting in a non-terminating program.

Non-Termination Cases

If an infinite stream that does not produce a value at all is interleaved with another stream then the entire computation gets stuck forever because the interleave operation schedules the second stream only after the first stream yields a value. This can lead to non-terminating programs, an example is provided below.

>>> :{
toS = StreamK.toStream . StreamK.unFairNested
odds x = mk (if x then [1,3..] else [2,4..])
filterEven x = if even x then pure x else StreamK.FairNested StreamK.nil
:}

When writing code with do notation, keep in mind that when we bind a variable to a monadic value, all the following code that depends on this variable is associated together and connected to it via a monad bind. Consider the following code:

>>> :{
evens = toS $ do
    r <- mk [True,False]
    -- The next two statements depending on the variable r are associated
    -- together and bound to the previous line using a monad bind.
    x <- odds r
    filterEven x
:}

This code does not terminate because, when r is True, odds and filterEven together constitute an infinite inner loop, coninuously working but not yielding any value at all, this stream is interleaved with the outer loop, therefore, the outer loop does not get a chance to move to the next iteration.

But the following code works as expected:

>>> :{
evens = toS $ do
    x <- mk [True,False] >>= odds
    filterEven x
:}
>>> Stream.toList $ Stream.take 3 $ evens
[2,4,6]

This works because both the lists being interleaved continue to produce values in the outer loop and the inner loop keeps filtering them.

Care should be taken how you write your program, keep in mind the scheduling implications. To avoid such scheduling problems in the serial FairNested type use the concurrent version i.e. FairParallel described in MkType module. Due to concurrent evaluation each branch will make progress even if one is an infinite loop producing nothing.

Related Operations

We can create a custom type with concatMapWith interleave as the monad bind operation then the inner loop iterations get exponentially more priority over the outer iterations of the nested loop. This is not fully fair, it is biased - this is exactly how the logic-t and list-t implementation of fair bind works.

Constructors

FairNested 

Fields

Instances

Instances details
MonadTrans FairNested Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

lift :: Monad m => m a -> FairNested m a #

MonadIO m => MonadIO (FairNested m) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

liftIO :: IO a -> FairNested m a #

(Foldable m, Monad m) => Foldable (FairNested m) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

fold :: Monoid m0 => FairNested m m0 -> m0 #

foldMap :: Monoid m0 => (a -> m0) -> FairNested m a -> m0 #

foldMap' :: Monoid m0 => (a -> m0) -> FairNested m a -> m0 #

foldr :: (a -> b -> b) -> b -> FairNested m a -> b #

foldr' :: (a -> b -> b) -> b -> FairNested m a -> b #

foldl :: (b -> a -> b) -> b -> FairNested m a -> b #

foldl' :: (b -> a -> b) -> b -> FairNested m a -> b #

foldr1 :: (a -> a -> a) -> FairNested m a -> a #

foldl1 :: (a -> a -> a) -> FairNested m a -> a #

toList :: FairNested m a -> [a] #

null :: FairNested m a -> Bool #

length :: FairNested m a -> Int #

elem :: Eq a => a -> FairNested m a -> Bool #

maximum :: Ord a => FairNested m a -> a #

minimum :: Ord a => FairNested m a -> a #

sum :: Num a => FairNested m a -> a #

product :: Num a => FairNested m a -> a #

Traversable (FairNested Identity) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

traverse :: Applicative f => (a -> f b) -> FairNested Identity a -> f (FairNested Identity b) #

sequenceA :: Applicative f => FairNested Identity (f a) -> f (FairNested Identity a) #

mapM :: Monad m => (a -> m b) -> FairNested Identity a -> m (FairNested Identity b) #

sequence :: Monad m => FairNested Identity (m a) -> m (FairNested Identity a) #

Monad m => Applicative (FairNested m) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

pure :: a -> FairNested m a #

(<*>) :: FairNested m (a -> b) -> FairNested m a -> FairNested m b #

liftA2 :: (a -> b -> c) -> FairNested m a -> FairNested m b -> FairNested m c #

(*>) :: FairNested m a -> FairNested m b -> FairNested m b #

(<*) :: FairNested m a -> FairNested m b -> FairNested m a #

Monad m => Functor (FairNested m) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

fmap :: (a -> b) -> FairNested m a -> FairNested m b #

(<$) :: a -> FairNested m b -> FairNested m a #

Monad m => Monad (FairNested m) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

(>>=) :: FairNested m a -> (a -> FairNested m b) -> FairNested m b #

(>>) :: FairNested m a -> FairNested m b -> FairNested m b #

return :: a -> FairNested m a #

MonadThrow m => MonadThrow (FairNested m) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

throwM :: (HasCallStack, Exception e) => e -> FairNested m a #

a ~ Char => IsString (FairNested Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

IsList (FairNested Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Associated Types

type Item (FairNested Identity a) 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Read a => Read (FairNested Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Show a => Show (FairNested Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

type Item (FairNested Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

foldStream :: State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r Source #

Fold a stream by providing a State, stop continuation, a singleton continuation and a yield continuation. The stream will not use the SVar passed via State.

foldStreamShared :: State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r Source #

Fold a stream by providing an SVar, a stop continuation, a singleton continuation and a yield continuation. The stream would share the current SVar passed via the State.

foldrSShared :: forall a (m :: Type -> Type) b. (a -> StreamK m b -> StreamK m b) -> StreamK m b -> StreamK m a -> StreamK m b Source #

Fold sharing the SVar state within the reconstructed stream

foldrSM :: Monad m => (m a -> StreamK m b -> StreamK m b) -> StreamK m b -> StreamK m a -> StreamK m b Source #

buildS :: forall a (m :: Type -> Type). ((a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a) -> StreamK m a Source #

buildM :: Monad m => (forall r. (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a Source #

buildSM :: Monad m => ((m a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a) -> StreamK m a Source #

augmentS :: forall a (m :: Type -> Type). ((a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a Source #

augmentSM :: Monad m => ((m a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a Source #

unShare :: forall (m :: Type -> Type) a. StreamK m a -> StreamK m a Source #

Detach a stream from an SVar

fromStopK :: forall (m :: Type -> Type) a. StopK m -> StreamK m a Source #

Make an empty stream from a stop function.

fromYieldK :: forall (m :: Type -> Type) a. YieldK m a -> StreamK m a Source #

Make a singleton stream from a callback function. The callback function calls the one-shot yield continuation to yield an element.

consK :: forall (m :: Type -> Type) a. YieldK m a -> StreamK m a -> StreamK m a Source #

Add a yield function at the head of the stream.

(.:) :: forall a (m :: Type -> Type). a -> StreamK m a -> StreamK m a infixr 5 Source #

Operator equivalent of cons.

> toList $ 1 .: 2 .: 3 .: nil
[1,2,3]

consMBy :: Monad m => (StreamK m a -> StreamK m a -> StreamK m a) -> m a -> StreamK m a -> StreamK m a Source #

unfoldrMWith :: Monad m => (m a -> StreamK m a -> StreamK m a) -> (b -> m (Maybe (a, b))) -> b -> StreamK m a Source #

repeatMWith :: (m a -> t m a -> t m a) -> m a -> t m a Source #

Like repeatM but takes a stream cons operation to combine the actions in a stream specific manner. A serial cons would repeat the values serially while an async cons would repeat concurrently.

Pre-release

replicateMWith :: (m a -> StreamK m a -> StreamK m a) -> Int -> m a -> StreamK m a Source #

fromIndicesMWith :: (m a -> StreamK m a -> StreamK m a) -> (Int -> m a) -> StreamK m a Source #

iterateMWith :: Monad m => (m a -> StreamK m a -> StreamK m a) -> (a -> m a) -> m a -> StreamK m a Source #

headNonEmpty :: Monad m => StreamK m a -> m a Source #

head for non-empty streams, fails for empty stream case.

mapMWith :: (m b -> StreamK m b -> StreamK m b) -> (a -> m b) -> StreamK m a -> StreamK m b Source #

mapMSerial :: Monad m => (a -> m b) -> StreamK m a -> StreamK m b Source #

mapMAccum :: (s -> a -> m (s, b)) -> m s -> StreamK m a -> StreamK m b Source #

A stateful map aka scan but with a slight difference.

This is similar to a scan except that instead of emitting the state it emits a separate result. This is also similar to mapAccumL but does not return the final value of the state.

Separation of state from the output makes it easier to think in terms of a shared state, and also makes it easier to keep the state fully strict and the output lazy.

Unimplemented

conjoin :: forall (m :: Type -> Type) a. Monad m => StreamK m a -> StreamK m a -> StreamK m a Source #

crossApplyWith :: forall (m :: Type -> Type) b a. (StreamK m b -> StreamK m b -> StreamK m b) -> StreamK m (a -> b) -> StreamK m a -> StreamK m b Source #

concatMapEffect :: Monad m => (b -> StreamK m a) -> m b -> StreamK m a Source #

concatMapMAccum :: (StreamK m b -> StreamK m b -> StreamK m b) -> (s -> a -> m (s, StreamK m b)) -> m s -> StreamK m a -> StreamK m b Source #

Like concatMapWith but carries a state which can be used to share information across multiple steps of concat.

concatSmapMWith combine f initial = concatMapWith combine id . smapM f initial

Unimplemented

concatForWith :: forall (m :: Type -> Type) b a. (StreamK m b -> StreamK m b -> StreamK m b) -> StreamK m a -> (a -> StreamK m b) -> StreamK m b Source #

concatForWithM :: Monad m => (StreamK m b -> StreamK m b -> StreamK m b) -> StreamK m a -> (a -> m (StreamK m b)) -> StreamK m b Source #

Like concatForWith but maps an effectful function.

concatIterateWith :: forall (m :: Type -> Type) a. (StreamK m a -> StreamK m a -> StreamK m a) -> (a -> StreamK m a) -> StreamK m a -> StreamK m a Source #

Yield an input element in the output stream, map a stream generator on it and repeat the process on the resulting stream. Resulting streams are flattened using the concatMapWith combinator. This can be used for a depth first style (DFS) traversal of a tree like structure.

Example, list a directory tree using DFS:

>>> f = StreamK.fromStream . either (Dir.readEitherPaths id) (const Stream.nil)
>>> input = StreamK.fromEffect (Left <$> Path.fromString ".")
>>> ls = StreamK.concatIterateWith StreamK.append f input

Note that iterateM is a special case of concatIterateWith:

>>> iterateM f = StreamK.concatIterateWith StreamK.append (StreamK.fromEffect . f) . StreamK.fromEffect

Pre-release

concatIterateLeftsWith :: forall b a c (m :: Type -> Type). b ~ Either a c => (StreamK m b -> StreamK m b -> StreamK m b) -> (a -> StreamK m b) -> StreamK m b -> StreamK m b Source #

In an Either stream iterate on Lefts. This is a special case of concatIterateWith:

>>> concatIterateLeftsWith combine f = StreamK.concatIterateWith combine (either f (const StreamK.nil))

To traverse a directory tree:

>>> input = StreamK.fromEffect (Left <$> Path.fromString ".")
>>> ls = StreamK.concatIterateLeftsWith StreamK.append (StreamK.fromStream . Dir.readEither id) input

Pre-release

concatIterateScanWith :: Monad m => (StreamK m a -> StreamK m a -> StreamK m a) -> (b -> a -> m (b, StreamK m a)) -> m b -> StreamK m a -> StreamK m a Source #

Like iterateMap but carries a state in the stream generation function. This can be used to traverse graph like structures, we can remember the visited nodes in the state to avoid cycles.

Note that a combination of iterateMap and usingState can also be used to traverse graphs. However, this function provides a more localized state instead of using a global state.

See also: mfix

Pre-release

mergeIterateWith :: forall (m :: Type -> Type) a. (StreamK m a -> StreamK m a -> StreamK m a) -> (a -> StreamK m a) -> StreamK m a -> StreamK m a Source #

Like concatIterateWith but uses the pairwise flattening combinator mergeMapWith for flattening the resulting streams. This can be used for a balanced traversal of a tree like structure.

Example, list a directory tree using balanced traversal:

>>> f = StreamK.fromStream . either (Dir.readEitherPaths id) (const Stream.nil)
>>> input = StreamK.fromEffect (Left <$> Path.fromString ".")
>>> ls = StreamK.mergeIterateWith StreamK.interleave f input

Pre-release

type CrossStreamK = Nested Source #

Deprecated: Use Nested instead.

bindWith :: forall (m :: Type -> Type) b a. (StreamK m b -> StreamK m b -> StreamK m b) -> StreamK m a -> (a -> StreamK m b) -> StreamK m b Source #

Deprecated: Please use concatForWith instead.

Transformer

evalStateT :: Monad m => m s -> StreamK (StateT s m) a -> StreamK m a Source #

liftInner :: forall (m :: Type -> Type) (t :: (Type -> Type) -> Type -> Type) a. (Monad m, MonadTrans t, Monad (t m)) => StreamK m a -> StreamK (t m) a Source #

localReaderT :: forall r (m :: Type -> Type) a. (r -> r) -> StreamK (ReaderT r m) a -> StreamK (ReaderT r m) a Source #

Modify the environment of the underlying ReaderT monad.

foldlT :: forall (m :: Type -> Type) s b a. (Monad m, Monad (s m), MonadTrans s) => (s m b -> a -> s m b) -> s m b -> StreamK m a -> s m b Source #

Lazy left fold to an arbitrary transformer monad.

foldrT :: forall (m :: Type -> Type) s a b. (Monad m, Monad (s m), MonadTrans s) => (a -> s m b -> s m b) -> s m b -> StreamK m a -> s m b Source #

Right associative fold to an arbitrary transformer monad.

From containers

fromStream :: forall (m :: Type -> Type) a. Monad m => Stream m a -> StreamK m a Source #

Convert a fused Stream to StreamK.

For example:

>>> s1 = StreamK.fromStream $ Stream.fromList [1,2]
>>> s2 = StreamK.fromStream $ Stream.fromList [3,4]
>>> Stream.fold Fold.toList $ StreamK.toStream $ s1 `StreamK.append` s2
[1,2,3,4]

Specialized Generation

repeatM :: Monad m => m a -> StreamK m a Source #

>>> repeatM = StreamK.sequence . StreamK.repeat
>>> repeatM = fix . StreamK.consM
>>> repeatM = cycle1 . StreamK.fromEffect

Generate a stream by repeatedly executing a monadic action forever.

>>> :{
repeatAction =
       StreamK.repeatM (threadDelay 1000000 >> print 1)
     & StreamK.take 10
     & StreamK.fold Fold.drain
:}

replicate :: forall a (m :: Type -> Type). Int -> a -> StreamK m a Source #

replicateM :: Monad m => Int -> m a -> StreamK m a Source #

fromIndices :: forall a (m :: Type -> Type). (Int -> a) -> StreamK m a Source #

fromIndicesM :: Monad m => (Int -> m a) -> StreamK m a Source #

iterate :: forall a (m :: Type -> Type). (a -> a) -> a -> StreamK m a Source #

>>> iterate f x = x `StreamK.cons` iterate f x

Generate an infinite stream with x as the first element and each successive element derived by applying the function f on the previous element.

>>> StreamK.toList $ StreamK.take 5 $ StreamK.iterate (+1) 1
[1,2,3,4,5]

iterateM :: Monad m => (a -> m a) -> m a -> StreamK m a Source #

>>> iterateM f m = m >>= \a -> return a `StreamK.consM` iterateM f (f a)

Generate an infinite stream with the first element generated by the action m and each successive element derived by applying the monadic function f on the previous element.

>>> :{
StreamK.iterateM (\x -> print x >> return (x + 1)) (return 0)
    & StreamK.take 3
    & StreamK.toList
:}
0
1
[0,1,2]

Elimination

General Folds

foldr1 :: Monad m => (a -> a -> a) -> StreamK m a -> m (Maybe a) Source #

fold :: Monad m => Fold m a b -> StreamK m a -> m b Source #

Fold a stream using the supplied left Fold and reducing the resulting expression strictly at each step. The behavior is similar to foldl'. A Fold can terminate early without consuming the full stream. See the documentation of individual Folds for termination behavior.

Definitions:

>>> fold f = fmap fst . StreamK.foldBreak f
>>> fold f = StreamK.parseD (Parser.fromFold f)

Example:

>>> StreamK.fold Fold.sum $ StreamK.fromStream $ Stream.enumerateFromTo 1 100
5050

foldBreak :: Monad m => Fold m a b -> StreamK m a -> m (b, StreamK m a) Source #

Like fold but also returns the remaining stream. The resulting stream would be nil if the stream finished before the fold.

foldEither :: Monad m => Fold m a b -> StreamK m a -> m (Either (Fold m a b) (b, StreamK m a)) Source #

Fold resulting in either breaking the stream or continuation of the fold. Instead of supplying the input stream in one go we can run the fold multiple times, each time supplying the next segment of the input stream. If the fold has not yet finished it returns a fold that can be run again otherwise it returns the fold result and the residual stream.

Internal

foldConcat :: Monad m => Producer m a b -> Fold m b c -> StreamK m a -> m (c, StreamK m a) Source #

Generate streams from individual elements of a stream and fold the concatenation of those streams using the supplied fold. Return the result of the fold and residual stream.

For example, this can be used to efficiently fold an Array Word8 stream using Word8 folds.

Internal

toParserK :: forall (m :: Type -> Type) a b. Monad m => Parser a m b -> ParserK a m b Source #

Convert a Parser to ParserK.

Pre-release

parseDBreak :: Monad m => Parser a m b -> StreamK m a -> m (Either ParseErrorPos b, StreamK m a) Source #

Run a Parser over a stream and return rest of the Stream.

parseD :: Monad m => Parser a m b -> StreamK m a -> m (Either ParseErrorPos b) Source #

parseBreak :: Monad m => ParserK a m b -> StreamK m a -> m (Either ParseError b, StreamK m a) Source #

Similar to parseBreak but works on singular elements.

parseBreakPos :: Monad m => ParserK a m b -> StreamK m a -> m (Either ParseErrorPos b, StreamK m a) Source #

Like parseBreak but includes stream position information in the error messages.

parse :: Monad m => ParserK a m b -> StreamK m a -> m (Either ParseError b) Source #

Run a ParserK over a StreamK. Please use parseChunks where possible, for better performance.

parsePos :: Monad m => ParserK a m b -> StreamK m a -> m (Either ParseErrorPos b) Source #

Like parse but includes stream position information in the error messages.

Specialized Folds

head :: Monad m => StreamK m a -> m (Maybe a) Source #

elem :: (Monad m, Eq a) => a -> StreamK m a -> m Bool Source #

notElem :: (Monad m, Eq a) => a -> StreamK m a -> m Bool Source #

all :: Monad m => (a -> Bool) -> StreamK m a -> m Bool Source #

any :: Monad m => (a -> Bool) -> StreamK m a -> m Bool Source #

last :: Monad m => StreamK m a -> m (Maybe a) Source #

Extract the last element of the stream, if any.

minimum :: (Monad m, Ord a) => StreamK m a -> m (Maybe a) Source #

minimumBy :: Monad m => (a -> a -> Ordering) -> StreamK m a -> m (Maybe a) Source #

maximum :: (Monad m, Ord a) => StreamK m a -> m (Maybe a) Source #

maximumBy :: Monad m => (a -> a -> Ordering) -> StreamK m a -> m (Maybe a) Source #

findIndices :: forall a (m :: Type -> Type). (a -> Bool) -> StreamK m a -> StreamK m Int Source #

lookup :: (Monad m, Eq a) => a -> StreamK m (a, b) -> m (Maybe b) Source #

findM :: Monad m => (a -> m Bool) -> StreamK m a -> m (Maybe a) Source #

find :: Monad m => (a -> Bool) -> StreamK m a -> m (Maybe a) Source #

(!!) :: Monad m => StreamK m a -> Int -> m (Maybe a) Source #

To Containers

toList :: Monad m => StreamK m a -> m [a] Source #

toStream :: forall (m :: Type -> Type) a. Applicative m => StreamK m a -> Stream m a Source #

Convert a StreamK to a fused Stream.

Map and Fold

mapM_ :: Monad m => (a -> m b) -> StreamK m a -> m () Source #

Apply a monadic action to each element of the stream and discard the output of the action.

Transformation

By folding (scans)

scanl' :: forall b a (m :: Type -> Type). (b -> a -> b) -> b -> StreamK m a -> StreamK m b Source #

scanlx' :: forall x a b (m :: Type -> Type). (x -> a -> x) -> x -> (x -> b) -> StreamK m a -> StreamK m b Source #

Filtering

filter :: forall a (m :: Type -> Type). (a -> Bool) -> StreamK m a -> StreamK m a Source #

take :: forall (m :: Type -> Type) a. Int -> StreamK m a -> StreamK m a Source #

takeWhile :: forall a (m :: Type -> Type). (a -> Bool) -> StreamK m a -> StreamK m a Source #

drop :: forall (m :: Type -> Type) a. Int -> StreamK m a -> StreamK m a Source #

dropWhile :: forall a (m :: Type -> Type). (a -> Bool) -> StreamK m a -> StreamK m a Source #

Mapping

mapM :: Monad m => (a -> m b) -> StreamK m a -> StreamK m b Source #

sequence :: Monad m => StreamK m (m a) -> StreamK m a Source #

Inserting

intersperseM :: Monad m => m a -> StreamK m a -> StreamK m a Source #

intersperse :: forall (m :: Type -> Type) a. Monad m => a -> StreamK m a -> StreamK m a Source #

insertBy :: forall a (m :: Type -> Type). (a -> a -> Ordering) -> a -> StreamK m a -> StreamK m a Source #

Deleting

deleteBy :: forall a (m :: Type -> Type). (a -> a -> Bool) -> a -> StreamK m a -> StreamK m a Source #

Reordering

sortBy :: forall (m :: Type -> Type) a. Monad m => (a -> a -> Ordering) -> StreamK m a -> StreamK m a Source #

Sort the input stream using a supplied comparison function.

Sorting can be achieved by simply:

>>> sortBy cmp = StreamK.mergeMapWith (StreamK.mergeBy cmp) StreamK.fromPure

However, this combinator uses a parser to first split the input stream into down and up sorted segments and then merges them to optimize sorting when pre-sorted sequences exist in the input stream.

O(n) space

sortOn :: forall (m :: Type -> Type) b a. (Monad m, Ord b) => (a -> b) -> StreamK m a -> StreamK m a Source #

Map and Filter

mapMaybe :: forall a b (m :: Type -> Type). (a -> Maybe b) -> StreamK m a -> StreamK m b Source #

Zipping

zipWith :: forall (m :: Type -> Type) a b c. Monad m => (a -> b -> c) -> StreamK m a -> StreamK m b -> StreamK m c Source #

Zipping of n streams can be performed by combining the streams pair wise using mergeMapWith with O(n * log n) time complexity. If used with concatMapWith it will have O(n^2) performance.

zipWithM :: Monad m => (a -> b -> m c) -> StreamK m a -> StreamK m b -> StreamK m c Source #

Merging

mergeBy :: forall a (m :: Type -> Type). (a -> a -> Ordering) -> StreamK m a -> StreamK m a -> StreamK m a Source #

Merging of n streams can be performed by combining the streams pair wise using mergeMapWith to give O(n * log n) time complexity. If used with concatMapWith it will have O(n^2) performance.

mergeByM :: Monad m => (a -> a -> m Ordering) -> StreamK m a -> StreamK m a -> StreamK m a Source #

Transformation comprehensions

the :: (Eq a, Monad m) => StreamK m a -> m (Maybe a) Source #

Transforming Inner Monad

morphInner :: (Monad m, Monad n) => (forall x. m x -> n x) -> StreamK m a -> StreamK n a Source #

Exceptions

handle :: (MonadCatch m, Exception e) => (e -> m (StreamK m a)) -> StreamK m a -> StreamK m a Source #

Like Streamly.Data.Stream.handle but with one significant difference, this function observes exceptions from the consumer of the stream as well.

You can also convert StreamK to Stream and use exception handling from Stream module:

>>> handle f s = StreamK.fromStream $ Stream.handle (\e -> StreamK.toStream (f e)) (StreamK.toStream s)

Resource Management

bracketIO :: forall (m :: Type -> Type) b c a. (MonadIO m, MonadCatch m) => IO b -> (b -> IO c) -> (b -> StreamK m a) -> StreamK m a Source #

Like Streamly.Data.Stream.bracketIO but with one significant difference, this function observes exceptions from the consumer of the stream as well. Therefore, it cleans up the resource promptly when the consumer encounters an exception.

You can also convert StreamK to Stream and use resource handling from Stream module:

>>> bracketIO bef aft bet = StreamK.fromStream $ Stream.bracketIO bef aft (StreamK.toStream . bet)

Deprecated

hoist :: (Monad m, Monad n) => (forall x. m x -> n x) -> StreamK m a -> StreamK n a Source #

Deprecated: Please use morphInner instead.

parseBreakChunks :: (Monad m, Unbox a) => ParserK (Array a) m b -> StreamK m (Array a) -> m (Either ParseError b, StreamK m (Array a)) Source #

Deprecated: Use Streamly.Data.Array.parseBreak instead

Run a ParserK over a chunked StreamK and return the parse result and the remaining Stream.

parseChunks :: (Monad m, Unbox a) => ParserK (Array a) m b -> StreamK m (Array a) -> m (Either ParseError b) Source #

Deprecated: Use Streamly.Data.Array.parse instead

parseBreakChunksGeneric :: Monad m => ParserK (Array a) m b -> StreamK m (Array a) -> m (Either ParseError b, StreamK m (Array a)) Source #

Deprecated: Use Streamly.Data.Array.Generic.parseBreak

Similar to parseBreak but works on generic arrays

parseChunksGeneric :: Monad m => ParserK (Array a) m b -> StreamK m (Array a) -> m (Either ParseError b) Source #

Deprecated: Use Streamly.Data.Array.Generic.parse