streamly
Copyright(c) 2021 Composewell Technologies
LicenseBSD-3-Clause
Maintainerstreamly@composewell.com
Stabilityreleased
PortabilityGHC
Safe HaskellNone
LanguageHaskell2010

Streamly.Data.Fold.Prelude

Description

All Fold related combinators including the streamly-core Streamly.Data.Fold module, concurrency, unordered container operations.

Synopsis

Setup

To execute the code examples provided in this module in ghci, please run the following commands first.

>>> :m
>>> :set -XFlexibleContexts
>>> import Control.Concurrent (threadDelay)
>>> import Data.List (sortOn)
>>> import Data.HashMap.Strict (HashMap)
>>> import Streamly.Data.Fold (Fold)
>>> import Streamly.Data.Stream (Stream)
>>> import qualified Data.HashMap.Strict as HM
>>> import qualified Streamly.Data.Fold.Prelude as Fold
>>> import qualified Streamly.Data.Stream.Prelude as Stream

For APIs that have not been released yet.

>>> import qualified Streamly.Internal.Data.Fold as Fold
>>> import qualified Streamly.Internal.Data.Fold.Prelude as Fold

Streamly.Data.Fold

All Streamly.Data.Fold combinators are re-exported via this module. For more pre-release combinators also see Streamly.Internal.Data.Fold module.

Concurrent Operations

Configuration

data Config Source #

An abstract type for specifying the configuration parameters of a Channel. Use Config -> Config modifier functions to modify the default configuration. See the individual modifier documentation for default values.

maxBuffer :: Int -> Config -> Config Source #

Specify the maximum size of the buffer for storing the results from concurrent computations. If the buffer becomes full we stop spawning more concurrent tasks until there is space in the buffer. A value of 0 resets the buffer size to default, a negative value means there is no limit. The default value is 1500.

CAUTION! using an unbounded maxBuffer value (i.e. a negative value) coupled with an unbounded maxThreads value is a recipe for disaster in presence of infinite streams, or very large streams. Especially, it must not be used when pure is used in ZipAsyncM streams as pure in applicative zip streams generates an infinite stream causing unbounded concurrent generation with no limit on the buffer or threads.

boundThreads :: Bool -> Config -> Config Source #

Spawn bound threads (i.e., spawn threads using forkOS instead of forkIO). The default value is False.

Currently, this only takes effect only for concurrent folds.

inspect :: Bool -> Config -> Config Source #

Print debug information about the Channel when the stream ends. When the stream does not end normally, the channel debug information is printed when the channel is garbage collected. If you are expecting but not seeing the debug info try adding a performMajorGC before the program ends.

Combinators

parBuffered :: forall (m :: Type -> Type) a b. MonadAsync m => (Config -> Config) -> Fold m a b -> Fold m a b Source #

parBuffered introduces a concurrent stage at the input of the fold. The inputs are asynchronously queued in a buffer and evaluated concurrently with the evaluation of the source stream. On finalization, parBuffered waits for the asynchronous fold to complete before it returns.

In the following example both the stream and the fold have a 1 second delay, but the delay is not compounded because both run concurrently.

>>> delay x = threadDelay 1000000 >> print x >> return x
>>> src = Stream.delay 1 (Stream.enumerateFromTo 1 3)
>>> dst = Fold.parBuffered id (Fold.lmapM delay Fold.sum)
>>> Stream.fold dst src
...

Another example:

>>> Stream.toList $ Stream.groupsOf 4 dst src
...

parTee :: forall (m :: Type -> Type) x a b. MonadAsync m => (Config -> Config) -> Fold m x a -> Fold m x b -> Fold m x (a, b) Source #

Execute both the folds in a tee concurrently.

Definition:

>>> parTee cfg c1 c2 = Fold.teeWith (,) (Fold.parBuffered cfg c1) (Fold.parBuffered cfg c2)

Example:

>>> delay x = threadDelay 1000000 >> print x >> return x
>>> c1 = Fold.lmapM delay Fold.sum
>>> c2 = Fold.lmapM delay Fold.length
>>> dst = Fold.parTee id c1 c2
>>> Stream.fold dst src
...

parDistribute :: forall (m :: Type -> Type) a b. MonadAsync m => (Config -> Config) -> [Fold m a b] -> Fold m a [b] Source #

Distribute the input to all the folds in the supplied list concurrently.

Definition:

>>> parDistribute cfg = Fold.distribute . fmap (Fold.parBuffered cfg)

Example:

>>> delay x = threadDelay 1000000 >> print x >> return x
>>> c = Fold.lmapM delay Fold.sum
>>> dst = Fold.parDistribute id [c,c,c]
>>> Stream.fold dst src
...

parDistributeScan :: MonadAsync m => (Config -> Config) -> m [Fold m a b] -> Stream m a -> Stream m [b] Source #

