streamly
Copyright(c) 2017 Composewell Technologies
LicenseBSD-3-Clause
Maintainerstreamly@composewell.com
Stabilityexperimental
PortabilityGHC
Safe HaskellNone
LanguageHaskell2010

Streamly.Internal.Data.Channel

Description

This module contains operations that are common for Stream and Fold channels.

Synopsis

Channel Config & Stats

data Limit Source #

Constructors

Unlimited 
Limited Word 

Instances

Instances details
Show Limit Source # 
Instance details

Defined in Streamly.Internal.Data.Channel.Types

Methods

showsPrec :: Int -> Limit -> ShowS #

show :: Limit -> String #

showList :: [Limit] -> ShowS #

Eq Limit Source # 
Instance details

Defined in Streamly.Internal.Data.Channel.Types

Methods

(==) :: Limit -> Limit -> Bool #

(/=) :: Limit -> Limit -> Bool #

Ord Limit Source # 
Instance details

Defined in Streamly.Internal.Data.Channel.Types

Methods

compare :: Limit -> Limit -> Ordering #

(<) :: Limit -> Limit -> Bool #

(<=) :: Limit -> Limit -> Bool #

(>) :: Limit -> Limit -> Bool #

(>=) :: Limit -> Limit -> Bool #

max :: Limit -> Limit -> Limit #

min :: Limit -> Limit -> Limit #

data Rate Source #

Specifies the stream yield rate in yields per second (Hertz). We keep accumulating yield credits at rateGoal. At any point of time we allow only as many yields as we have accumulated as per rateGoal since the start of time. If the consumer or the producer is slower or faster, the actual rate may fall behind or exceed rateGoal. We try to recover the gap between the two by increasing or decreasing the pull rate from the producer. However, if the yield count gap becomes more than rateBuffer (specified as a yield count) we try to recover only as much as rateBuffer.

rateLow puts a bound on how low the instantaneous rate can go when recovering the rate gap. In other words, it determines the maximum yield latency. Similarly, rateHigh puts a bound on how high the instantaneous rate can go when recovering the rate gap. In other words, it determines the minimum yield latency. We reduce the latency by increasing concurrency, therefore we can say that it puts an upper bound on concurrency.

If the rateGoal is 0 or negative the stream never yields a value. If the rateBuffer is 0 or negative we do not attempt to recover.

Constructors

Rate 

Fields

data YieldRateInfo Source #

Rate control.

Constructors

YieldRateInfo 

Fields

  • svarLatencyTarget :: NanoSecond64
     
  • svarLatencyRange :: LatencyRange
     
  • svarRateBuffer :: Int

    Number of yields beyond which we will not try to recover the rate.

  • svarGainedLostYields :: IORef Count

    Yields that we have permanently gained or lost since the start of the channel i.e. we do not want to adjust the rate to make up for this deficit or gain.

    LOCKING
    Unlocked access. Modified by the consumer thread and snapshot read by the worker threads
  • svarAllTimeLatency :: IORef (Count, AbsTime)

    (channel yields from start till now, channel start timestamp) as recorded by the consumer side of the channel.

    LOCKING
    Unlocked access. Modified by the consumer thread, snapshot read by the worker threads.
  • workerBootstrapLatency :: Maybe NanoSecond64

    TODO. Not yet implemented. Worker latency specified by the user to be used as a guide before the first actual measurement arrives.

  • workerPollingInterval :: IORef Count

    After how many yields the worker should update the latency information. If the workerMeasuredLatency is high, this count is kept lower and vice-versa.

    LOCKING
    Unlocked access. Modified by the consumer thread and snapshot read by the worker threads
  • workerPendingLatency :: IORef (Count, Count, NanoSecond64)

    (total yields, measured yields, time taken by measured yields). This is first level collection bucket which is continuously updated by workers and periodically emptied and collected into workerCollectedLatency by the consumer thread.

    "Measured yields" are only those yields for which the latency was measured to be non-zero (note that if the timer resolution is low the measured latency may be zero e.g. on JS platform).

    LOCKING
    Locked access. Atomically modified by the consumer thread as well as worker threads. Workers modify it periodically based on workerPollingInterval and not on every yield to reduce the locking overhead.
  • workerCollectedLatency :: IORef (Count, Count, NanoSecond64)

    workerPendingLatency is periodically reset and aggregated into this by the consumer thread. This itself is reset periodically and svarAllTimeLatency, workerMeasuredLatency are updated using it.

    LOCKING
    Unlocked access. Modified by the consumer thread and snapshot read by the worker threads
  • workerMeasuredLatency :: IORef NanoSecond64

    Weighted average of worker latencies in previous measurement periods.

    LOCKING
    Unlocked access. Modified by the consumer thread and snapshot read by the worker threads

