streamly
Copyright(c) 2017 Composewell Technologies
LicenseBSD-3-Clause
Maintainerstreamly@composewell.com
Stabilityexperimental
PortabilityGHC
Safe HaskellNone
LanguageHaskell2010

Streamly.Internal.Data.Stream.IsStream

Description

Deprecated: Please use "Streamly.Internal.Data.Stream from streamly-core package", Streamly.Internal.Data.Stream.Concurrent, Streamly.Internal.Data.Stream.Exception.Lifted, & Streamly.Internal.Data.Stream.Time from streamly package instead.

This is an internal module which is a superset of the corresponding released module Streamly.Prelude. It contains some additional unreleased or experimental APIs.

Synopsis

Documentation

fromList :: forall (m :: Type -> Type) t a. (Monad m, IsStream t) => [a] -> t m a Source #

fromList = foldr cons nil

Construct a stream from a list of pure values. This is more efficient than fromFoldable for serial streams.

Since: 0.4.0

cons :: forall t a (m :: Type -> Type). IsStream t => a -> t m a -> t m a infixr 5 Source #

Construct a stream by adding a pure value at the head of an existing stream. For serial streams this is the same as (return a) `consM` r but more efficient. For concurrent streams this is not concurrent whereas consM is concurrent. For example:

> toList $ 1 `cons` 2 `cons` 3 `cons` nil
[1,2,3]

Since: 0.1.0

nil :: forall t (m :: Type -> Type) a. IsStream t => t m a Source #

newtype StreamK (m :: Type -> Type) a #

Constructors

MkStream (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) 

Instances

Instances details
(Foldable m, Monad m) => Foldable (StreamK m) 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

fold :: Monoid m0 => StreamK m m0 -> m0 #

foldMap :: Monoid m0 => (a -> m0) -> StreamK m a -> m0 #

foldMap' :: Monoid m0 => (a -> m0) -> StreamK m a -> m0 #

foldr :: (a -> b -> b) -> b -> StreamK m a -> b #

foldr' :: (a -> b -> b) -> b -> StreamK m a -> b #

foldl :: (b -> a -> b) -> b -> StreamK m a -> b #

foldl' :: (b -> a -> b) -> b -> StreamK m a -> b #

foldr1 :: (a -> a -> a) -> StreamK m a -> a #

foldl1 :: (a -> a -> a) -> StreamK m a -> a #

toList :: StreamK m a -> [a] #

null :: StreamK m a -> Bool #

length :: StreamK m a -> Int #

elem :: Eq a => a -> StreamK m a -> Bool #

maximum :: Ord a => StreamK m a -> a #

minimum :: Ord a => StreamK m a -> a #

sum :: Num a => StreamK m a -> a #

product :: Num a => StreamK m a -> a #

Traversable (StreamK Identity) 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

traverse :: Applicative f => (a -> f b) -> StreamK Identity a -> f (StreamK Identity b) #

sequenceA :: Applicative f => StreamK Identity (f a) -> f (StreamK Identity a) #

mapM :: Monad m => (a -> m b) -> StreamK Identity a -> m (StreamK Identity b) #

sequence :: Monad m => StreamK Identity (m a) -> m (StreamK Identity a) #

Monad m => Functor (StreamK m) 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

fmap :: (a -> b) -> StreamK m a -> StreamK m b #

(<$) :: a -> StreamK m b -> StreamK m a #

a ~ Char => IsString (StreamK Identity a) 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Monoid (StreamK m a) 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

mempty :: StreamK m a #

mappend :: StreamK m a -> StreamK m a -> StreamK m a #

mconcat :: [StreamK m a] -> StreamK m a #

Semigroup (StreamK m a) 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

(<>) :: StreamK m a -> StreamK m a -> StreamK m a #

sconcat :: NonEmpty (StreamK m a) -> StreamK m a #

stimes :: Integral b => b -> StreamK m a -> StreamK m a #

IsList (StreamK Identity a) 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Associated Types

type Item (StreamK Identity a) 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

type Item (StreamK Identity a) = a
Read a => Read (StreamK Identity a) 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Show a => Show (StreamK Identity a) 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

type Item (StreamK Identity a) 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

type Item (StreamK Identity a) = a

nilM :: (IsStream t, Monad m) => m b -> t m a Source #

mkStream :: IsStream t => (forall r. State StreamK m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r) -> t m a Source #

Build a stream from an SVar, a stop continuation, a singleton stream continuation and a yield continuation.

(.:) :: forall t a (m :: Type -> Type). IsStream t => a -> t m a -> t m a infixr 5 Source #

Operator equivalent of cons.

> toList $ 1 .: 2 .: 3 .: nil
[1,2,3]

Since: 0.1.1

class (forall (m :: Type -> Type) a. MonadAsync m => Semigroup (t m a), forall (m :: Type -> Type) a. MonadAsync m => Monoid (t m a), forall (m :: Type -> Type). Monad m => Functor (t m), forall (m :: Type -> Type). MonadAsync m => Applicative (t m)) => IsStream (t :: (Type -> Type) -> Type -> Type) where Source #

Class of types that can represent a stream of elements of some type a in some monad m.

Since: 0.2.0 (Streamly)

Since: 0.8.0

Minimal complete definition

toStream, fromStream, consM, (|:)

Methods

consM :: MonadAsync m => m a -> t m a -> t m a infixr 5 Source #

Constructs a stream by adding a monadic action at the head of an existing stream. For example:

> toList $ getLine `consM` getLine `consM` nil
hello
world
["hello","world"]

Concurrent (do not use fromParallel to construct infinite streams)

Since: 0.2.0

(|:) :: MonadAsync m => m a -> t m a -> t m a infixr 5 Source #

Operator equivalent of consM. We can read it as "parallel colon" to remember that | comes before :.

> toList $ getLine |: getLine |: nil
hello
world
["hello","world"]
let delay = threadDelay 1000000 >> print 1
drain $ fromSerial  $ delay |: delay |: delay |: nil
drain $ fromParallel $ delay |: delay |: delay |: nil

Concurrent (do not use fromParallel to construct infinite streams)

Since: 0.2.0

Instances

Instances details
IsStream AheadT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> Type) a. AheadT m a -> StreamK m a

fromStream :: forall (m :: Type -> Type) a. StreamK m a -> AheadT m a

consM :: MonadAsync m => m a -> AheadT m a -> AheadT m a Source #

(|:) :: MonadAsync m => m a -> AheadT m a -> AheadT m a Source #

IsStream AsyncT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> Type) a. AsyncT m a -> StreamK m a

fromStream :: forall (m :: Type -> Type) a. StreamK m a -> AsyncT m a

consM :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a Source #

(|:) :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a Source #

IsStream WAsyncT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> Type) a. WAsyncT m a -> StreamK m a

fromStream :: forall (m :: Type -> Type) a. StreamK m a -> WAsyncT m a

consM :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a Source #

(|:) :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a Source #

IsStream ParallelT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> Type) a. ParallelT m a -> StreamK m a

fromStream :: forall (m :: Type -> Type) a. StreamK m a -> ParallelT m a

consM :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a Source #

(|:) :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a Source #

IsStream SerialT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> Type) a. SerialT m a -> StreamK m a

fromStream :: forall (m :: Type -> Type) a. StreamK m a -> SerialT m a

consM :: MonadAsync m => m a -> SerialT m a -> SerialT m a Source #

(|:) :: MonadAsync m => m a -> SerialT m a -> SerialT m a Source #

IsStream WSerialT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> Type) a. WSerialT m a -> StreamK m a

fromStream :: forall (m :: Type -> Type) a. StreamK m a -> WSerialT m a

consM :: MonadAsync m => m a -> WSerialT m a -> WSerialT m a Source #

(|:) :: MonadAsync m => m a -> WSerialT m a -> WSerialT m a Source #

IsStream ZipSerialM Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> Type) a. ZipSerialM m a -> StreamK m a

fromStream :: forall (m :: Type -> Type) a. StreamK m a -> ZipSerialM m a

consM :: MonadAsync m => m a -> ZipSerialM m a -> ZipSerialM m a Source #

(|:) :: MonadAsync m => m a -> ZipSerialM m a -> ZipSerialM m a Source #

IsStream ZipAsyncM Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> Type) a. ZipAsyncM m a -> StreamK m a

fromStream :: forall (m :: Type -> Type) a. StreamK m a -> ZipAsyncM m a

consM :: MonadAsync m => m a -> ZipAsyncM m a -> ZipAsyncM m a Source #

(|:) :: MonadAsync m => m a -> ZipAsyncM m a -> ZipAsyncM m a Source #

foldStreamShared :: IsStream t => State StreamK m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r Source #

Fold a stream by providing an SVar, a stop continuation, a singleton continuation and a yield continuation. The stream would share the current SVar passed via the State.

foldStream :: IsStream t => State StreamK m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r Source #

Fold a stream by providing a State, stop continuation, a singleton continuation and a yield continuation. The stream will not use the SVar passed via State.

foldlx' :: (IsStream t, Monad m) => (x -> a -> x) -> x -> (x -> b) -> t m a -> m b Source #

Strict left fold with an extraction function. Like the standard strict left fold, but applies a user supplied extraction function (the third argument) to the folded value at the end. This is designed to work with the foldl library. The suffix x is a mnemonic for extraction.

Since: 0.7.0

foldlMx' :: (IsStream t, Monad m) => (x -> a -> m x) -> m x -> (x -> m b) -> t m a -> m b Source #

Like foldlx', but with a monadic step function.

Since: 0.7.0

bindWith :: forall t (m :: Type -> Type) b a. IsStream t => (t m b -> t m b -> t m b) -> t m a -> (a -> t m b) -> t m b Source #

