streamly
Copyright(c) 2025 Composewell Technologies
LicenseBSD-3-Clause
Maintainerstreamly@composewell.com
Stabilityreleased
PortabilityGHC
Safe HaskellNone
LanguageHaskell2010

Streamly.Data.Scanl.Prelude

Description

All Scan related combinators including the streamly-core Streamly.Data.Scanl module, concurrency, unordered container operations.

Synopsis

Setup

To execute the code examples provided in this module in ghci, please run the following commands first.

>>> :m
>>> :set -XFlexibleContexts
>>> import Control.Concurrent (threadDelay)
>>> import qualified Streamly.Data.Stream as Stream
>>> import qualified Streamly.Data.Scanl.Prelude as Scanl

Streamly.Data.Scanl

All Streamly.Data.Scanl combinators are re-exported via this module. For more pre-release combinators also see Streamly.Internal.Data.Scanl module.

Concurrent Operations

Configuration

data Config Source #

An abstract type for specifying the configuration parameters of a Channel. Use Config -> Config modifier functions to modify the default configuration. See the individual modifier documentation for default values.

maxBuffer :: Int -> Config -> Config Source #

Specify the maximum size of the buffer for storing the results from concurrent computations. If the buffer becomes full we stop spawning more concurrent tasks until there is space in the buffer. A value of 0 resets the buffer size to default, a negative value means there is no limit. The default value is 1500.

CAUTION! using an unbounded maxBuffer value (i.e. a negative value) coupled with an unbounded maxThreads value is a recipe for disaster in presence of infinite streams, or very large streams. Especially, it must not be used when pure is used in ZipAsyncM streams as pure in applicative zip streams generates an infinite stream causing unbounded concurrent generation with no limit on the buffer or threads.

boundThreads :: Bool -> Config -> Config Source #

Spawn bound threads (i.e., spawn threads using forkOS instead of forkIO). The default value is False.

Currently, this only takes effect only for concurrent folds.

inspect :: Bool -> Config -> Config Source #

Print debug information about the Channel when the stream ends. When the stream does not end normally, the channel debug information is printed when the channel is garbage collected. If you are expecting but not seeing the debug info try adding a performMajorGC before the program ends.

Combinators

parTeeWith :: forall (m :: Type -> Type) a b c x. MonadAsync m => (Config -> Config) -> (a -> b -> c) -> Scanl m x a -> Scanl m x b -> Scanl m x c Source #

Execute both the scans in a tee concurrently.

Example:

>>> src = Stream.delay 1 (Stream.enumerateFromTo 1 3)
>>> delay x = threadDelay 1000000 >> print x >> return x
>>> c1 = Scanl.lmapM delay Scanl.sum
>>> c2 = Scanl.lmapM delay Scanl.length
>>> dst = Scanl.parTeeWith id (,) c1 c2
>>> Stream.toList $ Stream.scanl dst src
...

parDistributeScanM :: MonadAsync m => (Config -> Config) -> m [Scanl m a b] -> Stream m a -> Stream m [b] Source #

Evaluate a stream by distributing its inputs across zero or more concurrently running scans. New scans can be generated dynamically. Use parDistributeScan for an eaiser to use interface, if you do not need the power of parDistributeScanM.

Before processing each input element, the supplied action is executed to produce additional scans. These scans are appended to the set of currently active scans. If you do not want the same scan to be added repeatedly, ensure that the action only generates it once (see the example below).

If there are no scans currently active, the input element is discarded. The results from all active scans are collected and lattened into the the output stream.

Concurrency and buffering:

If the input buffer (see maxBuffer) is bounded, a scan may block until space becomes available. If any scan is blocked on buffer, all scans are blocked. Processing continues only when all scans have buffer space available.

Example:

>>> import Data.IORef
>>> ref <- newIORef [Scanl.take 5 Scanl.sum, Scanl.take 5 Scanl.length :: Scanl.Scanl IO Int Int]
>>> gen = atomicModifyIORef ref (\xs -> ([], xs))
>>> Stream.toList $ Scanl.parDistributeScanM id gen (Stream.enumerateFromTo 1 10)
...

parDistributeScan :: forall (m :: Type -> Type) a b. MonadAsync m => (Config -> Config) -> [Scanl m a b] -> Stream m a -> Stream m [b] Source #

A pure variant of parDistributeScanM that uses a fixed list of scans.

The provided scans are started once and run concurrently for the duration of the stream. Each input element is distributed to all active scans, and their outputs are collected and emitted together.

Example:

>>> xs = [Scanl.take 5 Scanl.sum, Scanl.take 5 Scanl.length :: Scanl.Scanl IO Int Int]
>>> Stream.toList $ Scanl.parDistributeScan id xs (Stream.enumerateFromTo 1 10)
...

parDemuxScanM :: (MonadAsync m, Ord k) => (Config -> Config) -> (a -> k) -> (k -> m (Scanl m a b)) -> Stream m a -> Stream m [(k, b)] Source #

Evaluate a stream by routing each input to a scan determined by a key.

For each distinct key, the first input encountered triggers the creation of a new scan (via the supplied key-to-scan function). This scan is then added to the set of currently active scans. Subsequent inputs with the same key are directed to the same scan.

If no scan can be created for a key, the input element is discarded.

When a constituent scan completes, its final output is emitted as part of the composed output stream. The output of parDemuxScanM is a stream of key–value pairs, where each value is the output produced by the scan corresponding to that key.

For a simpler interface, use parDemuxScan if you do not need the full flexibility of parDemuxScanM.

Example:

>>> import qualified Data.Map.Strict as Map
>>> import Data.Maybe (fromJust)
>>> f1 = ("even", Scanl.take 5 Scanl.sum)
>>> f2 = ("odd",  Scanl.take 5 Scanl.sum)
>>> kv = Map.fromList [f1, f2]
>>> getScan k = return (fromJust $ Map.lookup k kv)
>>> getKey x = if even x then "even" else "odd"
>>> input = Stream.enumerateFromTo 1 10
>>> Stream.toList $ Scanl.parDemuxScanM id getKey getScan input
...

parDemuxScan :: forall (m :: Type -> Type) k a b. (MonadAsync m, Ord k) => (Config -> Config) -> (a -> k) -> (k -> Scanl m a b) -> Stream m a -> Stream m [(k, b)] Source #

A pure variant of parDemuxScanM where the key-to-scan mapping is static and does not require monadic effects.

Each distinct key is deterministically mapped to a scan using the provided function. The behavior is otherwise the same as parDemuxScanM.