data WorkerInfo Source #

We measure the individual worker latencies to estimate the number of workers needed or the amount of time we have to sleep between dispatches to achieve a particular rate when controlled pace mode it used.

Constructors

WorkerInfo 

Fields

data ChildEvent a Source #

Events that a child thread may send to a parent thread.

data ThreadAbort Source #

Channel driver throws this exception to all active workers to clean up the channel.

Constructors

ThreadAbort 

newtype Count Source #

Constructors

Count Int64 

Instances

Instances details
Bounded Count Source # 
Instance details

Defined in Streamly.Internal.Data.Channel.Types

Enum Count Source # 
Instance details

Defined in Streamly.Internal.Data.Channel.Types

Num Count Source # 
Instance details

Defined in Streamly.Internal.Data.Channel.Types

Read Count Source # 
Instance details

Defined in Streamly.Internal.Data.Channel.Types

Integral Count Source # 
Instance details

Defined in Streamly.Internal.Data.Channel.Types

Real Count Source # 
Instance details

Defined in Streamly.Internal.Data.Channel.Types

Methods

toRational :: Count -> Rational #

Show Count Source # 
Instance details

Defined in Streamly.Internal.Data.Channel.Types

Methods

showsPrec :: Int -> Count -> ShowS #

show :: Count -> String #

showList :: [Count] -> ShowS #

Eq Count Source # 
Instance details

Defined in Streamly.Internal.Data.Channel.Types

Methods

(==) :: Count -> Count -> Bool #

(/=) :: Count -> Count -> Bool #

Ord Count Source # 
Instance details

Defined in Streamly.Internal.Data.Channel.Types

Methods

compare :: Count -> Count -> Ordering #

(<) :: Count -> Count -> Bool #

(<=) :: Count -> Count -> Bool #

(>) :: Count -> Count -> Bool #

(>=) :: Count -> Count -> Bool #

max :: Count -> Count -> Count #

min :: Count -> Count -> Count #

magicMaxBuffer :: Word Source #

A magical value for the buffer size arrived at by running the smallest possible task and measuring the optimal value of the buffer for that. This is obviously dependent on hardware, this figure is based on a 2.2GHz intel core-i7 processor.

data StopWhen Source #

Specify when the Channel should stop.

Constructors

FirstStops

Stop when the first stream ends.

AllStop

Stop when all the streams end.

AnyStops

Stop when any one stream ends.

readOutputQRaw Source #

Arguments

:: IORef ([ChildEvent a], Int)

Channel output queue

-> Maybe SVarStats

Channel stats

-> IO ([ChildEvent a], Int)

(events, count)

Same as readOutputQBasic but additionally update the max output queue size channel stat if the new size is more than current max.

readOutputQBasic Source #

Arguments

:: IORef ([a], Int)

The channel output queue

-> IO ([a], Int)

(events, count)

Read the output queue of the channel. After reading set it to empty list and 0 count.

ringDoorBell Source #

Arguments

:: IORef Bool

If True only then ring the door bell

-> MVar ()

Door bell, put () to ring

-> IO () 

RingArray door bell. The IORef is read after adding a store-load barrier. If the IORef was set to True it is atomically reset to False.

decrementYieldLimit :: Maybe (IORef Count) -> IO Bool Source #

A worker decrements the yield limit before it executes an action. However, the action may not result in an element being yielded, in that case we have to increment the yield limit.

Note that we need it to be an Int type so that we have the ability to undo a decrement that takes it below zero.

dumpOutputQ :: (Foldable t, Show a1) => IORef (t a2, a1) -> IO String Source #

withDiagMVar :: Bool -> IO String -> String -> IO () -> IO () Source #

MVar diagnostics has some overhead - around 5% on AsyncT null benchmark, we can keep it on in production to debug problems quickly if and when they happen, but it may result in unexpected output when threads are left hanging until they are GCed because the consumer went away.

Worker Dispatcher

Operations used by the consumer of the channel.

minThreadDelay :: NanoSecond64 Source #

This is a magic number and it is overloaded, and used at several places to achieve batching:

  1. If we have to sleep to slowdown this is the minimum period that we accumulate before we sleep. Also, workers do not stop until this much sleep time is accumulated.
  2. Collected latencies are computed and transferred to measured latency after a minimum of this period.

collectLatency Source #

Arguments

:: Bool

stat inspection mode

-> SVarStats

Channel stats

-> YieldRateInfo

Channel rate control info

-> Bool

Force batch collection

-> IO (Count, AbsTime, NanoSecond64)