concatMapWith :: forall t (m :: Type -> Type) b a. IsStream t => (t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b Source #

concatMapWith mixer generator stream is a two dimensional looping combinator. The generator function is used to generate streams from the elements in the input stream and the mixer function is used to merge those streams.

Note we can merge streams concurrently by using a concurrent merge function.

Since: 0.7.0

Since: 0.8.0 (signature change)

toStreamK :: forall t (m :: Type -> Type) a. IsStream t => t m a -> StreamK m a Source #

adapt :: forall t1 t2 (m :: Type -> Type) a. (IsStream t1, IsStream t2) => t1 m a -> t2 m a Source #

Adapt any specific stream type to any other specific stream type.

Since: 0.1.0 (Streamly)

Since: 0.8.0

fromStreamK :: forall t (m :: Type -> Type) a. IsStream t => StreamK m a -> t m a Source #

foldrMx :: (IsStream t, Monad m) => (a -> m x -> m x) -> m x -> (m x -> m b) -> t m a -> m b Source #

fromStreamD :: forall t (m :: Type -> Type) a. (IsStream t, Monad m) => Stream m a -> t m a Source #

toStreamD :: forall t (m :: Type -> Type) a. (IsStream t, Monad m) => t m a -> Stream m a Source #

concatFoldableWith :: forall t f (m :: Type -> Type) a. (IsStream t, Foldable f) => (t m a -> t m a -> t m a) -> f (t m a) -> t m a Source #

A variant of fold that allows you to fold a Foldable container of streams using the specified stream sum operation.

concatFoldableWith async $ map return [1..3]

Equivalent to:

concatFoldableWith f = Prelude.foldr f D.nil
concatFoldableWith f = D.concatMapFoldableWith f id

Since: 0.8.0 (Renamed foldWith to concatFoldableWith)

Since: 0.1.0 (Streamly)

concatMapFoldableWith :: forall t f (m :: Type -> Type) b a. (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b Source #

A variant of foldMap that allows you to map a monadic streaming action on a Foldable container and then fold it using the specified stream merge operation.

concatMapFoldableWith async return [1..3]

Equivalent to:

concatMapFoldableWith f g = Prelude.foldr (f . g) S.nil
concatMapFoldableWith f g xs = S.concatMapWith f g (S.fromFoldable xs)

Since: 0.8.0 (Renamed foldMapWith to concatMapFoldableWith)

Since: 0.1.0 (Streamly)

concatForFoldableWith :: forall t f (m :: Type -> Type) b a. (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> f a -> (a -> t m b) -> t m b Source #

Like concatMapFoldableWith but with the last two arguments reversed i.e. the monadic streaming function is the last argument.

Equivalent to:

concatForFoldableWith f xs g = Prelude.foldr (f . g) D.nil xs
concatForFoldableWith f = flip (D.concatMapFoldableWith f)

Since: 0.8.0 (Renamed forEachWith to concatForFoldableWith)

Since: 0.1.0 (Streamly)

data SerialT (m :: Type -> Type) a Source #

For SerialT streams:

(<>) = serial                       -- Semigroup
(>>=) = flip . concatMapWith serial -- Monad

A single Monad bind behaves like a for loop:

>>> :{
IsStream.toList $ do
     x <- IsStream.fromList [1,2] -- foreach x in stream
     return x
:}
[1,2]

Nested monad binds behave like nested for loops:

>>> :{
IsStream.toList $ do
    x <- IsStream.fromList [1,2] -- foreach x in stream
    y <- IsStream.fromList [3,4] -- foreach y in stream
    return (x, y)
:}
[(1,3),(1,4),(2,3),(2,4)]

Since: 0.2.0 (Streamly)

Since: 0.8.0

Instances

Instances details
IsStream SerialT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> Type) a. SerialT m a -> StreamK m a

fromStream :: forall (m :: Type -> Type) a. StreamK m a -> SerialT m a

consM :: MonadAsync m => m a -> SerialT m a -> SerialT m a Source #

(|:) :: MonadAsync m => m a -> SerialT m a -> SerialT m a Source #

MonadTrans SerialT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

lift :: Monad m => m a -> SerialT m a #

MonadReader r m => MonadReader r (SerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

ask :: SerialT m r #

local :: (r -> r) -> SerialT m a -> SerialT m a #

reader :: (r -> a) -> SerialT m a #

MonadState s m => MonadState s (SerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

get :: SerialT m s #

put :: s -> SerialT m () #

state :: (s -> (a, s)) -> SerialT m a #

MonadIO m => MonadIO (SerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

liftIO :: IO a -> SerialT m a #

(Foldable m, Monad m) => Foldable (SerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

fold :: Monoid m0 => SerialT m m0 -> m0 #

foldMap :: Monoid m0 => (a -> m0) -> SerialT m a -> m0 #

foldMap' :: Monoid m0 => (a -> m0) -> SerialT m a -> m0 #

foldr :: (a -> b -> b) -> b -> SerialT m a -> b #

foldr' :: (a -> b -> b) -> b -> SerialT m a -> b #

foldl :: (b -> a -> b) -> b -> SerialT m a -> b #

foldl' :: (b -> a -> b) -> b -> SerialT m a -> b #

foldr1 :: (a -> a -> a) -> SerialT m a -> a #

foldl1 :: (a -> a -> a) -> SerialT m a -> a #

toList :: SerialT m a -> [a] #

null :: SerialT m a -> Bool #

length :: SerialT m a -> Int #

elem :: Eq a => a -> SerialT m a -> Bool #

maximum :: Ord a => SerialT m a -> a #

minimum :: Ord a => SerialT m a -> a #

sum :: Num a => SerialT m a -> a #

product :: Num a => SerialT m a -> a #

Traversable (SerialT Identity) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

traverse :: Applicative f => (a -> f b) -> SerialT Identity a -> f (SerialT Identity b) #

sequenceA :: Applicative f => SerialT Identity (f a) -> f (SerialT Identity a) #

mapM :: Monad m => (a -> m b) -> SerialT Identity a -> m (SerialT Identity b) #

sequence :: Monad m => SerialT Identity (m a) -> m (SerialT Identity a) #

Monad m => Applicative (SerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

pure :: a -> SerialT m a #

(<*>) :: SerialT m (a -> b) -> SerialT m a -> SerialT m b #

liftA2 :: (a -> b -> c) -> SerialT m a -> SerialT m b -> SerialT m c #

(*>) :: SerialT m a -> SerialT m b -> SerialT m b #

(<*) :: SerialT m a -> SerialT m b -> SerialT m a #

Monad m => Functor (SerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

fmap :: (a -> b) -> SerialT m a -> SerialT m b #

(<$) :: a -> SerialT m b -> SerialT m a #

Monad m => Monad (SerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

(>>=) :: SerialT m a -> (a -> SerialT m b) -> SerialT m b #

(>>) :: SerialT m a -> SerialT m b -> SerialT m b #

return :: a -> SerialT m a #

NFData1 (SerialT Identity) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

liftRnf :: (a -> ()) -> SerialT Identity a -> () #

MonadThrow m => MonadThrow (SerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

throwM :: (HasCallStack, Exception e) => e -> SerialT m a #

a ~ Char => IsString (SerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Monoid (SerialT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

mempty :: SerialT m a #

mappend :: SerialT m a -> SerialT m a -> SerialT m a #

mconcat :: [SerialT m a] -> SerialT m a #

Semigroup (SerialT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

(<>) :: SerialT m a -> SerialT m a -> SerialT m a #

sconcat :: NonEmpty (SerialT m a) -> SerialT m a #

stimes :: Integral b => b -> SerialT m a -> SerialT m a #

IsList (SerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Associated Types

type Item (SerialT Identity a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

type Item (SerialT Identity a) = a
Read a => Read (SerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Show a => Show (SerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

NFData a => NFData (SerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

rnf :: SerialT Identity a -> () #

Eq a => Eq (SerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Ord a => Ord (SerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

type Item (SerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

type Item (SerialT Identity a) = a

data WSerialT (m :: Type -> Type) a Source #

For WSerialT streams:

(<>) = wSerial                       -- Semigroup
(>>=) = flip . concatMapWith wSerial -- Monad

Note that <> is associative only if we disregard the ordering of elements in the resulting stream.

A single Monad bind behaves like a for loop:

>>> :{
IsStream.toList $ IsStream.fromWSerial $ do
     x <- IsStream.fromList [1,2] -- foreach x in stream
     return x
:}
[1,2]

Nested monad binds behave like interleaved nested for loops:

>>> :{
IsStream.toList $ IsStream.fromWSerial $ do
    x <- IsStream.fromList [1,2] -- foreach x in stream
    y <- IsStream.fromList [3,4] -- foreach y in stream
    return (x, y)
:}
[(1,3),(2,3),(1,4),(2,4)]

It is a result of interleaving all the nested iterations corresponding to element 1 in the first stream with all the nested iterations of element 2:

>>> import Streamly.Prelude (wSerial)
>>> IsStream.toList $ IsStream.fromList [(1,3),(1,4)] `IsStream.wSerial` IsStream.fromList [(2,3),(2,4)]
[(1,3),(2,3),(1,4),(2,4)]

The W in the name stands for wide or breadth wise scheduling in contrast to the depth wise scheduling behavior of SerialT.

Since: 0.2.0 (Streamly)

Since: 0.8.0

Instances

Instances details
IsStream WSerialT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> Type) a. WSerialT m a -> StreamK m a

fromStream :: forall (m :: Type -> Type) a. StreamK m a -> WSerialT m a

consM :: MonadAsync m => m a -> WSerialT m a -> WSerialT m a Source #

(|:) :: MonadAsync m => m a -> WSerialT m a -> WSerialT m a Source #

MonadTrans WSerialT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

lift :: Monad m => m a -> WSerialT m a #

MonadReader r m => MonadReader r (WSerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

ask :: WSerialT m r #

local :: (r -> r) -> WSerialT m a -> WSerialT m a #

reader :: (r -> a) -> WSerialT m a #

MonadState s m => MonadState s (WSerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

get :: WSerialT m s #

put :: s -> WSerialT m () #

state :: (s -> (a, s)) -> WSerialT m a #

MonadIO m => MonadIO (WSerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

liftIO :: IO a -> WSerialT m a #

(Foldable m, Monad m) => Foldable (WSerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

fold :: Monoid m0 => WSerialT m m0 -> m0 #

foldMap :: Monoid m0 => (a -> m0) -> WSerialT m a -> m0 #

foldMap' :: Monoid m0 => (a -> m0) -> WSerialT m a -> m0 #

foldr :: (a -> b -> b) -> b -> WSerialT m a -> b #

foldr' :: (a -> b -> b) -> b -> WSerialT m a -> b #

foldl :: (b -> a -> b) -> b -> WSerialT m a -> b #

foldl' :: (b -> a -> b) -> b -> WSerialT m a -> b #

foldr1 :: (a -> a -> a) -> WSerialT m a -> a #

foldl1 :: (a -> a -> a) -> WSerialT m a -> a #

toList :: WSerialT m a -> [a] #

null :: WSerialT m a -> Bool #

length :: WSerialT m a -> Int #

elem :: Eq a => a -> WSerialT m a -> Bool #

maximum :: Ord a => WSerialT m a -> a #

minimum :: Ord a => WSerialT m a -> a #

sum :: Num a => WSerialT m a -> a #

product :: Num a => WSerialT m a -> a #

Traversable (WSerialT Identity) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

traverse :: Applicative f => (a -> f b) -> WSerialT Identity a -> f (WSerialT Identity b) #

sequenceA :: Applicative f => WSerialT Identity (f a) -> f (WSerialT Identity a) #

mapM :: Monad m => (a -> m b) -> WSerialT Identity a -> m (WSerialT Identity b) #

sequence :: Monad m => WSerialT Identity (m a) -> m (WSerialT Identity a) #

Monad m => Applicative (WSerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

pure :: a -> WSerialT m a #

(<*>) :: WSerialT m (a -> b) -> WSerialT m a -> WSerialT m b #

liftA2 :: (a -> b -> c) -> WSerialT m a -> WSerialT m b -> WSerialT m c #

(*>) :: WSerialT m a -> WSerialT m b -> WSerialT m b #

(<*) :: WSerialT m a -> WSerialT m b -> WSerialT m a #

Monad m => Functor (WSerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

fmap :: (a -> b) -> WSerialT m a -> WSerialT m b #

(<$) :: a -> WSerialT m b -> WSerialT m a #

Monad m => Monad (WSerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

(>>=) :: WSerialT m a -> (a -> WSerialT m b) -> WSerialT m b #

(>>) :: WSerialT m a -> WSerialT m b -> WSerialT m b #

return :: a -> WSerialT m a #

NFData1 (WSerialT Identity) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

liftRnf :: (a -> ()) -> WSerialT Identity a -> () #

MonadThrow m => MonadThrow (WSerialT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

throwM :: (HasCallStack, Exception e) => e -> WSerialT m a #

a ~ Char => IsString (WSerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Monoid (WSerialT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

mempty :: WSerialT m a #

mappend :: WSerialT m a -> WSerialT m a -> WSerialT m a #

mconcat :: [WSerialT m a] -> WSerialT m a #

Semigroup (WSerialT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

(<>) :: WSerialT m a -> WSerialT m a -> WSerialT m a #

sconcat :: NonEmpty (WSerialT m a) -> WSerialT m a #

stimes :: Integral b => b -> WSerialT m a -> WSerialT m a #

IsList (WSerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Associated Types

type Item (WSerialT Identity a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

type Item (WSerialT Identity a) = a
Read a => Read (WSerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Show a => Show (WSerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

NFData a => NFData (WSerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

rnf :: WSerialT Identity a -> () #

Eq a => Eq (WSerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Ord a => Ord (WSerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

type Item (WSerialT Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

type Item (WSerialT Identity a) = a

data AheadT (m :: Type -> Type) a Source #

For AheadT streams:

(<>) = ahead
(>>=) = flip . concatMapWith ahead

A single Monad bind behaves like a for loop with iterations executed concurrently, ahead of time, producing side effects of iterations out of order, but results in order:

>>> :{
Stream.toList $ Stream.fromAhead $ do
     x <- Stream.fromList [2,1] -- foreach x in stream
     Stream.fromEffect $ delay x
:}
1 sec
2 sec
[2,1]

Nested monad binds behave like nested for loops with nested iterations executed concurrently, ahead of time:

>>> :{
Stream.toList $ Stream.fromAhead $ do
    x <- Stream.fromList [1,2] -- foreach x in stream
    y <- Stream.fromList [2,4] -- foreach y in stream
    Stream.fromEffect $ delay (x + y)
:}
3 sec
4 sec
5 sec
6 sec
[3,5,4,6]

The behavior can be explained as follows. All the iterations corresponding to the element 1 in the first stream constitute one output stream and all the iterations corresponding to 2 constitute another output stream and these two output streams are merged using ahead.

Since: 0.3.0 (Streamly)

Since: 0.8.0

Instances

Instances details
IsStream AheadT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> Type) a. AheadT m a -> StreamK m a

fromStream :: forall (m :: Type -> Type) a. StreamK m a -> AheadT m a

consM :: MonadAsync m => m a -> AheadT m a -> AheadT m a Source #

(|:) :: MonadAsync m => m a -> AheadT m a -> AheadT m a Source #

(MonadReader r m, MonadAsync m) => MonadReader r (AheadT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

ask :: AheadT m r #

local :: (r -> r) -> AheadT m a -> AheadT m a #

reader :: (r -> a) -> AheadT m a #

(MonadState s m, MonadAsync m) => MonadState s (AheadT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

get :: AheadT m s #

put :: s -> AheadT m () #

state :: (s -> (a, s)) -> AheadT m a #

(MonadIO m, MonadAsync m) => MonadIO (AheadT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

liftIO :: IO a -> AheadT m a #

(Monad m, MonadAsync m) => Applicative (AheadT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

pure :: a -> AheadT m a #

(<*>) :: AheadT m (a -> b) -> AheadT m a -> AheadT m b #

liftA2 :: (a -> b -> c) -> AheadT m a -> AheadT m b -> AheadT m c #

(*>) :: AheadT m a -> AheadT m b -> AheadT m b #

(<*) :: AheadT m a -> AheadT m b -> AheadT m a #

Monad m => Functor (AheadT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

fmap :: (a -> b) -> AheadT m a -> AheadT m b #

(<$) :: a -> AheadT m b -> AheadT m a #

MonadAsync m => Monad (AheadT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

(>>=) :: AheadT m a -> (a -> AheadT m b) -> AheadT m b #

(>>) :: AheadT m a -> AheadT m b -> AheadT m b #

return :: a -> AheadT m a #

(MonadThrow m, MonadAsync m) => MonadThrow (AheadT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

throwM :: (HasCallStack, Exception e) => e -> AheadT m a #

MonadAsync m => Monoid (AheadT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

mempty :: AheadT m a #

mappend :: AheadT m a -> AheadT m a -> AheadT m a #

mconcat :: [AheadT m a] -> AheadT m a #

MonadAsync m => Semigroup (AheadT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

(<>) :: AheadT m a -> AheadT m a -> AheadT m a #

sconcat :: NonEmpty (AheadT m a) -> AheadT m a #

stimes :: Integral b => b -> AheadT m a -> AheadT m a #

data AsyncT (m :: Type -> Type) a Source #

For AsyncT streams:

(<>) = async
(>>=) = flip . concatMapWith async

A single Monad bind behaves like a for loop with iterations of the loop executed concurrently a la the async combinator, producing results and side effects of iterations out of order:

>>> :{
Stream.toList $ Stream.fromAsync $ do
     x <- Stream.fromList [2,1] -- foreach x in stream
     Stream.fromEffect $ delay x
:}
1 sec
2 sec
[1,2]

Nested monad binds behave like nested for loops with nested iterations executed concurrently, a la the async combinator:

>>> :{
Stream.toList $ Stream.fromAsync $ do
    x <- Stream.fromList [1,2] -- foreach x in stream
    y <- Stream.fromList [2,4] -- foreach y in stream
    Stream.fromEffect $ delay (x + y)
:}
3 sec
4 sec
5 sec
6 sec
[3,4,5,6]

The behavior can be explained as follows. All the iterations corresponding to the element 1 in the first stream constitute one output stream and all the iterations corresponding to 2 constitute another output stream and these two output streams are merged using async.

Since: 0.1.0 (Streamly)

Since: 0.8.0

Instances

Instances details
IsStream AsyncT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> Type) a. AsyncT m a -> StreamK m a

fromStream :: forall (m :: Type -> Type) a. StreamK m a -> AsyncT m a

consM :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a Source #

(|:) :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a Source #

(MonadReader r m, MonadAsync m) => MonadReader r (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

ask :: AsyncT m r #

local :: (r -> r) -> AsyncT m a -> AsyncT m a #

reader :: (r -> a) -> AsyncT m a #

(MonadState s m, MonadAsync m) => MonadState s (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

get :: AsyncT m s #

put :: s -> AsyncT m () #

state :: (s -> (a, s)) -> AsyncT m a #

(MonadIO m, MonadAsync m) => MonadIO (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

liftIO :: IO a -> AsyncT m a #

(Monad m, MonadAsync m) => Applicative (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

pure :: a -> AsyncT m a #

(<*>) :: AsyncT m (a -> b) -> AsyncT m a -> AsyncT m b #

liftA2 :: (a -> b -> c) -> AsyncT m a -> AsyncT m b -> AsyncT m c #

(*>) :: AsyncT m a -> AsyncT m b -> AsyncT m b #

(<*) :: AsyncT m a -> AsyncT m b -> AsyncT m a #

Monad m => Functor (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

fmap :: (a -> b) -> AsyncT m a -> AsyncT m b #

(<$) :: a -> AsyncT m b -> AsyncT m a #

MonadAsync m => Monad (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

(>>=) :: AsyncT m a -> (a -> AsyncT m b) -> AsyncT m b #

(>>) :: AsyncT m a -> AsyncT m b -> AsyncT m b #

return :: a -> AsyncT m a #

(MonadThrow m, MonadAsync m) => MonadThrow (AsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

throwM :: (HasCallStack, Exception e) => e -> AsyncT m a #

MonadAsync m => Monoid (AsyncT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

mempty :: AsyncT m a #

mappend :: AsyncT m a -> AsyncT m a -> AsyncT m a #

mconcat :: [AsyncT m a] -> AsyncT m a #

MonadAsync m => Semigroup (AsyncT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

(<>) :: AsyncT m a -> AsyncT m a -> AsyncT m a #

sconcat :: NonEmpty (AsyncT m a) -> AsyncT m a #

stimes :: Integral b => b -> AsyncT m a -> AsyncT m a #

data WAsyncT (m :: Type -> Type) a Source #

For WAsyncT streams:

(<>) = wAsync
(>>=) = flip . concatMapWith wAsync

A single Monad bind behaves like a for loop with iterations of the loop executed concurrently a la the wAsync combinator, producing results and side effects of iterations out of order:

>>> :{
Stream.toList $ Stream.fromWAsync $ do
     x <- Stream.fromList [2,1] -- foreach x in stream
     Stream.fromEffect $ delay x
:}
1 sec
2 sec
[1,2]

Nested monad binds behave like nested for loops with nested iterations executed concurrently, a la the wAsync combinator:

>>> :{
Stream.toList $ Stream.fromWAsync $ do
    x <- Stream.fromList [1,2] -- foreach x in stream
    y <- Stream.fromList [2,4] -- foreach y in stream
    Stream.fromEffect $ delay (x + y)
:}
3 sec
4 sec
5 sec
6 sec
[3,4,5,6]

The behavior can be explained as follows. All the iterations corresponding to the element 1 in the first stream constitute one WAsyncT output stream and all the iterations corresponding to 2 constitute another WAsyncT output stream and these two output streams are merged using wAsync.

The W in the name stands for wide or breadth wise scheduling in contrast to the depth wise scheduling behavior of AsyncT.

Since: 0.2.0 (Streamly)

Since: 0.8.0

Instances

Instances details
IsStream WAsyncT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> Type) a. WAsyncT m a -> StreamK m a

fromStream :: forall (m :: Type -> Type) a. StreamK m a -> WAsyncT m a

consM :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a Source #

(|:) :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a Source #

(MonadReader r m, MonadAsync m) => MonadReader r (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

ask :: WAsyncT m r #

local :: (r -> r) -> WAsyncT m a -> WAsyncT m a #

reader :: (r -> a) -> WAsyncT m a #

(MonadState s m, MonadAsync m) => MonadState s (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

get :: WAsyncT m s #

put :: s -> WAsyncT m () #

state :: (s -> (a, s)) -> WAsyncT m a #

(MonadIO m, MonadAsync m) => MonadIO (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

liftIO :: IO a -> WAsyncT m a #

(Monad m, MonadAsync m) => Applicative (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

pure :: a -> WAsyncT m a #

(<*>) :: WAsyncT m (a -> b) -> WAsyncT m a -> WAsyncT m b #

liftA2 :: (a -> b -> c) -> WAsyncT m a -> WAsyncT m b -> WAsyncT m c #

(*>) :: WAsyncT m a -> WAsyncT m b -> WAsyncT m b #

(<*) :: WAsyncT m a -> WAsyncT m b -> WAsyncT m a #

Monad m => Functor (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

fmap :: (a -> b) -> WAsyncT m a -> WAsyncT m b #

(<$) :: a -> WAsyncT m b -> WAsyncT m a #

MonadAsync m => Monad (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

(>>=) :: WAsyncT m a -> (a -> WAsyncT m b) -> WAsyncT m b #

(>>) :: WAsyncT m a -> WAsyncT m b -> WAsyncT m b #

return :: a -> WAsyncT m a #

(MonadThrow m, MonadAsync m) => MonadThrow (WAsyncT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

throwM :: (HasCallStack, Exception e) => e -> WAsyncT m a #

MonadAsync m => Monoid (WAsyncT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

mempty :: WAsyncT m a #

mappend :: WAsyncT m a -> WAsyncT m a -> WAsyncT m a #

mconcat :: [WAsyncT m a] -> WAsyncT m a #

MonadAsync m => Semigroup (WAsyncT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

(<>) :: WAsyncT m a -> WAsyncT m a -> WAsyncT m a #

sconcat :: NonEmpty (WAsyncT m a) -> WAsyncT m a #

stimes :: Integral b => b -> WAsyncT m a -> WAsyncT m a #

data ParallelT (m :: Type -> Type) a Source #

For ParallelT streams:

(<>) = parallel
(>>=) = flip . concatMapWith parallel

See AsyncT, ParallelT is similar except that all iterations are strictly concurrent while in AsyncT it depends on the consumer demand and available threads. See parallel for more details.

Since: 0.1.0 (Streamly)

Since: 0.7.0 (maxBuffer applies to ParallelT streams)

Since: 0.8.0

Instances

Instances details
IsStream ParallelT Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> Type) a. ParallelT m a -> StreamK m a

fromStream :: forall (m :: Type -> Type) a. StreamK m a -> ParallelT m a

consM :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a Source #

(|:) :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a Source #

(MonadReader r m, MonadAsync m) => MonadReader r (ParallelT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

ask :: ParallelT m r #

local :: (r -> r) -> ParallelT m a -> ParallelT m a #

reader :: (r -> a) -> ParallelT m a #

(MonadState s m, MonadAsync m) => MonadState s (ParallelT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

get :: ParallelT m s #

put :: s -> ParallelT m () #

state :: (s -> (a, s)) -> ParallelT m a #

(MonadIO m, MonadAsync m) => MonadIO (ParallelT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

liftIO :: IO a -> ParallelT m a #

(Monad m, MonadAsync m) => Applicative (ParallelT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

pure :: a -> ParallelT m a #

(<*>) :: ParallelT m (a -> b) -> ParallelT m a -> ParallelT m b #

liftA2 :: (a -> b -> c) -> ParallelT m a -> ParallelT m b -> ParallelT m c #

(*>) :: ParallelT m a -> ParallelT m b -> ParallelT m b #

(<*) :: ParallelT m a -> ParallelT m b -> ParallelT m a #

Monad m => Functor (ParallelT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

fmap :: (a -> b) -> ParallelT m a -> ParallelT m b #

(<$) :: a -> ParallelT m b -> ParallelT m a #

MonadAsync m => Monad (ParallelT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

(>>=) :: ParallelT m a -> (a -> ParallelT m b) -> ParallelT m b #

(>>) :: ParallelT m a -> ParallelT m b -> ParallelT m b #

return :: a -> ParallelT m a #

(MonadThrow m, MonadAsync m) => MonadThrow (ParallelT m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

throwM :: (HasCallStack, Exception e) => e -> ParallelT m a #

MonadAsync m => Monoid (ParallelT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

mempty :: ParallelT m a #

mappend :: ParallelT m a -> ParallelT m a -> ParallelT m a #

mconcat :: [ParallelT m a] -> ParallelT m a #

MonadAsync m => Semigroup (ParallelT m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

(<>) :: ParallelT m a -> ParallelT m a -> ParallelT m a #

sconcat :: NonEmpty (ParallelT m a) -> ParallelT m a #

stimes :: Integral b => b -> ParallelT m a -> ParallelT m a #

data ZipSerialM (m :: Type -> Type) a Source #

For ZipSerialM streams:

(<>) = serial
(*) = 'Streamly.Prelude.serial.zipWith' id

Applicative evaluates the streams being zipped serially:

>>> s1 = Stream.fromFoldable [1, 2]
>>> s2 = Stream.fromFoldable [3, 4]
>>> s3 = Stream.fromFoldable [5, 6]
>>> Stream.toList $ Stream.fromZipSerial $ (,,) <$> s1 <*> s2 <*> s3
[(1,3,5),(2,4,6)]

Since: 0.2.0 (Streamly)

Since: 0.8.0

Instances

Instances details
IsStream ZipSerialM Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> Type) a. ZipSerialM m a -> StreamK m a

fromStream :: forall (m :: Type -> Type) a. StreamK m a -> ZipSerialM m a

consM :: MonadAsync m => m a -> ZipSerialM m a -> ZipSerialM m a Source #

(|:) :: MonadAsync m => m a -> ZipSerialM m a -> ZipSerialM m a Source #

(Foldable m, Monad m) => Foldable (ZipSerialM m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Methods

fold :: Monoid m0 => ZipSerialM m m0 -> m0 #

foldMap :: Monoid m0 => (a -> m0) -> ZipSerialM m a -> m0 #

foldMap' :: Monoid m0 => (a -> m0) -> ZipSerialM m a -> m0 #

foldr :: (a -> b -> b) -> b -> ZipSerialM m a -> b #

foldr' :: (a -> b -> b) -> b -> ZipSerialM m a -> b #

foldl :: (b -> a -> b) -> b -> ZipSerialM m a -> b #

foldl' :: (b -> a -> b) -> b -> ZipSerialM m a -> b #

foldr1 :: (a -> a -> a) -> ZipSerialM m a -> a #

foldl1 :: (a -> a -> a) -> ZipSerialM m a -> a #

toList :: ZipSerialM m a -> [a] #

null :: ZipSerialM m a -> Bool #

length :: ZipSerialM m a -> Int #

elem :: Eq a => a -> ZipSerialM m a -> Bool #

maximum :: Ord a => ZipSerialM m a -> a #

minimum :: Ord a => ZipSerialM m a -> a #

sum :: Num a => ZipSerialM m a -> a #

product :: Num a => ZipSerialM m a -> a #

Traversable (ZipSerialM Identity) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Methods

traverse :: Applicative f => (a -> f b) -> ZipSerialM Identity a -> f (ZipSerialM Identity b) #

sequenceA :: Applicative f => ZipSerialM Identity (f a) -> f (ZipSerialM Identity a) #

mapM :: Monad m => (a -> m b) -> ZipSerialM Identity a -> m (ZipSerialM Identity b) #

sequence :: Monad m => ZipSerialM Identity (m a) -> m (ZipSerialM Identity a) #

Monad m => Applicative (ZipSerialM m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Methods

pure :: a -> ZipSerialM m a #

(<*>) :: ZipSerialM m (a -> b) -> ZipSerialM m a -> ZipSerialM m b #

liftA2 :: (a -> b -> c) -> ZipSerialM m a -> ZipSerialM m b -> ZipSerialM m c #

(*>) :: ZipSerialM m a -> ZipSerialM m b -> ZipSerialM m b #

(<*) :: ZipSerialM m a -> ZipSerialM m b -> ZipSerialM m a #

Monad m => Functor (ZipSerialM m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Methods

fmap :: (a -> b) -> ZipSerialM m a -> ZipSerialM m b #

(<$) :: a -> ZipSerialM m b -> ZipSerialM m a #

NFData1 (ZipSerialM Identity) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Methods

liftRnf :: (a -> ()) -> ZipSerialM Identity a -> () #

a ~ Char => IsString (ZipSerialM Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Monoid (ZipSerialM m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Methods

mempty :: ZipSerialM m a #

mappend :: ZipSerialM m a -> ZipSerialM m a -> ZipSerialM m a #

mconcat :: [ZipSerialM m a] -> ZipSerialM m a #

Semigroup (ZipSerialM m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Methods

(<>) :: ZipSerialM m a -> ZipSerialM m a -> ZipSerialM m a #

sconcat :: NonEmpty (ZipSerialM m a) -> ZipSerialM m a #

stimes :: Integral b => b -> ZipSerialM m a -> ZipSerialM m a #

IsList (ZipSerialM Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Associated Types

type Item (ZipSerialM Identity a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

type Item (ZipSerialM Identity a) = a
Read a => Read (ZipSerialM Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Show a => Show (ZipSerialM Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

NFData a => NFData (ZipSerialM Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Methods

rnf :: ZipSerialM Identity a -> () #

Eq a => Eq (ZipSerialM Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Ord a => Ord (ZipSerialM Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

type Item (ZipSerialM Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

type Item (ZipSerialM Identity a) = a

data ZipAsyncM (m :: Type -> Type) a Source #

For ZipAsyncM streams:

(<>) = serial
(*) = 'Streamly.Prelude.serial.zipAsyncWith' id

Applicative evaluates the streams being zipped concurrently, the following would take half the time that it would take in serial zipping:

>>> s = Stream.fromFoldableM $ Prelude.map delay [1, 1, 1]
>>> Stream.toList $ Stream.fromZipAsync $ (,) <$> s <*> s
...
[(1,1),(1,1),(1,1)]

Since: 0.2.0 (Streamly)

Since: 0.8.0

Instances

Instances details
IsStream ZipAsyncM Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> Type) a. ZipAsyncM m a -> StreamK m a

fromStream :: forall (m :: Type -> Type) a. StreamK m a -> ZipAsyncM m a

consM :: MonadAsync m => m a -> ZipAsyncM m a -> ZipAsyncM m a Source #

(|:) :: MonadAsync m => m a -> ZipAsyncM m a -> ZipAsyncM m a Source #

MonadAsync m => Applicative (ZipAsyncM m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.ZipAsync

Methods

pure :: a -> ZipAsyncM m a #

(<*>) :: ZipAsyncM m (a -> b) -> ZipAsyncM m a -> ZipAsyncM m b #

liftA2 :: (a -> b -> c) -> ZipAsyncM m a -> ZipAsyncM m b -> ZipAsyncM m c #

(*>) :: ZipAsyncM m a -> ZipAsyncM m b -> ZipAsyncM m b #

(<*) :: ZipAsyncM m a -> ZipAsyncM m b -> ZipAsyncM m a #

Monad m => Functor (ZipAsyncM m) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.ZipAsync

Methods

fmap :: (a -> b) -> ZipAsyncM m a -> ZipAsyncM m b #

(<$) :: a -> ZipAsyncM m b -> ZipAsyncM m a #

Monoid (ZipAsyncM m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.ZipAsync

Methods

mempty :: ZipAsyncM m a #

mappend :: ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a #

mconcat :: [ZipAsyncM m a] -> ZipAsyncM m a #

Semigroup (ZipAsyncM m a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.ZipAsync

Methods

(<>) :: ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a #

sconcat :: NonEmpty (ZipAsyncM m a) -> ZipAsyncM m a #

stimes :: Integral b => b -> ZipAsyncM m a -> ZipAsyncM m a #

type Serial = SerialT IO Source #

A serial IO stream of elements of type a. See SerialT documentation for more details.

Since: 0.2.0 (Streamly)

Since: 0.8.0

type WSerial = WSerialT IO Source #

An interleaving serial IO stream of elements of type a. See WSerialT documentation for more details.

Since: 0.2.0 (Streamly)

Since: 0.8.0

type Ahead = AheadT IO Source #

A serial IO stream of elements of type a with concurrent lookahead. See AheadT documentation for more details.

Since: 0.3.0 (Streamly)

Since: 0.8.0

type Async = AsyncT IO Source #

A demand driven left biased parallely composing IO stream of elements of type a. See AsyncT documentation for more details.

Since: 0.2.0 (Streamly)

Since: 0.8.0

type WAsync = WAsyncT IO Source #

A round robin parallely composing IO stream of elements of type a. See WAsyncT documentation for more details.

Since: 0.2.0 (Streamly)

Since: 0.8.0

type Parallel = ParallelT IO Source #

A parallely composing IO stream of elements of type a. See ParallelT documentation for more details.

Since: 0.2.0 (Streamly)

Since: 0.8.0

type ZipSerial = ZipSerialM IO Source #

An IO stream whose applicative instance zips streams serially.

Since: 0.2.0 (Streamly)

Since: 0.8.0

type ZipAsync = ZipAsyncM IO Source #

An IO stream whose applicative instance zips streams wAsyncly.

Since: 0.2.0 (Streamly)

Since: 0.8.0

fromSerial :: forall t (m :: Type -> Type) a. IsStream t => SerialT m a -> t m a Source #

Fix the type of a polymorphic stream as SerialT.

Since: 0.1.0 (Streamly)

Since: 0.8.0

fromWSerial :: forall t (m :: Type -> Type) a. IsStream t => WSerialT m a -> t m a Source #

Fix the type of a polymorphic stream as WSerialT.

Since: 0.2.0 (Streamly)

Since: 0.8.0

fromAsync :: forall t (m :: Type -> Type) a. IsStream t => AsyncT m a -> t m a Source #

Fix the type of a polymorphic stream as AsyncT.

Since: 0.1.0 (Streamly)

Since: 0.8.0

fromAhead :: forall t (m :: Type -> Type) a. IsStream t => AheadT m a -> t m a Source #

Fix the type of a polymorphic stream as AheadT.

Since: 0.3.0 (Streamly)

Since: 0.8.0

fromWAsync :: forall t (m :: Type -> Type) a. IsStream t => WAsyncT m a -> t m a Source #

Fix the type of a polymorphic stream as WAsyncT.

Since: 0.2.0 (Streamly)

Since: 0.8.0

fromParallel :: forall t (m :: Type -> Type) a. IsStream t => ParallelT m a -> t m a Source #

Fix the type of a polymorphic stream as ParallelT.

Since: 0.1.0 (Streamly)

Since: 0.8.0

fromZipSerial :: forall t (m :: Type -> Type) a. IsStream t => ZipSerialM m a -> t m a Source #

Fix the type of a polymorphic stream as ZipSerialM.

Since: 0.2.0 (Streamly)

Since: 0.8.0

fromZipAsync :: forall t (m :: Type -> Type) a. IsStream t => ZipAsyncM m a -> t m a Source #

Fix the type of a polymorphic stream as ZipAsyncM.

Since: 0.2.0 (Streamly)

Since: 0.8.0

toConsK :: IsStream t => (m a -> t m a -> t m a) -> m a -> StreamK m a -> StreamK m a Source #

Adapt a polymorphic consM operation to a StreamK cons operation

repeat :: forall t (m :: Type -> Type) a. (IsStream t, Monad m) => a -> t m a Source #

Generate an infinite stream by repeating a pure value.

Since: 0.4.0

mfix :: (IsStream t, Monad m) => (m a -> t m a) -> t m a Source #

We can define cyclic structures using let:

>>> let (a, b) = ([1, b], head a) in (a, b)
([1,1],1)

The function fix defined as:

>>> fix f = let x = f x in x

ensures that the argument of a function and its output refer to the same lazy value x i.e. the same location in memory. Thus x can be defined in terms of itself, creating structures with cyclic references.

>>> f ~(a, b) = ([1, b], head a)
>>> fix f
([1,1],1)

mfix is essentially the same as fix but for monadic values.

Using mfix for streams we can construct a stream in which each element of the stream is defined in a cyclic fashion. The argument of the function being fixed represents the current element of the stream which is being returned by the stream monad. Thus, we can use the argument to construct itself.

Pre-release

unfoldr :: forall (m :: Type -> Type) t b a. (Monad m, IsStream t) => (b -> Maybe (a, b)) -> b -> t m a Source #

>>> :{
unfoldr step s =
    case step s of
        Nothing -> Stream.nil
        Just (a, b) -> a `Stream.cons` unfoldr step b
:}

Build a stream by unfolding a pure step function step starting from a seed s. The step function returns the next element in the stream and the next seed value. When it is done it returns Nothing and the stream ends. For example,

>>> :{
let f b =
        if b > 2
        then Nothing
        else Just (b, b + 1)
in Stream.toList $ Stream.unfoldr f 0
:}
[0,1,2]

Since: 0.1.0

fromList :: forall (m :: Type -> Type) t a. (Monad m, IsStream t) => [a] -> t m a Source #

fromList = foldr cons nil

Construct a stream from a list of pure values. This is more efficient than fromFoldable for serial streams.

Since: 0.4.0

iterate :: forall t (m :: Type -> Type) a. (IsStream t, Monad m) => (a -> a) -> a -> t m a Source #

>>> iterate f x = x `Stream.cons` iterate f x

Generate an infinite stream with x as the first element and each successive element derived by applying the function f on the previous element.

>>> Stream.toList $ Stream.take 5 $ Stream.iterate (+1) 1
[1,2,3,4,5]

Since: 0.1.2

replicate :: forall t (m :: Type -> Type) a. (IsStream t, Monad m) => Int -> a -> t m a Source #

>>> replicate n = Stream.take n . Stream.repeat

Generate a stream of length n by repeating a value n times.

Since: 0.6.0

yield :: forall t a (m :: Type -> Type). IsStream t => a -> t m a Source #

Same as fromPure

Since: 0.4.0

replicateM :: (IsStream t, MonadAsync m) => Int -> m a -> t m a Source #

>>> replicateM n = Stream.take n . Stream.repeatM

Generate a stream by performing a monadic action n times. Same as:

>>> pr n = threadDelay 1000000 >> print n

This runs serially and takes 3 seconds:

>>> Stream.drain $ Stream.fromSerial $ Stream.replicateM 3 $ pr 1
1
1
1

This runs concurrently and takes just 1 second:

>>> Stream.drain $ Stream.fromAsync  $ Stream.replicateM 3 $ pr 1
1
1
1

Concurrent

Since: 0.1.1

unfold :: forall t (m :: Type -> Type) a b. (IsStream t, Monad m) => Unfold m a b -> a -> t m b Source #

Convert an Unfold into a stream by supplying it an input seed.

>>> Stream.drain $ Stream.unfold Unfold.replicateM (3, putStrLn "hello")
hello
hello
hello

Since: 0.7.0

cons :: forall t a (m :: Type -> Type). IsStream t => a -> t m a -> t m a infixr 5 Source #

Construct a stream by adding a pure value at the head of an existing stream. For serial streams this is the same as (return a) `consM` r but more efficient. For concurrent streams this is not concurrent whereas consM is concurrent. For example:

> toList $ 1 `cons` 2 `cons` 3 `cons` nil
[1,2,3]

Since: 0.1.0

timeout :: AbsTime -> t m () Source #

Generate a singleton event at or after the specified absolute time. Note that this is different from a threadDelay, a threadDelay starts from the time when the action is evaluated, whereas if we use AbsTime based timeout it will immediately expire if the action is evaluated too late.

Unimplemented

times :: forall t (m :: Type -> Type). (IsStream t, MonadAsync m) => t m (AbsTime, RelTime64) Source #

times returns a stream of time value tuples with clock of 10 ms granularity. The first component of the tuple is an absolute time reference (epoch) denoting the start of the stream and the second component is a time relative to the reference.

>>> Stream.mapM_ (\x -> print x >> threadDelay 1000000) $ Stream.take 3 $ Stream.times
(AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...))
(AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...))
(AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...))

Note: This API is not safe on 32-bit machines.

Pre-release

fromEffect :: (Monad m, IsStream t) => m a -> t m a Source #

fromEffect m = m `consM` nil

Create a singleton stream from a monadic action.

> Stream.toList $ Stream.fromEffect getLine
hello
["hello"]

Since: 0.8.0 (Renamed yieldM to fromEffect)

nil :: forall t (m :: Type -> Type) a. IsStream t => t m a Source #

nilM :: (IsStream t, Monad m) => m b -> t m a Source #

unfoldrM :: (IsStream t, MonadAsync m) => (b -> m (Maybe (a, b))) -> b -> t m a Source #

Build a stream by unfolding a monadic step function starting from a seed. The step function returns the next element in the stream and the next seed value. When it is done it returns Nothing and the stream ends. For example,

>>> :{
let f b =
        if b > 2
        then return Nothing
        else return (Just (b, b + 1))
in Stream.toList $ Stream.unfoldrM f 0
:}
[0,1,2]

When run concurrently, the next unfold step can run concurrently with the processing of the output of the previous step. Note that more than one step cannot run concurrently as the next step depends on the output of the previous step.

>>> :{
let f b =
        if b > 2
        then return Nothing
        else threadDelay 1000000 >> return (Just (b, b + 1))
in Stream.toList $ Stream.delay 1 $ Stream.fromAsync $ Stream.unfoldrM f 0
:}
[0,1,2]

Concurrent

Since: 0.1.0

(.:) :: forall t a (m :: Type -> Type). IsStream t => a -> t m a -> t m a infixr 5 Source #

Operator equivalent of cons.

> toList $ 1 .: 2 .: 3 .: nil
[1,2,3]

Since: 0.1.1

fromPure :: forall t a (m :: Type -> Type). IsStream t => a -> t m a Source #

fromPure a = a `cons` nil

Create a singleton stream from a pure value.

The following holds in monadic streams, but not in Zip streams:

fromPure = pure
fromPure = fromEffect . pure

In Zip applicative streams fromPure is not the same as pure because in that case pure is equivalent to repeat instead. fromPure and pure are equally efficient, in other cases fromPure may be slightly more efficient than the other equivalent definitions.

Since: 0.8.0 (Renamed yield to fromPure)

consM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a infixr 5 Source #

Constructs a stream by adding a monadic action at the head of an existing stream. For example:

> toList $ getLine `consM` getLine `consM` nil
hello
world
["hello","world"]

Concurrent (do not use fromParallel to construct infinite streams)

Since: 0.2.0

(|:) :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a infixr 5 Source #

Operator equivalent of consM. We can read it as "parallel colon" to remember that | comes before :.

> toList $ getLine |: getLine |: nil
hello
world
["hello","world"]
let delay = threadDelay 1000000 >> print 1
drain $ fromSerial  $ delay |: delay |: delay |: nil
drain $ fromParallel $ delay |: delay |: delay |: nil

Concurrent (do not use fromParallel to construct infinite streams)

Since: 0.2.0

fromFoldable :: forall t f a (m :: Type -> Type). (IsStream t, Foldable f) => f a -> t m a Source #

>>> fromFoldable = Prelude.foldr Stream.cons Stream.nil

Construct a stream from a Foldable containing pure values:

Since: 0.2.0

fromFoldableM :: (IsStream t, MonadAsync m, Foldable f) => f (m a) -> t m a Source #

>>> fromFoldableM = Prelude.foldr Stream.consM Stream.nil

Construct a stream from a Foldable containing monadic actions.

>>> pr n = threadDelay 1000000 >> print n
>>> Stream.drain $ Stream.fromSerial $ Stream.fromFoldableM $ map pr [1,2,3]
1
2
3
>>> Stream.drain $ Stream.fromAsync $ Stream.fromFoldableM $ map pr [1,2,3]
...
...
...

Concurrent (do not use with fromParallel on infinite containers)

Since: 0.3.0

class Enum a => Enumerable a where Source #

Types that can be enumerated as a stream. The operations in this type class are equivalent to those in the Enum type class, except that these generate a stream instead of a list. Use the functions in Streamly.Internal.Data.Stream.Enumeration module to define new instances.

Since: 0.6.0

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => a -> t m a Source #

enumerateFrom from generates a stream starting with the element from, enumerating up to maxBound when the type is Bounded or generating an infinite stream when the type is not Bounded.

>>> Stream.toList $ Stream.take 4 $ Stream.enumerateFrom (0 :: Int)
[0,1,2,3]

For Fractional types, enumeration is numerically stable. However, no overflow or underflow checks are performed.

>>> Stream.toList $ Stream.take 4 $ Stream.enumerateFrom 1.1
[1.1,2.1,3.1,4.1]

Since: 0.6.0

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => a -> a -> t m a Source #

Generate a finite stream starting with the element from, enumerating the type up to the value to. If to is smaller than from then an empty stream is returned.

>>> Stream.toList $ Stream.enumerateFromTo 0 4
[0,1,2,3,4]

For Fractional types, the last element is equal to the specified to value after rounding to the nearest integral value.

>>> Stream.toList $ Stream.enumerateFromTo 1.1 4
[1.1,2.1,3.1,4.1]

>>> Stream.toList $ Stream.enumerateFromTo 1.1 4.6
[1.1,2.1,3.1,4.1,5.1]

Since: 0.6.0

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => a -> a -> t m a Source #

enumerateFromThen from then generates a stream whose first element is from, the second element is then and the successive elements are in increments of then - from. Enumeration can occur downwards or upwards depending on whether then comes before or after from. For Bounded types the stream ends when maxBound is reached, for unbounded types it keeps enumerating infinitely.

>>> Stream.toList $ Stream.take 4 $ Stream.enumerateFromThen 0 2
[0,2,4,6]

>>> Stream.toList $ Stream.take 4 $ Stream.enumerateFromThen 0 (-2)
[0,-2,-4,-6]

Since: 0.6.0

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => a -> a -> a -> t m a Source #

enumerateFromThenTo from then to generates a finite stream whose first element is from, the second element is then and the successive elements are in increments of then - from up to to. Enumeration can occur downwards or upwards depending on whether then comes before or after from.

>>> Stream.toList $ Stream.enumerateFromThenTo 0 2 6
[0,2,4,6]

>>> Stream.toList $ Stream.enumerateFromThenTo 0 (-2) (-6)
[0,-2,-4,-6]

Since: 0.6.0

Instances

Instances details
Enumerable Int16 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int16 -> t m Int16 Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int16 -> Int16 -> t m Int16 Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int16 -> Int16 -> t m Int16 Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int16 -> Int16 -> Int16 -> t m Int16 Source #

Enumerable Int32 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int32 -> t m Int32 Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int32 -> Int32 -> t m Int32 Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int32 -> Int32 -> t m Int32 Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int32 -> Int32 -> Int32 -> t m Int32 Source #

Enumerable Int64 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int64 -> t m Int64 Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int64 -> Int64 -> t m Int64 Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int64 -> Int64 -> t m Int64 Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int64 -> Int64 -> Int64 -> t m Int64 Source #

Enumerable Int8 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int8 -> t m Int8 Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int8 -> Int8 -> t m Int8 Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int8 -> Int8 -> t m Int8 Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int8 -> Int8 -> Int8 -> t m Int8 Source #

Enumerable Word16 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word16 -> t m Word16 Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word16 -> Word16 -> t m Word16 Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word16 -> Word16 -> t m Word16 Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word16 -> Word16 -> Word16 -> t m Word16 Source #

Enumerable Word32 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word32 -> t m Word32 Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word32 -> Word32 -> t m Word32 Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word32 -> Word32 -> t m Word32 Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word32 -> Word32 -> Word32 -> t m Word32 Source #

Enumerable Word64 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word64 -> t m Word64 Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word64 -> Word64 -> t m Word64 Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word64 -> Word64 -> t m Word64 Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word64 -> Word64 -> Word64 -> t m Word64 Source #

Enumerable Word8 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word8 -> t m Word8 Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word8 -> Word8 -> t m Word8 Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word8 -> Word8 -> t m Word8 Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word8 -> Word8 -> Word8 -> t m Word8 Source #

Enumerable Ordering Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Ordering -> t m Ordering Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Ordering -> Ordering -> t m Ordering Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Ordering -> Ordering -> t m Ordering Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Ordering -> Ordering -> Ordering -> t m Ordering Source #

Enumerable Integer Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Integer -> t m Integer Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Integer -> Integer -> t m Integer Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Integer -> Integer -> t m Integer Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Integer -> Integer -> Integer -> t m Integer Source #

Enumerable Natural Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Natural -> t m Natural Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Natural -> Natural -> t m Natural Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Natural -> Natural -> t m Natural Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Natural -> Natural -> Natural -> t m Natural Source #

Enumerable () Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => () -> t m () Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => () -> () -> t m () Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => () -> () -> t m () Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => () -> () -> () -> t m () Source #

Enumerable Bool Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Bool -> t m Bool Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Bool -> Bool -> t m Bool Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Bool -> Bool -> t m Bool Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Bool -> Bool -> Bool -> t m Bool Source #

Enumerable Char Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Char -> t m Char Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Char -> Char -> t m Char Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Char -> Char -> t m Char Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Char -> Char -> Char -> t m Char Source #

Enumerable Double Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Double -> t m Double Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Double -> Double -> t m Double Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Double -> Double -> t m Double Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Double -> Double -> Double -> t m Double Source #

Enumerable Float Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Float -> t m Float Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Float -> Float -> t m Float Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Float -> Float -> t m Float Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Float -> Float -> Float -> t m Float Source #

Enumerable Int Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int -> t m Int Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int -> Int -> t m Int Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int -> Int -> t m Int Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int -> Int -> Int -> t m Int Source #

Enumerable Word Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word -> t m Word Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word -> Word -> t m Word Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word -> Word -> t m Word Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word -> Word -> Word -> t m Word Source #

Enumerable a => Enumerable (Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Identity a -> t m (Identity a) Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Identity a -> Identity a -> t m (Identity a) Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Identity a -> Identity a -> t m (Identity a) Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Identity a -> Identity a -> Identity a -> t m (Identity a) Source #

Integral a => Enumerable (Ratio a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Ratio a -> t m (Ratio a) Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Ratio a -> Ratio a -> t m (Ratio a) Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Ratio a -> Ratio a -> t m (Ratio a) Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Ratio a -> Ratio a -> Ratio a -> t m (Ratio a) Source #

HasResolution a => Enumerable (Fixed a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Fixed a -> t m (Fixed a) Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Fixed a -> Fixed a -> t m (Fixed a) Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Fixed a -> Fixed a -> t m (Fixed a) Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Fixed a -> Fixed a -> Fixed a -> t m (Fixed a) Source #

fromListM :: (MonadAsync m, IsStream t) => [m a] -> t m a Source #

>>> fromListM = Stream.fromFoldableM
>>> fromListM = Stream.sequence . Stream.fromList
>>> fromListM = Stream.mapM id . Stream.fromList
>>> fromListM = Prelude.foldr Stream.consM Stream.nil

Construct a stream from a list of monadic actions. This is more efficient than fromFoldableM for serial streams.

Since: 0.4.0

repeatM :: (IsStream t, MonadAsync m) => m a -> t m a Source #

>>> repeatM = fix . consM
>>> repeatM = cycle1 . fromEffect

Generate a stream by repeatedly executing a monadic action forever.

>>> :{
repeatAsync =
       Stream.repeatM (threadDelay 1000000 >> print 1)
     & Stream.take 10
     & Stream.fromAsync
     & Stream.drain
:}

Concurrent, infinite (do not use with fromParallel)

Since: 0.2.0

iterateM :: (IsStream t, MonadAsync m) => (a -> m a) -> m a -> t m a Source #

>>> iterateM f m = m >>= \a -> return a `Stream.consM` iterateM f (f a)

Generate an infinite stream with the first element generated by the action m and each successive element derived by applying the monadic function f on the previous element.

>>> pr n = threadDelay 1000000 >> print n
>>> :{
Stream.iterateM (\x -> pr x >> return (x + 1)) (return 0)
    & Stream.take 3
    & Stream.fromSerial
    & Stream.toList
:}
0
1
[0,1,2]

When run concurrently, the next iteration can run concurrently with the processing of the previous iteration. Note that more than one iteration cannot run concurrently as the next iteration depends on the output of the previous iteration.

>>> :{
Stream.iterateM (\x -> pr x >> return (x + 1)) (return 0)
    & Stream.delay 1
    & Stream.take 3
    & Stream.fromAsync
    & Stream.toList
:}
0
1
...

Concurrent

Since: 0.1.2

Since: 0.7.0 (signature change)

fromIndicesM :: (IsStream t, MonadAsync m) => (Int -> m a) -> t m a Source #

>>> fromIndicesM f = Stream.mapM f $ Stream.enumerateFrom 0
>>> fromIndicesM f = let g i = f i `Stream.consM` g (i + 1) in g 0

Generate an infinite stream, whose values are the output of a monadic function f applied on the corresponding index. Index starts at 0.

Concurrent

Since: 0.6.0

enumerate :: forall t (m :: Type -> Type) a. (IsStream t, Monad m, Bounded a, Enumerable a) => t m a Source #

enumerate = enumerateFrom minBound

Enumerate a Bounded type from its minBound to maxBound

Since: 0.6.0

enumerateTo :: forall t (m :: Type -> Type) a. (IsStream t, Monad m, Bounded a, Enumerable a) => a -> t m a Source #

enumerateTo = enumerateFromTo minBound

Enumerate a Bounded type from its minBound to specified value.

Since: 0.6.0

absTimesWith :: forall t (m :: Type -> Type). (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m AbsTime Source #

absTimesWith g returns a stream of absolute timestamps using a clock of granularity g specified in seconds. A low granularity clock is more expensive in terms of CPU usage. Any granularity lower than 1 ms is treated as 1 ms.

>>> Stream.mapM_ print $ Stream.delayPre 1 $ Stream.take 3 $ absTimesWith 0.01
AbsTime (TimeSpec {sec = ..., nsec = ...})
AbsTime (TimeSpec {sec = ..., nsec = ...})
AbsTime (TimeSpec {sec = ..., nsec = ...})

Note: This API is not safe on 32-bit machines.

Pre-release

relTimesWith :: forall t (m :: Type -> Type). (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m RelTime64 Source #

relTimesWith g returns a stream of relative time values starting from 0, using a clock of granularity g specified in seconds. A low granularity clock is more expensive in terms of CPU usage. Any granularity lower than 1 ms is treated as 1 ms.

>>> Stream.mapM_ print $ Stream.delayPre 1 $ Stream.take 3 $ Stream.relTimesWith 0.01
RelTime64 (NanoSecond64 ...)
RelTime64 (NanoSecond64 ...)
RelTime64 (NanoSecond64 ...)

Note: This API is not safe on 32-bit machines.

Pre-release

absTimes :: forall t (m :: Type -> Type). (IsStream t, MonadAsync m, Functor (t m)) => t m AbsTime Source #

absTimes returns a stream of absolute timestamps using a clock of 10 ms granularity.

>>> Stream.mapM_ print $ Stream.delayPre 1 $ Stream.take 3 $ Stream.absTimes
AbsTime (TimeSpec {sec = ..., nsec = ...})
AbsTime (TimeSpec {sec = ..., nsec = ...})
AbsTime (TimeSpec {sec = ..., nsec = ...})

Note: This API is not safe on 32-bit machines.

Pre-release

relTimes :: forall t (m :: Type -> Type). (IsStream t, MonadAsync m, Functor (t m)) => t m RelTime64 Source #

relTimes returns a stream of relative time values starting from 0, using a clock of granularity 10 ms.

>>> Stream.mapM_ print $ Stream.delayPre 1 $ Stream.take 3 $ Stream.relTimes
RelTime64 (NanoSecond64 ...)
RelTime64 (NanoSecond64 ...)
RelTime64 (NanoSecond64 ...)

Note: This API is not safe on 32-bit machines.

Pre-release

durations :: Double -> t m RelTime64 Source #

durations g returns a stream of relative time values measuring the time elapsed since the immediate predecessor element of the stream was generated. The first element of the stream is always 0. durations uses a clock of granularity g specified in seconds. A low granularity clock is more expensive in terms of CPU usage. The minimum granularity is 1 millisecond. Durations lower than 1 ms will be 0.

Note: This API is not safe on 32-bit machines.

Unimplemented

fromIndices :: forall t (m :: Type -> Type) a. (IsStream t, Monad m) => (Int -> a) -> t m a Source #

>>> fromIndices f = fmap f $ Stream.enumerateFrom 0
>>> fromIndices f = let g i = f i `Stream.cons` g (i + 1) in g 0

Generate an infinite stream, whose values are the output of a function f applied on the corresponding index. Index starts at 0.

>>> Stream.toList $ Stream.take 5 $ Stream.fromIndices id
[0,1,2,3,4]

Since: 0.6.0

fromCallback :: MonadAsync m => ((a -> m ()) -> m ()) -> SerialT m a Source #

Takes a callback setter function and provides it with a callback. The callback when invoked adds a value at the tail of the stream. Returns a stream of values generated by the callback.

Pre-release

yieldM :: (Monad m, IsStream t) => m a -> t m a Source #

Same as fromEffect

Since: 0.4.0

fromHandle :: forall t (m :: Type -> Type). (IsStream t, MonadIO m) => Handle -> t m String Source #

Read lines from an IO Handle into a stream of Strings.

Since: 0.1.0

ticks :: Rate -> t m () Source #

Generate ticks at the specified rate. The rate is adaptive, the tick generation speed can be increased or decreased at different times to achieve the specified rate. The specific behavior for different styles of Rate specifications is documented under Rate. The effective maximum rate achieved by a stream is governed by the processor speed.

Unimplemented

unfold0 :: forall t (m :: Type -> Type) b. (IsStream t, Monad m) => Unfold m Void b -> t m b Source #

Convert an Unfold with a closed input end into a stream.

Pre-release

fromPrimIORef :: forall t (m :: Type -> Type) a. (IsStream t, MonadIO m, Unbox a) => IORef a -> t m a Source #

Construct a stream by reading an Unboxed IORef repeatedly.

Pre-release

currentTime :: forall t (m :: Type -> Type). (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m AbsTime Source #

foldr :: Monad m => (a -> b -> b) -> b -> SerialT m a -> m b Source #

Right fold, lazy for lazy monads and pure streams, and strict for strict monads.

Please avoid using this routine in strict monads like IO unless you need a strict right fold. This is provided only for use in lazy monads (e.g. Identity) or pure streams. Note that with this signature it is not possible to implement a lazy foldr when the monad m is strict. In that case it would be strict in its accumulator and therefore would necessarily consume all its input.

Since: 0.1.0

maximumBy :: Monad m => (a -> a -> Ordering) -> SerialT m a -> m (Maybe a) Source #

Determine the maximum element in a stream using the supplied comparison function.

maximumBy = Stream.fold Fold.maximumBy

Since: 0.6.0

minimumBy :: Monad m => (a -> a -> Ordering) -> SerialT m a -> m (Maybe a) Source #

Determine the minimum element in a stream using the supplied comparison function.

minimumBy = Stream.fold Fold.minimumBy

Since: 0.6.0

length :: Monad m => SerialT m a -> m Int Source #

Determine the length of the stream.

Since: 0.1.0

head :: Monad m => SerialT m a -> m (Maybe a) Source #

Extract the first element of the stream, if any.

head = (!! 0)
head = Stream.fold Fold.one

Since: 0.1.0

foldl' :: Monad m => (b -> a -> b) -> b -> SerialT m a -> m b Source #

Left associative/strict push fold. foldl' reduce initial stream invokes reduce with the accumulator and the next input in the input stream, using initial as the initial value of the current value of the accumulator. When the input is exhausted the current value of the accumulator is returned. Make sure to use a strict data structure for accumulator to not build unnecessary lazy expressions unless that's what you want. See the previous section for more details.

Since: 0.2.0

mapM_ :: Monad m => (a -> m b) -> SerialT m a -> m () Source #

mapM_ = Stream.drain . Stream.mapM

Apply a monadic action to each element of the stream and discard the output of the action. This is not really a pure transformation operation but a transformation followed by fold.

Since: 0.1.0

toList :: Monad m => SerialT m a -> m [a] Source #

toList = Stream.foldr (:) []

Convert a stream into a list in the underlying monad. The list can be consumed lazily in a lazy monad (e.g. Identity). In a strict monad (e.g. IO) the whole list is generated and buffered before it can be consumed.

Warning! working on large lists accumulated as buffers in memory could be very inefficient, consider using Streamly.Array instead.

Since: 0.1.0

mconcat :: (Monad m, Monoid a) => SerialT m a -> m a Source #

Fold a stream of monoid elements by appending them.

mconcat = Stream.fold Fold.mconcat

Pre-release

uncons :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (a, t m a)) Source #

Decompose a stream into its head and tail. If the stream is empty, returns Nothing. If the stream is non-empty, returns Just (a, ma), where a is the head of the stream and ma its tail.

This can be used to do pretty much anything in an imperative manner, as it just breaks down the stream into individual elements and we can loop over them as we deem fit. For example, this can be used to convert a streamly stream into other stream types.

All the folds in this module can be expressed in terms of uncons, however, this is generally less efficient than specific folds because it takes apart the stream one element at a time, therefore, does not take adavantage of stream fusion.

Since: 0.1.0

tail :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (t m a)) Source #

tail = fmap (fmap snd) . Stream.uncons

Extract all but the first element of the stream, if any.

Since: 0.1.1

last :: Monad m => SerialT m a -> m (Maybe a) Source #

Extract the last element of the stream, if any.

last xs = xs !! (Stream.length xs - 1)
last = Stream.fold Fold.last

Since: 0.1.1

init :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (t m a)) Source #

Extract all but the last element of the stream, if any.

Since: 0.5.0

null :: Monad m => SerialT m a -> m Bool Source #

Determine whether the stream is empty.

null = Stream.fold Fold.null

Since: 0.1.1

foldl1' :: Monad m => (a -> a -> a) -> SerialT m a -> m (Maybe a) Source #

Strict left fold, for non-empty streams, using first element as the starting value. Returns Nothing if the stream is empty.

Since: 0.5.0

sum :: (Monad m, Num a) => SerialT m a -> m a Source #

Determine the sum of all elements of a stream of numbers. Returns 0 when the stream is empty. Note that this is not numerically stable for floating point numbers.

sum = Stream.fold Fold.sum

Since: 0.1.0

product :: (Monad m, Num a) => SerialT m a -> m a Source #

Determine the product of all elements of a stream of numbers. Returns 1 when the stream is empty.

product = Stream.fold Fold.product

Since: 0.1.1

foldr1 :: Monad m => (a -> a -> a) -> SerialT m a -> m (Maybe a) Source #

Lazy right fold for non-empty streams, using first element as the starting value. Returns Nothing if the stream is empty.

Since: 0.5.0

maximum :: (Monad m, Ord a) => SerialT m a -> m (Maybe a) Source #

maximum = maximumBy compare
maximum = Stream.fold Fold.maximum

Determine the maximum element in a stream.

Since: 0.1.0

minimum :: (Monad m, Ord a) => SerialT m a -> m (Maybe a) Source #

minimum = minimumBy compare
minimum = Stream.fold Fold.minimum

Determine the minimum element in a stream.

Since: 0.1.0

and :: Monad m => SerialT m Bool -> m Bool Source #

Determines if all elements of a boolean stream are True.

and = Stream.fold Fold.and

Since: 0.5.0

or :: Monad m => SerialT m Bool -> m Bool Source #

Determines whether at least one element of a boolean stream is True.

or = Stream.fold Fold.or

Since: 0.5.0

any :: Monad m => (a -> Bool) -> SerialT m a -> m Bool Source #

Determine whether any of the elements of a stream satisfy a predicate.

any = Stream.fold Fold.any

Since: 0.1.0

all :: Monad m => (a -> Bool) -> SerialT m a -> m Bool Source #

Determine whether all elements of a stream satisfy a predicate.

all = Stream.fold Fold.all

Since: 0.1.0

elem :: (Monad m, Eq a) => a -> SerialT m a -> m Bool Source #

Determine whether an element is present in the stream.

elem = Stream.fold Fold.elem

Since: 0.1.0

notElem :: (Monad m, Eq a) => a -> SerialT m a -> m Bool Source #

Determine whether an element is not present in the stream.

notElem = Stream.fold Fold.length

Since: 0.1.0

lookup :: (Monad m, Eq a) => a -> SerialT m (a, b) -> m (Maybe b) Source #

In a stream of (key-value) pairs (a, b), return the value b of the first pair where the key equals the given value a.

lookup = snd <$> Stream.find ((==) . fst)
lookup = Stream.fold Fold.lookup

Since: 0.5.0

(!!) :: Monad m => SerialT m a -> Int -> m (Maybe a) Source #

Lookup the element at the given index.

Since: 0.6.0

fold :: Monad m => Fold m a b -> SerialT m a -> m b Source #

Fold a stream using the supplied left Fold and reducing the resulting expression strictly at each step. The behavior is similar to foldl'. A Fold can terminate early without consuming the full stream. See the documentation of individual Folds for termination behavior.

>>> Stream.fold Fold.sum (Stream.enumerateFromTo 1 100)
5050

Folds never fail, therefore, they produce a default value even when no input is provided. It means we can always fold an empty stream and get a valid result. For example:

>>> Stream.fold Fold.sum Stream.nil
0

However, foldMany on an empty stream results in an empty stream. Therefore, Stream.fold f is not the same as Stream.head . Stream.foldMany f.

fold f = Stream.parse (Parser.fromFold f)

Since: 0.7.0

foldrM :: Monad m => (a -> m b -> m b) -> m b -> SerialT m a -> m b Source #

Right associative/lazy pull fold. foldrM build final stream constructs an output structure using the step function build. build is invoked with the next input element and the remaining (lazy) tail of the output structure. It builds a lazy output expression using the two. When the "tail structure" in the output expression is evaluated it calls build again thus lazily consuming the input stream until either the output expression built by build is free of the "tail" or the input is exhausted in which case final is used as the terminating case for the output structure. For more details see the description in the previous section.

Example, determine if any element is odd in a stream:

>>> Stream.foldrM (\x xs -> if odd x then return True else xs) (return False) $ Stream.fromList (2:4:5:undefined)
True

Since: 0.7.0 (signature changed)

Since: 0.2.0 (signature changed)

Since: 0.1.0

find :: Monad m => (a -> Bool) -> SerialT m a -> m (Maybe a) Source #

Like findM but with a non-monadic predicate.

find p = findM (return . p)
find = Stream.fold Fold.find

Since: 0.5.0

stripPrefix :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m (Maybe (t m a)) Source #

stripPrefix prefix stream strips prefix from stream if it is a prefix of stream. Returns Nothing if the stream does not start with the given prefix, stripped stream otherwise. Returns Just nil when the prefix is the same as the stream.

See also "Streamly.Internal.Data.Stream.IsStream.Nesting.dropPrefix".

Space: O(1)

Since: 0.6.0

elemIndex :: (Monad m, Eq a) => a -> SerialT m a -> m (Maybe Int) Source #

Returns the first index where a given value is found in the stream.

elemIndex a = Stream.findIndex (== a)

Since: 0.5.0

findIndex :: Monad m => (a -> Bool) -> SerialT m a -> m (Maybe Int) Source #

Returns the first index that satisfies the given predicate.

findIndex = Stream.fold Fold.findIndex

Since: 0.5.0

isPrefixOf :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m Bool Source #

Returns True if the first stream is the same as or a prefix of the second. A stream is a prefix of itself.

>>> Stream.isPrefixOf (Stream.fromList "hello") (Stream.fromList "hello" :: SerialT IO Char)
True

Since: 0.6.0

isSuffixOf :: (Monad m, Eq a) => SerialT m a -> SerialT m a -> m Bool Source #

Returns True if the first stream is a suffix of the second. A stream is considered a suffix of itself.

>>> Stream.isSuffixOf (Stream.fromList "hello") (Stream.fromList "hello" :: SerialT IO Char)
True

Space: O(n), buffers entire input stream and the suffix.

Pre-release

Suboptimal - Help wanted.

isInfixOf :: (MonadIO m, Eq a, Enum a, Storable a, Unbox a) => SerialT m a -> SerialT m a -> m Bool Source #

Returns True if the first stream is an infix of the second. A stream is considered an infix of itself.

Stream.isInfixOf (Stream.fromList "hello") (Stream.fromList "hello" :: SerialT IO Char)

True

Space: O(n) worst case where n is the length of the infix.

Pre-release

Requires Storable constraint

isSubsequenceOf :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m Bool Source #

Returns True if all the elements of the first stream occur, in order, in the second stream. The elements do not have to occur consecutively. A stream is a subsequence of itself.

>>> Stream.isSubsequenceOf (Stream.fromList "hlo") (Stream.fromList "hello" :: SerialT IO Char)
True

Since: 0.6.0

the :: (Eq a, Monad m) => SerialT m a -> m (Maybe a) Source #

Ensures that all the elements of the stream are identical and then returns that unique element.

Since: 0.6.0

stripSuffix :: (Monad m, Eq a) => SerialT m a -> SerialT m a -> m (Maybe (SerialT m a)) Source #

Drops the given suffix from a stream. Returns Nothing if the stream does not end with the given suffix. Returns Just nil when the suffix is the same as the stream.

It may be more efficient to convert the stream to an Array and use stripSuffix on that especially if the elements have a Storable or Prim instance.

See also "Streamly.Internal.Data.Stream.IsStream.Nesting.dropSuffix".

Space: O(n), buffers the entire input stream as well as the suffix

Pre-release

foldlM' :: Monad m => (b -> a -> m b) -> m b -> SerialT m a -> m b Source #

Like foldl' but with a monadic step function.

Since: 0.2.0

Since: 0.8.0 (signature change)

drain :: Monad m => SerialT m a -> m () Source #

drain = mapM_ (\_ -> return ())
drain = Stream.fold Fold.drain

Run a stream, discarding the results. By default it interprets the stream as SerialT, to run other types of streams use the type adapting combinators for example Stream.drain . fromAsync.

Since: 0.7.0

foldlS :: forall t (m :: Type -> Type) b a. IsStream t => (t m b -> a -> t m b) -> t m b -> t m a -> t m b Source #

Lazy left fold to a stream.

foldlT :: forall (m :: Type -> Type) t s b a. (Monad m, IsStream t, Monad (s m), MonadTrans s) => (s m b -> a -> s m b) -> s m b -> t m a -> s m b Source #

Lazy left fold to a transformer monad.

For example, to reverse a stream:

D.toList $ D.foldlT (flip D.cons) D.nil $ (D.fromList [1..5] :: SerialT IO Int)

toListRev :: Monad m => SerialT m a -> m [a] Source #

toListRev = Stream.foldl' (flip (:)) []

Convert a stream into a list in reverse order in the underlying monad.

Warning! working on large lists accumulated as buffers in memory could be very inefficient, consider using Streamly.Array instead.

Pre-release

headElse :: Monad m => a -> SerialT m a -> m a Source #

Extract the first element of the stream, if any, otherwise use the supplied default value. It can help avoid one branch in high performance code.

Pre-release

eqBy :: (IsStream t, Monad m) => (a -> b -> Bool) -> t m a -> t m b -> m Bool Source #

Compare two streams for equality using an equality function.

Since: 0.6.0

cmpBy :: (IsStream t, Monad m) => (a -> b -> Ordering) -> t m a -> t m b -> m Ordering Source #

Compare two streams lexicographically using a comparison function.

Since: 0.6.0

parse :: Monad m => Parser a m b -> SerialT m a -> m (Either ParseError b) Source #

Parse a stream using the supplied Parser.

Unlike folds, parsers may not always result in a valid output, they may result in an error. For example:

>>> Stream.parse (Parser.takeEQ 1 Fold.drain) Stream.nil
Left (ParseError "takeEQ: Expecting exactly 1 elements, input terminated on 0")

Note:

fold f = Stream.parse (Parser.fromFold f)

parse p is not the same as head . parseMany p on an empty stream.

Pre-release

toStreamRev :: forall m a (n :: Type -> Type). Monad m => SerialT m a -> m (SerialT n a) Source #

Convert a stream to a pure stream in reverse order.

toStreamRev = Stream.foldl' (flip Stream.cons) Stream.nil

Pre-release

drainN :: Monad m => Int -> SerialT m a -> m () Source #

drainN n = Stream.drain . Stream.take n
drainN n = Stream.fold (Fold.take n Fold.drain)

Run maximum up to n iterations of a stream.

Since: 0.7.0

findM :: Monad m => (a -> m Bool) -> SerialT m a -> m (Maybe a) Source #

Returns the first element that satisfies the given predicate.

findM = Stream.fold Fold.findM

Since: 0.6.0

parseD :: Monad m => Parser a m b -> SerialT m a -> m (Either ParseError b) Source #

Parse a stream using the supplied ParserD Parser.

Internal

drainWhile :: Monad m => (a -> Bool) -> SerialT m a -> m () Source #

drainWhile p = Stream.drain . Stream.takeWhile p

Run a stream as long as the predicate holds true.

Since: 0.7.0

(|$.) :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b infixr 0 Source #

Parallel fold application operator; applies a fold function t m a -> m b to a stream t m a concurrently; The the input stream is evaluated asynchronously in an independent thread yielding elements to a buffer and the folding action runs in another thread consuming the input from the buffer.

If you read the signature as (t m a -> m b) -> (t m a -> m b) you can look at it as a transformation that converts a fold function to a buffered concurrent fold function.

The . at the end of the operator is a mnemonic for termination of the stream.

In the example below, each stage introduces a delay of 1 sec but output is printed every second because both stages are concurrent.

>>> import Control.Concurrent (threadDelay)
>>> import Streamly.Prelude ((|$.))
>>> :{
 Stream.foldlM' (\_ a -> threadDelay 1000000 >> print a) (return ())
     |$. Stream.replicateM 3 (threadDelay 1000000 >> return 1)
:}
1
1
1

Concurrent

Since: 0.3.0 (Streamly)

Since: 0.8.0

(|&.) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> m b) -> m b infixl 1 Source #

Same as |$. but with arguments reversed.

(|&.) = flip (|$.)

Concurrent

Since: 0.3.0 (Streamly)

Since: 0.8.0

foldx :: Monad m => (x -> a -> x) -> x -> (x -> b) -> SerialT m a -> m b Source #

Strict left fold with an extraction function. Like the standard strict left fold, but applies a user supplied extraction function (the third argument) to the folded value at the end. This is designed to work with the foldl library. The suffix x is a mnemonic for extraction.

Since: 0.2.0

foldxM :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> SerialT m a -> m b Source #

Like foldx, but with a monadic step function.

Since: 0.2.0

runStream :: Monad m => SerialT m a -> m () Source #

Run a stream, discarding the results. By default it interprets the stream as SerialT, to run other types of streams use the type adapting combinators for example runStream . fromAsync.

Since: 0.2.0

runN :: Monad m => Int -> SerialT m a -> m () Source #

runN n = runStream . take n

Run maximum up to n iterations of a stream.

Since: 0.6.0

runWhile :: Monad m => (a -> Bool) -> SerialT m a -> m () Source #

runWhile p = runStream . takeWhile p

Run a stream as long as the predicate holds true.

Since: 0.6.0

toHandle :: MonadIO m => Handle -> SerialT m String -> m () Source #

toHandle h = D.mapM_ $ hPutStrLn h

Write a stream of Strings to an IO Handle.

Since: 0.1.0

foldAsync :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b Source #

Same as |$..

Internal

mapMaybe :: forall t (m :: Type -> Type) a b. (IsStream t, Monad m) => (a -> Maybe b) -> t m a -> t m b Source #

Map a Maybe returning function to a stream, filter out the Nothing elements, and return a stream of values extracted from Just.

Equivalent to:

mapMaybe f = Stream.map fromJust . Stream.filter isJust . Stream.map f

Since: 0.3.0

mapM :: (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m b Source #

mapM f = sequence . map f

Apply a monadic function to each element of the stream and replace it with the output of the resulting action.

>>> drain $ Stream.mapM putStr $ Stream.fromList ["a", "b", "c"]
abc

>>> :{
   drain $ Stream.replicateM 10 (return 1)
     & (fromSerial . Stream.mapM (x -> threadDelay 1000000 >> print x))
:}
1
...
1

> drain $ Stream.replicateM 10 (return 1)
 & (fromAsync . Stream.mapM (x -> threadDelay 1000000 >> print x))

Concurrent (do not use with fromParallel on infinite streams)

Since: 0.1.0

sequence :: (IsStream t, MonadAsync m) => t m (m a) -> t m a Source #

sequence = mapM id

Replace the elements of a stream of monadic actions with the outputs of those actions.

>>> drain $ Stream.sequence $ Stream.fromList [putStr "a", putStr "b", putStrLn "c"]
abc

>>> :{
drain $ Stream.replicateM 3 (return $ threadDelay 1000000 >> print 1)
 & (fromSerial . Stream.sequence)
:}
1
1
1

>>> :{
drain $ Stream.replicateM 3 (return $ threadDelay 1000000 >> print 1)
 & (fromAsync . Stream.sequence)
:}
1
1
1

Concurrent (do not use with fromParallel on infinite streams)

Since: 0.1.0

with :: forall t (m :: Type -> Type) a b s. Functor (t m) => (t m a -> t m (s, a)) -> (((s, a) -> b) -> t m (s, a) -> t m (s, a)) -> ((s, a) -> b) -> t m a -> t m a Source #

Modify a t m a -> t m a stream transformation that accepts a predicate (a -> b) to accept ((s, a) -> b) instead, provided a transformation t m a -> t m (s, a). Convenient to filter with index or time.

filterWithIndex = with indexed filter
filterWithAbsTime = with timestamped filter
filterWithRelTime = with timeIndexed filter

Pre-release

filter :: forall t (m :: Type -> Type) a. (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a Source #

Include only those elements that pass a predicate.

Since: 0.1.0

trace :: (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m a Source #

Apply a monadic function to each element flowing through the stream and discard the results.

>>> Stream.drain $ Stream.trace print (Stream.enumerateFromTo 1 2)
1
2

Compare with tap.

Since: 0.7.0

map :: forall t (m :: Type -> Type) a b. (IsStream t, Monad m) => (a -> b) -> t m a -> t m b Source #

map = fmap

Same as fmap.

> D.toList $ D.map (+1) $ D.fromList [1,2,3]
[2,3,4]

Since: 0.4.0

catMaybes :: forall t (m :: Type -> Type) a. (IsStream t, Monad m, Functor (t m)) => t m (Maybe a) -> t m a Source #

In a stream of Maybes, discard Nothings and unwrap Justs.

Pre-release

scanl' :: forall t (m :: Type -> Type) b a. (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b Source #

Strict left scan. Like map, scanl' too is a one to one transformation, however it adds an extra element.

>>> Stream.toList $ Stream.scanl' (+) 0 $ fromList [1,2,3,4]
[0,1,3,6,10]

>>> Stream.toList $ Stream.scanl' (flip (:)) [] $ Stream.fromList [1,2,3,4]
[[],[1],[2,1],[3,2,1],[4,3,2,1]]

The output of scanl' is the initial value of the accumulator followed by all the intermediate steps and the final result of foldl'.

By streaming the accumulated state after each fold step, we can share the state across multiple stages of stream composition. Each stage can modify or extend the state, do some processing with it and emit it for the next stage, thus modularizing the stream processing. This can be useful in stateful or event-driven programming.

Consider the following monolithic example, computing the sum and the product of the elements in a stream in one go using a foldl':

>>> Stream.foldl' ((s, p) x -> (s + x, p * x)) (0,1) $ Stream.fromList 1,2,3,4

Using scanl' we can make it modular by computing the sum in the first stage and passing it down to the next stage for computing the product:

>>> :{
  Stream.foldl' ((_, p) (s, x) -> (s, p * x)) (0,1)
  $ Stream.scanl' ((s, _) x -> (s + x, x)) (0,1)
  $ Stream.fromList [1,2,3,4]
:}
(10,24)

IMPORTANT: scanl' evaluates the accumulator to WHNF. To avoid building lazy expressions inside the accumulator, it is recommended that a strict data structure is used for accumulator.

>>> scanl' step z = scan (Fold.foldl' step z)
>>> scanl' f z xs = scanlM' (\a b -> return (f a b)) (return z) xs
>>> scanl' f z xs = z `Stream.cons` postscanl' f z xs

See also: usingStateT

Since: 0.2.0

takeWhile :: forall t (m :: Type -> Type) a. (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a Source #

End the stream as soon as the predicate fails on an element.

Since: 0.1.0

dropWhile :: forall t (m :: Type -> Type) a. (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a Source #

Drop elements in the stream as long as the predicate succeeds and then take the rest of the stream.

Since: 0.1.0

take :: forall t (m :: Type -> Type) a. (IsStream t, Monad m) => Int -> t m a -> t m a Source #

Take first n elements from the stream and discard the rest.

Since: 0.1.0

drop :: forall t (m :: Type -> Type) a. (IsStream t, Monad m) => Int -> t m a -> t m a Source #

Discard first n elements from the stream and take the rest.

Since: 0.1.0

reverse :: forall t (m :: Type -> Type) a. (IsStream t, Monad m) => t m a -> t m a Source #

Returns the elements of the stream in reverse order. The stream must be finite. Note that this necessarily buffers the entire stream in memory.

>>> reverse = Stream.foldlT (flip Stream.cons) Stream.nil

Since 0.7.0 (Monad m constraint)

Since: 0.1.1

lefts :: forall t (m :: Type -> Type) a b. (IsStream t, Monad m, Functor (t m)) => t m (Either a b) -> t m a Source #

Discard Rights and unwrap Lefts in an Either stream.

Pre-release

rights :: forall t (m :: Type -> Type) a b. (IsStream t, Monad m, Functor (t m)) => t m (Either a b) -> t m b Source #

Discard Lefts and unwrap Rights in an Either stream.

Pre-release

elemIndices :: forall t a (m :: Type -> Type). (IsStream t, Eq a, Monad m) => a -> t m a -> t m Int Source #

Find all the indices where the value of the element in the stream is equal to the given value.

elemIndices a = findIndices (== a)

Since: 0.5.0

findIndices :: forall t (m :: Type -> Type) a. (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m Int Source #

Find all the indices where the element in the stream satisfies the given predicate.

findIndices = fold Fold.findIndices

Since: 0.5.0

nubBy :: (a -> a -> Bool) -> t m a -> t m a Source #

Drop repeated elements anywhere in the stream.

Caution: not scalable for infinite streams

See also: nubWindowBy

Unimplemented

deleteBy :: forall t (m :: Type -> Type) a. (IsStream t, Monad m) => (a -> a -> Bool) -> a -> t m a -> t m a Source #

Deletes the first occurrence of the element in the stream that satisfies the given equality predicate.

>>> Stream.toList $ Stream.deleteBy (==) 3 $ Stream.fromList [1,3,3,5]
[1,3,5]

Since: 0.6.0

intersperse :: forall t (m :: Type -> Type) a. (IsStream t, MonadAsync m) => a -> t m a -> t m a Source #

Insert a pure value between successive elements of a stream.

>>> Stream.toList $ Stream.intersperse ',' $ Stream.fromList "hello"
"h,e,l,l,o"

Since: 0.7.0

insertBy :: forall t (m :: Type -> Type) a. (IsStream t, Monad m) => (a -> a -> Ordering) -> a -> t m a -> t m a Source #

insertBy cmp elem stream inserts elem before the first element in stream that is less than elem when compared using cmp.

insertBy cmp x = mergeBy cmp (fromPure x)
>>> Stream.toList $ Stream.insertBy compare 2 $ Stream.fromList [1,3,5]
[1,2,3,5]

Since: 0.6.0

filterM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a Source #

Same as filter but with a monadic predicate.

Since: 0.4.0

data Rate #

Specifies the stream yield rate in yields per second (Hertz). We keep accumulating yield credits at rateGoal. At any point of time we allow only as many yields as we have accumulated as per rateGoal since the start of time. If the consumer or the producer is slower or faster, the actual rate may fall behind or exceed rateGoal. We try to recover the gap between the two by increasing or decreasing the pull rate from the producer. However, if the gap becomes more than rateBuffer we try to recover only as much as rateBuffer.

rateLow puts a bound on how low the instantaneous rate can go when recovering the rate gap. In other words, it determines the maximum yield latency. Similarly, rateHigh puts a bound on how high the instantaneous rate can go when recovering the rate gap. In other words, it determines the minimum yield latency. We reduce the latency by increasing concurrency, therefore we can say that it puts an upper bound on concurrency.

If the rateGoal is 0 or negative the stream never yields a value. If the rateBuffer is 0 or negative we do not attempt to recover.

Since: 0.5.0 (Streamly)

Since: streamly-core-0.8.0

Constructors

Rate 

Fields

foldrSShared :: forall t a (m :: Type -> Type) b. IsStream t => (a -> t m b -> t m b) -> t m b -> t m a -> t m b Source #

foldrS :: forall t a (m :: Type -> Type) b. IsStream t => (a -> t m b -> t m b) -> t m b -> t m a -> t m b Source #

Right fold to a streaming monad.

foldrS Stream.cons Stream.nil === id

foldrS can be used to perform stateless stream to stream transformations like map and filter in general. It can be coupled with a scan to perform stateful transformations. However, note that the custom map and filter routines can be much more efficient than this due to better stream fusion.

>>> Stream.toList $ Stream.foldrS Stream.cons Stream.nil $ Stream.fromList [1..5]
[1,2,3,4,5]

Find if any element in the stream is True:

>>> Stream.toList $ Stream.foldrS (\x xs -> if odd x then (Stream.fromPure True) else xs) (Stream.fromPure False) $ (Stream.fromList (2:4:5:undefined) :: Stream.SerialT IO Int)
[True]

Map (+2) on odd elements and filter out the even elements:

>>> Stream.toList $ Stream.foldrS (\x xs -> if odd x then (x + 2) `Stream.cons` xs else xs) Stream.nil $ (Stream.fromList [1..5] :: Stream.SerialT IO Int)
[3,5,7]

foldrM can also be represented in terms of foldrS, however, the former is much more efficient:

foldrM f z s = runIdentityT $ foldrS (\x xs -> lift $ f x (runIdentityT xs)) (lift z) s

Pre-release

foldrT :: forall t (m :: Type -> Type) s a b. (IsStream t, Monad m, Monad (s m), MonadTrans s) => (a -> s m b -> s m b) -> s m b -> t m a -> s m b Source #

Right fold to a transformer monad. This is the most general right fold function. foldrS is a special case of foldrT, however foldrS implementation can be more efficient:

foldrS = foldrT
foldrM f z s = runIdentityT $ foldrT (\x xs -> lift $ f x (runIdentityT xs)) (lift z) s

foldrT can be used to translate streamly streams to other transformer monads e.g. to a different streaming type.

Pre-release

postscan :: forall t (m :: Type -> Type) a b. (IsStream t, Monad m) => Fold m a b -> t m a -> t m b Source #

Postscan a stream using the given monadic fold.

The following example extracts the input stream up to a point where the running average of elements is no more than 10:

>>> import Data.Maybe (fromJust)
>>> let avg = Fold.teeWith (/) Fold.sum (fmap fromIntegral Fold.length)
>>> :{
 Stream.toList
  $ Stream.map (fromJust . fst)
  $ Stream.takeWhile (\(_,x) -> x <= 10)
  $ Stream.postscan (Fold.tee Fold.last avg) (Stream.enumerateFromTo 1.0 100.0)
:}
[1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,11.0,12.0,13.0,14.0,15.0,16.0,17.0,18.0,19.0]

Since: 0.7.0

scan :: forall t (m :: Type -> Type) a b. (IsStream t, Monad m) => Fold m a b -> t m a -> t m b Source #

Scan a stream using the given monadic fold.

>>> Stream.toList $ Stream.takeWhile (< 10) $ Stream.scan Fold.sum (Stream.fromList [1..10])
[0,1,3,6]

Since: 0.7.0

scanMany :: forall t (m :: Type -> Type) a b. (IsStream t, Monad m) => Fold m a b -> t m a -> t m b Source #

Like scan but restarts scanning afresh when the scanning fold terminates.

Pre-release

both :: Functor (t m) => t m (Either a a) -> t m a Source #

Remove the either wrapper and flatten both lefts and as well as rights in the output stream.

Pre-release

takeWhileM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a Source #

Same as takeWhile but with a monadic predicate.

Since: 0.4.0

postscanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> t m b Source #

Like postscanl' but with a monadic step function and a monadic seed.

>>> postscanlM' f z xs = Stream.drop 1 $ Stream.scanlM' f z xs

Since: 0.7.0

Since: 0.8.0 (signature change)

dropWhileM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a Source #

Same as dropWhile but with a monadic predicate.

Since: 0.4.0

indexed :: forall t (m :: Type -> Type) a. (IsStream t, Monad m) => t m a -> t m (Int, a) Source #

indexed = Stream.postscanl' (\(i, _) x -> (i + 1, x)) (-1,undefined)
indexed = Stream.zipWith (,) (Stream.enumerateFrom 0)

Pair each element in a stream with its index, starting from index 0.

>>> Stream.toList $ Stream.indexed $ Stream.fromList "hello"
[(0,'h'),(1,'e'),(2,'l'),(3,'l'),(4,'o')]

Since: 0.6.0

tap :: forall t (m :: Type -> Type) a b. (IsStream t, Monad m) => Fold m a b -> t m a -> t m a Source #

Tap the data flowing through a stream into a Fold. For example, you may add a tap to log the contents flowing through the stream. The fold is used only for effects, its result is discarded.

                  Fold m a b
                      |
-----stream m a ---------------stream m a-----

>>> Stream.drain $ Stream.tap (Fold.drainBy print) (Stream.enumerateFromTo 1 2)
1
2

Compare with trace.

Since: 0.7.0

tapOffsetEvery :: forall t (m :: Type -> Type) a b. (IsStream t, Monad m) => Int -> Int -> Fold m a b -> t m a -> t m a Source #

tapOffsetEvery offset n taps every nth element in the stream starting at offset. offset can be between 0 and n - 1. Offset 0 means start at the first element in the stream. If the offset is outside this range then offset mod n is used as offset.

>>> Stream.drain $ Stream.tapOffsetEvery 0 2 (Fold.rmapM print Fold.toList) $ Stream.enumerateFromTo 0 10
[0,2,4,6,8,10]

trace_ :: (IsStream t, Monad m) => m b -> t m a -> t m a Source #

Perform a side effect before yielding each element of the stream and discard the results.

>>> Stream.drain $ Stream.trace_ (print "got here") (Stream.enumerateFromTo 1 2)
"got here"
"got here"

Same as intersperseMPrefix_ but always serial.

See also: trace

Pre-release

prescanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> t m b Source #

Like prescanl' but with a monadic step function and a monadic seed.

Pre-release

prescanl' :: forall t (m :: Type -> Type) b a. (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b Source #

Like scanl' but does not stream the final value of the accumulator.

Pre-release

postscanl' :: forall t (m :: Type -> Type) b a. (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b Source #

Like scanl' but does not stream the initial value of the accumulator.

>>> postscanl' step z = postscan (Fold.foldl' step z)
>>> postscanl' f z = postscanlM' (\a b -> return (f a b)) (return z)
>>> postscanl' f z xs = Stream.drop 1 $ Stream.scanl' f z xs

Since: 0.7.0

postscanlMAfter' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> (b -> m b) -> t m a -> t m b Source #

scanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> t m b Source #

Like scanl' but with a monadic step function and a monadic seed.

Since: 0.4.0

Since: 0.8.0 (signature change)

scanlMAfter' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> (b -> m b) -> t m a -> t m b Source #

scanlMAfter' accumulate initial done stream is like scanlM' except that it provides an additional done function to be applied on the accumulator when the stream stops. The result of done is also emitted in the stream.

This function can be used to allocate a resource in the beginning of the scan and release it when the stream ends or to flush the internal state of the scan at the end.

Pre-release

scanl1M' :: (IsStream t, Monad m) => (a -> a -> m a) -> t m a -> t m a Source #

Like scanl1' but with a monadic step function.

Since: 0.6.0

scanl1' :: forall t (m :: Type -> Type) a. (IsStream t, Monad m) => (a -> a -> a) -> t m a -> t m a Source #

Like scanl' but for a non-empty stream. The first element of the stream is used as the initial value of the accumulator. Does nothing if the stream is empty.

>>> Stream.toList $ Stream.scanl1' (+) $ fromList [1,2,3,4]
[1,3,6,10]

Since: 0.6.0

uniqBy :: forall t (m :: Type -> Type) a. (IsStream t, Monad m, Functor (t m)) => (a -> a -> Bool) -> t m a -> t m a Source #

Drop repeated elements that are adjacent to each other using the supplied comparison function.

@uniq = uniqBy (==)

To strip duplicate path separators:

f x y = x == / && x == y
Stream.toList $ Stream.uniqBy f $ Stream.fromList "/a/b"
"ab"

Space: O(1)

See also: nubBy.

Pre-release

uniq :: forall a t (m :: Type -> Type). (Eq a, IsStream t, Monad m) => t m a -> t m a Source #

Drop repeated elements that are adjacent to each other.

Since: 0.6.0

prune :: (a -> Bool) -> t m a -> t m a Source #

Strip all leading and trailing occurrences of an element passing a predicate and make all other consecutive occurrences uniq.

prune p = dropWhileAround p $ uniqBy (x y -> p x && p y)
> Stream.prune isSpace (Stream.fromList "  hello      world!   ")
"hello world!"

Space: O(1)

Unimplemented

repeated :: t m a -> t m a Source #

Emit only repeated elements, once.

Unimplemented

takeWhileLast :: (a -> Bool) -> t m a -> t m a Source #

Take all consecutive elements at the end of the stream for which the predicate is true.

O(n) space, where n is the number elements taken.

Unimplemented

takeWhileAround :: (a -> Bool) -> t m a -> t m a Source #

Like takeWhile and takeWhileLast combined.

O(n) space, where n is the number elements taken from the end.

Unimplemented

dropLast :: Int -> t m a -> t m a Source #

Drop n elements at the end of the stream.

O(n) space, where n is the number elements dropped.

Unimplemented

dropWhileLast :: (a -> Bool) -> t m a -> t m a Source #

Drop all consecutive elements at the end of the stream for which the predicate is true.

O(n) space, where n is the number elements dropped.

Unimplemented

dropWhileAround :: (a -> Bool) -> t m a -> t m a Source #

Like dropWhile and dropWhileLast combined.

O(n) space, where n is the number elements dropped from the end.

Unimplemented

intersperseM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a Source #

Insert an effect and its output before consuming an element of a stream except the first one.

>>> Stream.toList $ Stream.trace putChar $ Stream.intersperseM (putChar '.' >> return ',') $ Stream.fromList "hello"
h.,e.,l.,l.,o"h,e,l,l,o"

Be careful about the order of effects. In the above example we used trace after the intersperse, if we use it before the intersperse the output would be he.l.l.o."h,e,l,l,o".

>>> Stream.toList $ Stream.intersperseM (putChar '.' >> return ',') $ Stream.trace putChar $ Stream.fromList "hello"
he.l.l.o."h,e,l,l,o"

Since: 0.5.0

intersperseM_ :: (IsStream t, Monad m) => m b -> t m a -> t m a Source #

Insert a side effect before consuming an element of a stream except the first one.

>>> Stream.drain $ Stream.trace putChar $ Stream.intersperseM_ (putChar '.') $ Stream.fromList "hello"
h.e.l.l.o

Pre-release

intersperseMSuffix :: (IsStream t, Monad m) => m a -> t m a -> t m a Source #

Insert an effect and its output after consuming an element of a stream.

>>> Stream.toList $ Stream.trace putChar $ intersperseMSuffix (putChar '.' >> return ',') $ Stream.fromList "hello"
h.,e.,l.,l.,o.,"h,e,l,l,o,"

Pre-release

intersperseMSuffix_ :: (IsStream t, Monad m) => m b -> t m a -> t m a Source #

Insert a side effect after consuming an element of a stream.

>>> Stream.mapM_ putChar $ Stream.intersperseMSuffix_ (threadDelay 1000000) $ Stream.fromList "hello"
hello

Pre-release

intersperseMSuffixWith :: (IsStream t, Monad m) => Int -> m a -> t m a -> t m a Source #

Like intersperseMSuffix but intersperses an effectful action into the input stream after every n elements and after the last element.

>>> Stream.toList $ Stream.intersperseMSuffixWith 2 (return ',') $ Stream.fromList "hello"
"he,ll,o,"

Pre-release

intersperseMPrefix_ :: (IsStream t, MonadAsync m) => m b -> t m a -> t m a Source #

Insert a side effect before consuming an element of a stream.

>>> Stream.toList $ Stream.trace putChar $ Stream.intersperseMPrefix_ (putChar '.' >> return ',') $ Stream.fromList "hello"
.h.e.l.l.o"hello"

Same as trace_ but may be concurrent.

Concurrent

Pre-release

delay :: forall t (m :: Type -> Type) a. (IsStream t, MonadIO m) => Double -> t m a -> t m a Source #

Introduce a delay of specified seconds before consuming an element of the stream except the first one.

>>> Stream.mapM_ print $ Stream.timestamped $ Stream.delay 1 $ Stream.enumerateFromTo 1 3
(AbsTime (TimeSpec {sec = ..., nsec = ...}),1)
(AbsTime (TimeSpec {sec = ..., nsec = ...}),2)
(AbsTime (TimeSpec {sec = ..., nsec = ...}),3)

Since: 0.8.0

delayPost :: forall t (m :: Type -> Type) a. (IsStream t, MonadIO m) => Double -> t m a -> t m a Source #

Introduce a delay of specified seconds after consuming an element of a stream.

>>> Stream.mapM_ print $ Stream.timestamped $ Stream.delayPost 1 $ Stream.enumerateFromTo 1 3
(AbsTime (TimeSpec {sec = ..., nsec = ...}),1)
(AbsTime (TimeSpec {sec = ..., nsec = ...}),2)
(AbsTime (TimeSpec {sec = ..., nsec = ...}),3)

Pre-release

delayPre :: forall t (m :: Type -> Type) a. (IsStream t, MonadIO m) => Double -> t m a -> t m a Source #

Introduce a delay of specified seconds before consuming an element of a stream.

>>> Stream.mapM_ print $ Stream.timestamped $ Stream.delayPre 1 $ Stream.enumerateFromTo 1 3
(AbsTime (TimeSpec {sec = ..., nsec = ...}),1)
(AbsTime (TimeSpec {sec = ..., nsec = ...}),2)
(AbsTime (TimeSpec {sec = ..., nsec = ...}),3)

Pre-release

reassembleBy :: forall (m :: Type -> Type) a b t. Fold m a b -> (a -> a -> Int) -> t m a -> t m b Source #

Buffer until the next element in sequence arrives. The function argument determines the difference in sequence numbers. This could be useful in implementing sequenced streams, for example, TCP reassembly.

Unimplemented

indexedR :: forall t (m :: Type -> Type) a. (IsStream t, Monad m) => Int -> t m a -> t m (Int, a) Source #

indexedR n = Stream.postscanl' (\(i, _) x -> (i - 1, x)) (n + 1,undefined)
indexedR n = Stream.zipWith (,) (Stream.enumerateFromThen n (n - 1))

Pair each element in a stream with its index, starting from the given index n and counting down.

>>> Stream.toList $ Stream.indexedR 10 $ Stream.fromList "hello"
[(10,'h'),(9,'e'),(8,'l'),(7,'l'),(6,'o')]

Since: 0.6.0

timestampWith :: forall t (m :: Type -> Type) a. (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m a -> t m (AbsTime, a) Source #

Pair each element in a stream with an absolute timestamp, using a clock of specified granularity. The timestamp is generated just before the element is consumed.

>>> Stream.mapM_ print $ Stream.timestampWith 0.01 $ Stream.delay 1 $ Stream.enumerateFromTo 1 3
(AbsTime (TimeSpec {sec = ..., nsec = ...}),1)
(AbsTime (TimeSpec {sec = ..., nsec = ...}),2)
(AbsTime (TimeSpec {sec = ..., nsec = ...}),3)

Pre-release

timestamped :: forall t (m :: Type -> Type) a. (IsStream t, MonadAsync m, Functor (t m)) => t m a -> t m (AbsTime, a) Source #

timeIndexWith :: forall t (m :: Type -> Type) a. (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m a -> t m (RelTime64, a) Source #

Pair each element in a stream with relative times starting from 0, using a clock with the specified granularity. The time is measured just before the element is consumed.

>>> Stream.mapM_ print $ Stream.timeIndexWith 0.01 $ Stream.delay 1 $ Stream.enumerateFromTo 1 3
(RelTime64 (NanoSecond64 ...),1)
(RelTime64 (NanoSecond64 ...),2)
(RelTime64 (NanoSecond64 ...),3)

Pre-release

timeIndexed :: forall t (m :: Type -> Type) a. (IsStream t, MonadAsync m, Functor (t m)) => t m a -> t m (RelTime64, a) Source #

Pair each element in a stream with relative times starting from 0, using a 10 ms granularity clock. The time is measured just before the element is consumed.

>>> Stream.mapM_ print $ Stream.timeIndexed $ Stream.delay 1 $ Stream.enumerateFromTo 1 3
(RelTime64 (NanoSecond64 ...),1)
(RelTime64 (NanoSecond64 ...),2)
(RelTime64 (NanoSecond64 ...),3)

Pre-release

rollingMapM :: (IsStream t, Monad m) => (Maybe a -> a -> m b) -> t m a -> t m b Source #

Like rollingMap but with an effectful map function.

Pre-release

rollingMap :: forall t (m :: Type -> Type) a b. (IsStream t, Monad m) => (Maybe a -> a -> b) -> t m a -> t m b Source #

Apply a function on every two successive elements of a stream. The first argument of the map function is the previous element and the second argument is the current element. When the current element is the first element, the previous element is Nothing.

Pre-release

rollingMap2 :: forall t (m :: Type -> Type) a b. (IsStream t, Monad m) => (a -> a -> b) -> t m a -> t m b Source #

Like rollingMap but requires at least two elements in the stream, returns an empty stream otherwise.

This is the stream equivalent of the list idiom zipWith f xs (tail xs).

Pre-release

mapMaybeM :: (IsStream t, MonadAsync m, Functor (t m)) => (a -> m (Maybe b)) -> t m a -> t m b Source #

Like mapMaybe but maps a monadic function.

Equivalent to:

mapMaybeM f = Stream.map fromJust . Stream.filter isJust . Stream.mapM f

Concurrent (do not use with fromParallel on infinite streams)

Since: 0.3.0

maxThreads :: forall t (m :: Type -> Type) a. IsStream t => Int -> t m a -> t m a Source #

Specify the maximum number of threads that can be spawned concurrently for any concurrent combinator in a stream. A value of 0 resets the thread limit to default, a negative value means there is no limit. The default value is 1500. maxThreads does not affect ParallelT streams as they can use unbounded number of threads.

When the actions in a stream are IO bound, having blocking IO calls, this option can be used to control the maximum number of in-flight IO requests. When the actions are CPU bound this option can be used to control the amount of CPU used by the stream.

Since: 0.4.0 (Streamly)

Since: 0.8.0

maxBuffer :: forall t (m :: Type -> Type) a. IsStream t => Int -> t m a -> t m a Source #

Specify the maximum size of the buffer for storing the results from concurrent computations. If the buffer becomes full we stop spawning more concurrent tasks until there is space in the buffer. A value of 0 resets the buffer size to default, a negative value means there is no limit. The default value is 1500.

CAUTION! using an unbounded maxBuffer value (i.e. a negative value) coupled with an unbounded maxThreads value is a recipe for disaster in presence of infinite streams, or very large streams. Especially, it must not be used when pure is used in ZipAsyncM streams as pure in applicative zip streams generates an infinite stream causing unbounded concurrent generation with no limit on the buffer or threads.

Since: 0.4.0 (Streamly)

Since: 0.8.0

rate :: forall t (m :: Type -> Type) a. IsStream t => Maybe Rate -> t m a -> t m a Source #

Specify the pull rate of a stream. A Nothing value resets the rate to default which is unlimited. When the rate is specified, concurrent production may be ramped up or down automatically to achieve the specified yield rate. The specific behavior for different styles of Rate specifications is documented under Rate. The effective maximum production rate achieved by a stream is governed by:

  • The maxThreads limit
  • The maxBuffer limit
  • The maximum rate that the stream producer can achieve
  • The maximum rate that the stream consumer can achieve

Since: 0.5.0 (Streamly)

Since: 0.8.0

avgRate :: forall t (m :: Type -> Type) a. IsStream t => Double -> t m a -> t m a Source #

Same as rate (Just $ Rate (r/2) r (2*r) maxBound)

Specifies the average production rate of a stream in number of yields per second (i.e. Hertz). Concurrent production is ramped up or down automatically to achieve the specified average yield rate. The rate can go down to half of the specified rate on the lower side and double of the specified rate on the higher side.

Since: 0.5.0 (Streamly)

Since: 0.8.0

minRate :: forall t (m :: Type -> Type) a. IsStream t => Double -> t m a -> t m a Source #

Same as rate (Just $ Rate r r (2*r) maxBound)

Specifies the minimum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go below the specified rate, even though it may possibly go above it at times, the upper limit is double of the specified rate.

Since: 0.5.0 (Streamly)

Since: 0.8.0

maxRate :: forall t (m :: Type -> Type) a. IsStream t => Double -> t m a -> t m a Source #

Same as rate (Just $ Rate (r/2) r r maxBound)

Specifies the maximum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go above the specified rate, even though it may possibly go below it at times, the lower limit is half of the specified rate. This can be useful in applications where certain resource usage must not be allowed to go beyond certain limits.

Since: 0.5.0 (Streamly)

Since: 0.8.0

constRate :: forall t (m :: Type -> Type) a. IsStream t => Double -> t m a -> t m a Source #

Same as rate (Just $ Rate r r r 0)

Specifies a constant yield rate. If for some reason the actual rate goes above or below the specified rate we do not try to recover it by increasing or decreasing the rate in future. This can be useful in applications like graphics frame refresh where we need to maintain a constant refresh rate.

Since: 0.5.0 (Streamly)

Since: 0.8.0

(|$) :: forall t (m :: Type -> Type) a b. (IsStream t, MonadAsync m) => (t m a -> t m b) -> t m a -> t m b infixr 0 Source #

Parallel transform application operator; applies a stream transformation function t m a -> t m b to a stream t m a concurrently; the input stream is evaluated asynchronously in an independent thread yielding elements to a buffer and the transformation function runs in another thread consuming the input from the buffer. |$ is just like regular function application operator $ except that it is concurrent.

If you read the signature as (t m a -> t m b) -> (t m a -> t m b) you can look at it as a transformation that converts a transform function to a buffered concurrent transform function.

The following code prints a value every second even though each stage adds a 1 second delay.

>>> :{
Stream.drain $
   Stream.mapM (\x -> threadDelay 1000000 >> print x)
     |$ Stream.replicateM 3 (threadDelay 1000000 >> return 1)
:}
1
1
1

Concurrent

Since: 0.3.0 (Streamly)

Since: 0.8.0

(|&) :: forall t (m :: Type -> Type) a b. (IsStream t, MonadAsync m) => t m a -> (t m a -> t m b) -> t m b infixl 1 Source #

Same as |$ but with arguments reversed.

(|&) = flip (|$)

Concurrent

Since: 0.3.0 (Streamly)

Since: 0.8.0

mkAsync :: forall t (m :: Type -> Type) a. (IsStream t, MonadAsync m) => t m a -> t m a Source #

Make the stream producer and consumer run concurrently by introducing a buffer between them. The producer thread evaluates the input stream until the buffer fills, it terminates if the buffer is full and a worker thread is kicked off again to evaluate the remaining stream when there is space in the buffer. The consumer consumes the stream lazily from the buffer.

Since: 0.2.0 (Streamly)

Since: 0.8.0

scanx :: forall t (m :: Type -> Type) x a b. (IsStream t, Monad m) => (x -> a -> x) -> x -> (x -> b) -> t m a -> t m b Source #

Strict left scan with an extraction function. Like scanl', but applies a user supplied extraction function (the third argument) at each step. This is designed to work with the foldl library. The suffix x is a mnemonic for extraction.

Since 0.2.0

Since: 0.7.0 (Monad m constraint)

tapAsyncK :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> t m a Source #

Like tapAsyncF but uses a stream fold function instead of a Fold type.

Pre-release

takeLastInterval :: Double -> t m a -> t m a Source #

Take time interval i seconds at the end of the stream.

O(n) space, where n is the number elements taken.

Unimplemented

dropLastInterval :: Int -> t m a -> t m a Source #

Drop time interval i seconds at the end of the stream.

O(n) space, where n is the number elements dropped.

Unimplemented

mkParallel :: forall t (m :: Type -> Type) a. (IsStream t, MonadAsync m) => t m a -> t m a Source #

Make the stream producer and consumer run concurrently by introducing a buffer between them. The producer thread evaluates the input stream until the buffer fills, it blocks if the buffer is full until there is space in the buffer. The consumer consumes the stream lazily from the buffer.

mkParallel = IsStream.fromStreamD . mkParallelD . IsStream.toStreamD

Pre-release

transform :: forall t (m :: Type -> Type) a b. (IsStream t, Monad m) => Pipe m a b -> t m a -> t m b Source #

Use a Pipe to transform a stream.

Pre-release

smapM :: (IsStream t, Monad m) => (s -> a -> m (s, b)) -> m s -> t m a -> t m b Source #

A stateful mapM, equivalent to a left scan, more like mapAccumL. Hopefully, this is a better alternative to scan. Separation of state from the output makes it easier to think in terms of a shared state, and also makes it easier to keep the state fully strict and the output lazy.

See also: scanlM'

Pre-release

tapAsync :: forall t (m :: Type -> Type) a b. (IsStream t, MonadAsync m) => Fold m a b -> t m a -> t m a Source #

Redirect a copy of the stream to a supplied fold and run it concurrently in an independent thread. The fold may buffer some elements. The buffer size is determined by the prevailing maxBuffer setting.

              Stream m a -> m b
                      |
-----stream m a ---------------stream m a-----

>>> Stream.drain $ Stream.tapAsync (Fold.drainBy print) (Stream.enumerateFromTo 1 2)
1
2

Exceptions from the concurrently running fold are propagated to the current computation. Note that, because of buffering in the fold, exceptions may be delayed and may not correspond to the current element being processed in the parent stream, but we guarantee that before the parent stream stops the tap finishes and all exceptions from it are drained.

>>> tapAsync f = Stream.tapAsyncK (Stream.fold f . Stream.adapt)

Compare with tap.

Pre-release

distributeAsync_ :: (Foldable f, IsStream t, MonadAsync m) => f (t m a -> m b) -> t m a -> t m a Source #

Concurrently distribute a stream to a collection of fold functions, discarding the outputs of the folds.

> Stream.drain $ Stream.distributeAsync_ [Stream.mapM_ print, Stream.mapM_ print] (Stream.enumerateFromTo 1 2)
1
2
1
2

distributeAsync_ = flip (foldr tapAsync)

Pre-release

pollCounts :: (IsStream t, MonadAsync m) => (a -> Bool) -> (t m Int -> m b) -> t m a -> t m a Source #

pollCounts predicate transform fold stream counts those elements in the stream that pass the predicate. The resulting count stream is sent to another thread which transforms it using transform and then folds it using fold. The thread is automatically cleaned up if the stream stops or aborts due to exception.

For example, to print the count of elements processed every second:

> Stream.drain $ Stream.pollCounts (const True) (Stream.rollingMap (-) . Stream.delayPost 1) (FLold.drainBy print)
          $ Stream.enumerateFrom 0

Note: This may not work correctly on 32-bit machines.

Pre-release

takeLast :: Int -> t m a -> t m a Source #

Take n elements at the end of the stream.

O(n) space, where n is the number elements taken.

Unimplemented

intersperseMWith :: Int -> m a -> t m a -> t m a Source #

Intersperse a monadic action into the input stream after every n elements.

> Stream.toList $ Stream.intersperseMWith 2 (return ',') $ Stream.fromList "hello"
"he,ll,o"

Unimplemented

interjectSuffix :: (IsStream t, MonadAsync m) => Double -> m a -> t m a -> t m a Source #

Intersperse a monadic action into the input stream after every n seconds.

> import Control.Concurrent (threadDelay)
> Stream.drain $ Stream.interjectSuffix 1 (putChar ',') $ Stream.mapM (x -> threadDelay 1000000 >> putChar x) $ Stream.fromList "hello"
h,e,l,l,o

Pre-release

reverse' :: forall t (m :: Type -> Type) a. (IsStream t, MonadIO m, Unbox a) => t m a -> t m a Source #

Like reverse but several times faster, requires a Storable instance.

Pre-release

applyAsync :: forall t (m :: Type -> Type) a b. (IsStream t, MonadAsync m) => (t m a -> t m b) -> t m a -> t m b Source #

Same as |$.

Internal

sampleOld :: Int -> t m a -> t m a Source #

Evaluate the input stream continuously and keep only the oldest n elements in the buffer, discard the new ones when the buffer is full. When the output stream is evaluated it consumes the values from the buffer in a FIFO manner.

Unimplemented

sampleNew :: Int -> t m a -> t m a Source #

Evaluate the input stream continuously and keep only the latest n elements in a ring buffer, keep discarding the older ones to make space for the new ones. When the output stream is evaluated it consumes the values from the buffer in a FIFO manner.

Unimplemented

sampleRate :: Double -> t m a -> t m a Source #

Like sampleNew but samples at uniform intervals to match the consumer rate. Note that sampleNew leads to non-uniform sampling depending on the consumer pattern.

Unimplemented

inspectMode :: forall t (m :: Type -> Type) a. IsStream t => t m a -> t m a Source #

Print debug information about an SVar when the stream ends

Pre-release

parallel :: forall t (m :: Type -> Type) a. (IsStream t, MonadAsync m) => t m a -> t m a -> t m a infixr 6 Source #

Like async except that the execution is much more strict. There is no limit on the number of threads. While async may not schedule a stream if there is no demand from the consumer, parallel always evaluates both the streams immediately. The only limit that applies to parallel is maxBuffer. Evaluation may block if the output buffer becomes full.

>>> import Streamly.Prelude (parallel)
>>> stream = Stream.fromEffect (delay 2) `parallel` Stream.fromEffect (delay 1)
>>> Stream.toList stream -- IO [Int]
1 sec
2 sec
[1,2]

parallel guarantees that all the streams are scheduled for execution immediately, therefore, we could use things like starting timers inside the streams and relying on the fact that all timers were started at the same time.

Unlike async this operation cannot be used to fold an infinite lazy container of streams, because it schedules all the streams strictly concurrently.

Since: 0.2.0 (Streamly)

Since: 0.8.0

concat :: forall t (m :: Type -> Type) a. (IsStream t, Monad m) => t m (t m a) -> t m a Source #

Flatten a stream of streams to a single stream.

concat = concatMap id

Pre-release

concatMap :: forall t (m :: Type -> Type) a b. (IsStream t, Monad m) => (a -> t m b) -> t m a -> t m b Source #

Map a stream producing function on each element of the stream and then flatten the results into a single stream.

>>> concatMap f = Stream.concatMapM (return . f)
>>> concatMap f = Stream.concatMapWith Stream.serial f
>>> concatMap f = Stream.concat . Stream.map f

Since: 0.6.0

zipWith :: forall t (m :: Type -> Type) a b c. (IsStream t, Monad m) => (a -> b -> c) -> t m a -> t m b -> t m c Source #

Stream a is evaluated first, followed by stream b, the resulting elements a and b are then zipped using the supplied zip function and the result c is yielded to the consumer.

If stream a or stream b ends, the zipped stream ends. If stream b ends first, the element a from previous evaluation of stream a is discarded.

> D.toList $ D.zipWith (+) (D.fromList [1,2,3]) (D.fromList [4,5,6])
[5,7,9]

Since: 0.1.0

intercalate :: forall t (m :: Type -> Type) b c. (IsStream t, Monad m) => Unfold m b c -> b -> t m b -> t m c Source #

intersperse followed by unfold and concat.

intercalate unf a str = unfoldMany unf $ intersperse a str
intersperse = intercalate (Unfold.function id)
unwords = intercalate Unfold.fromList " "
>>> Stream.toList $ Stream.intercalate Unfold.fromList " " $ Stream.fromList ["abc", "def", "ghi"]
"abc def ghi"

Since: 0.8.0

zipWithM :: (IsStream t, Monad m) => (a -> b -> m c) -> t m a -> t m b -> t m c Source #

Like zipWith but using a monadic zipping function.

Since: 0.4.0

append :: forall t (m :: Type -> Type) b. (IsStream t, Monad m) => t m b -> t m b -> t m b Source #

Append the outputs of two streams, yielding all the elements from the first stream and then yielding all the elements from the second stream.

IMPORTANT NOTE: This could be 100x faster than serial/<> for appending a few (say 100) streams because it can fuse via stream fusion. However, it does not scale for a large number of streams (say 1000s) and becomes qudartically slow. Therefore use this for custom appending of a few streams but use concatMap or 'concatMapWith serial' for appending n streams or infinite containers of streams.

Pre-release

merge :: forall t a (m :: Type -> Type). (IsStream t, Ord a) => t m a -> t m a -> t m a Source #

Same as mergeBy compare.

>>> Stream.toList $ Stream.merge (Stream.fromList [1,3,5]) (Stream.fromList [2,4,6,8])
[1,2,3,4,5,6,8]

Internal

interleave :: forall t (m :: Type -> Type) b. (IsStream t, Monad m) => t m b -> t m b -> t m b Source #

Interleaves the outputs of two streams, yielding elements from each stream alternately, starting from the first stream. If any of the streams finishes early the other stream continues alone until it too finishes.

>>> :set -XOverloadedStrings
>>> import Data.Functor.Identity (Identity)
>>> Stream.interleave "ab" ",,,," :: Stream.SerialT Identity Char
fromList "a,b,,,"
>>> Stream.interleave "abcd" ",," :: Stream.SerialT Identity Char
fromList "a,b,cd"

interleave is dual to interleaveMin, it can be called interleaveMax.

Do not use at scale in concatMapWith.

Pre-release

mergeBy :: forall t a (m :: Type -> Type). IsStream t => (a -> a -> Ordering) -> t m a -> t m a -> t m a Source #

Merge two streams using a comparison function. The head elements of both the streams are compared and the smaller of the two elements is emitted, if both elements are equal then the element from the first stream is used first.

If the streams are sorted in ascending order, the resulting stream would also remain sorted in ascending order.

>>> Stream.toList $ Stream.mergeBy compare (Stream.fromList [1,3,5]) (Stream.fromList [2,4,6,8])
[1,2,3,4,5,6,8]

See also: mergeByMFused

Since: 0.6.0

bindWith :: forall t (m :: Type -> Type) b a. IsStream t => (t m b -> t m b -> t m b) -> t m a -> (a -> t m b) -> t m b Source #

concatMapWith :: forall t (m :: Type -> Type) b a. IsStream t => (t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b Source #

concatMapWith mixer generator stream is a two dimensional looping combinator. The generator function is used to generate streams from the elements in the input stream and the mixer function is used to merge those streams.

Note we can merge streams concurrently by using a concurrent merge function.

Since: 0.7.0

Since: 0.8.0 (signature change)

interleaveMin :: forall t (m :: Type -> Type) b. (IsStream t, Monad m) => t m b -> t m b -> t m b Source #

Interleaves the outputs of two streams, yielding elements from each stream alternately, starting from the first stream. The output stops as soon as any of the two streams finishes, discarding the remaining part of the other stream. The last element of the resulting stream would be from the longer stream.

>>> :set -XOverloadedStrings
>>> import Data.Functor.Identity (Identity)
>>> Stream.interleaveMin "ab" ",,,," :: Stream.SerialT Identity Char
fromList "a,b,"
>>> Stream.interleaveMin "abcd" ",," :: Stream.SerialT Identity Char
fromList "a,b,c"

interleaveMin is dual to interleave.

Do not use at scale in concatMapWith.

Pre-release

concatMapM :: (IsStream t, Monad m) => (a -> m (t m b)) -> t m a -> t m b Source #

Map a stream producing monadic function on each element of the stream and then flatten the results into a single stream. Since the stream generation function is monadic, unlike concatMap, it can produce an effect at the beginning of each iteration of the inner loop.

Since: 0.6.0

unfoldMany :: forall t (m :: Type -> Type) a b. (IsStream t, Monad m) => Unfold m a b -> t m a -> t m b Source #

Like concatMap but uses an Unfold for stream generation. Unlike concatMap this can fuse the Unfold code with the inner loop and therefore provide many times better performance.

Since: 0.8.0

mergeByM :: (IsStream t, Monad m) => (a -> a -> m Ordering) -> t m a -> t m a -> t m a Source #

Like mergeBy but with a monadic comparison function.

Merge two streams randomly:

> randomly _ _ = randomIO >>= x -> return $ if x then LT else GT
> Stream.toList $ Stream.mergeByM randomly (Stream.fromList [1,1,1,1]) (Stream.fromList [2,2,2,2])
[2,1,2,2,2,1,1,1]

Merge two streams in a proportion of 2:1:

>>> :{
do
 let proportionately m n = do
      ref <- newIORef $ cycle $ Prelude.concat [Prelude.replicate m LT, Prelude.replicate n GT]
      return $ _ _ -> do
         r <- readIORef ref
         writeIORef ref $ Prelude.tail r
         return $ Prelude.head r
 f <- proportionately 2 1
 xs <- Stream.toList $ Stream.mergeByM f (Stream.fromList [1,1,1,1,1,1]) (Stream.fromList [2,2,2])
 print xs
:}
[1,1,2,1,1,2,1,1,2]

See also: mergeByMFused

Since: 0.6.0

mergeMinBy :: (a -> a -> m Ordering) -> t m a -> t m a -> t m a Source #

Like mergeByM but stops merging as soon as any of the two streams stops.

Unimplemented

mergeFstBy :: (a -> a -> m Ordering) -> t m a -> t m a -> t m a Source #

Like mergeByM but stops merging as soon as the first stream stops.

Unimplemented

interposeSuffix :: forall t (m :: Type -> Type) c b. (IsStream t, Monad m) => c -> Unfold m b c -> t m b -> t m c Source #

Unfold the elements of a stream, append the given element after each unfolded stream and then concat them into a single stream.

unlines = S.interposeSuffix '\n'

Pre-release

interpose :: forall t (m :: Type -> Type) c b. (IsStream t, Monad m) => c -> Unfold m b c -> t m b -> t m c Source #

Unfold the elements of a stream, intersperse the given element between the unfolded streams and then concat them into a single stream.

unwords = S.interpose ' '

Pre-release

gintercalateSuffix :: forall t (m :: Type -> Type) a c b. (IsStream t, Monad m) => Unfold m a c -> t m a -> Unfold m b c -> t m b -> t m c Source #

interleaveSuffix followed by unfold and concat.

Pre-release

gintercalate :: forall t (m :: Type -> Type) a c b. (IsStream t, Monad m) => Unfold m a c -> t m a -> Unfold m b c -> t m b -> t m c Source #

interleaveInfix followed by unfold and concat.

Pre-release

intercalateSuffix :: forall t (m :: Type -> Type) b c. (IsStream t, Monad m) => Unfold m b c -> b -> t m b -> t m c Source #

intersperseMSuffix followed by unfold and concat.

intercalateSuffix unf a str = unfoldMany unf $ intersperseMSuffix a str
intersperseMSuffix = intercalateSuffix (Unfold.function id)
unlines = intercalateSuffix Unfold.fromList "\n"
>>> Stream.toList $ Stream.intercalateSuffix Unfold.fromList "\n" $ Stream.fromList ["abc", "def", "ghi"]
"abc\ndef\nghi\n"

Since: 0.8.0

serial :: forall t (m :: Type -> Type) a. IsStream t => t m a -> t m a -> t m a infixr 6 Source #

Appends two streams sequentially, yielding all elements from the first stream, and then all elements from the second stream.

>>> import Streamly.Prelude (serial)
>>> stream1 = Stream.fromList [1,2]
>>> stream2 = Stream.fromList [3,4]
>>> Stream.toList $ stream1 `serial` stream2
[1,2,3,4]

This operation can be used to fold an infinite lazy container of streams.

Since: 0.2.0 (Streamly)

Since: 0.8.0

wSerial :: forall t (m :: Type -> Type) a. IsStream t => t m a -> t m a -> t m a infixr 6 Source #

Interleaves two streams, yielding one element from each stream alternately. When one stream stops the rest of the other stream is used in the output stream.

>>> import Streamly.Prelude (wSerial)
>>> stream1 = Stream.fromList [1,2]
>>> stream2 = Stream.fromList [3,4]
>>> Stream.toList $ Stream.fromWSerial $ stream1 `wSerial` stream2
[1,3,2,4]

Note, for singleton streams wSerial and serial are identical.

Note that this operation cannot be used to fold a container of infinite streams but it can be used for very large streams as the state that it needs to maintain is proportional to the logarithm of the number of streams.

Since: 0.2.0 (Streamly)

Since: 0.8.0

ahead :: forall t (m :: Type -> Type) a. (IsStream t, MonadAsync m) => t m a -> t m a -> t m a infixr 6 Source #

Appends two streams, both the streams may be evaluated concurrently but the outputs are used in the same order as the corresponding actions in the original streams, side effects will happen in the order in which the streams are evaluated:

>>> import Streamly.Prelude (ahead, SerialT)
>>> stream1 = Stream.fromEffect (delay 4) :: SerialT IO Int
>>> stream2 = Stream.fromEffect (delay 2) :: SerialT IO Int
>>> Stream.toList $ stream1 `ahead` stream2 :: IO [Int]
2 sec
4 sec
[4,2]

Multiple streams can be combined. With enough threads, all of them can be scheduled simultaneously:

>>> stream3 = Stream.fromEffect (delay 1)
>>> Stream.toList $ stream1 `ahead` stream2 `ahead` stream3
1 sec
2 sec
4 sec
[4,2,1]

With 2 threads, only two can be scheduled at a time, when one of those finishes, the third one gets scheduled:

>>> Stream.toList $ Stream.maxThreads 2 $ stream1 `ahead` stream2 `ahead` stream3
2 sec
1 sec
4 sec
[4,2,1]

Only streams are scheduled for ahead evaluation, how actions within a stream are evaluated depends on the stream type. If it is a concurrent stream they will be evaluated concurrently. It may not make much sense combining serial streams using ahead.

ahead can be safely used to fold an infinite lazy container of streams.

Since: 0.3.0 (Streamly)

Since: 0.8.0

async :: forall t (m :: Type -> Type) a. (IsStream t, MonadAsync m) => t m a -> t m a -> t m a infixr 6 Source #

Merges two streams, both the streams may be evaluated concurrently, outputs from both are used as they arrive:

>>> import Streamly.Prelude (async)
>>> stream1 = Stream.fromEffect (delay 4)
>>> stream2 = Stream.fromEffect (delay 2)
>>> Stream.toList $ stream1 `async` stream2
2 sec
4 sec
[2,4]

Multiple streams can be combined. With enough threads, all of them can be scheduled simultaneously:

>>> stream3 = Stream.fromEffect (delay 1)
>>> Stream.toList $ stream1 `async` stream2 `async` stream3
...
[1,2,4]

With 2 threads, only two can be scheduled at a time, when one of those finishes, the third one gets scheduled:

>>> Stream.toList $ Stream.maxThreads 2 $ stream1 `async` stream2 `async` stream3
...
[2,1,4]

With a single thread, it becomes serial:

>>> Stream.toList $ Stream.maxThreads 1 $ stream1 `async` stream2 `async` stream3
...
[4,2,1]

Only streams are scheduled for async evaluation, how actions within a stream are evaluated depends on the stream type. If it is a concurrent stream they will be evaluated concurrently.

In the following example, both the streams are scheduled for concurrent evaluation but each individual stream is evaluated serially:

>>> stream1 = Stream.fromListM $ Prelude.map delay [3,3] -- SerialT IO Int
>>> stream2 = Stream.fromListM $ Prelude.map delay [1,1] -- SerialT IO Int
>>> Stream.toList $ stream1 `async` stream2 -- IO [Int]
...
[1,1,3,3]

If total threads are 2, the third stream is scheduled only after one of the first two has finished:

stream3 = Stream.fromListM $ Prelude.map delay [2,2] -- SerialT IO Int
Stream.toList $ Stream.maxThreads 2 $ stream1 `async` stream2 `async` stream3 -- IO [Int]

... [1,1,3,2,3,2]

Thus async goes deep in first few streams rather than going wide in all streams. It prefers to evaluate the leftmost streams as much as possible. Because of this behavior, async can be safely used to fold an infinite lazy container of streams.

Since: 0.2.0 (Streamly)

Since: 0.8.0

wAsync :: forall t (m :: Type -> Type) a. (IsStream t, MonadAsync m) => t m a -> t m a -> t m a infixr 6 Source #

For singleton streams, wAsync is the same as async. See async for singleton stream behavior. For multi-element streams, while async is left biased i.e. it tries to evaluate the left side stream as much as possible, wAsync tries to schedule them both fairly. In other words, async goes deep while wAsync goes wide. However, outputs are always used as they arrive.

With a single thread, async starts behaving like serial while wAsync starts behaving like wSerial.

>>> import Streamly.Prelude (async, wAsync)
>>> stream1 = Stream.fromList [1,2,3]
>>> stream2 = Stream.fromList [4,5,6]
>>> Stream.toList $ Stream.fromAsync $ Stream.maxThreads 1 $ stream1 `async` stream2
[1,2,3,4,5,6]
>>> Stream.toList $ Stream.fromWAsync $ Stream.maxThreads 1 $ stream1 `wAsync` stream2
[1,4,2,5,3,6]

With two threads available, and combining three streams:

>>> stream3 = Stream.fromList [7,8,9]
>>> Stream.toList $ Stream.fromAsync $ Stream.maxThreads 2 $ stream1 `async` stream2 `async` stream3
[1,2,3,4,5,6,7,8,9]
>>> Stream.toList $ Stream.fromWAsync $ Stream.maxThreads 2 $ stream1 `wAsync` stream2 `wAsync` stream3
[1,4,2,7,5,3,8,6,9]

This operation cannot be used to fold an infinite lazy container of streams, because it schedules all the streams in a round robin manner.

Note that WSerialT and single threaded WAsyncT both interleave streams but the exact scheduling is slightly different in both cases.

Since: 0.2.0 (Streamly)

Since: 0.8.0

mergeAsyncBy :: forall t (m :: Type -> Type) a. (IsStream t, MonadAsync m) => (a -> a -> Ordering) -> t m a -> t m a -> t m a Source #

Like mergeBy but merges concurrently (i.e. both the elements being merged are generated concurrently).

Since: 0.6.0

mergeAsyncByM :: (IsStream t, MonadAsync m) => (a -> a -> m Ordering) -> t m a -> t m a -> t m a Source #

Like mergeByM but merges concurrently (i.e. both the elements being merged are generated concurrently).

Since: 0.6.0

zipAsyncWith :: forall t (m :: Type -> Type) a b c. (IsStream t, MonadAsync m) => (a -> b -> c) -> t m a -> t m b -> t m c Source #

Like zipWith but zips concurrently i.e. both the streams being zipped are evaluated concurrently using the ParallelT concurrent evaluation style. The maximum number of elements of each stream evaluated in advance can be controlled by maxBuffer.

The stream ends if stream a or stream b ends. However, if stream b ends while we are still evaluating stream a and waiting for a result then stream will not end until after the evaluation of stream a finishes. This behavior can potentially be changed in future to end the stream immediately as soon as any of the stream end is detected.

Since: 0.1.0

zipAsyncWithM :: (IsStream t, MonadAsync m) => (a -> b -> m c) -> t m a -> t m b -> t m c Source #

Like zipAsyncWith but with a monadic zipping function.

Since: 0.4.0

concatFoldableWith :: forall t f (m :: Type -> Type) a. (IsStream t, Foldable f) => (t m a -> t m a -> t m a) -> f (t m a) -> t m a Source #

A variant of fold that allows you to fold a Foldable container of streams using the specified stream sum operation.

concatFoldableWith async $ map return [1..3]

Equivalent to:

concatFoldableWith f = Prelude.foldr f D.nil
concatFoldableWith f = D.concatMapFoldableWith f id

Since: 0.8.0 (Renamed foldWith to concatFoldableWith)

Since: 0.1.0 (Streamly)

concatMapFoldableWith :: forall t f (m :: Type -> Type) b a. (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b Source #

A variant of foldMap that allows you to map a monadic streaming action on a Foldable container and then fold it using the specified stream merge operation.

concatMapFoldableWith async return [1..3]

Equivalent to:

concatMapFoldableWith f g = Prelude.foldr (f . g) S.nil
concatMapFoldableWith f g xs = S.concatMapWith f g (S.fromFoldable xs)

Since: 0.8.0 (Renamed foldMapWith to concatMapFoldableWith)

Since: 0.1.0 (Streamly)

concatForFoldableWith :: forall t f (m :: Type -> Type) b a. (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> f a -> (a -> t m b) -> t m b Source #

Like concatMapFoldableWith but with the last two arguments reversed i.e. the monadic streaming function is the last argument.

Equivalent to:

concatForFoldableWith f xs g = Prelude.foldr (f . g) D.nil xs
concatForFoldableWith f = flip (D.concatMapFoldableWith f)

Since: 0.8.0 (Renamed forEachWith to concatForFoldableWith)

Since: 0.1.0 (Streamly)

concatUnfold :: forall t (m :: Type -> Type) a b. (IsStream t, Monad m) => Unfold m a b -> t m a -> t m b Source #

wSerialFst :: forall (m :: Type -> Type) a. WSerialT m a -> WSerialT m a -> WSerialT m a Source #

wSerialMin :: forall (m :: Type -> Type) a. WSerialT m a -> WSerialT m a -> WSerialT m a Source #

concatM :: (IsStream t, Monad m) => m (t m a) -> t m a Source #

Given a stream value in the underlying monad, lift and join the underlying monad with the stream monad.

>>> concatM = Stream.concat . Stream.fromEffect
>>> concatM = Stream.concat . lift    -- requires (MonadTrans t)
>>> concatM = join . lift             -- requires (MonadTrans t, Monad (t m))

See also: concat, sequence

Internal

parallelFst :: forall t (m :: Type -> Type) a. (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #

Like parallel but stops the output as soon as the first stream stops.

Pre-release

parallelMin :: forall t (m :: Type -> Type) a. (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #

Like parallel but stops the output as soon as any of the two streams stops.

Pre-release

interleaveSuffix :: forall t (m :: Type -> Type) b. (IsStream t, Monad m) => t m b -> t m b -> t m b Source #

Interleaves the outputs of two streams, yielding elements from each stream alternately, starting from the first stream. As soon as the first stream finishes, the output stops, discarding the remaining part of the second stream. In this case, the last element in the resulting stream would be from the second stream. If the second stream finishes early then the first stream still continues to yield elements until it finishes.

>>> :set -XOverloadedStrings
>>> import Data.Functor.Identity (Identity)
>>> Stream.interleaveSuffix "abc" ",,,," :: Stream.SerialT Identity Char
fromList "a,b,c,"
>>> Stream.interleaveSuffix "abc" "," :: Stream.SerialT Identity Char
fromList "a,bc"

interleaveSuffix is a dual of interleaveInfix.

Do not use at scale in concatMapWith.

Pre-release

interleaveInfix :: forall t (m :: Type -> Type) b. (IsStream t, Monad m) => t m b -> t m b -> t m b Source #

Interleaves the outputs of two streams, yielding elements from each stream alternately, starting from the first stream and ending at the first stream. If the second stream is longer than the first, elements from the second stream are infixed with elements from the first stream. If the first stream is longer then it continues yielding elements even after the second stream has finished.

>>> :set -XOverloadedStrings
>>> import Data.Functor.Identity (Identity)
>>> Stream.interleaveInfix "abc" ",,,," :: Stream.SerialT Identity Char
fromList "a,b,c"
>>> Stream.interleaveInfix "abc" "," :: Stream.SerialT Identity Char
fromList "a,bc"

interleaveInfix is a dual of interleaveSuffix.

Do not use at scale in concatMapWith.

Pre-release

roundrobin :: forall t (m :: Type -> Type) b. (IsStream t, Monad m) => t m b -> t m b -> t m b Source #

Schedule the execution of two streams in a fair round-robin manner, executing each stream once, alternately. Execution of a stream may not necessarily result in an output, a stream may chose to Skip producing an element until later giving the other stream a chance to run. Therefore, this combinator fairly interleaves the execution of two streams rather than fairly interleaving the output of the two streams. This can be useful in co-operative multitasking without using explicit threads. This can be used as an alternative to async.

Do not use at scale in concatMapWith.

Pre-release

mergeByMFused :: (IsStream t, Monad m) => (a -> a -> m Ordering) -> t m a -> t m a -> t m a Source #

Like mergeByM but much faster, works best when merging statically known number of streams. When merging more than two streams try to merge pairs and pair pf pairs in a tree like structure.mergeByM works better with variable number of streams being merged using concatPairsWith.

Internal

unfoldManyInterleave :: forall t (m :: Type -> Type) a b. (IsStream t, Monad m) => Unfold m a b -> t m a -> t m b Source #

Like unfoldMany but interleaves the streams in the same way as interleave behaves instead of appending them.

Pre-release

unfoldManyRoundRobin :: forall t (m :: Type -> Type) a b. (IsStream t, Monad m) => Unfold m a b -> t m a -> t m b Source #

Like unfoldMany but executes the streams in the same way as roundrobin.

Pre-release

concatSmapMWith :: (IsStream t, Monad m) => (t m b -> t m b -> t m b) -> (s -> a -> m (s, t m b)) -> m s -> t m a -> t m b Source #

Like concatMapWith but carries a state which can be used to share information across multiple steps of concat.

concatSmapMWith combine f initial = concatMapWith combine id . smapM f initial

Pre-release

concatPairsWith :: forall t (m :: Type -> Type) b a. IsStream t => (t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b Source #

Combine streams in pairs using a binary stream combinator, then combine the resulting streams in pairs recursively until we get to a single combined stream.

For example, you can sort a stream using merge sort like this:

>>> Stream.toList $ Stream.concatPairsWith (Stream.mergeBy compare) Stream.fromPure $ Stream.fromList [5,1,7,9,2]
[1,2,5,7,9]

Caution: the stream of streams must be finite

Pre-release

iterateMapWith :: forall t (m :: Type -> Type) a. IsStream t => (t m a -> t m a -> t m a) -> (a -> t m a) -> t m a -> t m a Source #

Like iterateM but iterates after mapping a stream generator on the output.

Yield an input element in the output stream, map a stream generator on it and then do the same on the resulting stream. This can be used for a depth first traversal of a tree like structure.

Note that iterateM is a special case of iterateMapWith:

iterateM f = iterateMapWith serial (fromEffect . f) . fromEffect

It can be used to traverse a tree structure. For example, to list a directory tree:

Stream.iterateMapWith Stream.serial
    (either Dir.toEither (const nil))
    (fromPure (Left "tmp"))

Pre-release

iterateSmapMWith :: (IsStream t, Monad m) => (t m a -> t m a -> t m a) -> (b -> a -> m (b, t m a)) -> m b -> t m a -> t m a Source #

Like iterateMap but carries a state in the stream generation function. This can be used to traverse graph like structures, we can remember the visited nodes in the state to avoid cycles.

Note that a combination of iterateMap and usingState can also be used to traverse graphs. However, this function provides a more localized state instead of using a global state.

See also: mfix

Pre-release

iterateMapLeftsWith :: forall t b a c (m :: Type -> Type). (IsStream t, b ~ Either a c) => (t m b -> t m b -> t m b) -> (a -> t m b) -> t m b -> t m b Source #

In an Either stream iterate on Lefts. This is a special case of iterateMapWith:

iterateMapLeftsWith combine f = iterateMapWith combine (either f (const nil))

To traverse a directory tree:

iterateMapLeftsWith serial Dir.toEither (fromPure (Left "tmp"))

Pre-release

iterateUnfold :: forall (m :: Type -> Type) a t. Unfold m a a -> t m a -> t m a Source #

Same as iterateMapWith Stream.serial but more efficient due to stream fusion.

Unimplemented

chunksOf :: forall t (m :: Type -> Type) a b. (IsStream t, Monad m) => Int -> Fold m a b -> t m a -> t m b Source #

Group the input stream into groups of n elements each and then fold each group using the provided fold function.

>>> Stream.toList $ Stream.chunksOf 2 Fold.sum (Stream.enumerateFromTo 1 10)
[3,7,11,15,19]

This can be considered as an n-fold version of take where we apply take repeatedly on the leftover stream until the stream exhausts.

chunksOf n f = foldMany (FL.take n f)

Since: 0.7.0

foldIterateM :: (IsStream t, Monad m) => (b -> m (Fold m a b)) -> m b -> t m a -> t m b Source #

Iterate a fold generator on a stream. The initial value b is used to generate the first fold, the fold is applied on the stream and the result of the fold is used to generate the next fold and so on.

>>> import Data.Monoid (Sum(..))
>>> f x = return (Fold.take 2 (Fold.sconcat x))
>>> s = Stream.map Sum $ Stream.fromList [1..10]
>>> Stream.toList $ Stream.map getSum $ Stream.foldIterateM f (pure 0) s
[3,10,21,36,55,55]

This is the streaming equivalent of monad like sequenced application of folds where next fold is dependent on the previous fold.

Pre-release

foldMany :: forall t (m :: Type -> Type) a b. (IsStream t, Monad m) => Fold m a b -> t m a -> t m b Source #

Apply a Fold repeatedly on a stream and emit the fold outputs in the output stream.

To sum every two contiguous elements in a stream:

>>> f = Fold.take 2 Fold.sum
>>> Stream.toList $ Stream.foldMany f $ Stream.fromList [1..10]
[3,7,11,15,19]

On an empty stream the output is empty:

>>> Stream.toList $ Stream.foldMany f $ Stream.fromList []
[]

Note Stream.foldMany (Fold.take 0) would result in an infinite loop in a non-empty stream.

Since: 0.8.0

splitOnSeq :: forall t (m :: Type -> Type) a b. (IsStream t, MonadIO m, Storable a, Unbox a, Enum a, Eq a) => Array a -> Fold m a b -> t m a -> t m b Source #

Like splitOn but the separator is a sequence of elements instead of a single element.

For illustration, let's define a function that operates on pure lists:

>>> splitOnSeq' pat xs = Stream.toList $ Stream.splitOnSeq (Array.fromList pat) Fold.toList (Stream.fromList xs)
>>> splitOnSeq' "" "hello"
["h","e","l","l","o"]
>>> splitOnSeq' "hello" ""
[]
>>> splitOnSeq' "hello" "hello"
["",""]
>>> splitOnSeq' "x" "hello"
["hello"]
>>> splitOnSeq' "h" "hello"
["","ello"]
>>> splitOnSeq' "o" "hello"
["hell",""]
>>> splitOnSeq' "e" "hello"
["h","llo"]
>>> splitOnSeq' "l" "hello"
["he","","o"]
>>> splitOnSeq' "ll" "hello"
["he","o"]

splitOnSeq is an inverse of intercalate. The following law always holds:

intercalate . splitOnSeq == id

The following law holds when the separator is non-empty and contains none of the elements present in the input lists:

splitOnSeq . intercalate == id
>>> splitOnSeq pat f = Stream.foldManyPost (Fold.takeEndBySeq_ pat f)

Pre-release

refoldMany :: (IsStream t, Monad m) => Refold m c a b -> m c -> t m a -> t m b Source #

Like foldMany but using the Refold type instead of Fold.

Pre-release

foldManyPost :: forall t (m :: Type -> Type) a b. (IsStream t, Monad m) => Fold m a b -> t m a -> t m b Source #

Like foldMany but appends empty fold output if the fold and stream termination aligns:

>>> f = Fold.take 2 Fold.sum
>>> Stream.toList $ Stream.foldManyPost f $ Stream.fromList []
[0]
>>> Stream.toList $ Stream.foldManyPost f $ Stream.fromList [1..9]
[3,7,11,15,9]
>>> Stream.toList $ Stream.foldManyPost f $ Stream.fromList [1..10]
[3,7,11,15,19,0]

Pre-release

refoldIterateM :: (IsStream t, Monad m) => Refold m b a b -> m b -> t m a -> t m b Source #

Like foldIterateM but using the Refold type instead. This could be much more efficient due to stream fusion.

Internal

parseManyD :: forall t (m :: Type -> Type) a b. (IsStream t, Monad m) => Parser a m b -> t m a -> t m (Either ParseError b) Source #

Same as parseMany but for StreamD streams.

Internal

parseMany :: forall t (m :: Type -> Type) a b. (IsStream t, Monad m) => Parser a m b -> t m a -> t m (Either ParseError b) Source #

Apply a Parser repeatedly on a stream and emit the parsed values in the output stream.

This is the streaming equivalent of the many parse combinator.

>>> Stream.toList $ Stream.parseMany (Parser.takeBetween 0 2 Fold.sum) $ Stream.fromList [1..10]
[Right 3,Right 7,Right 11,Right 15,Right 19]
> Stream.toList $ Stream.parseMany (Parser.line Fold.toList) $ Stream.fromList "hello\nworld"
["hello\n","world"]

foldMany f = parseMany (fromFold f)

Known Issues: When the parser fails there is no way to get the remaining stream.

Pre-release

splitOn :: forall t (m :: Type -> Type) a b. (IsStream t, Monad m) => (a -> Bool) -> Fold m a b -> t m a -> t m b Source #

Split on an infixed separator element, dropping the separator. The supplied Fold is applied on the split segments. Splits the stream on separator elements determined by the supplied predicate, separator is considered as infixed between two segments:

>>> splitOn' p xs = Stream.toList $ Stream.splitOn p Fold.toList (Stream.fromList xs)
>>> splitOn' (== '.') "a.b"
["a","b"]

An empty stream is folded to the default value of the fold:

>>> splitOn' (== '.') ""
[""]

If one or both sides of the separator are missing then the empty segment on that side is folded to the default output of the fold:

>>> splitOn' (== '.') "."
["",""]
>>> splitOn' (== '.') ".a"
["","a"]
>>> splitOn' (== '.') "a."
["a",""]
>>> splitOn' (== '.') "a..b"
["a","","b"]

splitOn is an inverse of intercalating single element:

Stream.intercalate (Stream.fromPure '.') Unfold.fromList . Stream.splitOn (== '.') Fold.toList === id

Assuming the input stream does not contain the separator:

Stream.splitOn (== '.') Fold.toList . Stream.intercalate (Stream.fromPure '.') Unfold.fromList === id

Since: 0.7.0

foldSequence :: forall t (m :: Type -> Type) a b. t m (Fold m a b) -> t m a -> t m b Source #

Apply a stream of folds to an input stream and emit the results in the output stream.

Unimplemented

parseSequence :: forall t (m :: Type -> Type) a b. t m (Parser a m b) -> t m a -> t m b Source #

Apply a stream of parsers to an input stream and emit the results in the output stream.

Unimplemented

parseManyTill :: forall a (m :: Type -> Type) b x t. Parser a m b -> Parser a m x -> t m a -> t m b Source #

parseManyTill collect test stream tries the parser test on the input, if test fails it backtracks and tries collect, after collect succeeds test is tried again and so on. The parser stops when test succeeds. The output of test is discarded and the output of collect is emitted in the output stream. The parser fails if collect fails.

Unimplemented

parseIterate :: forall t (m :: Type -> Type) b a. (IsStream t, Monad m) => (b -> Parser a m b) -> b -> t m a -> t m (Either ParseError b) Source #

Iterate a parser generating function on a stream. The initial value b is used to generate the first parser, the parser is applied on the stream and the result is used to generate the next parser and so on.

>>> import Data.Monoid (Sum(..))
>>> Stream.toList $ fmap getSum $ Stream.rights $ Stream.parseIterate (\b -> Parser.takeBetween 0 2 (Fold.sconcat b)) (Sum 0) $ fmap Sum $ Stream.fromList [1..10]
[3,10,21,36,55,55]

This is the streaming equivalent of monad like sequenced application of parsers where next parser is dependent on the previous parser.

Pre-release

groupsBy :: forall t (m :: Type -> Type) a b. (IsStream t, Monad m) => (a -> a -> Bool) -> Fold m a b -> t m a -> t m b Source #

groupsBy cmp f $ S.fromList [a,b,c,...] assigns the element a to the first group, if b `cmp` a is True then b is also assigned to the same group. If c `cmp` a is True then c is also assigned to the same group and so on. When the comparison fails a new group is started. Each group is folded using the fold f and the result of the fold is emitted in the output stream.

>>> Stream.toList $ Stream.groupsBy (>) Fold.toList $ Stream.fromList [1,3,7,0,2,5]
[[1,3,7],[0,2,5]]

Since: 0.7.0

wordsBy :: forall t (m :: Type -> Type) a b. (IsStream t, Monad m) => (a -> Bool) -> Fold m a b -> t m a -> t m b Source #

Like splitOn after stripping leading, trailing, and repeated separators. Therefore, ".a..b." with . as the separator would be parsed as ["a","b"]. In other words, its like parsing words from whitespace separated text.

>>> wordsBy' p xs = Stream.toList $ Stream.wordsBy p Fold.toList (Stream.fromList xs)
>>> wordsBy' (== ',') ""
[]
>>> wordsBy' (== ',') ","
[]
>>> wordsBy' (== ',') ",a,,b,"
["a","b"]
words = wordsBy isSpace

Since: 0.7.0

splitOnSuffixSeq :: forall t (m :: Type -> Type) a b. (IsStream t, MonadIO m, Storable a, Unbox a, Enum a, Eq a) => Array a -> Fold m a b -> t m a -> t m b Source #

Like splitSuffixBy but the separator is a sequence of elements, instead of a predicate for a single element.

>>> splitOnSuffixSeq_ pat xs = Stream.toList $ Stream.splitOnSuffixSeq (Array.fromList pat) Fold.toList (Stream.fromList xs)
>>> splitOnSuffixSeq_ "." ""
[]
>>> splitOnSuffixSeq_ "." "."
[""]
>>> splitOnSuffixSeq_ "." "a"
["a"]
>>> splitOnSuffixSeq_ "." ".a"
["","a"]
>>> splitOnSuffixSeq_ "." "a."
["a"]
>>> splitOnSuffixSeq_ "." "a.b"
["a","b"]
>>> splitOnSuffixSeq_ "." "a.b."
["a","b"]
>>> splitOnSuffixSeq_ "." "a..b.."
["a","","b",""]
lines = splitOnSuffixSeq "\n"

splitOnSuffixSeq is an inverse of intercalateSuffix. The following law always holds:

intercalateSuffix . splitOnSuffixSeq == id

The following law holds when the separator is non-empty and contains none of the elements present in the input lists:

splitSuffixOn . intercalateSuffix == id
>>> splitOnSuffixSeq pat f = Stream.foldMany (Fold.takeEndBySeq_ pat f)

Pre-release

splitInnerBy :: (IsStream t, Monad m) => (f a -> m (f a, Maybe (f a))) -> (f a -> f a -> m (f a)) -> t m (f a) -> t m (f a) Source #

splitInnerBy splitter joiner stream splits the inner containers f a of an input stream t m (f a) using the splitter function. Container elements f a are collected until a split occurs, then all the elements before the split are joined using the joiner function.

For example, if we have a stream of Array Word8, we may want to split the stream into arrays representing lines separated by 'n' byte such that the resulting stream after a split would be one array for each line.

CAUTION! This is not a true streaming function as the container size after the split and merge may not be bounded.

Pre-release

splitInnerBySuffix :: (IsStream t, Monad m, Eq (f a), Monoid (f a)) => (f a -> m (f a, Maybe (f a))) -> (f a -> f a -> m (f a)) -> t m (f a) -> t m (f a) Source #

Like splitInnerBy but splits assuming the separator joins the segment in a suffix style.

Pre-release

dropPrefix :: t m a -> t m a -> t m a Source #

Drop prefix from the input stream if present.

Space: O(1)

Unimplemented

dropInfix :: t m a -> t m a -> t m a Source #

Drop all matching infix from the input stream if present. Infix stream may be consumed multiple times.

Space: O(n) where n is the length of the infix.

Unimplemented

dropSuffix :: t m a -> t m a -> t m a Source #

Drop suffix from the input stream if present. Suffix stream may be consumed multiple times.

Space: O(n) where n is the length of the suffix.

Unimplemented

splitOnSuffix :: forall t (m :: Type -> Type) a b. (IsStream t, Monad m) => (a -> Bool) -> Fold m a b -> t m a -> t m b Source #

Split on a suffixed separator element, dropping the separator. The supplied Fold is applied on the split segments.

>>> splitOnSuffix' p xs = Stream.toList $ Stream.splitOnSuffix p Fold.toList (Stream.fromList xs)
>>> splitOnSuffix' (== '.') "a.b."
["a","b"]
>>> splitOnSuffix' (== '.') "a."
["a"]

An empty stream results in an empty output stream:

>>> splitOnSuffix' (== '.') ""
[]

An empty segment consisting of only a suffix is folded to the default output of the fold:

>>> splitOnSuffix' (== '.') "."
[""]
>>> splitOnSuffix' (== '.') "a..b.."
["a","","b",""]

A suffix is optional at the end of the stream:

>>> splitOnSuffix' (== '.') "a"
["a"]
>>> splitOnSuffix' (== '.') ".a"
["","a"]
>>> splitOnSuffix' (== '.') "a.b"
["a","b"]
lines = splitOnSuffix (== '\n')

splitOnSuffix is an inverse of intercalateSuffix with a single element:

Stream.intercalateSuffix (Stream.fromPure '.') Unfold.fromList . Stream.splitOnSuffix (== '.') Fold.toList === id

Assuming the input stream does not contain the separator:

Stream.splitOnSuffix (== '.') Fold.toList . Stream.intercalateSuffix (Stream.fromPure '.') Unfold.fromList === id

Since: 0.7.0

intervalsOf :: forall t (m :: Type -> Type) a b. (IsStream t, MonadAsync m) => Double -> Fold m a b -> t m a -> t m b Source #

Group the input stream into windows of n second each and then fold each group using the provided fold function.

>>> Stream.toList $ Stream.take 5 $ Stream.intervalsOf 1 Fold.sum $ Stream.constRate 2 $ Stream.enumerateFrom 1
[...,...,...,...,...]

Since: 0.7.0

splitWithSuffix :: forall t (m :: Type -> Type) a b. (IsStream t, Monad m) => (a -> Bool) -> Fold m a b -> t m a -> t m b Source #

Like splitOnSuffix but keeps the suffix attached to the resulting splits.

>>> splitWithSuffix' p xs = Stream.toList $ splitWithSuffix p Fold.toList (Stream.fromList xs)
>>> splitWithSuffix' (== '.') ""
[]
>>> splitWithSuffix' (== '.') "."
["."]
>>> splitWithSuffix' (== '.') "a"
["a"]
>>> splitWithSuffix' (== '.') ".a"
[".","a"]
>>> splitWithSuffix' (== '.') "a."
["a."]
>>> splitWithSuffix' (== '.') "a.b"
["a.","b"]
>>> splitWithSuffix' (== '.') "a.b."
["a.","b."]
>>> splitWithSuffix' (== '.') "a..b.."
["a.",".","b.","."]

Since: 0.7.0

groups :: forall t (m :: Type -> Type) a b. (IsStream t, Monad m, Eq a) => Fold m a b -> t m a -> t m b Source #

groups = groupsBy (==)
groups = groupsByRolling (==)

Groups contiguous spans of equal elements together in individual groups.

>>> Stream.toList $ Stream.groups Fold.toList $ Stream.fromList [1,1,2,2]
[[1,1],[2,2]]

Since: 0.7.0

groupsByRolling :: forall t (m :: Type -> Type) a b. (IsStream t, Monad m) => (a -> a -> Bool) -> Fold m a b -> t m a -> t m b Source #

Unlike groupsBy this function performs a rolling comparison of two successive elements in the input stream. groupsByRolling cmp f $ S.fromList [a,b,c,...] assigns the element a to the first group, if a `cmp` b is True then b is also assigned to the same group. If b `cmp` c is True then c is also assigned to the same group and so on. When the comparison fails a new group is started. Each group is folded using the fold f.

>>> Stream.toList $ Stream.groupsByRolling (\a b -> a + 1 == b) Fold.toList $ Stream.fromList [1,2,3,7,8,9]
[[1,2,3],[7,8,9]]

Since: 0.7.0

classifySessionsByGeneric Source #

Arguments

:: forall t m (f :: Type -> Type) a b. (IsStream t, MonadAsync m, IsMap f) 
=> Proxy f 
-> Double

timer tick in seconds

-> Bool

reset the timer when an event is received

-> (Int -> m Bool)

predicate to eject sessions based on session count

-> Double

session timeout in seconds

-> Fold m a b

Fold to be applied to session data

-> t m (AbsTime, (Key f, a))

timestamp, (session key, session data)

-> t m (Key f, b)

session key, fold result

classifySessionsBy Source #

Arguments

:: (IsStream t, MonadAsync m, Ord k) 
=> Double

timer tick in seconds

-> Bool

reset the timer when an event is received

-> (Int -> m Bool)

predicate to eject sessions based on session count

-> Double

session timeout in seconds

-> Fold m a b

Fold to be applied to session data

-> t m (AbsTime, (k, a))

timestamp, (session key, session data)

-> t m (k, b)

session key, fold result

classifySessionsBy tick keepalive predicate timeout fold stream classifies an input event stream consisting of (timestamp, (key, value)) into sessions based on the key, folding all the values corresponding to the same key into a session using the supplied fold.

When the fold terminates or a timeout occurs, a tuple consisting of the session key and the folded value is emitted in the output stream. The timeout is measured from the first event in the session. If the keepalive option is set to True the timeout is reset to 0 whenever an event is received.

The timestamp in the input stream is an absolute time from some epoch, characterizing the time when the input event was generated. The notion of current time is maintained by a monotonic event time clock using the timestamps seen in the input stream. The latest timestamp seen till now is used as the base for the current time. When no new events are seen, a timer is started with a clock resolution of tick seconds. This timer is used to detect session timeouts in the absence of new events.

To ensure an upper bound on the memory used the number of sessions can be limited to an upper bound. If the ejection predicate returns True, the oldest session is ejected before inserting a new session.

When the stream ends any buffered sessions are ejected immediately.

If a session key is received even after a session has finished, another session is created for that key.

>>> :{
Stream.mapM_ print
    $ Stream.classifySessionsBy 1 False (const (return False)) 3 (Fold.take 3 Fold.toList)
    $ Stream.timestamped
    $ Stream.delay 0.1
    $ Stream.fromList ((,) <$> [1,2,3] <*> ['a','b','c'])
:}
(1,"abc")
(2,"abc")
(3,"abc")

Pre-release

classifySessionsOf Source #

Arguments

:: (IsStream t, MonadAsync m, Ord k) 
=> (Int -> m Bool)

predicate to eject sessions on session count

-> Double

time window size

-> Fold m a b

Fold to be applied to session data

-> t m (AbsTime, (k, a))

timestamp, (session key, session data)

-> t m (k, b) 

Same as classifySessionsBy with a timer tick of 1 second and keepalive option set to False.

classifySessionsOf = classifySessionsBy 1 False

Pre-release

classifyKeepAliveSessions Source #

Arguments

:: (IsStream t, MonadAsync m, Ord k) 
=> (Int -> m Bool)

predicate to eject sessions on session count

-> Double

session inactive timeout

-> Fold m a b

Fold to be applied to session payload data

-> t m (AbsTime, (k, a))

timestamp, (session key, session data)

-> t m (k, b) 

Same as classifySessionsBy with a timer tick of 1 second and keepalive option set to True.

classifyKeepAliveSessions = classifySessionsBy 1 True

Pre-release

arraysOf :: forall t (m :: Type -> Type) a. (IsStream t, MonadIO m, Unbox a) => Int -> t m a -> t m (Array a) Source #

arraysOf n stream groups the elements in the input stream into arrays of n elements each.

Same as the following but may be more efficient:

arraysOf n = Stream.foldMany (A.writeN n)

Pre-release

chunksOfTimeout :: forall t (m :: Type -> Type) a b. (IsStream t, MonadAsync m, Functor (t m)) => Int -> Double -> Fold m a b -> t m a -> t m b Source #

Like chunksOf but if the chunk is not completed within the specified time interval then emit whatever we have collected till now. The chunk timeout is reset whenever a chunk is emitted. The granularity of the clock is 100 ms.

>>> s = Stream.delayPost 0.3 $ Stream.fromList [1..1000]
>>> f = Stream.mapM_ print $ Stream.chunksOfTimeout 5 1 Fold.toList s

Pre-release

splitOnPrefix :: forall a (m :: Type -> Type) b t. (a -> Bool) -> Fold m a b -> t m a -> t m b Source #

Split on a prefixed separator element, dropping the separator. The supplied Fold is applied on the split segments.

> splitOnPrefix' p xs = Stream.toList $ Stream.splitOnPrefix p (Fold.toList) (Stream.fromList xs)
> splitOnPrefix' (== .) ".a.b"
["a","b"]

An empty stream results in an empty output stream: > splitOnPrefix' (== .) "" []

An empty segment consisting of only a prefix is folded to the default output of the fold:

> splitOnPrefix' (== .) "."
[""]

> splitOnPrefix' (== .) ".a.b."
["a","b",""]

> splitOnPrefix' (== .) ".a..b"
["a","","b"]

A prefix is optional at the beginning of the stream:

> splitOnPrefix' (== .) "a"
["a"]

> splitOnPrefix' (== .) "a.b"
["a","b"]

splitOnPrefix is an inverse of intercalatePrefix with a single element:

Stream.intercalatePrefix (Stream.fromPure '.') Unfold.fromList . Stream.splitOnPrefix (== '.') Fold.toList === id

Assuming the input stream does not contain the separator:

Stream.splitOnPrefix (== '.') Fold.toList . Stream.intercalatePrefix (Stream.fromPure '.') Unfold.fromList === id

Unimplemented

splitOnAny :: forall a (m :: Type -> Type) b t. [Array a] -> Fold m a b -> t m a -> t m b Source #

Split on any one of the given patterns.

Unimplemented

splitBySeq :: forall t (m :: Type -> Type) a b. (IsStream t, MonadAsync m, Storable a, Unbox a, Enum a, Eq a) => Array a -> Fold m a b -> t m a -> t m b Source #

Like splitOnSeq but splits the separator as well, as an infix token.

>>> splitOn'_ pat xs = Stream.toList $ Stream.splitBySeq (Array.fromList pat) Fold.toList (Stream.fromList xs)
>>> splitOn'_ "" "hello"
["h","","e","","l","","l","","o"]
>>> splitOn'_ "hello" ""
[""]
>>> splitOn'_ "hello" "hello"
["","hello",""]
>>> splitOn'_ "x" "hello"
["hello"]
>>> splitOn'_ "h" "hello"
["","h","ello"]
>>> splitOn'_ "o" "hello"
["hell","o",""]
>>> splitOn'_ "e" "hello"
["h","e","llo"]
>>> splitOn'_ "l" "hello"
["he","l","","l","o"]
>>> splitOn'_ "ll" "hello"
["he","ll","o"]

Pre-release

splitWithSuffixSeq :: forall t (m :: Type -> Type) a b. (IsStream t, MonadIO m, Storable a, Unbox a, Enum a, Eq a) => Array a -> Fold m a b -> t m a -> t m b Source #

Like splitOnSuffixSeq but keeps the suffix intact in the splits.

>>> splitWithSuffixSeq' pat xs = Stream.toList $ Stream.splitWithSuffixSeq (Array.fromList pat) Fold.toList (Stream.fromList xs)
>>> splitWithSuffixSeq' "." ""
[]
>>> splitWithSuffixSeq' "." "."
["."]
>>> splitWithSuffixSeq' "." "a"
["a"]
>>> splitWithSuffixSeq' "." ".a"
[".","a"]
>>> splitWithSuffixSeq' "." "a."
["a."]
>>> splitWithSuffixSeq' "." "a.b"
["a.","b"]
>>> splitWithSuffixSeq' "." "a.b."
["a.","b."]
>>> splitWithSuffixSeq' "." "a..b.."
["a.",".","b.","."]
>>> splitWithSuffixSeq pat f = Stream.foldMany (Fold.takeEndBySeq pat f)

Pre-release

splitOnSuffixSeqAny :: forall a (m :: Type -> Type) b t. [Array a] -> Fold m a b -> t m a -> t m b Source #

Split post any one of the given patterns.

Unimplemented

wordsOn :: forall a (m :: Type -> Type) b t. Array a -> Fold m a b -> t m a -> t m b Source #

Like splitOn but drops any empty splits.

Unimplemented

bracket :: (IsStream t, MonadAsync m, MonadCatch m) => m b -> (b -> m c) -> (b -> t m a) -> t m a Source #

Run the alloc action m b with async exceptions disabled but keeping blocking operations interruptible (see mask). Use the output b as input to b -> t m a to generate an output stream.

b is usually a resource under the state of monad m, e.g. a file handle, that requires a cleanup after use. The cleanup action b -> m c, runs whenever the stream ends normally, due to a sync or async exception or if it gets garbage collected after a partial lazy evaluation.

bracket only guarantees that the cleanup action runs, and it runs with async exceptions enabled. The action must ensure that it can successfully cleanup the resource in the face of sync or async exceptions.

When the stream ends normally or on a sync exception, cleanup action runs immediately in the current thread context, whereas in other cases it runs in the GC context, therefore, cleanup may be delayed until the GC gets to run.

See also: bracket_

Inhibits stream fusion

Since: 0.7.0

onException :: (IsStream t, MonadCatch m) => m b -> t m a -> t m a Source #

Run the action m b if the stream aborts due to an exception. The exception is not caught, simply rethrown.

Inhibits stream fusion

Since: 0.7.0

finally :: (IsStream t, MonadAsync m, MonadCatch m) => m b -> t m a -> t m a Source #

Run the action m b whenever the stream t m a stops normally, aborts due to an exception or if it is garbage collected after a partial lazy evaluation.

The semantics of running the action m b are similar to the cleanup action semantics described in bracket.

See also finally_

Inhibits stream fusion

Since: 0.7.0

retry Source #

Arguments

:: forall t (m :: Type -> Type) e a. (IsStream t, MonadCatch m, Exception e, Ord e) 
=> Map e Int

map from exception to retry count

-> (e -> t m a)

default handler for those exceptions that are not in the map

-> t m a 
-> t m a 

retry takes 3 arguments

  1. A map m whose keys are exceptions and values are the number of times to retry the action given that the exception occurs.
  2. A handler han that decides how to handle an exception when the exception cannot be retried.
  3. The stream itself that we want to run this mechanism on.

When evaluating a stream if an exception occurs,

  1. The stream evaluation aborts
  2. The exception is looked up in m

a. If the exception exists and the mapped value is > 0 then,

i. The value is decreased by 1.

ii. The stream is resumed from where the exception was called, retrying the action.

b. If the exception exists and the mapped value is == 0 then the stream evaluation stops.

c. If the exception does not exist then we handle the exception using han.

Internal

handle :: forall t (m :: Type -> Type) e a. (IsStream t, MonadCatch m, Exception e) => (e -> t m a) -> t m a -> t m a Source #

When evaluating a stream if an exception occurs, stream evaluation aborts and the specified exception handler is run with the exception as argument.

Inhibits stream fusion

Since: 0.7.0

bracket_ :: (IsStream t, MonadCatch m) => m b -> (b -> m c) -> (b -> t m a) -> t m a Source #

Like bracket but with following differences:

  • alloc action m b runs with async exceptions enabled
  • cleanup action b -> m c won't run if the stream is garbage collected after partial evaluation.
  • does not require a MonadAsync constraint.
  • has slightly better performance than bracket.

Inhibits stream fusion

Pre-release

before :: (IsStream t, Monad m) => m b -> t m a -> t m a Source #

Run the action m b before the stream yields its first element.

Same as the following but more efficient due to fusion:

>>> before action xs = Stream.nilM action <> xs
>>> before action xs = Stream.concatMap (const xs) (Stream.fromEffect action)

Since: 0.7.0

after_ :: (IsStream t, Monad m) => m b -> t m a -> t m a Source #

Like after, with following differences:

  • action m b won't run if the stream is garbage collected after partial evaluation.
  • Monad m does not require any other constraints.
  • has slightly better performance than after.

Same as the following, but with stream fusion:

after_ action xs = xs <> 'nilM' action

Pre-release

finally_ :: (IsStream t, MonadCatch m) => m b -> t m a -> t m a Source #

Like finally with following differences:

  • action m b won't run if the stream is garbage collected after partial evaluation.
  • does not require a MonadAsync constraint.
  • has slightly better performance than finally.

Inhibits stream fusion

Pre-release

ghandle :: forall t (m :: Type -> Type) e a. (IsStream t, MonadCatch m, Exception e) => (e -> t m a -> t m a) -> t m a -> t m a Source #

Like handle but the exception handler is also provided with the stream that generated the exception as input. The exception handler can thus re-evaluate the stream to retry the action that failed. The exception handler can again call ghandle on it to retry the action multiple times.

This is highly experimental. In a stream of actions we can map the stream with a retry combinator to retry each action on failure.

Inhibits stream fusion

Pre-release

after :: (IsStream t, MonadRunInIO m) => m b -> t m a -> t m a Source #

Run the action m b whenever the stream t m a stops normally, or if it is garbage collected after a partial lazy evaluation.

The semantics of the action m b are similar to the semantics of cleanup action in bracket.

See also after_

Since: 0.7.0

bracket' :: (IsStream t, MonadAsync m, MonadCatch m) => m b -> (b -> m c) -> (b -> m d) -> (b -> m e) -> (b -> t m a) -> t m a Source #

Like bracket but can use separate cleanup actions depending on the mode of termination. bracket' before onStop onGC onException action runs action using the result of before. If the stream stops, onStop action is executed, if the stream is abandoned onGC is executed, if the stream encounters an exception onException is executed.

Pre-release

runStateT :: Monad m => m s -> SerialT (StateT s m) a -> SerialT m (s, a) Source #

Evaluate the inner monad of a stream as StateT and emit the resulting state and value pair after each step.

This is supported only for SerialT as concurrent state updation may not be safe.

Since: 0.8.0

runReaderT :: (IsStream t, Monad m) => m s -> t (ReaderT s m) a -> t m a Source #

Evaluate the inner monad of a stream as ReaderT.

Since: 0.8.0

evalStateT :: Monad m => m s -> SerialT (StateT s m) a -> SerialT m a Source #

Evaluate the inner monad of a stream as StateT.

This is supported only for SerialT as concurrent state updation may not be safe.

evalStateT s = Stream.map snd . Stream.runStateT s

Internal

liftInner :: forall (m :: Type -> Type) t (tr :: (Type -> Type) -> Type -> Type) a. (Monad m, IsStream t, MonadTrans tr, Monad (tr m)) => t m a -> t (tr m) a Source #

Lift the inner monad m of a stream t m a to tr m using the monad transformer tr.

Since: 0.8.0

usingReaderT :: (Monad m, IsStream t) => m r -> (t (ReaderT r m) a -> t (ReaderT r m) a) -> t m a -> t m a Source #

Run a stream transformation using a given environment.

See also: map

Internal

usingStateT :: Monad m => m s -> (SerialT (StateT s m) a -> SerialT (StateT s m) a) -> SerialT m a -> SerialT m a Source #

Run a stateful (StateT) stream transformation using a given state.

This is supported only for SerialT as concurrent state updation may not be safe.

usingStateT s f = evalStateT s . f . liftInner

See also: scanl'

Internal

hoist :: (Monad m, Monad n) => (forall x. m x -> n x) -> SerialT m a -> SerialT n a Source #

Transform the inner monad of a stream using a natural transformation.

Internal

generally :: forall t (m :: Type -> Type) a. (IsStream t, Monad m) => t Identity a -> t m a Source #

Generalize the inner monad of the stream from Identity to any monad.

Internal

sortBy :: forall (m :: Type -> Type) a. MonadCatch m => (a -> a -> Ordering) -> SerialT m a -> SerialT m a Source #

Sort the input stream using a supplied comparison function.

O(n) space

Note: this is not the fastest possible implementation as of now.

Pre-release

unionBy :: forall t (m :: Type -> Type) a. (IsStream t, MonadAsync m, Semigroup (t m a)) => (a -> a -> Bool) -> t m a -> t m a -> t m a Source #

This is essentially an append operation that appends all the extra occurrences of elements from the second stream that are not already present in the first stream.

>>> Stream.toList $ Stream.unionBy (==) (Stream.fromList [1,2,2,4]) (Stream.fromList [1,1,2,3])
[1,2,2,4,3]

Equivalent to the following except that s1 is evaluated only once:

unionBy eq s1 s2 = s1 `serial` (s2 `differenceBy eq` s1)

Similar to joinOuter but not the same.

Space: O(n)

Time: O(m x n)

Pre-release

intersectBy :: forall t (m :: Type -> Type) a. (IsStream t, Monad m) => (a -> a -> Bool) -> t m a -> t m a -> t m a Source #

intersectBy is essentially a filtering operation that retains only those elements in the first stream that are present in the second stream.

>>> Stream.toList $ Stream.intersectBy (==) (Stream.fromList [1,2,2,4]) (Stream.fromList [2,1,1,3])
[1,2,2]
>>> Stream.toList $ Stream.intersectBy (==) (Stream.fromList [2,1,1,3]) (Stream.fromList [1,2,2,4])
[2,1,1]

intersectBy is similar to but not the same as joinInner:

>>> Stream.toList $ fmap fst $ Stream.joinInner (==) (Stream.fromList [1,2,2,4]) (Stream.fromList [2,1,1,3])
[1,1,2,2]

Space: O(n) where n is the number of elements in the second stream.

Time: O(m x n) where m is the number of elements in the first stream and n is the number of elements in the second stream.

Pre-release

sampleFromThen :: forall t (m :: Type -> Type) a. (IsStream t, Monad m, Functor (t m)) => Int -> Int -> t m a -> t m a Source #

sampleFromthen offset stride samples the element at offset index and then every element at strides of stride.

>>> Stream.toList $ Stream.sampleFromThen 2 3 $ Stream.enumerateFromTo 0 10
[2,5,8]

Pre-release

sampleIntervalEnd :: forall t (m :: Type -> Type) a. (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m a -> t m a Source #

Continuously evaluate the input stream and sample the last event in time window of n seconds.

This is also known as throttle in some libraries.

sampleIntervalEnd n = Stream.catMaybes . Stream.intervalsOf n Fold.last

Pre-release

sampleIntervalStart :: forall t (m :: Type -> Type) a. (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m a -> t m a Source #

Like sampleInterval but samples at the beginning of the time window.

sampleIntervalStart n = Stream.catMaybes . Stream.intervalsOf n Fold.one

Pre-release

sampleBurstEnd :: forall t (m :: Type -> Type) a. (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m a -> t m a Source #

Sample one event at the end of each burst of events. A burst is a group of events close together in time, it ends when an event is spaced by more than the specified time interval (in seconds) from the previous event.

This is known as debounce in some libraries.

The clock granularity is 10 ms.

Pre-release

sampleBurstStart :: forall t (m :: Type -> Type) a. (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m a -> t m a Source #

Like sampleBurstEnd but samples the event at the beginning of the burst instead of at the end of it.

Pre-release

intersectBySorted :: forall t (m :: Type -> Type) a. (IsStream t, Monad m) => (a -> a -> Ordering) -> t m a -> t m a -> t m a Source #

Like intersectBy but works only on streams sorted in ascending order.

Space: O(1)

Time: O(m+n)

Pre-release

differenceBy :: forall t (m :: Type -> Type) a. (IsStream t, Monad m) => (a -> a -> Bool) -> t m a -> t m a -> t m a Source #

Delete first occurrences of those elements from the first stream that are present in the second stream. If an element occurs multiple times in the second stream as many occurrences of it are deleted from the first stream.

>>> Stream.toList $ Stream.differenceBy (==) (Stream.fromList [1,2,2]) (Stream.fromList [1,2,3])
[2]

The following laws hold:

(s1 serial s2) `differenceBy eq` s1 === s2
(s1 wSerial s2) `differenceBy eq` s1 === s2

Same as the list // operation.

Space: O(m) where m is the number of elements in the first stream.

Time: O(m x n) where m is the number of elements in the first stream and n is the number of elements in the second stream.

Pre-release

mergeDifferenceBy :: (a -> a -> Ordering) -> t m a -> t m a -> t m a Source #

Like differenceBy but works only on sorted streams.

Space: O(1)

Unimplemented

mergeUnionBy :: (a -> a -> Ordering) -> t m a -> t m a -> t m a Source #

Like unionBy but works only on sorted streams.

Space: O(1)

Unimplemented

crossJoin :: Monad (t m) => t m a -> t m b -> t m (a, b) Source #

This is the same as outerProduct but less efficient.

The second stream is evaluated multiple times. If the second stream is consume-once stream then it can be cached in an Array before calling this function. Caching may also improve performance if the stream is expensive to evaluate.

Time: O(m x n)

Pre-release

joinInner :: forall t (m :: Type -> Type) a b. (IsStream t, Monad m) => (a -> b -> Bool) -> t m a -> t m b -> t m (a, b) Source #

For all elements in t m a, for all elements in t m b if a and b are equal by the given equality pedicate then return the tuple (a, b).

The second stream is evaluated multiple times. If the stream is a consume-once stream then the caller should cache it (e.g. in a Array) before calling this function. Caching may also improve performance if the stream is expensive to evaluate.

For space efficiency use the smaller stream as the second stream.

You should almost always use joinInnerMap instead of joinInner. joinInnerMap is an order of magnitude faster. joinInner may be used when the second stream is generated from a seed, therefore, need not be stored in memory and the amount of memory it takes is a concern.

Space: O(n) assuming the second stream is cached in memory.

Time: O(m x n)

Pre-release

joinInnerMap :: forall t (m :: Type -> Type) k a b. (IsStream t, Monad m, Ord k) => t m (k, a) -> t m (k, b) -> t m (k, a, b) Source #

Like joinInner but uses a Map for efficiency.

If the input streams have duplicate keys, the behavior is undefined.

For space efficiency use the smaller stream as the second stream.

Space: O(n)

Time: O(m + n)

Pre-release

joinInnerMerge :: (a -> b -> Ordering) -> t m a -> t m b -> t m (a, b) Source #

Like joinInner but works only on sorted streams.

Space: O(1)

Time: O(m + n)

Unimplemented

mergeLeftJoin :: (a -> b -> Ordering) -> t m a -> t m b -> t m (a, Maybe b) Source #

Like joinLeft but works only on sorted streams.

Space: O(1)

Time: O(m + n)

Unimplemented

joinLeftMap :: forall t k (m :: Type -> Type) a b. (IsStream t, Ord k, Monad m) => t m (k, a) -> t m (k, b) -> t m (k, a, Maybe b) Source #

Like joinLeft but uses a hashmap for efficiency.

Space: O(n)

Time: O(m + n)

Pre-release

mergeOuterJoin :: (a -> b -> Ordering) -> t m a -> t m b -> t m (Maybe a, Maybe b) Source #

Like joinOuter but works only on sorted streams.

Space: O(1)

Time: O(m + n)

Unimplemented

joinOuterMap :: forall t k (m :: Type -> Type) a b. (IsStream t, Ord k, MonadIO m) => t m (k, a) -> t m (k, b) -> t m (k, Maybe a, Maybe b) Source #

Like joinOuter but uses a Map for efficiency.

Space: O(m + n)

Time: O(m + n)

Pre-release

maxThreads :: forall t (m :: Type -> Type) a. IsStream t => Int -> t m a -> t m a Source #

Specify the maximum number of threads that can be spawned concurrently for any concurrent combinator in a stream. A value of 0 resets the thread limit to default, a negative value means there is no limit. The default value is 1500. maxThreads does not affect ParallelT streams as they can use unbounded number of threads.

When the actions in a stream are IO bound, having blocking IO calls, this option can be used to control the maximum number of in-flight IO requests. When the actions are CPU bound this option can be used to control the amount of CPU used by the stream.

Since: 0.4.0 (Streamly)

Since: 0.8.0

maxBuffer :: forall t (m :: Type -> Type) a. IsStream t => Int -> t m a -> t m a Source #

Specify the maximum size of the buffer for storing the results from concurrent computations. If the buffer becomes full we stop spawning more concurrent tasks until there is space in the buffer. A value of 0 resets the buffer size to default, a negative value means there is no limit. The default value is 1500.

CAUTION! using an unbounded maxBuffer value (i.e. a negative value) coupled with an unbounded maxThreads value is a recipe for disaster in presence of infinite streams, or very large streams. Especially, it must not be used when pure is used in ZipAsyncM streams as pure in applicative zip streams generates an infinite stream causing unbounded concurrent generation with no limit on the buffer or threads.

Since: 0.4.0 (Streamly)

Since: 0.8.0

rate :: forall t (m :: Type -> Type) a. IsStream t => Maybe Rate -> t m a -> t m a Source #

Specify the pull rate of a stream. A Nothing value resets the rate to default which is unlimited. When the rate is specified, concurrent production may be ramped up or down automatically to achieve the specified yield rate. The specific behavior for different styles of Rate specifications is documented under Rate. The effective maximum production rate achieved by a stream is governed by:

  • The maxThreads limit
  • The maxBuffer limit
  • The maximum rate that the stream producer can achieve
  • The maximum rate that the stream consumer can achieve

Since: 0.5.0 (Streamly)

Since: 0.8.0

avgRate :: forall t (m :: Type -> Type) a. IsStream t => Double -> t m a -> t m a Source #

Same as rate (Just $ Rate (r/2) r (2*r) maxBound)

Specifies the average production rate of a stream in number of yields per second (i.e. Hertz). Concurrent production is ramped up or down automatically to achieve the specified average yield rate. The rate can go down to half of the specified rate on the lower side and double of the specified rate on the higher side.

Since: 0.5.0 (Streamly)

Since: 0.8.0

minRate :: forall t (m :: Type -> Type) a. IsStream t => Double -> t m a -> t m a Source #

Same as rate (Just $ Rate r r (2*r) maxBound)

Specifies the minimum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go below the specified rate, even though it may possibly go above it at times, the upper limit is double of the specified rate.

Since: 0.5.0 (Streamly)

Since: 0.8.0

maxRate :: forall t (m :: Type -> Type) a. IsStream t => Double -> t m a -> t m a Source #

Same as rate (Just $ Rate (r/2) r r maxBound)

Specifies the maximum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go above the specified rate, even though it may possibly go below it at times, the lower limit is half of the specified rate. This can be useful in applications where certain resource usage must not be allowed to go beyond certain limits.

Since: 0.5.0 (Streamly)

Since: 0.8.0

constRate :: forall t (m :: Type -> Type) a. IsStream t => Double -> t m a -> t m a Source #

Same as rate (Just $ Rate r r r 0)

Specifies a constant yield rate. If for some reason the actual rate goes above or below the specified rate we do not try to recover it by increasing or decreasing the rate in future. This can be useful in applications like graphics frame refresh where we need to maintain a constant refresh rate.

Since: 0.5.0 (Streamly)

Since: 0.8.0

maxYields :: forall t (m :: Type -> Type) a. IsStream t => Maybe Int64 -> t m a -> t m a Source #

inspectMode :: forall t (m :: Type -> Type) a. IsStream t => t m a -> t m a Source #

Print debug information about an SVar when the stream ends

Pre-release

printState :: MonadIO m => State Stream m a -> m () Source #

map :: forall t (m :: Type -> Type) a b. (IsStream t, Monad m) => (a -> b) -> t m a -> t m b Source #

map = fmap

Same as fmap.

> D.toList $ D.map (+1) $ D.fromList [1,2,3]
[2,3,4]

Since: 0.4.0

takeWhile :: forall t (m :: Type -> Type) a. (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a Source #

End the stream as soon as the predicate fails on an element.

Since: 0.1.0

take :: forall t (m :: Type -> Type) a. (IsStream t, Monad m) => Int -> t m a -> t m a Source #

Take first n elements from the stream and discard the rest.

Since: 0.1.0

drop :: forall t (m :: Type -> Type) a. (IsStream t, Monad m) => Int -> t m a -> t m a Source #

Discard first n elements from the stream and take the rest.

Since: 0.1.0

reverse :: forall t (m :: Type -> Type) a. (IsStream t, Monad m) => t m a -> t m a Source #

Returns the elements of the stream in reverse order. The stream must be finite. Note that this necessarily buffers the entire stream in memory.

>>> reverse = Stream.foldlT (flip Stream.cons) Stream.nil

Since 0.7.0 (Monad m constraint)

Since: 0.1.1

concatMap :: forall t (m :: Type -> Type) a b. (IsStream t, Monad m) => (a -> t m b) -> t m a -> t m b Source #

Map a stream producing function on each element of the stream and then flatten the results into a single stream.

>>> concatMap f = Stream.concatMapM (return . f)
>>> concatMap f = Stream.concatMapWith Stream.serial f
>>> concatMap f = Stream.concat . Stream.map f

Since: 0.6.0

zipWith :: forall t (m :: Type -> Type) a b c. (IsStream t, Monad m) => (a -> b -> c) -> t m a -> t m b -> t m c Source #

Stream a is evaluated first, followed by stream b, the resulting elements a and b are then zipped using the supplied zip function and the result c is yielded to the consumer.

If stream a or stream b ends, the zipped stream ends. If stream b ends first, the element a from previous evaluation of stream a is discarded.

> D.toList $ D.zipWith (+) (D.fromList [1,2,3]) (D.fromList [4,5,6])
[5,7,9]

Since: 0.1.0

fold :: Monad m => Fold m a b -> SerialT m a -> m b Source #

Fold a stream using the supplied left Fold and reducing the resulting expression strictly at each step. The behavior is similar to foldl'. A Fold can terminate early without consuming the full stream. See the documentation of individual Folds for termination behavior.

>>> Stream.fold Fold.sum (Stream.enumerateFromTo 1 100)
5050

Folds never fail, therefore, they produce a default value even when no input is provided. It means we can always fold an empty stream and get a valid result. For example:

>>> Stream.fold Fold.sum Stream.nil
0

However, foldMany on an empty stream results in an empty stream. Therefore, Stream.fold f is not the same as Stream.head . Stream.foldMany f.

fold f = Stream.parse (Parser.fromFold f)

Since: 0.7.0

findIndices :: forall t (m :: Type -> Type) a. (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m Int Source #

Find all the indices where the element in the stream satisfies the given predicate.

findIndices = fold Fold.findIndices

Since: 0.5.0

yield :: forall t a (m :: Type -> Type). IsStream t => a -> t m a Source #

Same as fromPure

Since: 0.4.0

zipWithM :: (IsStream t, Monad m) => (a -> b -> m c) -> t m a -> t m b -> t m c Source #

Like zipWith but using a monadic zipping function.

Since: 0.4.0

fromEffect :: (Monad m, IsStream t) => m a -> t m a Source #

fromEffect m = m `consM` nil

Create a singleton stream from a monadic action.

> Stream.toList $ Stream.fromEffect getLine
hello
["hello"]

Since: 0.8.0 (Renamed yieldM to fromEffect)

splitOnSeq :: forall t (m :: Type -> Type) a b. (IsStream t, MonadIO m, Storable a, Unbox a, Enum a, Eq a) => Array a -> Fold m a b -> t m a -> t m b Source #

Like splitOn but the separator is a sequence of elements instead of a single element.

For illustration, let's define a function that operates on pure lists:

>>> splitOnSeq' pat xs = Stream.toList $ Stream.splitOnSeq (Array.fromList pat) Fold.toList (Stream.fromList xs)
>>> splitOnSeq' "" "hello"
["h","e","l","l","o"]
>>> splitOnSeq' "hello" ""
[]
>>> splitOnSeq' "hello" "hello"
["",""]
>>> splitOnSeq' "x" "hello"
["hello"]
>>> splitOnSeq' "h" "hello"
["","ello"]
>>> splitOnSeq' "o" "hello"
["hell",""]
>>> splitOnSeq' "e" "hello"
["h","llo"]
>>> splitOnSeq' "l" "hello"
["he","","o"]
>>> splitOnSeq' "ll" "hello"
["he","o"]

splitOnSeq is an inverse of intercalate. The following law always holds:

intercalate . splitOnSeq == id

The following law holds when the separator is non-empty and contains none of the elements present in the input lists:

splitOnSeq . intercalate == id
>>> splitOnSeq pat f = Stream.foldManyPost (Fold.takeEndBySeq_ pat f)

Pre-release

fromPure :: forall t a (m :: Type -> Type). IsStream t => a -> t m a Source #

fromPure a = a `cons` nil

Create a singleton stream from a pure value.

The following holds in monadic streams, but not in Zip streams:

fromPure = pure
fromPure = fromEffect . pure

In Zip applicative streams fromPure is not the same as pure because in that case pure is equivalent to repeat instead. fromPure and pure are equally efficient, in other cases fromPure may be slightly more efficient than the other equivalent definitions.

Since: 0.8.0 (Renamed yield to fromPure)

takeEndBy :: forall t (m :: Type -> Type) a. (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a Source #

concatMapM :: (IsStream t, Monad m) => (a -> m (t m b)) -> t m a -> t m b Source #

Map a stream producing monadic function on each element of the stream and then flatten the results into a single stream. Since the stream generation function is monadic, unlike concatMap, it can produce an effect at the beginning of each iteration of the inner loop.

Since: 0.6.0

foldManyPost :: forall t (m :: Type -> Type) a b. (IsStream t, Monad m) => Fold m a b -> t m a -> t m b Source #

Like foldMany but appends empty fold output if the fold and stream termination aligns:

>>> f = Fold.take 2 Fold.sum
>>> Stream.toList $ Stream.foldManyPost f $ Stream.fromList []
[0]
>>> Stream.toList $ Stream.foldManyPost f $ Stream.fromList [1..9]
[3,7,11,15,9]
>>> Stream.toList $ Stream.foldManyPost f $ Stream.fromList [1..10]
[3,7,11,15,19,0]

Pre-release

postscanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> t m b Source #

Like postscanl' but with a monadic step function and a monadic seed.

>>> postscanlM' f z xs = Stream.drop 1 $ Stream.scanlM' f z xs

Since: 0.7.0

Since: 0.8.0 (signature change)

repeatM :: (IsStream t, MonadAsync m) => m a -> t m a Source #

>>> repeatM = fix . consM
>>> repeatM = cycle1 . fromEffect

Generate a stream by repeatedly executing a monadic action forever.

>>> :{
repeatAsync =
       Stream.repeatM (threadDelay 1000000 >> print 1)
     & Stream.take 10
     & Stream.fromAsync
     & Stream.drain
:}

Concurrent, infinite (do not use with fromParallel)

Since: 0.2.0

timesWith :: forall t (m :: Type -> Type). (IsStream t, MonadAsync m) => Double -> t m (AbsTime, RelTime64) Source #

timesWith g returns a stream of time value tuples. The first component of the tuple is an absolute time reference (epoch) denoting the start of the stream and the second component is a time relative to the reference.

The argument g specifies the granularity of the relative time in seconds. A lower granularity clock gives higher precision but is more expensive in terms of CPU usage. Any granularity lower than 1 ms is treated as 1 ms.

>>> import Control.Concurrent (threadDelay)
>>> import Streamly.Internal.Data.Stream.IsStream.Common as Stream (timesWith)
>>> Stream.mapM_ (\x -> print x >> threadDelay 1000000) $ Stream.take 3 $ Stream.timesWith 0.01
(AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...))
(AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...))
(AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...))

Note: This API is not safe on 32-bit machines.

Pre-release

absTimesWith :: forall t (m :: Type -> Type). (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m AbsTime Source #

absTimesWith g returns a stream of absolute timestamps using a clock of granularity g specified in seconds. A low granularity clock is more expensive in terms of CPU usage. Any granularity lower than 1 ms is treated as 1 ms.

>>> Stream.mapM_ print $ Stream.delayPre 1 $ Stream.take 3 $ absTimesWith 0.01
AbsTime (TimeSpec {sec = ..., nsec = ...})
AbsTime (TimeSpec {sec = ..., nsec = ...})
AbsTime (TimeSpec {sec = ..., nsec = ...})

Note: This API is not safe on 32-bit machines.

Pre-release

relTimesWith :: forall t (m :: Type -> Type). (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m RelTime64 Source #

relTimesWith g returns a stream of relative time values starting from 0, using a clock of granularity g specified in seconds. A low granularity clock is more expensive in terms of CPU usage. Any granularity lower than 1 ms is treated as 1 ms.

>>> Stream.mapM_ print $ Stream.delayPre 1 $ Stream.take 3 $ Stream.relTimesWith 0.01
RelTime64 (NanoSecond64 ...)
RelTime64 (NanoSecond64 ...)
RelTime64 (NanoSecond64 ...)

Note: This API is not safe on 32-bit machines.

Pre-release

postscanlMAfter' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> (b -> m b) -> t m a -> t m b Source #

scanlMAfter' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> (b -> m b) -> t m a -> t m b Source #

scanlMAfter' accumulate initial done stream is like scanlM' except that it provides an additional done function to be applied on the accumulator when the stream stops. The result of done is also emitted in the stream.

This function can be used to allocate a resource in the beginning of the scan and release it when the stream ends or to flush the internal state of the scan at the end.

Pre-release

intersperseM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a Source #

Insert an effect and its output before consuming an element of a stream except the first one.

>>> Stream.toList $ Stream.trace putChar $ Stream.intersperseM (putChar '.' >> return ',') $ Stream.fromList "hello"
h.,e.,l.,l.,o"h,e,l,l,o"

Be careful about the order of effects. In the above example we used trace after the intersperse, if we use it before the intersperse the output would be he.l.l.o."h,e,l,l,o".

>>> Stream.toList $ Stream.intersperseM (putChar '.' >> return ',') $ Stream.trace putChar $ Stream.fromList "hello"
he.l.l.o."h,e,l,l,o"

Since: 0.5.0

mkAsync :: forall t (m :: Type -> Type) a. (IsStream t, MonadAsync m) => t m a -> t m a Source #

Make the stream producer and consumer run concurrently by introducing a buffer between them. The producer thread evaluates the input stream until the buffer fills, it terminates if the buffer is full and a worker thread is kicked off again to evaluate the remaining stream when there is space in the buffer. The consumer consumes the stream lazily from the buffer.

Since: 0.2.0 (Streamly)

Since: 0.8.0

yieldM :: (Monad m, IsStream t) => m a -> t m a Source #

Same as fromEffect

Since: 0.4.0

concatM :: (IsStream t, Monad m) => m (t m a) -> t m a Source #

Given a stream value in the underlying monad, lift and join the underlying monad with the stream monad.

>>> concatM = Stream.concat . Stream.fromEffect
>>> concatM = Stream.concat . lift    -- requires (MonadTrans t)
>>> concatM = join . lift             -- requires (MonadTrans t, Monad (t m))

See also: concat, sequence

Internal

mkParallel :: forall t (m :: Type -> Type) a. (IsStream t, MonadAsync m) => t m a -> t m a Source #

Make the stream producer and consumer run concurrently by introducing a buffer between them. The producer thread evaluates the input stream until the buffer fills, it blocks if the buffer is full until there is space in the buffer. The consumer consumes the stream lazily from the buffer.

mkParallel = IsStream.fromStreamD . mkParallelD . IsStream.toStreamD

Pre-release

smapM :: (IsStream t, Monad m) => (s -> a -> m (s, b)) -> m s -> t m a -> t m b Source #

A stateful mapM, equivalent to a left scan, more like mapAccumL. Hopefully, this is a better alternative to scan. Separation of state from the output makes it easier to think in terms of a shared state, and also makes it easier to keep the state fully strict and the output lazy.

See also: scanlM'

Pre-release

interjectSuffix :: (IsStream t, MonadAsync m) => Double -> m a -> t m a -> t m a Source #

Intersperse a monadic action into the input stream after every n seconds.

> import Control.Concurrent (threadDelay)
> Stream.drain $ Stream.interjectSuffix 1 (putChar ',') $ Stream.mapM (x -> threadDelay 1000000 >> putChar x) $ Stream.fromList "hello"
h,e,l,l,o

Pre-release

reverse' :: forall t (m :: Type -> Type) a. (IsStream t, MonadIO m, Unbox a) => t m a -> t m a Source #

Like reverse but several times faster, requires a Storable instance.

Pre-release

parallelFst :: forall t (m :: Type -> Type) a. (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #

Like parallel but stops the output as soon as the first stream stops.

Pre-release

foldContinue :: forall (m :: Type -> Type) a b. Monad m => Fold m a b -> SerialT m a -> Fold m a b Source #

We can create higher order folds using foldContinue. We can fold a number of streams to a given fold efficiently with full stream fusion. For example, to fold a list of streams on the same sum fold:

concatFold = Prelude.foldl Stream.foldContinue Fold.sum
fold f = Fold.extractM . Stream.foldContinue f

Internal

class Enum a => Enumerable a where Source #

Types that can be enumerated as a stream. The operations in this type class are equivalent to those in the Enum type class, except that these generate a stream instead of a list. Use the functions in Streamly.Internal.Data.Stream.Enumeration module to define new instances.

Since: 0.6.0

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => a -> t m a Source #

enumerateFrom from generates a stream starting with the element from, enumerating up to maxBound when the type is Bounded or generating an infinite stream when the type is not Bounded.

>>> Stream.toList $ Stream.take 4 $ Stream.enumerateFrom (0 :: Int)
[0,1,2,3]

For Fractional types, enumeration is numerically stable. However, no overflow or underflow checks are performed.

>>> Stream.toList $ Stream.take 4 $ Stream.enumerateFrom 1.1
[1.1,2.1,3.1,4.1]

Since: 0.6.0

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => a -> a -> t m a Source #

Generate a finite stream starting with the element from, enumerating the type up to the value to. If to is smaller than from then an empty stream is returned.

>>> Stream.toList $ Stream.enumerateFromTo 0 4
[0,1,2,3,4]

For Fractional types, the last element is equal to the specified to value after rounding to the nearest integral value.

>>> Stream.toList $ Stream.enumerateFromTo 1.1 4
[1.1,2.1,3.1,4.1]

>>> Stream.toList $ Stream.enumerateFromTo 1.1 4.6
[1.1,2.1,3.1,4.1,5.1]

Since: 0.6.0

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => a -> a -> t m a Source #

enumerateFromThen from then generates a stream whose first element is from, the second element is then and the successive elements are in increments of then - from. Enumeration can occur downwards or upwards depending on whether then comes before or after from. For Bounded types the stream ends when maxBound is reached, for unbounded types it keeps enumerating infinitely.

>>> Stream.toList $ Stream.take 4 $ Stream.enumerateFromThen 0 2
[0,2,4,6]

>>> Stream.toList $ Stream.take 4 $ Stream.enumerateFromThen 0 (-2)
[0,-2,-4,-6]

Since: 0.6.0

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => a -> a -> a -> t m a Source #

enumerateFromThenTo from then to generates a finite stream whose first element is from, the second element is then and the successive elements are in increments of then - from up to to. Enumeration can occur downwards or upwards depending on whether then comes before or after from.

>>> Stream.toList $ Stream.enumerateFromThenTo 0 2 6
[0,2,4,6]

>>> Stream.toList $ Stream.enumerateFromThenTo 0 (-2) (-6)
[0,-2,-4,-6]

Since: 0.6.0

Instances

Instances details
Enumerable Int16 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int16 -> t m Int16 Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int16 -> Int16 -> t m Int16 Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int16 -> Int16 -> t m Int16 Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int16 -> Int16 -> Int16 -> t m Int16 Source #

Enumerable Int32 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int32 -> t m Int32 Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int32 -> Int32 -> t m Int32 Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int32 -> Int32 -> t m Int32 Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int32 -> Int32 -> Int32 -> t m Int32 Source #

Enumerable Int64 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int64 -> t m Int64 Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int64 -> Int64 -> t m Int64 Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int64 -> Int64 -> t m Int64 Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int64 -> Int64 -> Int64 -> t m Int64 Source #

Enumerable Int8 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int8 -> t m Int8 Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int8 -> Int8 -> t m Int8 Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int8 -> Int8 -> t m Int8 Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int8 -> Int8 -> Int8 -> t m Int8 Source #

Enumerable Word16 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word16 -> t m Word16 Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word16 -> Word16 -> t m Word16 Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word16 -> Word16 -> t m Word16 Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word16 -> Word16 -> Word16 -> t m Word16 Source #

Enumerable Word32 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word32 -> t m Word32 Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word32 -> Word32 -> t m Word32 Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word32 -> Word32 -> t m Word32 Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word32 -> Word32 -> Word32 -> t m Word32 Source #

Enumerable Word64 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word64 -> t m Word64 Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word64 -> Word64 -> t m Word64 Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word64 -> Word64 -> t m Word64 Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word64 -> Word64 -> Word64 -> t m Word64 Source #

Enumerable Word8 Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word8 -> t m Word8 Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word8 -> Word8 -> t m Word8 Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word8 -> Word8 -> t m Word8 Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word8 -> Word8 -> Word8 -> t m Word8 Source #

Enumerable Ordering Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Ordering -> t m Ordering Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Ordering -> Ordering -> t m Ordering Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Ordering -> Ordering -> t m Ordering Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Ordering -> Ordering -> Ordering -> t m Ordering Source #

Enumerable Integer Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Integer -> t m Integer Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Integer -> Integer -> t m Integer Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Integer -> Integer -> t m Integer Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Integer -> Integer -> Integer -> t m Integer Source #

Enumerable Natural Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Natural -> t m Natural Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Natural -> Natural -> t m Natural Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Natural -> Natural -> t m Natural Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Natural -> Natural -> Natural -> t m Natural Source #

Enumerable () Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => () -> t m () Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => () -> () -> t m () Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => () -> () -> t m () Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => () -> () -> () -> t m () Source #

Enumerable Bool Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Bool -> t m Bool Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Bool -> Bool -> t m Bool Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Bool -> Bool -> t m Bool Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Bool -> Bool -> Bool -> t m Bool Source #

Enumerable Char Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Char -> t m Char Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Char -> Char -> t m Char Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Char -> Char -> t m Char Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Char -> Char -> Char -> t m Char Source #

Enumerable Double Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Double -> t m Double Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Double -> Double -> t m Double Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Double -> Double -> t m Double Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Double -> Double -> Double -> t m Double Source #

Enumerable Float Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Float -> t m Float Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Float -> Float -> t m Float Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Float -> Float -> t m Float Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Float -> Float -> Float -> t m Float Source #

Enumerable Int Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int -> t m Int Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int -> Int -> t m Int Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int -> Int -> t m Int Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Int -> Int -> Int -> t m Int Source #

Enumerable Word Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word -> t m Word Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word -> Word -> t m Word Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word -> Word -> t m Word Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Word -> Word -> Word -> t m Word Source #

Enumerable a => Enumerable (Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Identity a -> t m (Identity a) Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Identity a -> Identity a -> t m (Identity a) Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Identity a -> Identity a -> t m (Identity a) Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Identity a -> Identity a -> Identity a -> t m (Identity a) Source #

Integral a => Enumerable (Ratio a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Ratio a -> t m (Ratio a) Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Ratio a -> Ratio a -> t m (Ratio a) Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Ratio a -> Ratio a -> t m (Ratio a) Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Ratio a -> Ratio a -> Ratio a -> t m (Ratio a) Source #

HasResolution a => Enumerable (Fixed a) Source # 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Enumeration

Methods

enumerateFrom :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Fixed a -> t m (Fixed a) Source #

enumerateFromTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Fixed a -> Fixed a -> t m (Fixed a) Source #

enumerateFromThen :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Fixed a -> Fixed a -> t m (Fixed a) Source #

enumerateFromThenTo :: forall t (m :: Type -> Type). (IsStream t, Monad m) => Fixed a -> Fixed a -> Fixed a -> t m (Fixed a) Source #

enumerateFromStepIntegral :: forall t (m :: Type -> Type) a. (IsStream t, Monad m, Integral a) => a -> a -> t m a Source #

enumerateFromStepIntegral from step generates an infinite stream whose first element is from and the successive elements are in increments of step.

CAUTION: This function is not safe for finite integral types. It does not check for overflow, underflow or bounds.

>>> Stream.toList $ Stream.take 4 $ Stream.enumerateFromStepIntegral 0 2
[0,2,4,6]

>>> Stream.toList $ Stream.take 3 $ Stream.enumerateFromStepIntegral 0 (-2)
[0,-2,-4]

Since: 0.6.0

enumerateFromIntegral :: forall t (m :: Type -> Type) a. (IsStream t, Monad m, Integral a, Bounded a) => a -> t m a Source #

Enumerate an Integral type. enumerateFromIntegral from generates a stream whose first element is from and the successive elements are in increments of 1. The stream is bounded by the size of the Integral type.

>>> Stream.toList $ Stream.take 4 $ Stream.enumerateFromIntegral (0 :: Int)
[0,1,2,3]

Since: 0.6.0

enumerateFromThenIntegral :: forall t (m :: Type -> Type) a. (IsStream t, Monad m, Integral a, Bounded a) => a -> a -> t m a Source #

Enumerate an Integral type in steps. enumerateFromThenIntegral from then generates a stream whose first element is from, the second element is then and the successive elements are in increments of then - from. The stream is bounded by the size of the Integral type.

>>> Stream.toList $ Stream.take 4 $ Stream.enumerateFromThenIntegral (0 :: Int) 2
[0,2,4,6]

>>> Stream.toList $ Stream.take 4 $ Stream.enumerateFromThenIntegral (0 :: Int) (-2)
[0,-2,-4,-6]

Since: 0.6.0

enumerateFromToIntegral :: forall t (m :: Type -> Type) a. (IsStream t, Monad m, Integral a) => a -> a -> t m a Source #

Enumerate an Integral type up to a given limit. enumerateFromToIntegral from to generates a finite stream whose first element is from and successive elements are in increments of 1 up to to.

>>> Stream.toList $ Stream.enumerateFromToIntegral 0 4
[0,1,2,3,4]

Since: 0.6.0

enumerateFromThenToIntegral :: forall t (m :: Type -> Type) a. (IsStream t, Monad m, Integral a) => a -> a -> a -> t m a Source #

Enumerate an Integral type in steps up to a given limit. enumerateFromThenToIntegral from then to generates a finite stream whose first element is from, the second element is then and the successive elements are in increments of then - from up to to.

>>> Stream.toList $ Stream.enumerateFromThenToIntegral 0 2 6
[0,2,4,6]

>>> Stream.toList $ Stream.enumerateFromThenToIntegral 0 (-2) (-6)
[0,-2,-4,-6]

Since: 0.6.0

enumerateFromFractional :: forall t (m :: Type -> Type) a. (IsStream t, Monad m, Fractional a) => a -> t m a Source #

Numerically stable enumeration from a Fractional number in steps of size 1. enumerateFromFractional from generates a stream whose first element is from and the successive elements are in increments of 1. No overflow or underflow checks are performed.

This is the equivalent to enumFrom for Fractional types. For example:

>>> Stream.toList $ Stream.take 4 $ Stream.enumerateFromFractional 1.1
[1.1,2.1,3.1,4.1]

Since: 0.6.0

enumerateFromThenFractional :: forall t (m :: Type -> Type) a. (IsStream t, Monad m, Fractional a) => a -> a -> t m a Source #

Numerically stable enumeration from a Fractional number in steps. enumerateFromThenFractional from then generates a stream whose first element is from, the second element is then and the successive elements are in increments of then - from. No overflow or underflow checks are performed.

This is the equivalent of enumFromThen for Fractional types. For example:

>>> Stream.toList $ Stream.take 4 $ Stream.enumerateFromThenFractional 1.1 2.1
[1.1,2.1,3.1,4.1]

>>> Stream.toList $ Stream.take 4 $ Stream.enumerateFromThenFractional 1.1 (-2.1)
[1.1,-2.1,-5.300000000000001,-8.500000000000002]

Since: 0.6.0

enumerateFromToFractional :: forall t (m :: Type -> Type) a. (IsStream t, Monad m, Fractional a, Ord a) => a -> a -> t m a Source #

Numerically stable enumeration from a Fractional number to a given limit. enumerateFromToFractional from to generates a finite stream whose first element is from and successive elements are in increments of 1 up to to.

This is the equivalent of enumFromTo for Fractional types. For example:

>>> Stream.toList $ Stream.enumerateFromToFractional 1.1 4
[1.1,2.1,3.1,4.1]

>>> Stream.toList $ Stream.enumerateFromToFractional 1.1 4.6
[1.1,2.1,3.1,4.1,5.1]

Notice that the last element is equal to the specified to value after rounding to the nearest integer.

Since: 0.6.0

enumerateFromThenToFractional :: forall t (m :: Type -> Type) a. (IsStream t, Monad m, Fractional a, Ord a) => a -> a -> a -> t m a Source #

Numerically stable enumeration from a Fractional number in steps up to a given limit. enumerateFromThenToFractional from then to generates a finite stream whose first element is from, the second element is then and the successive elements are in increments of then - from up to to.

This is the equivalent of enumFromThenTo for Fractional types. For example:

>>> Stream.toList $ Stream.enumerateFromThenToFractional 0.1 2 6
[0.1,2.0,3.9,5.799999999999999]

>>> Stream.toList $ Stream.enumerateFromThenToFractional 0.1 (-2) (-6)
[0.1,-2.0,-4.1000000000000005,-6.200000000000001]

Since: 0.6.0

enumerateFromToSmall :: forall t (m :: Type -> Type) a. (IsStream t, Monad m, Enum a) => a -> a -> t m a Source #

enumerateFromTo for Enum types not larger than Int.

Since: 0.6.0

enumerateFromThenToSmall :: forall t (m :: Type -> Type) a. (IsStream t, Monad m, Enum a) => a -> a -> a -> t m a Source #

enumerateFromThenTo for Enum types not larger than Int.

Since: 0.6.0

enumerateFromThenSmallBounded :: forall t (m :: Type -> Type) a. (IsStream t, Monad m, Enumerable a, Bounded a) => a -> a -> t m a Source #

enumerateFromThen for Enum types not larger than Int.

Note: We convert the Enum to Int and enumerate the Int. If a type is bounded but does not have a Bounded instance then we can go on enumerating it beyond the legal values of the type, resulting in the failure of toEnum when converting back to Enum. Therefore we require a Bounded instance for this function to be safely used.

Since: 0.6.0

enumerate :: forall t (m :: Type -> Type) a. (IsStream t, Monad m, Bounded a, Enumerable a) => t m a Source #

enumerate = enumerateFrom minBound

Enumerate a Bounded type from its minBound to maxBound

Since: 0.6.0

enumerateTo :: forall t (m :: Type -> Type) a. (IsStream t, Monad m, Bounded a, Enumerable a) => a -> t m a Source #

enumerateTo = enumerateFromTo minBound

Enumerate a Bounded type from its minBound to specified value.

Since: 0.6.0

enumerateFromBounded :: forall t (m :: Type -> Type) a. (IsStream t, Monad m, Enumerable a, Bounded a) => a -> t m a Source #

enumerateFromBounded = enumerateFromTo from maxBound

enumerateFrom for Bounded Enum types.

Since: 0.6.0

fromStream :: forall t (m :: Type -> Type) a. (IsStream t, Monad m) => Stream m a -> t m a Source #

toStream :: forall t (m :: Type -> Type) a. (IsStream t, Monad m) => t m a -> Stream m a Source #