Evaluate a stream and send its outputs to zero or more dynamically generated folds. It checks for any new folds at each input generation step. Any new fold is added to the list of folds which are currently running. If there are no folds available, the input is discarded. If a fold completes its output is emitted in the output of the scan.

>>> import Data.IORef
>>> ref <- newIORef [Fold.take 2 Fold.sum, Fold.take 2 Fold.length :: Fold.Fold IO Int Int]
>>> gen = atomicModifyIORef ref (\xs -> ([], xs))
>>> Stream.toList $ Fold.parDistributeScan id gen (Stream.enumerateFromTo 1 10)
...

parPartition :: forall (m :: Type -> Type) b x c y. MonadAsync m => (Config -> Config) -> Fold m b x -> Fold m c y -> Fold m (Either b c) (x, y) Source #

Select first fold for Left input and second for Right input. Both folds run concurrently.

Definition

>>> parPartition cfg c1 c2 = Fold.partition (Fold.parBuffered cfg c1) (Fold.parBuffered cfg c2)

Example:

>>> delay x = threadDelay 1000000 >> print x >> return x
>>> c1 = Fold.lmapM delay Fold.sum
>>> c2 = Fold.lmapM delay Fold.sum
>>> dst = Fold.parPartition id c1 c2
>>> Stream.fold dst $ (fmap (\x -> if even x then Left x else Right x)) src
...

parDemuxScan :: (MonadAsync m, Ord k) => (Config -> Config) -> (a -> k) -> (k -> m (Fold m a b)) -> Stream m a -> Stream m [(k, b)] Source #

Evaluate a stream and send its outputs to the selected fold. The fold is dynamically selected using a key at the time of the first input seen for that key. Any new fold is added to the list of folds which are currently running. If there are no folds available for a given key, the input is discarded. If a fold completes its output is emitted in the output of the scan.

>>> import qualified Data.Map.Strict as Map
>>> import Data.Maybe (fromJust)
>>> f1 = ("even", Fold.take 2 Fold.sum)
>>> f2 = ("odd", Fold.take 2 Fold.sum)
>>> kv = Map.fromList [f1, f2]
>>> getFold k = return (fromJust $ Map.lookup k kv)
>>> getKey x = if even x then "even" else "odd"
>>> input = Stream.enumerateFromTo 1 10
>>> Stream.toList $ Fold.parDemuxScan id getKey getFold input
...

parUnzip :: forall (m :: Type -> Type) b x c y. MonadAsync m => (Config -> Config) -> Fold m b x -> Fold m c y -> Fold m (b, c) (x, y) Source #

Split and distribute the output to two different folds and then zip the results. Both the consumer folds run concurrently.

Definition

>>> parUnzip cfg c1 c2 = Fold.unzip (Fold.parBuffered cfg c1) (Fold.parBuffered cfg c2)

Example:

>>> delay x = threadDelay 1000000 >> print x >> return x
>>> c1 = Fold.lmapM delay Fold.sum
>>> c2 = Fold.lmapM delay Fold.sum
>>> dst = Fold.parUnzip id c1 c2
>>> Stream.fold dst $ (fmap (\x -> (x, x * x))) src
...

Container Related

toHashMapIO :: forall (m :: Type -> Type) k a b. (MonadIO m, Hashable k) => (a -> k) -> Fold m a b -> Fold m a (HashMap k b) Source #

Split the input stream based on a hashable component of the key field and fold each split using the given fold. Useful for map/reduce, bucketizing the input in different bins or for generating histograms.

Consider a stream of key value pairs:

>>> input = Stream.fromList [("k1",1),("k1",1.1),("k2",2), ("k2",2.2)]

Classify each key to a different hash bin and fold the bins:

>>> classify = Fold.toHashMapIO fst (Fold.lmap snd Fold.toList)
>>> sortOn fst . HM.toList <$> Stream.fold classify input :: IO [(String, [Double])]
[("k1",[1.0,1.1]),("k2",[2.0,2.2])]

Pre-release

Deprecated

parEval :: forall (m :: Type -> Type) a b. MonadAsync m => (Config -> Config) -> Fold m a b -> Fold m a b Source #

Deprecated: Please use parBuffered instead.

parBuffered introduces a concurrent stage at the input of the fold. The inputs are asynchronously queued in a buffer and evaluated concurrently with the evaluation of the source stream. On finalization, parBuffered waits for the asynchronous fold to complete before it returns.

In the following example both the stream and the fold have a 1 second delay, but the delay is not compounded because both run concurrently.

>>> delay x = threadDelay 1000000 >> print x >> return x
>>> src = Stream.delay 1 (Stream.enumerateFromTo 1 3)
>>> dst = Fold.parBuffered id (Fold.lmapM delay Fold.sum)
>>> Stream.fold dst src
...

Another example:

>>> Stream.toList $ Stream.groupsOf 4 dst src
...