Copyright | (c) 2017 Composewell Technologies |
---|---|
License | BSD3 |
Maintainer | streamly@composewell.com |
Stability | released |
Portability | GHC |
Safe Haskell | None |
Language | Haskell2010 |
Streamly.Data.Stream
Description
The Stream
type represents a producer of a sequence of values. Its dual,
Fold
, represents a consumer. While both types support
similar transformations, the key difference is that only Stream
can
compose multiple producers, and only Fold
can compose multiple consumers.
Console Echo Example
To get you started, here is an example of a program which reads lines from console and writes them back to the console.
>>>
import Data.Function ((&))
>>>
:{
echo = Stream.repeatM getLine -- Stream IO String & Stream.mapM putStrLn -- Stream IO () & Stream.fold Fold.drain -- IO () :}
This is a simple example of a declarative representation of an imperative
loop using streaming combinators.
In this example, repeatM
generates an infinite stream of String
s by
repeatedly performing the getLine
IO action. mapM
then applies
putStrLn
on each element in the stream converting it to stream of ()
.
Finally, drain
fold
s the stream to IO discarding the
() values, thus producing only effects.
This gives you an idea about how we can program declaratively by
representing loops using streams. Compare this declarative loopless approach
with an imperative approach using a while
loop for writing the same
program. In this module, you can find all Data.List-like functions and
many more powerful combinators to perform common programming tasks.
Static Stream Fusion
The Stream
type represents streams as state machines. When composed
statically, these state machines fuse together at compile time, eliminating
intermediate data structures and function calls. This results in the
generation of tight, efficient loops comparable to those written in
low-level languages like C. For instance, in the earlier example, operations
like repeatM
and mapM
are written as separate fragments but fuse into a
single, optimized loop.
The primary goal of the Stream
type is to build highly efficient streams
via compile-time fusion of modular loop fragments. However, this technique
comes with trade-offs and should be used with care. Stream construction
operations such as cons
, append
, interleave
, mergeBy
, and zipWith
work extremely well at a small scale. But at a large scale, their
performance degrades due to O(n^2) complexity, where n
is the number of
compositions.
Therefore, it's best to generate a fused stream in one go, if possible.
While using a small number of composition operations is absolutely fine,
avoid using large number of composition operations. For example, do not try
to construct a fused Stream
by using cons
rescursively. However, you can
use cons
and any other construction operations on
the CPS StreamK
type without any problem. The CPS construction operations
have linear (O(n)) performance characteristics and scale much better, though
they are not as efficient as fused streams due to function call overhead at
each step.
When used correctly, the fused Stream
type can be 10x to 100x faster
than CPS-based streams, depending on the use case.
Rule of Thumb: Use the fused Stream
type when the number of
compositions is small and they are static or compile-time. Use the CPS-based
StreamK
type when the number of compositions is large or potentially
infinite, and they are dynamic or composed at runtime. Both types are fully
interconvertible, allowing you to choose the best tool for each part of your
pipeline.
Better and Effectful Lists
This module offers operations analogous to standard Haskell lists from the
base
package. Streams can be viewed as a generalization of lists —
providing all the functionality of standard lists, plus additional
capabilities such as effectful operations and improved performance through
stream fusion. They can easily replace lists in most contexts, and go
beyond where lists fall short.
For instance, a common limitation of lists is the inability to perform IO actions (e.g., printing) at arbitrary points during processing. Streams naturally support such effectful operations.
As discussed in the fusion section above, while the Stream
type is not
consable and appendable at scale, the StreamK
type is consable and
appendable at scale.
Non-determinism and List Transformers
Streamly does not provide a ListT
like Monad instance but it provides all
the equivalent functionality and more. We do not provide a Monad instance
for streams, as there are many possible ways to define the bind operation.
Instead, we offer bind-style operations such as concatFor
, concatForM
,
and their variants (e.g. fair interleaving and breadth-first nesting). These
can be used for convenient ListT-style stream composition. Additionally, we
provide applicative-style cross product operations like cross
and its
variants which are many times faster than the monad style operations.
Logic Programming
Streamly does not provide a LogicT
-style Monad instance, but it offers all
the equivalent functionality—and more. Operations like fairCross
and
fairConcatFor
nest outer and inner streams fairly, ensuring that no stream
is starved when exploring cross products.
This enables balanced exploration across all dimensions in backtracking
problems, while also supporting infinite streams. It effectively replaces the
core functionality of LogicT
from the logict
package, with significantly
better performance. In particular, it avoids the quadratic slowdown seen with
observeMany
, and the applicative fairCross
runs many times faster,
achieving loop nesting performance comparable to C.
Synopsis
- data Stream (m :: Type -> Type) a
- nil :: forall (m :: Type -> Type) a. Applicative m => Stream m a
- nilM :: Applicative m => m b -> Stream m a
- cons :: forall (m :: Type -> Type) a. Applicative m => a -> Stream m a -> Stream m a
- consM :: Applicative m => m a -> Stream m a -> Stream m a
- unfoldr :: forall (m :: Type -> Type) s a. Monad m => (s -> Maybe (a, s)) -> s -> Stream m a
- unfoldrM :: Monad m => (s -> m (Maybe (a, s))) -> s -> Stream m a
- fromPure :: forall (m :: Type -> Type) a. Applicative m => a -> Stream m a
- fromEffect :: Applicative m => m a -> Stream m a
- iterate :: forall (m :: Type -> Type) a. Monad m => (a -> a) -> a -> Stream m a
- iterateM :: Monad m => (a -> m a) -> m a -> Stream m a
- repeat :: forall (m :: Type -> Type) a. Monad m => a -> Stream m a
- repeatM :: Monad m => m a -> Stream m a
- replicate :: forall (m :: Type -> Type) a. Monad m => Int -> a -> Stream m a
- replicateM :: Monad m => Int -> m a -> Stream m a
- class Enum a => Enumerable a where
- enumerateFrom :: forall (m :: Type -> Type). Monad m => a -> Stream m a
- enumerateFromTo :: forall (m :: Type -> Type). Monad m => a -> a -> Stream m a
- enumerateFromThen :: forall (m :: Type -> Type). Monad m => a -> a -> Stream m a
- enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => a -> a -> a -> Stream m a
- enumerate :: forall (m :: Type -> Type) a. (Monad m, Bounded a, Enumerable a) => Stream m a
- enumerateTo :: forall (m :: Type -> Type) a. (Monad m, Bounded a, Enumerable a) => a -> Stream m a
- fromList :: forall (m :: Type -> Type) a. Applicative m => [a] -> Stream m a
- unfold :: forall (m :: Type -> Type) a b. Applicative m => Unfold m a b -> a -> Stream m b
- uncons :: Monad m => Stream m a -> m (Maybe (a, Stream m a))
- fold :: Monad m => Fold m a b -> Stream m a -> m b
- foldBreak :: Monad m => Fold m a b -> Stream m a -> m (b, Stream m a)
- parse :: Monad m => Parser a m b -> Stream m a -> m (Either ParseError b)
- parseBreak :: Monad m => Parser a m b -> Stream m a -> m (Either ParseError b, Stream m a)
- parsePos :: Monad m => Parser a m b -> Stream m a -> m (Either ParseErrorPos b)
- parseBreakPos :: Monad m => Parser a m b -> Stream m a -> m (Either ParseErrorPos b, Stream m a)
- foldrM :: Monad m => (a -> m b -> m b) -> m b -> Stream m a -> m b
- foldr :: Monad m => (a -> b -> b) -> b -> Stream m a -> m b
- toList :: Monad m => Stream m a -> m [a]
- sequence :: Monad m => Stream m (m a) -> Stream m a
- mapM :: Monad m => (a -> m b) -> Stream m a -> Stream m b
- trace :: Monad m => (a -> m b) -> Stream m a -> Stream m a
- tap :: forall (m :: Type -> Type) a b. Monad m => Fold m a b -> Stream m a -> Stream m a
- delay :: forall (m :: Type -> Type) a. MonadIO m => Double -> Stream m a -> Stream m a
- scanl :: forall (m :: Type -> Type) a b. Monad m => Scanl m a b -> Stream m a -> Stream m b
- postscanl :: forall (m :: Type -> Type) a b. Monad m => Scanl m a b -> Stream m a -> Stream m b
- indexed :: forall (m :: Type -> Type) a. Monad m => Stream m a -> Stream m (Int, a)
- insertBy :: forall (m :: Type -> Type) a. Monad m => (a -> a -> Ordering) -> a -> Stream m a -> Stream m a
- intersperseM :: Monad m => m a -> Stream m a -> Stream m a
- intersperseM_ :: Monad m => m b -> Stream m a -> Stream m a
- intersperse :: forall (m :: Type -> Type) a. Monad m => a -> Stream m a -> Stream m a
- mapMaybe :: forall (m :: Type -> Type) a b. Monad m => (a -> Maybe b) -> Stream m a -> Stream m b
- mapMaybeM :: Monad m => (a -> m (Maybe b)) -> Stream m a -> Stream m b
- filter :: forall (m :: Type -> Type) a. Monad m => (a -> Bool) -> Stream m a -> Stream m a
- filterM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a
- catMaybes :: forall (m :: Type -> Type) a. Monad m => Stream m (Maybe a) -> Stream m a
- catLefts :: forall (m :: Type -> Type) a b. Monad m => Stream m (Either a b) -> Stream m a
- catRights :: forall (m :: Type -> Type) a b. Monad m => Stream m (Either a b) -> Stream m b
- catEithers :: forall (m :: Type -> Type) a. Monad m => Stream m (Either a a) -> Stream m a
- take :: forall (m :: Type -> Type) a. Applicative m => Int -> Stream m a -> Stream m a
- takeWhile :: forall (m :: Type -> Type) a. Monad m => (a -> Bool) -> Stream m a -> Stream m a
- takeWhileM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a
- drop :: forall (m :: Type -> Type) a. Monad m => Int -> Stream m a -> Stream m a
- dropWhile :: forall (m :: Type -> Type) a. Monad m => (a -> Bool) -> Stream m a -> Stream m a
- dropWhileM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a
- append :: forall (m :: Type -> Type) a. Monad m => Stream m a -> Stream m a -> Stream m a
- interleave :: forall (m :: Type -> Type) a. Monad m => Stream m a -> Stream m a -> Stream m a
- mergeBy :: forall (m :: Type -> Type) a. Monad m => (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
- mergeByM :: Monad m => (a -> a -> m Ordering) -> Stream m a -> Stream m a -> Stream m a
- zipWith :: forall (m :: Type -> Type) a b c. Monad m => (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
- zipWithM :: Monad m => (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
- crossWith :: forall (m :: Type -> Type) a b c. Monad m => (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
- cross :: forall (m :: Type -> Type) a b. Monad m => Stream m a -> Stream m b -> Stream m (a, b)
- fairCross :: forall (m :: Type -> Type) a b. Monad m => Stream m a -> Stream m b -> Stream m (a, b)
- unfoldEach :: forall (m :: Type -> Type) a b. Monad m => Unfold m a b -> Stream m a -> Stream m b
- bfsUnfoldEach :: forall (m :: Type -> Type) a b. Monad m => Unfold m a b -> Stream m a -> Stream m b
- fairUnfoldEach :: forall (m :: Type -> Type) a b. Monad m => Unfold m a b -> Stream m a -> Stream m b
- unfoldEachSepBySeq :: forall (m :: Type -> Type) b c. Monad m => b -> Unfold m b c -> Stream m b -> Stream m c
- unfoldEachEndBySeq :: forall (m :: Type -> Type) b c. Monad m => b -> Unfold m b c -> Stream m b -> Stream m c
- concatEffect :: Monad m => m (Stream m a) -> Stream m a
- concatMap :: forall (m :: Type -> Type) a b. Monad m => (a -> Stream m b) -> Stream m a -> Stream m b
- concatMapM :: Monad m => (a -> m (Stream m b)) -> Stream m a -> Stream m b
- fairConcatMap :: forall (m :: Type -> Type) a b. Monad m => (a -> Stream m b) -> Stream m a -> Stream m b
- concatFor :: forall (m :: Type -> Type) a b. Monad m => Stream m a -> (a -> Stream m b) -> Stream m b
- fairConcatFor :: forall (m :: Type -> Type) a b. Monad m => Stream m a -> (a -> Stream m b) -> Stream m b
- concatForM :: Monad m => Stream m a -> (a -> m (Stream m b)) -> Stream m b
- fairConcatForM :: Monad m => Stream m a -> (a -> m (Stream m b)) -> Stream m b
- foldMany :: forall (m :: Type -> Type) a b. Monad m => Fold m a b -> Stream m a -> Stream m b
- groupsOf :: forall (m :: Type -> Type) a b. Monad m => Int -> Fold m a b -> Stream m a -> Stream m b
- parseMany :: forall (m :: Type -> Type) a b. Monad m => Parser a m b -> Stream m a -> Stream m (Either ParseError b)
- splitSepBy_ :: forall (m :: Type -> Type) a b. Monad m => (a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
- splitSepBySeq_ :: forall (m :: Type -> Type) a b. (MonadIO m, Unbox a, Enum a, Eq a) => Array a -> Fold m a b -> Stream m a -> Stream m b
- splitEndBySeq :: forall (m :: Type -> Type) a b. (MonadIO m, Unbox a, Enum a, Eq a) => Array a -> Fold m a b -> Stream m a -> Stream m b
- splitEndBySeq_ :: forall (m :: Type -> Type) a b. (MonadIO m, Unbox a, Enum a, Eq a) => Array a -> Fold m a b -> Stream m a -> Stream m b
- wordsBy :: forall (m :: Type -> Type) a b. Monad m => (a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
- reverse :: forall (m :: Type -> Type) a. Monad m => Stream m a -> Stream m a
- unionBy :: forall (m :: Type -> Type) a. MonadIO m => (a -> a -> Bool) -> Stream m a -> Stream m a -> Stream m a
- eqBy :: Monad m => (a -> b -> Bool) -> Stream m a -> Stream m b -> m Bool
- cmpBy :: Monad m => (a -> b -> Ordering) -> Stream m a -> Stream m b -> m Ordering
- isPrefixOf :: (Monad m, Eq a) => Stream m a -> Stream m a -> m Bool
- isInfixOf :: (MonadIO m, Eq a, Enum a, Unbox a) => Stream m a -> Stream m a -> m Bool
- isSubsequenceOf :: (Monad m, Eq a) => Stream m a -> Stream m a -> m Bool
- stripPrefix :: (Monad m, Eq a) => Stream m a -> Stream m a -> m (Maybe (Stream m a))
- onException :: MonadCatch m => m b -> Stream m a -> Stream m a
- handle :: (MonadCatch m, Exception e) => (e -> m (Stream m a)) -> Stream m a -> Stream m a
- before :: Monad m => m b -> Stream m a -> Stream m a
- afterIO :: forall (m :: Type -> Type) b a. MonadIO m => IO b -> Stream m a -> Stream m a
- finallyIO :: forall (m :: Type -> Type) b a. (MonadIO m, MonadCatch m) => IO b -> Stream m a -> Stream m a
- finallyIO' :: forall (m :: Type -> Type) b a. MonadIO m => AcquireIO -> IO b -> Stream m a -> Stream m a
- finallyIO'' :: forall (m :: Type -> Type) b a. (MonadIO m, MonadCatch m) => AcquireIO -> IO b -> Stream m a -> Stream m a
- bracketIO :: forall (m :: Type -> Type) b c a. (MonadIO m, MonadCatch m) => IO b -> (b -> IO c) -> (b -> Stream m a) -> Stream m a
- bracketIO' :: forall (m :: Type -> Type) b c a. MonadIO m => AcquireIO -> IO b -> (b -> IO c) -> (b -> Stream m a) -> Stream m a
- bracketIO'' :: forall (m :: Type -> Type) b c a. (MonadIO m, MonadCatch m) => AcquireIO -> IO b -> (b -> IO c) -> (b -> Stream m a) -> Stream m a
- bracketIO3 :: forall (m :: Type -> Type) b c d e a. (MonadIO m, MonadCatch m) => IO b -> (b -> IO c) -> (b -> IO d) -> (b -> IO e) -> (b -> Stream m a) -> Stream m a
- morphInner :: Monad n => (forall x. m x -> n x) -> Stream m a -> Stream n a
- liftInner :: forall (m :: Type -> Type) (t :: (Type -> Type) -> Type -> Type) a. (Monad m, MonadTrans t, Monad (t m)) => Stream m a -> Stream (t m) a
- runReaderT :: Monad m => m s -> Stream (ReaderT s m) a -> Stream m a
- runStateT :: Monad m => m s -> Stream (StateT s m) a -> Stream m (s, a)
- scan :: forall (m :: Type -> Type) a b. Monad m => Fold m a b -> Stream m a -> Stream m b
- scanMaybe :: forall (m :: Type -> Type) a b. Monad m => Fold m a (Maybe b) -> Stream m a -> Stream m b
- postscan :: forall (m :: Type -> Type) a b. Monad m => Fold m a b -> Stream m a -> Stream m b
- splitOn :: forall (m :: Type -> Type) a b. Monad m => (a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
- unfoldMany :: forall (m :: Type -> Type) a b. Monad m => Unfold m a b -> Stream m a -> Stream m b
- intercalate :: forall (m :: Type -> Type) b c. Monad m => Unfold m b c -> b -> Stream m b -> Stream m c
- intercalateSuffix :: forall (m :: Type -> Type) b c. Monad m => Unfold m b c -> b -> Stream m b -> Stream m c
- chunksOf :: forall (m :: Type -> Type) a. (MonadIO m, Unbox a) => Int -> Stream m a -> Stream m (Array a)
Setup
To execute the code examples provided in this module in ghci, please run the following commands first.
>>>
:m
>>>
import Control.Concurrent (threadDelay)
>>>
import Control.Monad (void, when)
>>>
import Control.Monad.IO.Class (MonadIO (liftIO))
>>>
import Control.Monad.Trans.Class (lift)
>>>
import Control.Monad.Trans.Identity (runIdentityT)
>>>
import Data.Char (isSpace)
>>>
import Data.Either (fromLeft, fromRight, isLeft, isRight, either)
>>>
import Data.Maybe (fromJust, isJust)
>>>
import Data.Function (fix, (&))
>>>
import Data.Functor.Identity (runIdentity)
>>>
import Data.IORef
>>>
import Data.Semigroup (cycle1)
>>>
import Data.Word (Word8, Word16)
>>>
import GHC.Exts (Ptr (Ptr))
>>>
import System.IO (stdout, hClose, hSetBuffering, openFile, BufferMode(LineBuffering), IOMode(..))
>>>
hSetBuffering stdout LineBuffering
>>>
effect n = print n >> return n
>>>
import Streamly.Data.Stream (Stream)
>>>
import qualified Streamly.Data.Array as Array
>>>
import qualified Streamly.Data.Fold as Fold
>>>
import qualified Streamly.Data.Scanl as Scanl
>>>
import qualified Streamly.Data.Stream as Stream
>>>
import qualified Streamly.Data.StreamK as StreamK
>>>
import qualified Streamly.Data.Unfold as Unfold
>>>
import qualified Streamly.Data.Parser as Parser
>>>
import qualified Streamly.FileSystem.DirIO as Dir
For APIs that have not been released yet.
>>>
import qualified Streamly.Internal.Control.Exception as Exception
>>>
import qualified Streamly.Internal.FileSystem.Path as Path
>>>
import qualified Streamly.Internal.Data.Scanr as Scanr
>>>
import qualified Streamly.Internal.Data.Scanl as Scanl
>>>
import qualified Streamly.Internal.Data.Fold as Fold
>>>
import qualified Streamly.Internal.Data.Parser as Parser
>>>
import qualified Streamly.Internal.Data.Stream as Stream
>>>
import qualified Streamly.Internal.Data.StreamK as StreamK
>>>
import qualified Streamly.Internal.Data.Unfold as Unfold
>>>
import qualified Streamly.Internal.FileSystem.DirIO as Dir
Overview
The Stream Type
data Stream (m :: Type -> Type) a Source #
A stream consists of a step function that generates the next step given a current state, and the current state.
Instances
Construction
Functions ending in the general shape b -> Stream m a
.
Useful Idioms:
>>>
fromIndices f = fmap f $ Stream.enumerateFrom 0
>>>
fromIndicesM f = Stream.mapM f $ Stream.enumerateFrom 0
>>>
fromListM = Stream.sequence . Stream.fromList
>>>
fromFoldable = StreamK.toStream . StreamK.fromFoldable
>>>
fromFoldableM = Stream.sequence . fromFoldable
Primitives
These primitives are meant to statically fuse a small number of stream
elements. The Stream
type is never constructed at large scale using
these primitives. Use StreamK
if you need to construct a stream from
primitives.
nil :: forall (m :: Type -> Type) a. Applicative m => Stream m a Source #
A stream that terminates without producing any output or side effect.
>>>
Stream.toList Stream.nil
[]
nilM :: Applicative m => m b -> Stream m a Source #
A stream that terminates without producing any output, but produces a side effect.
>>>
nilM action = Stream.before action Stream.nil
>>>
Stream.fold Fold.toList (Stream.nilM (print "nil"))
"nil" []
Pre-release
cons :: forall (m :: Type -> Type) a. Applicative m => a -> Stream m a -> Stream m a infixr 5 Source #
WARNING! O(n^2) time complexity wrt number of elements. Use the O(n)
complexity StreamK.cons
unless you want to
statically fuse just a few elements.
Fuse a pure value at the head of an existing stream::
>>>
s = 1 `Stream.cons` Stream.fromList [2,3]
>>>
Stream.toList s
[1,2,3]
Definition:
>>>
cons x xs = return x `Stream.consM` xs
consM :: Applicative m => m a -> Stream m a -> Stream m a infixr 5 Source #
Like cons
but fuses an effect instead of a pure value.
Unfolding
unfoldrM
is the most general way of generating a stream efficiently.
All other generation operations can be expressed using it.
unfoldr :: forall (m :: Type -> Type) s a. Monad m => (s -> Maybe (a, s)) -> s -> Stream m a Source #
Build a stream by unfolding a pure step function step
starting from a
seed s
. The step function returns the next element in the stream and the
next seed value. When it is done it returns Nothing
and the stream ends.
For example,
>>>
:{
let f b = if b > 2 then Nothing else Just (b, b + 1) in Stream.toList $ Stream.unfoldr f 0 :} [0,1,2]
unfoldrM :: Monad m => (s -> m (Maybe (a, s))) -> s -> Stream m a Source #
Build a stream by unfolding a monadic step function starting from a
seed. The step function returns the next element in the stream and the next
seed value. When it is done it returns Nothing
and the stream ends. For
example,
>>>
:{
let f b = if b > 2 then return Nothing else return (Just (b, b + 1)) in Stream.toList $ Stream.unfoldrM f 0 :} [0,1,2]
Singleton
fromPure :: forall (m :: Type -> Type) a. Applicative m => a -> Stream m a Source #
Create a singleton stream from a pure value.
>>>
fromPure a = a `Stream.cons` Stream.nil
>>>
fromPure = pure
>>>
fromPure = Stream.fromEffect . pure
fromEffect :: Applicative m => m a -> Stream m a Source #
Create a singleton stream from a monadic action.
>>>
fromEffect m = m `Stream.consM` Stream.nil
>>>
fromEffect = Stream.sequence . Stream.fromPure
>>>
Stream.fold Fold.drain $ Stream.fromEffect (putStrLn "hello")
hello
Iteration
Generate a monadic stream from a seed value or values.
iterate :: forall (m :: Type -> Type) a. Monad m => (a -> a) -> a -> Stream m a Source #
Generate an infinite stream with x
as the first element and each
successive element derived by applying the function f
on the previous
element.
>>>
Stream.toList $ Stream.take 5 $ Stream.iterate (+1) 1
[1,2,3,4,5]
iterateM :: Monad m => (a -> m a) -> m a -> Stream m a Source #
Generate an infinite stream with the first element generated by the action
m
and each successive element derived by applying the monadic function f
on the previous element.
>>>
:{
Stream.iterateM (\x -> print x >> return (x + 1)) (return 0) & Stream.take 3 & Stream.toList :} 0 1 [0,1,2]
repeat :: forall (m :: Type -> Type) a. Monad m => a -> Stream m a Source #
Generate an infinite stream by repeating a pure value.
>>>
repeat = Stream.iterate id
>>>
repeat x = Stream.repeatM (pure x)
repeatM :: Monad m => m a -> Stream m a Source #
>>>
repeatM act = Stream.iterateM (const act) act
>>>
repeatM = Stream.sequence . Stream.repeat
Generate a stream by repeatedly executing a monadic action forever.
>>>
:{
repeatAction = Stream.repeatM (threadDelay 1000000 >> print 1) & Stream.take 10 & Stream.fold Fold.drain :}
replicate :: forall (m :: Type -> Type) a. Monad m => Int -> a -> Stream m a Source #
>>>
replicate n = Stream.take n . Stream.repeat
>>>
replicate n x = Stream.replicateM n (pure x)
Generate a stream of length n
by repeating a value n
times.
replicateM :: Monad m => Int -> m a -> Stream m a Source #
>>>
replicateM n = Stream.sequence . Stream.replicate n
Generate a stream by performing a monadic action n
times.
Enumeration
Enumerable
type class is to streams as Enum
is to lists. Enum
provides functions to generate a list, Enumerable provides similar
functions to generate a stream instead.
It is much more efficient to use Enumerable
directly than enumerating
to a list and converting it to stream. The following works but is not
particularly efficient:
>>>
f from next = Stream.fromList $ Prelude.enumFromThen from next
Note: For lists, using enumeration functions e.g. enumFromThen
turns out to be slightly faster than the idioms like [from, then..]
.
class Enum a => Enumerable a where Source #
Types that can be enumerated as a stream. The operations in this type
class are equivalent to those in the Enum
type class, except that these
generate a stream instead of a list. Use the functions in
Streamly.Internal.Data.Stream.Enumeration module to define new instances.
Methods
enumerateFrom :: forall (m :: Type -> Type). Monad m => a -> Stream m a Source #
enumerateFrom from
generates a stream starting with the element
from
, enumerating up to maxBound
when the type is Bounded
or
generating an infinite stream when the type is not Bounded
.
>>>
Stream.toList $ Stream.take 4 $ Stream.enumerateFrom (0 :: Int)
[0,1,2,3]
For Fractional
types, enumeration is numerically stable. However, no
overflow or underflow checks are performed.
>>>
Stream.toList $ Stream.take 4 $ Stream.enumerateFrom 1.1
[1.1,2.1,3.1,4.1]
enumerateFromTo :: forall (m :: Type -> Type). Monad m => a -> a -> Stream m a Source #
Generate a finite stream starting with the element from
, enumerating
the type up to the value to
. If to
is smaller than from
then an
empty stream is returned.
>>>
Stream.toList $ Stream.enumerateFromTo 0 4
[0,1,2,3,4]
For Fractional
types, the last element is equal to the specified to
value after rounding to the nearest integral value.
>>>
Stream.toList $ Stream.enumerateFromTo 1.1 4
[1.1,2.1,3.1,4.1]
>>>
Stream.toList $ Stream.enumerateFromTo 1.1 4.6
[1.1,2.1,3.1,4.1,5.1]
enumerateFromThen :: forall (m :: Type -> Type). Monad m => a -> a -> Stream m a Source #
enumerateFromThen from then
generates a stream whose first element
is from
, the second element is then
and the successive elements are
in increments of then - from
. Enumeration can occur downwards or
upwards depending on whether then
comes before or after from
. For
Bounded
types the stream ends when maxBound
is reached, for
unbounded types it keeps enumerating infinitely.
>>>
Stream.toList $ Stream.take 4 $ Stream.enumerateFromThen 0 2
[0,2,4,6]
>>>
Stream.toList $ Stream.take 4 $ Stream.enumerateFromThen 0 (-2)
[0,-2,-4,-6]
enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => a -> a -> a -> Stream m a Source #
enumerateFromThenTo from then to
generates a finite stream whose
first element is from
, the second element is then
and the successive
elements are in increments of then - from
up to to
. Enumeration can
occur downwards or upwards depending on whether then
comes before or
after from
.
>>>
Stream.toList $ Stream.enumerateFromThenTo 0 2 6
[0,2,4,6]
>>>
Stream.toList $ Stream.enumerateFromThenTo 0 (-2) (-6)
[0,-2,-4,-6]
Instances
Enumerable Int16 Source # | |
Defined in Streamly.Internal.Data.Stream.Generate Methods enumerateFrom :: forall (m :: Type -> Type). Monad m => Int16 -> Stream m Int16 Source # enumerateFromTo :: forall (m :: Type -> Type). Monad m => Int16 -> Int16 -> Stream m Int16 Source # enumerateFromThen :: forall (m :: Type -> Type). Monad m => Int16 -> Int16 -> Stream m Int16 Source # enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Int16 -> Int16 -> Int16 -> Stream m Int16 Source # | |
Enumerable Int32 Source # | |
Defined in Streamly.Internal.Data.Stream.Generate Methods enumerateFrom :: forall (m :: Type -> Type). Monad m => Int32 -> Stream m Int32 Source # enumerateFromTo :: forall (m :: Type -> Type). Monad m => Int32 -> Int32 -> Stream m Int32 Source # enumerateFromThen :: forall (m :: Type -> Type). Monad m => Int32 -> Int32 -> Stream m Int32 Source # enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Int32 -> Int32 -> Int32 -> Stream m Int32 Source # | |
Enumerable Int64 Source # | |
Defined in Streamly.Internal.Data.Stream.Generate Methods enumerateFrom :: forall (m :: Type -> Type). Monad m => Int64 -> Stream m Int64 Source # enumerateFromTo :: forall (m :: Type -> Type). Monad m => Int64 -> Int64 -> Stream m Int64 Source # enumerateFromThen :: forall (m :: Type -> Type). Monad m => Int64 -> Int64 -> Stream m Int64 Source # enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Int64 -> Int64 -> Int64 -> Stream m Int64 Source # | |
Enumerable Int8 Source # | |
Defined in Streamly.Internal.Data.Stream.Generate Methods enumerateFrom :: forall (m :: Type -> Type). Monad m => Int8 -> Stream m Int8 Source # enumerateFromTo :: forall (m :: Type -> Type). Monad m => Int8 -> Int8 -> Stream m Int8 Source # enumerateFromThen :: forall (m :: Type -> Type). Monad m => Int8 -> Int8 -> Stream m Int8 Source # enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Int8 -> Int8 -> Int8 -> Stream m Int8 Source # | |
Enumerable Word16 Source # | |
Defined in Streamly.Internal.Data.Stream.Generate Methods enumerateFrom :: forall (m :: Type -> Type). Monad m => Word16 -> Stream m Word16 Source # enumerateFromTo :: forall (m :: Type -> Type). Monad m => Word16 -> Word16 -> Stream m Word16 Source # enumerateFromThen :: forall (m :: Type -> Type). Monad m => Word16 -> Word16 -> Stream m Word16 Source # enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Word16 -> Word16 -> Word16 -> Stream m Word16 Source # | |
Enumerable Word32 Source # | |
Defined in Streamly.Internal.Data.Stream.Generate Methods enumerateFrom :: forall (m :: Type -> Type). Monad m => Word32 -> Stream m Word32 Source # enumerateFromTo :: forall (m :: Type -> Type). Monad m => Word32 -> Word32 -> Stream m Word32 Source # enumerateFromThen :: forall (m :: Type -> Type). Monad m => Word32 -> Word32 -> Stream m Word32 Source # enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Word32 -> Word32 -> Word32 -> Stream m Word32 Source # | |
Enumerable Word64 Source # | |
Defined in Streamly.Internal.Data.Stream.Generate Methods enumerateFrom :: forall (m :: Type -> Type). Monad m => Word64 -> Stream m Word64 Source # enumerateFromTo :: forall (m :: Type -> Type). Monad m => Word64 -> Word64 -> Stream m Word64 Source # enumerateFromThen :: forall (m :: Type -> Type). Monad m => Word64 -> Word64 -> Stream m Word64 Source # enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Word64 -> Word64 -> Word64 -> Stream m Word64 Source # | |
Enumerable Word8 Source # | |
Defined in Streamly.Internal.Data.Stream.Generate Methods enumerateFrom :: forall (m :: Type -> Type). Monad m => Word8 -> Stream m Word8 Source # enumerateFromTo :: forall (m :: Type -> Type). Monad m => Word8 -> Word8 -> Stream m Word8 Source # enumerateFromThen :: forall (m :: Type -> Type). Monad m => Word8 -> Word8 -> Stream m Word8 Source # enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Word8 -> Word8 -> Word8 -> Stream m Word8 Source # | |
Enumerable Ordering Source # | |
Defined in Streamly.Internal.Data.Stream.Generate Methods enumerateFrom :: forall (m :: Type -> Type). Monad m => Ordering -> Stream m Ordering Source # enumerateFromTo :: forall (m :: Type -> Type). Monad m => Ordering -> Ordering -> Stream m Ordering Source # enumerateFromThen :: forall (m :: Type -> Type). Monad m => Ordering -> Ordering -> Stream m Ordering Source # enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Ordering -> Ordering -> Ordering -> Stream m Ordering Source # | |
Enumerable Integer Source # | |
Defined in Streamly.Internal.Data.Stream.Generate Methods enumerateFrom :: forall (m :: Type -> Type). Monad m => Integer -> Stream m Integer Source # enumerateFromTo :: forall (m :: Type -> Type). Monad m => Integer -> Integer -> Stream m Integer Source # enumerateFromThen :: forall (m :: Type -> Type). Monad m => Integer -> Integer -> Stream m Integer Source # enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Integer -> Integer -> Integer -> Stream m Integer Source # | |
Enumerable Natural Source # | |
Defined in Streamly.Internal.Data.Stream.Generate Methods enumerateFrom :: forall (m :: Type -> Type). Monad m => Natural -> Stream m Natural Source # enumerateFromTo :: forall (m :: Type -> Type). Monad m => Natural -> Natural -> Stream m Natural Source # enumerateFromThen :: forall (m :: Type -> Type). Monad m => Natural -> Natural -> Stream m Natural Source # enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Natural -> Natural -> Natural -> Stream m Natural Source # | |
Enumerable () Source # | |
Defined in Streamly.Internal.Data.Stream.Generate Methods enumerateFrom :: forall (m :: Type -> Type). Monad m => () -> Stream m () Source # enumerateFromTo :: forall (m :: Type -> Type). Monad m => () -> () -> Stream m () Source # enumerateFromThen :: forall (m :: Type -> Type). Monad m => () -> () -> Stream m () Source # enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => () -> () -> () -> Stream m () Source # | |
Enumerable Bool Source # | |
Defined in Streamly.Internal.Data.Stream.Generate Methods enumerateFrom :: forall (m :: Type -> Type). Monad m => Bool -> Stream m Bool Source # enumerateFromTo :: forall (m :: Type -> Type). Monad m => Bool -> Bool -> Stream m Bool Source # enumerateFromThen :: forall (m :: Type -> Type). Monad m => Bool -> Bool -> Stream m Bool Source # enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Bool -> Bool -> Bool -> Stream m Bool Source # | |
Enumerable Char Source # | |
Defined in Streamly.Internal.Data.Stream.Generate Methods enumerateFrom :: forall (m :: Type -> Type). Monad m => Char -> Stream m Char Source # enumerateFromTo :: forall (m :: Type -> Type). Monad m => Char -> Char -> Stream m Char Source # enumerateFromThen :: forall (m :: Type -> Type). Monad m => Char -> Char -> Stream m Char Source # enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Char -> Char -> Char -> Stream m Char Source # | |
Enumerable Double Source # | |
Defined in Streamly.Internal.Data.Stream.Generate Methods enumerateFrom :: forall (m :: Type -> Type). Monad m => Double -> Stream m Double Source # enumerateFromTo :: forall (m :: Type -> Type). Monad m => Double -> Double -> Stream m Double Source # enumerateFromThen :: forall (m :: Type -> Type). Monad m => Double -> Double -> Stream m Double Source # enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Double -> Double -> Double -> Stream m Double Source # | |
Enumerable Float Source # | |
Defined in Streamly.Internal.Data.Stream.Generate Methods enumerateFrom :: forall (m :: Type -> Type). Monad m => Float -> Stream m Float Source # enumerateFromTo :: forall (m :: Type -> Type). Monad m => Float -> Float -> Stream m Float Source # enumerateFromThen :: forall (m :: Type -> Type). Monad m => Float -> Float -> Stream m Float Source # enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Float -> Float -> Float -> Stream m Float Source # | |
Enumerable Int Source # | |
Defined in Streamly.Internal.Data.Stream.Generate Methods enumerateFrom :: forall (m :: Type -> Type). Monad m => Int -> Stream m Int Source # enumerateFromTo :: forall (m :: Type -> Type). Monad m => Int -> Int -> Stream m Int Source # enumerateFromThen :: forall (m :: Type -> Type). Monad m => Int -> Int -> Stream m Int Source # enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Int -> Int -> Int -> Stream m Int Source # | |
Enumerable Word Source # | |
Defined in Streamly.Internal.Data.Stream.Generate Methods enumerateFrom :: forall (m :: Type -> Type). Monad m => Word -> Stream m Word Source # enumerateFromTo :: forall (m :: Type -> Type). Monad m => Word -> Word -> Stream m Word Source # enumerateFromThen :: forall (m :: Type -> Type). Monad m => Word -> Word -> Stream m Word Source # enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Word -> Word -> Word -> Stream m Word Source # | |
Enumerable a => Enumerable (Identity a) Source # | |
Defined in Streamly.Internal.Data.Stream.Generate Methods enumerateFrom :: forall (m :: Type -> Type). Monad m => Identity a -> Stream m (Identity a) Source # enumerateFromTo :: forall (m :: Type -> Type). Monad m => Identity a -> Identity a -> Stream m (Identity a) Source # enumerateFromThen :: forall (m :: Type -> Type). Monad m => Identity a -> Identity a -> Stream m (Identity a) Source # enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Identity a -> Identity a -> Identity a -> Stream m (Identity a) Source # | |
Integral a => Enumerable (Ratio a) Source # | |
Defined in Streamly.Internal.Data.Stream.Generate Methods enumerateFrom :: forall (m :: Type -> Type). Monad m => Ratio a -> Stream m (Ratio a) Source # enumerateFromTo :: forall (m :: Type -> Type). Monad m => Ratio a -> Ratio a -> Stream m (Ratio a) Source # enumerateFromThen :: forall (m :: Type -> Type). Monad m => Ratio a -> Ratio a -> Stream m (Ratio a) Source # enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Ratio a -> Ratio a -> Ratio a -> Stream m (Ratio a) Source # | |
HasResolution a => Enumerable (Fixed a) Source # | |
Defined in Streamly.Internal.Data.Stream.Generate Methods enumerateFrom :: forall (m :: Type -> Type). Monad m => Fixed a -> Stream m (Fixed a) Source # enumerateFromTo :: forall (m :: Type -> Type). Monad m => Fixed a -> Fixed a -> Stream m (Fixed a) Source # enumerateFromThen :: forall (m :: Type -> Type). Monad m => Fixed a -> Fixed a -> Stream m (Fixed a) Source # enumerateFromThenTo :: forall (m :: Type -> Type). Monad m => Fixed a -> Fixed a -> Fixed a -> Stream m (Fixed a) Source # |
enumerate :: forall (m :: Type -> Type) a. (Monad m, Bounded a, Enumerable a) => Stream m a Source #
enumerateTo :: forall (m :: Type -> Type) a. (Monad m, Bounded a, Enumerable a) => a -> Stream m a Source #
From Containers
Convert an input structure, container or source into a stream. All of these can be expressed in terms of primitives.
fromList :: forall (m :: Type -> Type) a. Applicative m => [a] -> Stream m a Source #
Construct a stream from a list of pure values.
From Unfolds
Most of the above stream generation operations can also be expressed using the corresponding unfolds in the Streamly.Data.Unfold module.
unfold :: forall (m :: Type -> Type) a b. Applicative m => Unfold m a b -> a -> Stream m b Source #
Convert an Unfold
into a stream by supplying it an input seed.
>>>
s = Stream.unfold Unfold.replicateM (3, putStrLn "hello")
>>>
Stream.fold Fold.drain s
hello hello hello
Elimination
Functions ending in the general shape Stream m a -> m b
or Stream m
a -> m (b, Stream m a)
Primitives
uncons :: Monad m => Stream m a -> m (Maybe (a, Stream m a)) Source #
Decompose a stream into its head and tail. If the stream is empty, returns
Nothing
. If the stream is non-empty, returns Just (a, ma)
, where a
is
the head of the stream and ma
its tail.
Properties:
>>>
Nothing <- Stream.uncons Stream.nil
>>>
Just ("a", t) <- Stream.uncons (Stream.cons "a" Stream.nil)
This can be used to consume the stream in an imperative manner one element at a time, as it just breaks down the stream into individual elements and we can loop over them as we deem fit. For example, this can be used to convert a streamly stream into other stream types.
All the folds in this module can be expressed in terms of uncons
, however,
this is generally less efficient than specific folds because it takes apart
the stream one element at a time, therefore, does not take adavantage of
stream fusion.
foldBreak
is a more general way of consuming a stream piecemeal.
>>>
:{
uncons xs = do r <- Stream.foldBreak Fold.one xs return $ case r of (Nothing, _) -> Nothing (Just h, t) -> Just (h, t) :}
Strict Left Folds
fold :: Monad m => Fold m a b -> Stream m a -> m b Source #
Fold a stream using the supplied left Fold
and reducing the resulting
expression strictly at each step. The behavior is similar to foldl'
. A
Fold
can terminate early without consuming the full stream. See the
documentation of individual Fold
s for termination behavior.
Definitions:
>>>
fold f = fmap fst . Stream.foldBreak f
>>>
fold f = Stream.parse (Parser.fromFold f)
Example:
>>>
Stream.fold Fold.sum (Stream.enumerateFromTo 1 100)
5050
Parsing
parse :: Monad m => Parser a m b -> Stream m a -> m (Either ParseError b) Source #
Parse a stream using the supplied Parser
.
Parsers (See Streamly.Internal.Data.Parser) are more powerful folds that add backtracking and error functionality to terminating folds. Unlike folds, parsers may not always result in a valid output, they may result in an error. For example:
>>>
Stream.parse (Parser.takeEQ 1 Fold.drain) Stream.nil
Left (ParseError "takeEQ: Expecting exactly 1 elements, input terminated on 0")
Note: parse p
is not the same as head . parseMany p
on an empty stream.
parseBreak :: Monad m => Parser a m b -> Stream m a -> m (Either ParseError b, Stream m a) Source #
Parse a stream using the supplied Parser
.
parsePos :: Monad m => Parser a m b -> Stream m a -> m (Either ParseErrorPos b) Source #
Like parse
but includes stream position information in the error
messages.
>>>
Stream.parsePos (Parser.takeEQ 2 Fold.drain) (Stream.fromList [1])
Left (ParseErrorPos 1 "takeEQ: Expecting exactly 2 elements, input terminated on 1")
parseBreakPos :: Monad m => Parser a m b -> Stream m a -> m (Either ParseErrorPos b, Stream m a) Source #
Like parseBreak
but includes stream position information in the error
messages.
Lazy Right Folds
Consuming a stream to build a right associated expression, suitable for lazy evaluation. Evaluation of the input happens when the output of the fold is evaluated, the fold output is a lazy thunk.
This is suitable for stream transformation operations, for example, operations like mapping a function over the stream.
foldrM :: Monad m => (a -> m b -> m b) -> m b -> Stream m a -> m b Source #
Right associative/lazy pull fold. foldrM build final stream
constructs
an output structure using the step function build
. build
is invoked with
the next input element and the remaining (lazy) tail of the output
structure. It builds a lazy output expression using the two. When the "tail
structure" in the output expression is evaluated it calls build
again thus
lazily consuming the input stream
until either the output expression built
by build
is free of the "tail" or the input is exhausted in which case
final
is used as the terminating case for the output structure. For more
details see the description in the previous section.
Example, determine if any element is odd
in a stream:
>>>
s = Stream.fromList (2:4:5:undefined)
>>>
step x xs = if odd x then return True else xs
>>>
Stream.foldrM step (return False) s
True
>>>
import Control.Monad (join)
>>>
foldrM f z = join . Stream.foldr f z
foldr :: Monad m => (a -> b -> b) -> b -> Stream m a -> m b Source #
Right fold, lazy for lazy monads and pure streams, and strict for strict monads.
Please avoid using this routine in strict monads like IO unless you need a
strict right fold. This is provided only for use in lazy monads (e.g.
Identity) or pure streams. Note that with this signature it is not possible
to implement a lazy foldr when the monad m
is strict. In that case it
would be strict in its accumulator and therefore would necessarily consume
all its input.
>>>
foldr f z = Stream.foldrM (\a b -> f a <$> b) (return z)
Note: This is similar to Fold.foldr' (the right fold via left fold), but could be more efficient.
Specific Folds
Streams are folded using folds in Streamly.Data.Fold. Here are some idioms and equivalents of Data.List APIs using folds:
>>>
foldlM' f a = Stream.fold (Fold.foldlM' f a)
>>>
foldl1' f = Stream.fold (Fold.foldl1' f)
>>>
foldl' f a = Stream.fold (Fold.foldl' f a)
>>>
drain = Stream.fold Fold.drain
>>>
mapM_ f = Stream.fold (Fold.drainMapM f)
>>>
length = Stream.fold Fold.length
>>>
genericLength = Stream.fold Fold.genericLength
>>>
head = Stream.fold Fold.one
>>>
last = Stream.fold Fold.latest
>>>
null = Stream.fold Fold.null
>>>
and = Stream.fold Fold.and
>>>
or = Stream.fold Fold.or
>>>
any p = Stream.fold (Fold.any p)
>>>
all p = Stream.fold (Fold.all p)
>>>
sum = Stream.fold Fold.sum
>>>
product = Stream.fold Fold.product
>>>
maximum = Stream.fold Fold.maximum
>>>
maximumBy cmp = Stream.fold (Fold.maximumBy cmp)
>>>
minimum = Stream.fold Fold.minimum
>>>
minimumBy cmp = Stream.fold (Fold.minimumBy cmp)
>>>
elem x = Stream.fold (Fold.elem x)
>>>
notElem x = Stream.fold (Fold.notElem x)
>>>
lookup x = Stream.fold (Fold.lookup x)
>>>
find p = Stream.fold (Fold.find p)
>>>
(!?) i = Stream.fold (Fold.index i)
>>>
genericIndex i = Stream.fold (Fold.genericIndex i)
>>>
elemIndex x = Stream.fold (Fold.elemIndex x)
>>>
findIndex p = Stream.fold (Fold.findIndex p)
Some equivalents of Data.List APIs from the Stream module:
>>>
head = fmap (fmap fst) . Stream.uncons
>>>
tail = fmap (fmap snd) . Stream.uncons
>>>
tail = Stream.tail -- unreleased API
>>>
init = Stream.init -- unreleased API
A Stream based toList fold implementation is provided below because it has a better performance compared to the fold.
toList :: Monad m => Stream m a -> m [a] Source #
Definitions:
>>>
toList = Stream.foldr (:) []
>>>
toList = Stream.fold Fold.toList
Convert a stream into a list in the underlying monad. The list can be
consumed lazily in a lazy monad (e.g. Identity
). In a strict monad (e.g.
IO) the whole list is generated and buffered before it can be consumed.
Warning! working on large lists accumulated as buffers in memory could be very inefficient, consider using Streamly.Data.Array instead.
Note that this could a bit more efficient compared to Stream.fold
Fold.toList
, and it can fuse with pure list consumers.
Mapping
Stateless one-to-one transformations. Use fmap
for mapping a pure
function on a stream.
sequence :: Monad m => Stream m (m a) -> Stream m a Source #
>>>
sequence = Stream.mapM id
Replace the elements of a stream of monadic actions with the outputs of those actions.
>>>
s = Stream.fromList [putStr "a", putStr "b", putStrLn "c"]
>>>
Stream.fold Fold.drain $ Stream.sequence s
abc
mapM :: Monad m => (a -> m b) -> Stream m a -> Stream m b Source #
>>>
mapM f = Stream.sequence . fmap f
Apply a monadic function to each element of the stream and replace it with the output of the resulting action.
>>>
s = Stream.fromList ["a", "b", "c"]
>>>
Stream.fold Fold.drain $ Stream.mapM putStr s
abc
This is functional equivalent of an imperative loop.
trace :: Monad m => (a -> m b) -> Stream m a -> Stream m a Source #
Apply a monadic function to each element flowing through the stream and discard the results.
>>>
s = Stream.enumerateFromTo 1 2
>>>
Stream.fold Fold.drain $ Stream.trace print s
1 2
Compare with tap
.
tap :: forall (m :: Type -> Type) a b. Monad m => Fold m a b -> Stream m a -> Stream m a Source #
Tap the data flowing through a stream into a Fold
. For example, you may
add a tap to log the contents flowing through the stream. The fold is used
only for effects, its result is discarded.
Fold m a b | -----stream m a ---------------stream m a-----
>>>
s = Stream.enumerateFromTo 1 2
>>>
Stream.fold Fold.drain $ Stream.tap (Fold.drainMapM print) s
1 2
Compare with trace
.
delay :: forall (m :: Type -> Type) a. MonadIO m => Double -> Stream m a -> Stream m a Source #
Introduce a delay of specified seconds between elements of the stream.
Definition:
>>>
sleep n = liftIO $ threadDelay $ round $ n * 1000000
>>>
delay = Stream.intersperseM_ . sleep
Example:
>>>
input = Stream.enumerateFromTo 1 3
>>>
Stream.fold (Fold.drainMapM print) $ Stream.delay 1 input
1 2 3
Scanning
Stateful one-to-one transformations.
Scanning By Scanl
Useful idioms:
>>>
scanl' f z = Stream.scanl (Scanl.mkScanl f z)
>>>
scanlM' f z = Stream.scanl (Scanl.mkScanlM f z)
>>>
postscanl' f z = Stream.postscanl (Scanl.mkScanl f z)
>>>
postscanlM' f z = Stream.postscanl (Scanl.mkScanlM f z)
>>>
scanl1' f = Stream.catMaybes . Stream.scanl (Scanl.mkScanl1 f)
>>>
scanl1M' f = Stream.catMaybes . Stream.scanl (Scanl.mkScanl1M f)
scanl :: forall (m :: Type -> Type) a b. Monad m => Scanl m a b -> Stream m a -> Stream m b Source #
Strict left scan. Scan a stream using the given fold. Scan includes
the initial (default) value of the accumulator as well as the final value.
Compare with postscan
which omits the initial value.
>>>
s = Stream.fromList [1..10]
>>>
Stream.fold Fold.toList $ Stream.takeWhile (< 10) $ Stream.scanl Scanl.sum s
[0,1,3,6]
See also: usingStateT
postscanl :: forall (m :: Type -> Type) a b. Monad m => Scanl m a b -> Stream m a -> Stream m b Source #
Postscan a stream using the given fold. A postscan omits the initial (default) value of the accumulator and includes the final value.
>>>
Stream.toList $ Stream.postscanl Scanl.latest (Stream.fromList [])
[]
Compare with scan
which includes the initial value as well:
>>>
Stream.toList $ Stream.scanl Scanl.latest (Stream.fromList [])
[Nothing]
The following example extracts the input stream up to a point where the running average of elements is no more than 10:
>>>
import Data.Maybe (fromJust)
>>>
let avg = Scanl.teeWith (/) Scanl.sum (fmap fromIntegral Scanl.length)
>>>
s = Stream.enumerateFromTo 1.0 100.0
>>>
:{
Stream.fold Fold.toList $ fmap (fromJust . fst) $ Stream.takeWhile (\(_,x) -> x <= 10) $ Stream.postscanl (Scanl.tee Scanl.latest avg) s :} [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,11.0,12.0,13.0,14.0,15.0,16.0,17.0,18.0,19.0]
Specific scans
indexed :: forall (m :: Type -> Type) a. Monad m => Stream m a -> Stream m (Int, a) Source #
>>>
f = Scanl.mkScanl (\(i, _) x -> (i + 1, x)) (-1,undefined)
>>>
indexed = Stream.postscanl f
>>>
indexed = Stream.zipWith (,) (Stream.enumerateFrom 0)
>>>
indexedR n = fmap (\(i, a) -> (n - i, a)) . indexed
Pair each element in a stream with its index, starting from index 0.
>>>
Stream.fold Fold.toList $ Stream.indexed $ Stream.fromList "hello"
[(0,'h'),(1,'e'),(2,'l'),(3,'l'),(4,'o')]
Insertion
Add elements to the stream.
>>>
insert = Stream.insertBy compare
insertBy :: forall (m :: Type -> Type) a. Monad m => (a -> a -> Ordering) -> a -> Stream m a -> Stream m a Source #
insertBy cmp elem stream
inserts elem
before the first element in
stream
that is less than elem
when compared using cmp
.
>>>
insertBy cmp x = Stream.mergeBy cmp (Stream.fromPure x)
>>>
input = Stream.fromList [1,3,5]
>>>
Stream.fold Fold.toList $ Stream.insertBy compare 2 input
[1,2,3,5]
intersperseM :: Monad m => m a -> Stream m a -> Stream m a Source #
Effectful variant of intersperse
. Insert an effect and its output
between successive elements of a stream. It does nothing if stream has less
than two elements.
Definition:
>>>
intersperseM x = Stream.interleaveSepBy (Stream.repeatM x)
intersperseM_ :: Monad m => m b -> Stream m a -> Stream m a Source #
Perform a side effect between two successive elements of a stream. It does nothing if the stream has less than two elements.
>>>
f x y = Stream.fold Fold.drain $ Stream.trace putChar $ Stream.intersperseM_ x $ Stream.fromList y
>>>
f (putChar '.') "abc"
a.b.c>>>
f (putChar '.') "a"
a
Pre-release
intersperse :: forall (m :: Type -> Type) a. Monad m => a -> Stream m a -> Stream m a Source #
Insert a pure value between successive elements of a stream. It does nothing if stream has less than two elements.
Definition:
>>>
intersperse x = Stream.intersperseM (return x)
>>>
intersperse x = Stream.unfoldEachSepBy x Unfold.identity
>>>
intersperse x = Stream.unfoldEachSepBySeq x Unfold.identity
>>>
intersperse x = Stream.interleaveSepBy (Stream.repeat x)
Example:
>>>
f x y = Stream.toList $ Stream.intersperse x $ Stream.fromList y
>>>
f ',' "abc"
"a,b,c">>>
f ',' "a"
"a"
Filtering
Remove elements from the stream.
Stateless Filters
mapMaybeM
is the most general stateless filtering operation. All
other filtering operations can be expressed using it.
mapMaybe :: forall (m :: Type -> Type) a b. Monad m => (a -> Maybe b) -> Stream m a -> Stream m b Source #
mapMaybeM :: Monad m => (a -> m (Maybe b)) -> Stream m a -> Stream m b Source #
Like mapMaybe
but maps a monadic function.
Equivalent to:
>>>
mapMaybeM f = Stream.catMaybes . Stream.mapM f
>>>
mapM f = Stream.mapMaybeM (\x -> Just <$> f x)
filter :: forall (m :: Type -> Type) a. Monad m => (a -> Bool) -> Stream m a -> Stream m a Source #
Include only those elements that pass a predicate.
>>>
filter p = Stream.filterM (return . p)
>>>
filter p = Stream.mapMaybe (\x -> if p x then Just x else Nothing)
>>>
filter p = Stream.postscanlMaybe (Scanl.filtering p)
filterM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a Source #
Same as filter
but with a monadic predicate.
>>>
f p x = p x >>= \r -> return $ if r then Just x else Nothing
>>>
filterM p = Stream.mapMaybeM (f p)
catRights :: forall (m :: Type -> Type) a b. Monad m => Stream m (Either a b) -> Stream m b Source #
catEithers :: forall (m :: Type -> Type) a. Monad m => Stream m (Either a a) -> Stream m a Source #
Remove the either wrapper and flatten both lefts and as well as rights in the output stream.
>>>
catEithers = fmap (either id id)
Pre-release
Stateful Filters
take :: forall (m :: Type -> Type) a. Applicative m => Int -> Stream m a -> Stream m a Source #
Take first n
elements from the stream and discard the rest.
takeWhile :: forall (m :: Type -> Type) a. Monad m => (a -> Bool) -> Stream m a -> Stream m a Source #
End the stream as soon as the predicate fails on an element.
takeWhileM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a Source #
Same as takeWhile
but with a monadic predicate.
drop :: forall (m :: Type -> Type) a. Monad m => Int -> Stream m a -> Stream m a Source #
Discard first n
elements from the stream and take the rest.
dropWhile :: forall (m :: Type -> Type) a. Monad m => (a -> Bool) -> Stream m a -> Stream m a Source #
Drop elements in the stream as long as the predicate succeeds and then take the rest of the stream.
dropWhileM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a Source #
Same as dropWhile
but with a monadic predicate.
Combining Two Streams
Note that these operations are suitable for statically fusing a few streams, they have a quadratic O(n^2) time complexity wrt to the number of streams. If you want to compose many streams dynamically using binary combining operations see the corresponding operations in Streamly.Data.StreamK.
When fusing more than two streams it is more efficient if the binary operations are composed as a balanced tree rather than a right associative or left associative one e.g.:
>>>
s1 = Stream.fromList [1,2] `Stream.append` Stream.fromList [3,4]
>>>
s2 = Stream.fromList [4,5] `Stream.append` Stream.fromList [6,7]
>>>
s = s1 `Stream.append` s2
Appending
Equivalent of Data.List append:
>>>
(++) = Stream.append
append :: forall (m :: Type -> Type) a. Monad m => Stream m a -> Stream m a -> Stream m a Source #
WARNING! O(n^2) time complexity wrt number of streams. Suitable for
statically fusing a small number of streams. Use the O(n) complexity
StreamK.append
otherwise.
Fuses two streams sequentially, yielding all elements from the first stream, and then all elements from the second stream.
>>>
s1 = Stream.fromList [1,2]
>>>
s2 = Stream.fromList [3,4]
>>>
Stream.fold Fold.toList $ s1 `Stream.append` s2
[1,2,3,4]
Interleaving
interleave :: forall (m :: Type -> Type) a. Monad m => Stream m a -> Stream m a -> Stream m a Source #
WARNING! O(n^2) time complexity wrt number of streams. Suitable for
statically fusing a small number of streams. Use the O(n) complexity
StreamK.interleave
otherwise.
Interleaves two streams, yielding one element from each stream alternately, starting from the first stream. When one stream is exhausted, all the remaining elements of the other stream are emitted in the output stream.
Both the streams are completely exhausted.
(a b c) (. . .) => a . b . c . (a b c) (. . ) => a . b . c (a b ) (. . .) => a . b . .
Examples:
>>>
f x y = Stream.toList $ Stream.interleave (Stream.fromList x) (Stream.fromList y)
>>>
f "abc" "..."
"a.b.c.">>>
f "abc" ".."
"a.b.c">>>
f "ab" "..."
"a.b.."
Merging
mergeBy :: forall (m :: Type -> Type) a. Monad m => (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a Source #
WARNING! O(n^2) time complexity wrt number of streams. Suitable for
statically fusing a small number of streams. Use the O(n) complexity
StreamK.mergeBy
otherwise.
Merge two streams using a comparison function. The head elements of both the streams are compared and the smaller of the two elements is emitted, if both elements are equal then the element from the first stream is used first.
If the streams are sorted in ascending order, the resulting stream would also remain sorted in ascending order.
>>>
s1 = Stream.fromList [1,3,5]
>>>
s2 = Stream.fromList [2,4,6,8]
>>>
Stream.fold Fold.toList $ Stream.mergeBy compare s1 s2
[1,2,3,4,5,6,8]
mergeByM :: Monad m => (a -> a -> m Ordering) -> Stream m a -> Stream m a -> Stream m a Source #
Like mergeBy
but with a monadic comparison function.
Example, to merge two streams randomly:
> randomly _ _ = randomIO >>= x -> return $ if x then LT else GT > Stream.toList $ Stream.mergeByM randomly (Stream.fromList [1,1,1,1]) (Stream.fromList [2,2,2,2]) [2,1,2,2,2,1,1,1]
Example, merge two streams in a proportion of 2:1:
>>>
:set -fno-warn-unrecognised-warning-flags
>>>
:set -fno-warn-x-partial
>>>
:{
do let s1 = Stream.fromList [1,1,1,1,1,1] s2 = Stream.fromList [2,2,2] let proportionately m n = do ref <- newIORef $ cycle $ Prelude.concat [Prelude.replicate m LT, Prelude.replicate n GT] return $ \_ _ -> do r <- readIORef ref writeIORef ref $ Prelude.tail r return $ Prelude.head r f <- proportionately 2 1 xs <- Stream.fold Fold.toList $ Stream.mergeByM f s1 s2 print xs :} [1,1,2,1,1,2,1,1,2]
Zipping
Idioms and equivalents of Data.List APIs:
>>>
zip = Stream.zipWith (,)
>>>
unzip = Stream.fold (Fold.unzip Fold.toList Fold.toList)
zipWith :: forall (m :: Type -> Type) a b c. Monad m => (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c Source #
WARNING! O(n^2) time complexity wrt number of streams. Suitable for
statically fusing a small number of streams. Use the O(n) complexity
StreamK.zipWith
otherwise.
Stream a
is evaluated first, followed by stream b
, the resulting
elements a
and b
are then zipped using the supplied zip function and the
result c
is yielded to the consumer.
If stream a
or stream b
ends, the zipped stream ends. If stream b
ends
first, the element a
from previous evaluation of stream a
is discarded.
>>>
s1 = Stream.fromList [1,2,3]
>>>
s2 = Stream.fromList [4,5,6]
>>>
Stream.fold Fold.toList $ Stream.zipWith (+) s1 s2
[5,7,9]
zipWithM :: Monad m => (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c Source #
Like zipWith
but using a monadic zipping function.
Cross Product
crossWith :: forall (m :: Type -> Type) a b c. Monad m => (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c Source #
Definition:
>>>
crossWith f m1 m2 = fmap f m1 `Stream.crossApply` m2
Note that the second stream is evaluated multiple times.
Also see "Streamly.Data.Unfold.crossWith" for fast fusible static cross product option.
cross :: forall (m :: Type -> Type) a b. Monad m => Stream m a -> Stream m b -> Stream m (a, b) Source #
Given a Stream m a
and Stream m b
generate a stream with all possible
combinations of the tuple (a, b)
.
Definition:
>>>
cross = Stream.crossWith (,)
The second stream is evaluated multiple times. If that is not desired it can
be cached in an Array
and then generated from the array before
calling this function. Caching may also improve performance if the stream is
expensive to evaluate.
Time: O(m x n)
Pre-release
fairCross :: forall (m :: Type -> Type) a b. Monad m => Stream m a -> Stream m b -> Stream m (a, b) Source #
Like cross
but interleaves the outer and inner loops fairly. See
fairConcatFor
for more details.
Unfold Each
unfoldEach :: forall (m :: Type -> Type) a b. Monad m => Unfold m a b -> Stream m a -> Stream m b Source #
unfoldEach unfold stream
uses unfold
to map the input stream elements
to streams and then flattens the generated streams into a single output
stream.
Like concatMap
but uses an Unfold
for stream generation. Unlike
concatMap
this can fuse the Unfold
code with the inner loop and
therefore provide many times better performance.
>>>
concatMap f = Stream.unfoldEach (Unfold.lmap f Unfold.fromStream)
Here is an example of a two level nested loop much faster than
concatMap
based nesting.
>>>
:{
outerLoop = flip Stream.mapM (Stream.fromList [1,2,3]) $ \x -> do liftIO $ putStrLn (show x) return x innerUnfold = Unfold.carry $ Unfold.lmap (const [4,5,6]) Unfold.fromList innerLoop = flip Unfold.mapM innerUnfold $ \(x, y) -> do when (x == 1) $ liftIO $ putStrLn (show y) pure $ (x, y) :}
>>>
Stream.toList $ Stream.unfoldEach innerLoop outerLoop
1 4 5 6 2 3 [(1,4),(1,5),(1,6),(2,4),(2,5),(2,6),(3,4),(3,5),(3,6)]
bfsUnfoldEach :: forall (m :: Type -> Type) a b. Monad m => Unfold m a b -> Stream m a -> Stream m b Source #
Like unfoldEach
but interleaves the resulting streams in a breadth first
manner instead of appending them. Unfolds each element in the input stream
to a stream and then interleave the resulting streams.
>>>
lists = Stream.fromList [[1,4,7],[2,5,8],[3,6,9]]
>>>
Stream.toList $ Stream.bfsUnfoldEach Unfold.fromList lists
[1,2,3,4,5,6,7,8,9]
CAUTION! Do not use on infinite streams.
fairUnfoldEach :: forall (m :: Type -> Type) a b. Monad m => Unfold m a b -> Stream m a -> Stream m b Source #
See fairConcatFor
for more details. This is similar except that this
uses unfolds, therefore, it is much faster due to fusion.
>>>
:{
outerLoop = Stream.fromList [1,2,3] innerLoop = Unfold.carry $ Unfold.lmap (const [4,5,6]) Unfold.fromList :}
>>>
Stream.toList $ Stream.fairUnfoldEach innerLoop outerLoop
[(1,4),(1,5),(2,4),(1,6),(2,5),(3,4),(2,6),(3,5),(3,6)]
unfoldEachSepBySeq :: forall (m :: Type -> Type) b c. Monad m => b -> Unfold m b c -> Stream m b -> Stream m c Source #
Unfold each element of the stream, separate the successive unfolds by a sequence generated by unfolding the supplied value.
Definition:
>>>
unfoldEachSepBySeq a u = Stream.unfoldEach u . Stream.intersperse a
>>>
unfoldEachSepBySeq a u = Stream.intercalateSepBy u (Stream.repeat a) u
Idioms:
>>>
intersperse x = Stream.unfoldEachSepBySeq x Unfold.identity
>>>
unwords = Stream.unfoldEachSepBySeq " " Unfold.fromList
Usage:
>>>
input = Stream.fromList ["abc", "def", "ghi"]
>>>
Stream.toList $ Stream.unfoldEachSepBySeq " " Unfold.fromList input
"abc def ghi"
unfoldEachEndBySeq :: forall (m :: Type -> Type) b c. Monad m => b -> Unfold m b c -> Stream m b -> Stream m c Source #
Unfold each element of the stream, end each unfold by a sequence generated by unfolding the supplied value.
Definition:
>>>
unfoldEachEndBySeq a u = Stream.unfoldEach u . Stream.intersperseEndByM a
>>>
unfoldEachEndBySeq a u = Stream.intercalateEndBy u (Stream.repeat a) u
Idioms:
>>>
intersperseEndByM x = Stream.unfoldEachEndBySeq x Unfold.identity
>>>
unlines = Stream.unfoldEachEndBySeq "\n" Unfold.fromList
Usage:
>>>
input = Stream.fromList ["abc", "def", "ghi"]
>>>
Stream.toList $ Stream.unfoldEachEndBySeq "\n" Unfold.fromList input
"abc\ndef\nghi\n"
Stream of streams
Stream operations like map and filter represent loops in
imperative programming terms. Similarly, the imperative concept of
nested loops are represented by streams of streams. The concatMap
operation represents nested looping.
A concatMap
operation loops over the input stream (outer loop),
generating a stream from each element of the stream. Then it loops over
each element of the generated streams (inner loop), collecting them in a
single output stream.
One dimension loops are just a special case of nested loops. For example map and filter can be expressed using concatMap:
>>>
map f = Stream.concatMap (Stream.fromPure . f)
>>>
filter p = Stream.concatMap (\x -> if p x then Stream.fromPure x else Stream.nil)
Idioms and equivalents of Data.List APIs:
>>>
concat = Stream.concatMap id
>>>
cycle = Stream.concatMap Stream.fromList . Stream.repeat
concatMap :: forall (m :: Type -> Type) a b. Monad m => (a -> Stream m b) -> Stream m a -> Stream m b Source #
Map a stream producing function on each element of the stream and then flatten the results into a single stream.
>>>
concatMap f = Stream.concat . fmap f
>>>
concatMap f = Stream.concatMapM (return . f)
>>>
concatMap f = Stream.unfoldEach (Unfold.lmap f Unfold.fromStream)
See argument flipped version concatFor
for more detailed documentation.
NOTE: We recommend using unfoldEach
or unfoldCross
instead of
concatMap
especially in performance critical code. unfoldEach
is much
faster than concatMap
and matches its expressive power in terms of
generating dependent inner streams, there is one important distinction
though: the nesting structure when using unfoldEach
is fixed statically in
the code. In contrast, concatMap
allows dynamic and arbitrary nesting
through monadic composition. This means that deeply nested or
programmatically determined levels of nesting are easier to express and
compose with concatMap
, though often at the cost of performance and
fusion.
concatMapM :: Monad m => (a -> m (Stream m b)) -> Stream m a -> Stream m b Source #
Map a stream producing monadic function on each element of the stream
and then flatten the results into a single stream. Since the stream
generation function is monadic, unlike concatMap
, it can produce an
effect at the beginning of each iteration of the inner loop.
See unfoldEach
for a faster alternative.
fairConcatMap :: forall (m :: Type -> Type) a b. Monad m => (a -> Stream m b) -> Stream m a -> Stream m b Source #
See fairConcatFor
for documentation.
concatFor :: forall (m :: Type -> Type) a b. Monad m => Stream m a -> (a -> Stream m b) -> Stream m b Source #
Map a stream generating function on each element of a stream and
concatenate the results. This is the same as the bind function of the monad
instance. It is just a flipped concatMap
but more convenient to use for
nested use case, feels like an imperative for
loop. It is in fact
equivalent to concat . for
.
>>>
concatFor = flip Stream.concatMap
A concatenating for
loop:
>>>
:{
Stream.toList $ Stream.concatFor (Stream.fromList [1,2,3]) $ \x -> Stream.fromPure x :} [1,2,3]
Use unfoldEach
instead of concatFor
where possible, unfoldEach is much
faster due to fusion.
Nested concatenating for
loops:
>>>
:{
Stream.toList $ Stream.concatFor (Stream.fromList [1,2,3]) $ \x -> Stream.concatFor (Stream.fromList [4,5,6]) $ \y -> Stream.fromPure (x, y) :} [(1,4),(1,5),(1,6),(2,4),(2,5),(2,6),(3,4),(3,5),(3,6)]
If total iterations are kept the same, each increase in the nesting level increases the cost by roughly 2 times.
For significantly faster multi-level nesting, prefer using the better
fusible, applicative-like crossWith
over concatFor
where possible.
concatFor
is monad-like: it allows expressing dependencies between the
outer and the inner loops of the nesting, it means that the stream generated
by the inner loop is dynamically governed by the outer loop. This expressive
power comes at a significant performance cost.
NOTE: We recommend using unfoldEach
or unfoldCross
instead of
concatFor
especially in performance critical code. unfoldEach
is much
faster than concatFor
and matches its expressive power in terms of
generating dependent inner streams, there is one important distinction
though: the nesting structure when using unfoldEach
is fixed statically in
the code. In contrast, concatFor
allows dynamic and arbitrary nesting
through monadic composition. This means that deeply nested or
programmatically determined levels of nesting are easier to express and
compose with concatFor
, though often at the cost of performance and
fusion.
fairConcatFor :: forall (m :: Type -> Type) a b. Monad m => Stream m a -> (a -> Stream m b) -> Stream m b Source #
fairConcatFor
is like concatFor
but traverses the depth and breadth of
nesting equally. Therefore, the outer and the inner loops in a nested loop
get equal priority. It can be used to nest infinite streams without starving
outer streams due to inner ones.
Given a stream of three streams:
1. [1,2,3] 2. [4,5,6] 3. [7,8,9]
Here, outer loop is the stream of streams and the inner loops are the
individual streams. The traversal sweeps the diagonals in the above grid to
give equal chance to outer and inner loops. The resulting stream is
(1),(2,4),(3,5,7),(6,8),(9)
, diagonals are parenthesized for emphasis.
Looping
A single stream case is equivalent to concatFor
:
>>>
Stream.toList $ Stream.fairConcatFor (Stream.fromList [1,2]) $ \x -> Stream.fromPure x
[1,2]
Fair Nested Looping
Multiple streams nest like for
loops. The result is a cross product of the
streams. However, the ordering of the results of the cross product is such
that each stream gets consumed equally. In other words, inner iterations of
a nested loop get the same priority as the outer iterations. Inner
iterations do not finish completely before the outer iterations start.
>>>
:{
Stream.toList $ do Stream.fairConcatFor (Stream.fromList [1,2,3]) $ \x -> Stream.fairConcatFor (Stream.fromList [4,5,6]) $ \y -> Stream.fromPure (x, y) :} [(1,4),(1,5),(2,4),(1,6),(2,5),(3,4),(2,6),(3,5),(3,6)]
Nesting Infinite Streams
Example with infinite streams. Print all pairs in the cross product with sum less than a specified number.
>>>
:{
Stream.toList $ Stream.takeWhile (\(x,y) -> x + y < 6) $ Stream.fairConcatFor (Stream.fromList [1..]) $ \x -> Stream.fairConcatFor (Stream.fromList [1..]) $ \y -> Stream.fromPure (x, y) :} [(1,1),(1,2),(2,1),(1,3),(2,2),(3,1),(1,4),(2,3),(3,2),(4,1)]
How the nesting works?
If we look at the cross product of [1,2,3], [4,5,6], the streams being
combined using fairConcatFor
are the following sequential loop iterations:
(1,4) (1,5) (1,6) -- first iteration of the outer loop (2,4) (2,5) (2,6) -- second iteration of the outer loop (3,4) (3,5) (3,6) -- third iteration of the outer loop
The result is a triangular or diagonal traversal of these iterations:
[(1,4),(1,5),(2,4),(1,6),(2,5),(3,4),(2,6),(3,5),(3,6)]
Non-Termination Cases
If one of the two interleaved streams does not produce an output at all and continues forever then the other stream will never get scheduled. This is because a stream is unscheduled only after it produces an output. This can lead to non-terminating programs, an example is provided below.
>>>
:{
oddsIf x = Stream.fromList (if x then [1,3..] else [2,4..]) filterEven x = if even x then Stream.fromPure x else Stream.nil :}
>>>
:{
evens = Stream.fairConcatFor (Stream.fromList [True,False]) $ \r -> Stream.concatFor (oddsIf r) filterEven :}
The evens
function does not terminate because, when r is True, the nested
concatFor
is a non-productive infinite loop, therefore, the outer loop
never gets a chance to generate the False
value.
But the following refactoring of the above code works as expected:
>>>
:{
mixed = Stream.fairConcatFor (Stream.fromList [True,False]) $ \r -> Stream.concatFor (oddsIf r) Stream.fromPure :}
>>>
evens = Stream.fairConcatFor mixed filterEven
>>>
Stream.toList $ Stream.take 3 $ evens
[2,4,6]
This works because in mixed
both the streams being interleaved are
productive.
Care should be taken how you write your program, keep in mind the scheduling
implications. To avoid such scheduling problems in serial interleaving, you
can use fairSchedFor
or concurrent scheduling i.e. parFairConcatFor. Due
to concurrent scheduling the other branch will make progress even if one is
an infinite loop producing nothing.
Logic Programming
Streamly provides all operations for logic programming. It provides
functionality equivalent to LogicT
type from the logict
package.
The MonadLogic
operations can be implemented using the available stream
operations. For example, uncons
is msplit
, interleave
corresponds to
the interleave
operation of MonadLogic, fairConcatFor
is the
fair bind (>>-
) operation. fairSchedFor
is an even better alternative
for fair bind, it guarantees that non-productive infinite streams cannot
block progress.
Related Operations
See also "Streamly.Internal.Data.StreamK.fairConcatFor".
concatForM :: Monad m => Stream m a -> (a -> m (Stream m b)) -> Stream m b Source #
Like concatFor
but maps an effectful function. It allows conveniently
mixing monadic effects with streams.
>>>
import Control.Monad.IO.Class (liftIO)
>>>
:{
Stream.toList $ Stream.concatForM (Stream.fromList [1,2,3]) $ \x -> do liftIO $ putStrLn (show x) pure $ Stream.fromPure x :} 1 2 3 [1,2,3]
Nested concatentating for
loops:
>>>
:{
Stream.toList $ Stream.concatForM (Stream.fromList [1,2,3]) $ \x -> do liftIO $ putStrLn (show x) pure $ Stream.concatForM (Stream.fromList [4,5,6]) $ \y -> do when (x == 1) $ liftIO $ putStrLn (show y) pure $ Stream.fromPure (x, y) :} 1 4 5 6 2 3 [(1,4),(1,5),(1,6),(2,4),(2,5),(2,6),(3,4),(3,5),(3,6)]
fairConcatForM :: Monad m => Stream m a -> (a -> m (Stream m b)) -> Stream m b Source #
See fairConcatFor
for documentation.
Repeated Fold
Idioms and equivalents of Data.List APIs:
>>>
groupsOf n = Stream.foldMany (Fold.take n Fold.toList)
>>>
groupBy eq = Stream.groupsWhile eq Fold.toList
>>>
groupBy eq = Stream.parseMany (Parser.groupBy eq Fold.toList)
>>>
groupsByRolling eq = Stream.parseMany (Parser.groupByRolling eq Fold.toList)
>>>
groups = groupBy (==)
foldMany :: forall (m :: Type -> Type) a b. Monad m => Fold m a b -> Stream m a -> Stream m b Source #
Apply a terminating Fold
repeatedly on a stream and emit the results in
the output stream. If the last fold is empty, it's result is not emitted.
This means if the input stream is empty the result is also an empty stream.
See foldManyPost
for an alternate behavior which always results in a
non-empty stream even if the input stream is empty.
Definition:
>>>
foldMany f = Stream.parseMany (Parser.fromFold f)
Example, empty stream, omits the empty fold value:
>>>
f = Fold.take 2 Fold.toList
>>>
fmany = Stream.fold Fold.toList . Stream.foldMany f
>>>
fmany $ Stream.fromList []
[]
Example, omits the last empty fold value:
>>>
fmany $ Stream.fromList [1..4]
[[1,2],[3,4]]
Example, last fold non-empty:
>>>
fmany $ Stream.fromList [1..5]
[[1,2],[3,4],[5]]
Note that using a closed fold e.g. Fold.take 0
, would result in an
infinite stream on a non-empty input stream.
groupsOf :: forall (m :: Type -> Type) a b. Monad m => Int -> Fold m a b -> Stream m a -> Stream m b Source #
Group the input stream into groups of n
elements each and then fold each
group using the provided fold function.
Definition:
>>>
groupsOf n f = Stream.foldMany (Fold.take n f)
Usage:
>>>
Stream.toList $ Stream.groupsOf 2 Fold.toList (Stream.enumerateFromTo 1 10)
[[1,2],[3,4],[5,6],[7,8],[9,10]]
This can be considered as an n-fold version of take
where we apply
take
repeatedly on the leftover stream until the stream exhausts.
parseMany :: forall (m :: Type -> Type) a b. Monad m => Parser a m b -> Stream m a -> Stream m (Either ParseError b) Source #
Apply a Parser
repeatedly on a stream and emit the parsed values in the
output stream.
Usage:
>>>
s = Stream.fromList [1..10]
>>>
parser = Parser.takeBetween 0 2 Fold.sum
>>>
Stream.toList $ Stream.parseMany parser s
[Right 3,Right 7,Right 11,Right 15,Right 19]
This is the streaming equivalent of the many
parse
combinator.
Known Issues: When the parser fails there is no way to get the remaining stream.
Splitting
Idioms and equivalents of Data.List APIs:
>>>
splitEndBy p f = Stream.foldMany (Fold.takeEndBy p f)
>>>
splitEndBy_ p f = Stream.foldMany (Fold.takeEndBy_ p f)
>>>
lines = splitEndBy_ (== '\n')
>>>
words = Stream.wordsBy isSpace
>>>
splitAt n = Stream.fold (Fold.splitAt n Fold.toList Fold.toList)
>>>
span p = Parser.splitWith (,) (Parser.takeWhile p Fold.toList) (Parser.fromFold Fold.toList)
>>>
break p = span (not . p)
splitSepBy_ :: forall (m :: Type -> Type) a b. Monad m => (a -> Bool) -> Fold m a b -> Stream m a -> Stream m b Source #
Split on an infixed separator element, dropping the separator. The
supplied Fold
is applied on the split segments. Splits the stream on
separator elements determined by the supplied predicate, separator is
considered as infixed between two segments:
Definition:
Usage:
>>>
splitOn p xs = Stream.fold Fold.toList $ Stream.splitSepBy_ p Fold.toList (Stream.fromList xs)
>>>
splitOn (== '.') "a.b"
["a","b"]
Splitting an empty stream results in an empty stream i.e. zero splits:
>>>
splitOn (== '.') ""
[]
If the stream does not contain the separator then it results in a single split:
>>>
splitOn (== '.') "abc"
["abc"]
If one or both sides of the separator are missing then the empty segment on that side is folded to the default output of the fold:
>>>
splitOn (== '.') "."
["",""]
>>>
splitOn (== '.') ".a"
["","a"]
>>>
splitOn (== '.') "a."
["a",""]
>>>
splitOn (== '.') "a..b"
["a","","b"]
splitSepBy_
is an inverse of unfoldEachSepBy
:
Stream.unfoldEachSepBy '.' Unfold.fromList . Stream.splitSepBy_ (== '.') Fold.toList === id
Assuming the input stream does not contain the separator:
Stream.splitSepBy_ (== '.') Fold.toList . Stream.unfoldEachSepBy '.' Unfold.fromList === id
splitSepBySeq_ :: forall (m :: Type -> Type) a b. (MonadIO m, Unbox a, Enum a, Eq a) => Array a -> Fold m a b -> Stream m a -> Stream m b Source #
Like splitSepBy_
but splits the stream on a sequence of elements rather than
a single element. Parses a sequence of tokens separated by an infixed
separator e.g. a;b;c
is parsed as a
, b
, c
. If the pattern is empty
then each element is a match, thus the fold is finalized on each element.
>>>
splitSepBy p xs = Stream.fold Fold.toList $ Stream.splitSepBySeq_ (Array.fromList p) Fold.toList (Stream.fromList xs)
>>>
splitSepBy "" ""
[]
>>>
splitSepBy "" "a...b"
["a",".",".",".","b"]
>>>
splitSepBy ".." ""
[]
>>>
splitSepBy ".." "a...b"
["a",".b"]
>>>
splitSepBy ".." "abc"
["abc"]
>>>
splitSepBy ".." ".."
["",""]
>>>
splitSepBy "." ".a"
["","a"]
>>>
splitSepBy "." "a."
["a",""]
Uses Rabin-Karp algorithm for substring search.
splitEndBySeq :: forall (m :: Type -> Type) a b. (MonadIO m, Unbox a, Enum a, Eq a) => Array a -> Fold m a b -> Stream m a -> Stream m b Source #
Parses a sequence of tokens suffixed by a separator e.g. a;b;c;
is
parsed as a;
, b;
, c;
. If the pattern is empty the input stream is
returned as it is.
Equivalent to the following:
>>>
splitEndBySeq pat f = Stream.foldMany (Fold.takeEndBySeq pat f)
Usage:
>>>
f p = Stream.splitEndBySeq (Array.fromList p) Fold.toList
>>>
splitEndBy p xs = Stream.fold Fold.toList $ f p (Stream.fromList xs)
>>>
splitEndBy "" ""
[]
>>>
splitEndBy "" "a...b"
["a",".",".",".","b"]
>>>
splitEndBy ".." ""
[]
>>>
splitEndBy ".." "a...b"
["a..",".b"]
>>>
splitEndBy ".." "abc"
["abc"]
>>>
splitEndBy ".." ".."
[".."]
>>>
splitEndBy "." ".a"
[".","a"]
>>>
splitEndBy "." "a."
["a."]
Uses Rabin-Karp algorithm for substring search.
splitEndBySeq_ :: forall (m :: Type -> Type) a b. (MonadIO m, Unbox a, Enum a, Eq a) => Array a -> Fold m a b -> Stream m a -> Stream m b Source #
Like splitEndBySeq
but drops the separators and returns only the tokens.
Equivalent to the following:
>>>
splitEndBySeq_ pat f = Stream.foldMany (Fold.takeEndBySeq_ pat f)
Usage:
>>>
f p = Stream.splitEndBySeq_ (Array.fromList p) Fold.toList
>>>
splitEndBy_ p xs = Stream.fold Fold.toList $ f p (Stream.fromList xs)
>>>
splitEndBy_ "" ""
[]
>>>
splitEndBy_ "" "a...b"
["a",".",".",".","b"]
>>>
splitEndBy_ ".." ""
[]
>>>
splitEndBy_ ".." "a...b"
["a",".b"]
>>>
splitEndBy_ ".." "abc"
["abc"]
>>>
splitEndBy_ ".." ".."
[""]
>>>
splitEndBy_ "." ".a"
["","a"]
>>>
splitEndBy_ "." "a."
["a"]
Uses Rabin-Karp algorithm for substring search.
wordsBy :: forall (m :: Type -> Type) a b. Monad m => (a -> Bool) -> Fold m a b -> Stream m a -> Stream m b Source #
Split the stream after stripping leading, trailing, and repeated separators determined by the predicate supplied. The tokens after splitting are collected by the supplied fold. In other words, the tokens are parsed in the same way as words are parsed from whitespace separated text.
>>>
f x = Stream.toList $ Stream.wordsBy (== '.') Fold.toList $ Stream.fromList x
>>>
f "a.b"
["a","b"]>>>
f "a..b"
["a","b"]>>>
f ".a..b."
["a","b"]
Buffered Operations
Operations that require buffering of the stream. Reverse is essentially a left fold followed by an unfold.
Idioms and equivalents of Data.List APIs:
>>>
nub = Stream.ordNub -- unreleased API
>>>
sortBy = StreamK.sortBy
>>>
sortOn f = StreamK.sortOn -- unreleased API
>>>
deleteFirstsBy = Stream.deleteFirstsBy -- unreleased
>>>
(\\) = Stream.deleteFirstsBy (==) -- unreleased
>>>
intersectBy = Stream.intersectBy -- unreleased
>>>
intersect = Stream.intersectBy (==) -- unreleased
>>>
unionBy = Stream.unionBy -- unreleased
>>>
union = Stream.unionBy (==) -- unreleased
reverse :: forall (m :: Type -> Type) a. Monad m => Stream m a -> Stream m a Source #
Returns the elements of the stream in reverse order. The stream must be finite. Note that this necessarily buffers the entire stream in memory.
Definition:
>>>
reverse m = Stream.concatEffect $ Stream.fold Fold.toListRev m >>= return . Stream.fromList
unionBy :: forall (m :: Type -> Type) a. MonadIO m => (a -> a -> Bool) -> Stream m a -> Stream m a -> Stream m a Source #
Returns the first stream appended with those unique elements from the
second stream that are not already present in the first stream. Note that
this is not a commutative operation unlike a set union, argument order
matters. The behavior is similar to unionBy
.
Equivalent to the following except that s2
is evaluated only once:
>>>
unionBy eq s1 s2 = s1 `Stream.append` Stream.deleteFirstsBy eq s1 (Stream.ordNub s2)
Example:
>>>
f s1 s2 = Stream.fold Fold.toList $ Stream.unionBy (==) (Stream.fromList s1) (Stream.fromList s2)
>>>
f [1,2,2,4] [1,1,2,3,3]
[1,2,2,4,3]
First stream can be infinite, but second stream must be finite. Note that if
the first stream is infinite the union means just the first stream. Thus
union is useful only when both streams are finite. See sortedUnionBy
where
union can work on infinite streams if they are sorted.
Space: O(n)
Time: O(m x n)
Pre-release
Multi-Stream folds
Operations that consume multiple streams at the same time.
eqBy :: Monad m => (a -> b -> Bool) -> Stream m a -> Stream m b -> m Bool Source #
Compare two streams for equality
cmpBy :: Monad m => (a -> b -> Ordering) -> Stream m a -> Stream m b -> m Ordering Source #
Compare two streams lexicographically.
isPrefixOf :: (Monad m, Eq a) => Stream m a -> Stream m a -> m Bool Source #
Returns True
if the first stream is the same as or a prefix of the
second. A stream is a prefix of itself.
>>>
Stream.isPrefixOf (Stream.fromList "hello") (Stream.fromList "hello" :: Stream IO Char)
True
isInfixOf :: (MonadIO m, Eq a, Enum a, Unbox a) => Stream m a -> Stream m a -> m Bool Source #
Returns True
if the first stream is an infix of the second. A stream is
considered an infix of itself.
>>>
s = Stream.fromList "hello" :: Stream IO Char
>>>
Stream.isInfixOf s s
True
Space: O(n)
worst case where n
is the length of the infix.
Pre-release
Requires Storable
constraint
isSubsequenceOf :: (Monad m, Eq a) => Stream m a -> Stream m a -> m Bool Source #
Returns True
if all the elements of the first stream occur, in order, in
the second stream. The elements do not have to occur consecutively. A stream
is a subsequence of itself.
>>>
Stream.isSubsequenceOf (Stream.fromList "hlo") (Stream.fromList "hello" :: Stream IO Char)
True
stripPrefix :: (Monad m, Eq a) => Stream m a -> Stream m a -> m (Maybe (Stream m a)) Source #
stripPrefix prefix input
strips the prefix
stream from the input
stream if it is a prefix of input. Returns Nothing
if the input does not
start with the given prefix, stripped input otherwise. Returns Just nil
when the prefix is the same as the input stream.
Space: O(1)
Exceptions
Scope: Note that the stream exception handling routines
(catch and handle) observe exceptions only in the stream segment (i.e.
functions with the Stream
type) of the pipeline and not in the
consumer segments (i.e. functions with Fold
or Parser
types). For
example, if we are folding or parsing a stream - any exceptions in the
fold or parser code won't be observed by the stream exception handlers.
Exceptions in the fold code can be handled using similar exception
handling routines found in the Streamly.Data.Fold module. To observe
exceptions in the entire pipeline, you can wrap the stream elimination
effect itself in a monad level exception handler (e.g. Stream.fold
Fold.drain
).catch
...
Most of these combinators inhibit stream fusion, therefore, when possible, they should be called in an outer loop to mitigate the cost. For example, instead of calling them on a stream of chars call them on a stream of arrays before flattening it to a stream of chars.
onException :: MonadCatch m => m b -> Stream m a -> Stream m a Source #
Run the action m b
if the stream evaluation is aborted due to an
exception. The exception is not caught, simply rethrown.
Observes exceptions only in the stream generation, and not in stream consumers.
Inhibits stream fusion
handle :: (MonadCatch m, Exception e) => (e -> m (Stream m a)) -> Stream m a -> Stream m a Source #
When evaluating a stream if an exception occurs, stream evaluation aborts and the specified exception handler is run with the exception as argument. The exception is caught and handled unless the handler decides to rethrow it. Note that exception handling is not applied to the stream returned by the exception handler.
Observes exceptions only in the stream generation, and not in stream consumers.
Inhibits stream fusion
Resource Management
bracket
is the most general resource management operation, all other
resource management operations can be expressed using it. These
functions have IO suffix because the allocation and cleanup functions
are IO actions. For generalized allocation and cleanup functions, see
the functions without the IO suffix in the streamly
package.
Scope: Note that these operations bracket only the stream-segment in a pipeline, they do not cover the stream-consumer (e.g. folds). This means that if an exception occurs in the consumer of the stream (e.g. in a fold or parser driven by the stream) then the exception won't be observed by the stream resource handlers, in such cases the resource stream cleanup handler runs when the stream is garbage collected.
To observe exceptions in the entire pipline, put a monad level resource
bracket around the stream elimination effect (e.g. around (Stream.fold
Fold.sum)
).
See also the Streamly.Control.Exception module for general resource management operations in non-stream as well as stream code.
before :: Monad m => m b -> Stream m a -> Stream m a Source #
Run the action m b
before the stream yields its first element.
Same as the following but more efficient due to fusion:
>>>
before action xs = Stream.concatMap (const xs) (Stream.fromEffect action)
afterIO :: forall (m :: Type -> Type) b a. MonadIO m => IO b -> Stream m a -> Stream m a Source #
Run the action IO b
whenever the stream is evaluated to completion, or
if it is garbage collected after a partial lazy evaluation.
The semantics of the action IO b
are similar to the semantics of cleanup
action in bracketIO
.
See also afterUnsafe
finallyIO :: forall (m :: Type -> Type) b a. (MonadIO m, MonadCatch m) => IO b -> Stream m a -> Stream m a Source #
Run the action IO b
whenever the stream stream stops normally, aborts
due to an exception or if it is garbage collected after a partial lazy
evaluation.
The semantics of running the action IO b
are similar to the cleanup action
semantics described in bracketIO
.
>>>
finallyIO release stream = Stream.bracketIO (return ()) (const release) (const stream)
See also finallyIO' for stricter resource release guarantees.
See also finallyUnsafe
Inhibits stream fusion
finallyIO' :: forall (m :: Type -> Type) b a. MonadIO m => AcquireIO -> IO b -> Stream m a -> Stream m a Source #
Like finallyIO, based on bracketIO' semantics.
finallyIO'' :: forall (m :: Type -> Type) b a. (MonadIO m, MonadCatch m) => AcquireIO -> IO b -> Stream m a -> Stream m a Source #
Like finallyIO, based on bracketIO'' semantics.
bracketIO :: forall (m :: Type -> Type) b c a. (MonadIO m, MonadCatch m) => IO b -> (b -> IO c) -> (b -> Stream m a) -> Stream m a Source #
The alloc action IO b
is executed with async exceptions disabled but keeping
blocking operations interruptible (see mask
). Uses the
output b
of the IO action as input to the function b -> Stream m a
to
generate an output stream.
b
is usually a resource allocated under the IO monad, e.g. a file handle, that
requires a cleanup after use. The cleanup is done using the b -> IO c
action. bracketIO guarantees that the allocated resource is eventually (see
details below) cleaned up even in the face of sync or async exceptions. If
an exception occurs it is not caught, simply rethrown.
bracketIO
only guarantees that the cleanup action runs, and it runs with
async exceptions enabled. The action must ensure that it can successfully
cleanup the resource in the face of sync or async exceptions.
Best case: Cleanup happens immediately in the following cases:
- the stream is consumed completely
- an exception occurs in the bracketed part of the pipeline
Worst case: In the following cases cleanup is deferred to GC.
- the bracketed stream is partially consumed and abandoned
- pipeline is aborted due to an exception outside the bracket
Use Streamly.Control.Exception.withAcquireIO
for covering the entire pipeline with guaranteed cleanup at the end of
bracket.
Observes exceptions only in the stream generation, and not in stream consumers.
See also: bracketUnsafe
Inhibits stream fusion
bracketIO' :: forall (m :: Type -> Type) b c a. MonadIO m => AcquireIO -> IO b -> (b -> IO c) -> (b -> Stream m a) -> Stream m a Source #
Like bracketIO
but requires an AcquireIO
reference in the underlying monad
of the stream, and guarantees that all resources are freed before the
scope of the monad level resource manager
(Streamly.Control.Exception.withAcquireIO
)
ends. Where fusion matters, this combinator can be much faster than bracketIO
as it
allows stream fusion.
Best case: Cleanup happens immediately if the stream is consumed completely.
Worst case: In the following cases cleanup is guaranteed to occur at the end of the monad level bracket. However, if a GC occurs then cleanup will occur even earlier than that.
- the bracketed stream is partially consumed and abandoned
- pipeline is aborted due to an exception
This is the recommended default bracket operation.
Note: You can use acquire
directly, instead of using this combinator, if
you don’t need to release the resource when the stream ends. However, if
you're using the stream inside another stream (like with concatMap), you
usually do want to release it at the end of the stream.
Allows stream fusion
bracketIO'' :: forall (m :: Type -> Type) b c a. (MonadIO m, MonadCatch m) => AcquireIO -> IO b -> (b -> IO c) -> (b -> Stream m a) -> Stream m a Source #
Like bracketIO, the only difference is that there is a guarantee that the
resources will be freed at the end of the monad level bracket
(AcquireIO
).
Best case: Cleanup happens immediately in the following cases:
- the stream is consumed completely
- an exception occurs in the bracketed part of the pipeline
Worst case: In the following cases cleanup is guaranteed to occur at the end of the monad level bracket. However, if a GC occurs before that then cleanup will occur early.
- the bracketed stream is partially consumed and abandoned
- pipeline is aborted due to an exception outside the bracket
Note: Instead of using this combinator you can directly use
acquire
if you do not care about releasing the resource at the end of the stream
and if you are not recovering from an exception using handle
. You may want
to care about releasing the resource at the end of a stream if you are using
it in a nested manner (e.g. in concatMap).
Inhibits stream fusion
bracketIO3 :: forall (m :: Type -> Type) b c d e a. (MonadIO m, MonadCatch m) => IO b -> (b -> IO c) -> (b -> IO d) -> (b -> IO e) -> (b -> Stream m a) -> Stream m a Source #
Like bracketIO
but can use 3 separate cleanup actions depending on the
mode of termination:
- When the stream stops normally
- When the stream is garbage collected
- When the stream encounters an exception
bracketIO3 before onStop onGC onException action
runs action
using the
result of before
. If the stream stops, onStop
action is executed, if the
stream is abandoned onGC
is executed, if the stream encounters an
exception onException
is executed.
The exception is not caught, it is rethrown.
Inhibits stream fusion
Pre-release
Transforming Inner Monad
morphInner :: Monad n => (forall x. m x -> n x) -> Stream m a -> Stream n a Source #
Transform the inner monad of a stream using a natural transformation.
Example, generalize the inner monad from Identity to any other:
>>>
generalizeInner = Stream.morphInner (return . runIdentity)
Also known as hoist.
liftInner :: forall (m :: Type -> Type) (t :: (Type -> Type) -> Type -> Type) a. (Monad m, MonadTrans t, Monad (t m)) => Stream m a -> Stream (t m) a Source #
Lift the inner monad m
of Stream m a
to t m
where t
is a monad
transformer.
runReaderT :: Monad m => m s -> Stream (ReaderT s m) a -> Stream m a Source #
Evaluate the inner monad of a stream as ReaderT
.
runStateT :: Monad m => m s -> Stream (StateT s m) a -> Stream m (s, a) Source #
Evaluate the inner monad of a stream as StateT
and emit the resulting
state and value pair after each step.
Deprecated
scan :: forall (m :: Type -> Type) a b. Monad m => Fold m a b -> Stream m a -> Stream m b Source #
Deprecated: Please use scanl instead
scanMaybe :: forall (m :: Type -> Type) a b. Monad m => Fold m a (Maybe b) -> Stream m a -> Stream m b Source #
Deprecated: Use postscanlMaybe instead
postscan :: forall (m :: Type -> Type) a b. Monad m => Fold m a b -> Stream m a -> Stream m b Source #
Deprecated: Please use postscanl instead
splitOn :: forall (m :: Type -> Type) a b. Monad m => (a -> Bool) -> Fold m a b -> Stream m a -> Stream m b Source #
Deprecated: Please use splitSepBy_ instead. Note the difference in behavior on splitting empty stream.
unfoldMany :: forall (m :: Type -> Type) a b. Monad m => Unfold m a b -> Stream m a -> Stream m b Source #
Deprecated: Please use unfoldEach instead.
unfoldEach unfold stream
uses unfold
to map the input stream elements
to streams and then flattens the generated streams into a single output
stream.
Like concatMap
but uses an Unfold
for stream generation. Unlike
concatMap
this can fuse the Unfold
code with the inner loop and
therefore provide many times better performance.
>>>
concatMap f = Stream.unfoldEach (Unfold.lmap f Unfold.fromStream)
Here is an example of a two level nested loop much faster than
concatMap
based nesting.
>>>
:{
outerLoop = flip Stream.mapM (Stream.fromList [1,2,3]) $ \x -> do liftIO $ putStrLn (show x) return x innerUnfold = Unfold.carry $ Unfold.lmap (const [4,5,6]) Unfold.fromList innerLoop = flip Unfold.mapM innerUnfold $ \(x, y) -> do when (x == 1) $ liftIO $ putStrLn (show y) pure $ (x, y) :}
>>>
Stream.toList $ Stream.unfoldEach innerLoop outerLoop
1 4 5 6 2 3 [(1,4),(1,5),(1,6),(2,4),(2,5),(2,6),(3,4),(3,5),(3,6)]
intercalate :: forall (m :: Type -> Type) b c. Monad m => Unfold m b c -> b -> Stream m b -> Stream m c Source #
Deprecated: Please use unfoldEachSepBySeq instead.