(channel yield count since beginning, beginning timestamp, workerMeasuredLatency)

Always moves workerPendingLatency to workerCollectedLatency:

Moves workerCollectedLatency to svarAllTimeLatency periodically, when the collected batch size hits a limit, or time limit is over, or latency changes beyond a limit. Updates done when the batch is collected:

See also getWorkerLatency.

allThreadsDone :: MonadIO m => IORef (Set ThreadId) -> m Bool Source #

This is safe even if we are adding more threads concurrently because if a child thread is adding another thread then anyway workerThreads will not be empty.

Channel Workers

Operations used by the workers (producers) of the channel. These operations are thread-safe, these can be called concurrently by workers working in independent Haskell threads, the shared channel data structures are read or updated atomically.

data Work Source #

Describes how to pace the work based on current measurement estimates. If the rate is higher than expected we may have to sleep for some time (BlockWait), or send just one worker with limited yield count (PartialWorker) or send more than one workers with max yield count of each limited to the total maximum target count.

Constructors

BlockWait NanoSecond64

Sleep required before next dispatch

PartialWorker Count

One worker is enough, total yields needed

ManyWorkers Int Count

Worker count, total yields needed overall

Instances

Instances details
Show Work Source # 
Instance details

Defined in Streamly.Internal.Data.Channel.Worker

Methods

showsPrec :: Int -> Work -> ShowS #

show :: Work -> String #

showList :: [Work] -> ShowS #

estimateWorkers Source #

Arguments

:: Limit

Channel's max worker limit

-> Count

Channel's yield count since start

-> Count

svarGainedLostYields

-> NanoSecond64

The up time of the channel

-> NanoSecond64

Current workerMeasuredLatency

-> NanoSecond64

svarLatencyTarget

-> LatencyRange

svarLatencyRange

-> Work 

Estimate how many workers and yield count (Work) is required to maintian the target yield rate of the channel.

This is used by the worker dispatcher to estimate how many workers to dispatch. It is also used periodically by the workers to decide whether to stop or continue working.

isBeyondMaxRate Source #

Arguments

:: Limit

Channel's max worker limit

-> IORef Int

Current worker count

-> YieldRateInfo

Channel's rate control info

-> IO Bool

True if we are exceeding the specified rate

Using the channel worker latency and channel yield count stats from the current measurement interval, estimate how many workers are needed to maintain the target rate and compare that with current number of workers. Returns true if we have have more than required workers.

incrWorkerYieldCount Source #

Arguments

:: Limit

Channel's max worker limit

-> IORef Int

Current worker count

-> YieldRateInfo

Channel's rate control info

-> WorkerInfo

Worker's yield count info

-> IO Bool

True means limits are ok and worker can continue

Update the local yield count of the worker and check if:

  • the channel yield rate is beyond max limit
  • worker's yield count is beyond max limit

sendEvent Source #

Arguments

:: IORef ([a], Int)

Queue where the event is added

-> MVar ()

Door bell to ring

-> a

The event to be added

-> IO Int

Length of the queue before adding this event

Low level API to add an event on the channel's output queue. Atomically adds the event to the queue and rings the doorbell if needed to wakeup the consumer thread.

sendYield Source #

Arguments

:: Limit

Channel's max buffer limit

-> Limit

Channel's max worker limit

-> IORef Int

Current worker count

-> Maybe YieldRateInfo

Channel's rate control info

-> IORef ([ChildEvent a], Int)

Queue where the output is added

-> MVar ()

Door bell to ring

-> Maybe WorkerInfo

Worker's yield count info

-> a

The output to be sent

-> IO Bool

True means worker is allowed to continue working

Add a ChildYield event to the channel's output queue.

This is a wrapper over sendEvent, it does a few more things:

  • performs a buffer limit check, returns False if exceeded

When rate control is enabled and WorkerInfo is supplied::

  • increments the worker yield count
  • periodically pushes the worker latency stats to the channel
  • performs a rate limit check, returns False if exceeded

sendStop Source #

Arguments

:: IORef Int

Channel's current worker count

-> Maybe YieldRateInfo

Channel's rate control info

-> IORef ([ChildEvent a], Int)

Queue where the stop event is added

-> MVar ()

Door bell to ring

-> Maybe WorkerInfo

Worker's yield count info

-> IO () 

Add a ChildStop event to the channel's output queue. When rate control is enabled, it pushes the worker latency stats to the channel.

sendException Source #

Arguments

:: IORef Int

Channel's current worker count

-> IORef ([ChildEvent a], Int)

Queue where the event is added

-> MVar ()

Door bell to ring

-> SomeException

The exception to send

-> IO () 

Add a ChildStop event with exception to the channel's output queue.