Copyright | (c) 2022 Composewell Technologies |
---|---|
License | BSD-3-Clause |
Maintainer | streamly@composewell.com |
Stability | experimental |
Portability | GHC |
Safe Haskell | None |
Language | Haskell2010 |
Streamly.Internal.Data.Scanl.Prelude
Contents
Description
Synopsis
- finalize :: MonadIO m => Channel m a b -> m ()
- cleanup :: MonadIO m => Channel m a b -> m ()
- data Channel (m :: Type -> Type) a b = Channel {
- inputQueue :: IORef ([ChildEvent a], Int)
- maxInputBuffer :: Limit
- inputItemDoorBell :: MVar ()
- closedForInput :: IORef Bool
- inputSpaceDoorBell :: MVar ()
- readInputQ :: m [ChildEvent a]
- outputQueue :: IORef ([OutEvent b], Int)
- outputDoorBell :: MVar ()
- svarRef :: Maybe (IORef ())
- svarStats :: SVarStats
- svarInspectMode :: Bool
- svarCreator :: ThreadId
- data Config
- maxBuffer :: Int -> Config -> Config
- inspect :: Bool -> Config -> Config
- boundThreads :: Bool -> Config -> Config
- defaultConfig :: Config
- dumpChannel :: forall (m :: Type -> Type) a b. Channel m a b -> IO String
- newChannel :: MonadRunInIO m => (Config -> Config) -> Fold m a b -> m (Channel m a b)
- data OutEvent b
- newChannelWith :: MonadRunInIO m => IORef ([OutEvent b], Int) -> MVar () -> (Config -> Config) -> Fold m a b -> m (Channel m a b, ThreadId)
- newChannelWithScan :: MonadRunInIO m => IORef ([OutEvent b], Int) -> MVar () -> (Config -> Config) -> Scanl m a b -> m (Channel m a b, ThreadId)
- newScanChannel :: MonadRunInIO m => (Config -> Config) -> Scanl m a b -> m (Channel m a b)
- sendToWorker :: MonadAsync m => Channel m a b -> a -> m (Maybe b)
- sendToWorker_ :: MonadAsync m => Channel m a b -> a -> m ()
- checkFoldStatus :: MonadAsync m => Channel m a b -> m (Maybe b)
- parDistributeScan :: forall (m :: Type -> Type) a b. MonadAsync m => (Config -> Config) -> [Scanl m a b] -> Stream m a -> Stream m [b]
- parDemuxScan :: forall (m :: Type -> Type) k a b. (MonadAsync m, Ord k) => (Config -> Config) -> (a -> k) -> (k -> Scanl m a b) -> Stream m a -> Stream m [(k, b)]
- parTeeWith :: forall (m :: Type -> Type) a b c x. MonadAsync m => (Config -> Config) -> (a -> b -> c) -> Scanl m x a -> Scanl m x b -> Scanl m x c
- parDistributeScanM :: MonadAsync m => (Config -> Config) -> m [Scanl m a b] -> Stream m a -> Stream m [b]
- parDemuxScanM :: (MonadAsync m, Ord k) => (Config -> Config) -> (a -> k) -> (k -> m (Scanl m a b)) -> Stream m a -> Stream m [(k, b)]
Channel
data Channel (m :: Type -> Type) a b Source #
The fold driver thread queues the input of the fold in the inputQueue
The driver rings the doorbell when the queue transitions from empty to
non-empty state.
The fold consumer thread dequeues the input items from the inputQueue
and
supplies them to the fold. When the fold is done the output of the fold is
placed in inputQueue
and outputDoorBell
is rung.
The fold driver thread keeps watching the outputQueue
, if the fold has
terminated, it stops queueing the input to the inputQueue
If the fold driver runs out of input it stops and waits for the fold to drain the buffered input.
Driver thread ------>------Input Queue and Doorbell ----->-----Fold thread
Driver thread ------<------Output Queue and Doorbell-----<-----Fold thread
Constructors
Channel | |
Fields
|
An abstract type for specifying the configuration parameters of a
Channel
. Use Config -> Config
modifier functions to modify the default
configuration. See the individual modifier documentation for default values.
maxBuffer :: Int -> Config -> Config Source #
Specify the maximum size of the buffer for storing the results from concurrent computations. If the buffer becomes full we stop spawning more concurrent tasks until there is space in the buffer. A value of 0 resets the buffer size to default, a negative value means there is no limit. The default value is 1500.
CAUTION! using an unbounded maxBuffer
value (i.e. a negative value)
coupled with an unbounded maxThreads
value is a recipe for disaster in
presence of infinite streams, or very large streams. Especially, it must
not be used when pure
is used in ZipAsyncM
streams as pure
in
applicative zip streams generates an infinite stream causing unbounded
concurrent generation with no limit on the buffer or threads.
inspect :: Bool -> Config -> Config Source #
Print debug information about the Channel
when the stream ends. When the
stream does not end normally, the channel debug information is printed when
the channel is garbage collected. If you are expecting but not seeing the
debug info try adding a performMajorGC
before the program ends.
boundThreads :: Bool -> Config -> Config Source #
Spawn bound threads (i.e., spawn threads using forkOS
instead of
forkIO
). The default value is False
.
Currently, this only takes effect only for concurrent folds.
defaultConfig :: Config Source #
The fields prefixed by an _ are not to be accessed or updated directly but via smart accessor APIs. Use get/set routines instead of directly accessing the Config fields
dumpChannel :: forall (m :: Type -> Type) a b. Channel m a b -> IO String Source #
Dump the channel stats for diagnostics. Used when inspect
option is
enabled.
newChannel :: MonadRunInIO m => (Config -> Config) -> Fold m a b -> m (Channel m a b) Source #
Constructors
FoldException ThreadId SomeException | |
FoldPartial b | |
FoldDone ThreadId b | |
FoldEOF ThreadId |
newChannelWith :: MonadRunInIO m => IORef ([OutEvent b], Int) -> MVar () -> (Config -> Config) -> Fold m a b -> m (Channel m a b, ThreadId) Source #
newChannelWithScan :: MonadRunInIO m => IORef ([OutEvent b], Int) -> MVar () -> (Config -> Config) -> Scanl m a b -> m (Channel m a b, ThreadId) Source #
newScanChannel :: MonadRunInIO m => (Config -> Config) -> Scanl m a b -> m (Channel m a b) Source #
sendToWorker :: MonadAsync m => Channel m a b -> a -> m (Maybe b) Source #
Push values from a driver to a fold worker via a Channel. Blocks if no
space is available in the buffer. Before pushing a value to the Channel it
polls for events received from the fold worker. If a stop event is received
then it returns True
otherwise false. Propagates exceptions received from
the fold worker.
sendToWorker_ :: MonadAsync m => Channel m a b -> a -> m () Source #
Like sendToWorker but only sends, does not receive any events from the fold.
checkFoldStatus :: MonadAsync m => Channel m a b -> m (Maybe b) Source #
Concurrency
parDistributeScan :: forall (m :: Type -> Type) a b. MonadAsync m => (Config -> Config) -> [Scanl m a b] -> Stream m a -> Stream m [b] Source #
A pure variant of parDistributeScanM
that uses a fixed list of scans.
The provided scans are started once and run concurrently for the duration of the stream. Each input element is distributed to all active scans, and their outputs are collected and emitted together.
Example:
>>>
xs = [Scanl.take 5 Scanl.sum, Scanl.take 5 Scanl.length :: Scanl.Scanl IO Int Int]
>>>
Stream.toList $ Scanl.parDistributeScan id xs (Stream.enumerateFromTo 1 10)
...
parDemuxScan :: forall (m :: Type -> Type) k a b. (MonadAsync m, Ord k) => (Config -> Config) -> (a -> k) -> (k -> Scanl m a b) -> Stream m a -> Stream m [(k, b)] Source #
A pure variant of parDemuxScanM
where the key-to-scan mapping is
static and does not require monadic effects.
Each distinct key is deterministically mapped to a scan using the provided
function. The behavior is otherwise the same as parDemuxScanM
.
parTeeWith :: forall (m :: Type -> Type) a b c x. MonadAsync m => (Config -> Config) -> (a -> b -> c) -> Scanl m x a -> Scanl m x b -> Scanl m x c Source #
Execute both the scans in a tee concurrently.
Example:
>>>
src = Stream.delay 1 (Stream.enumerateFromTo 1 3)
>>>
delay x = threadDelay 1000000 >> print x >> return x
>>>
c1 = Scanl.lmapM delay Scanl.sum
>>>
c2 = Scanl.lmapM delay Scanl.length
>>>
dst = Scanl.parTeeWith id (,) c1 c2
>>>
Stream.toList $ Stream.scanl dst src
...
parDistributeScanM :: MonadAsync m => (Config -> Config) -> m [Scanl m a b] -> Stream m a -> Stream m [b] Source #
Evaluate a stream by distributing its inputs across zero or more
concurrently running scans. New scans can be generated dynamically. Use
parDistributeScan
for an eaiser to use interface, if you do not need the
power of parDistributeScanM
.
Before processing each input element, the supplied action is executed to produce additional scans. These scans are appended to the set of currently active scans. If you do not want the same scan to be added repeatedly, ensure that the action only generates it once (see the example below).
If there are no scans currently active, the input element is discarded. The results from all active scans are collected and lattened into the the output stream.
Concurrency and buffering:
If the input buffer (see maxBuffer
) is bounded, a scan may block until
space becomes available. If any scan is blocked on buffer, all scans are
blocked. Processing continues only when all scans have buffer space
available.
Example:
>>>
import Data.IORef
>>>
ref <- newIORef [Scanl.take 5 Scanl.sum, Scanl.take 5 Scanl.length :: Scanl.Scanl IO Int Int]
>>>
gen = atomicModifyIORef ref (\xs -> ([], xs))
>>>
Stream.toList $ Scanl.parDistributeScanM id gen (Stream.enumerateFromTo 1 10)
...
parDemuxScanM :: (MonadAsync m, Ord k) => (Config -> Config) -> (a -> k) -> (k -> m (Scanl m a b)) -> Stream m a -> Stream m [(k, b)] Source #
Evaluate a stream by routing each input to a scan determined by a key.
For each distinct key, the first input encountered triggers the creation of a new scan (via the supplied key-to-scan function). This scan is then added to the set of currently active scans. Subsequent inputs with the same key are directed to the same scan.
If no scan can be created for a key, the input element is discarded.
When a constituent scan completes, its final output is emitted as part of
the composed output stream. The output of parDemuxScanM
is a stream of
key–value pairs, where each value is the output produced by the scan
corresponding to that key.
For a simpler interface, use parDemuxScan
if you do not need the full
flexibility of parDemuxScanM
.
Example:
>>>
import qualified Data.Map.Strict as Map
>>>
import Data.Maybe (fromJust)
>>>
f1 = ("even", Scanl.take 5 Scanl.sum)
>>>
f2 = ("odd", Scanl.take 5 Scanl.sum)
>>>
kv = Map.fromList [f1, f2]
>>>
getScan k = return (fromJust $ Map.lookup k kv)
>>>
getKey x = if even x then "even" else "odd"
>>>
input = Stream.enumerateFromTo 1 10
>>>
Stream.toList $ Scanl.parDemuxScanM id getKey getScan input
...