Copyright | (c) 2017 Composewell Technologies |
---|---|
License | BSD-3-Clause |
Maintainer | streamly@composewell.com |
Stability | experimental |
Portability | GHC |
Safe Haskell | None |
Language | Haskell2010 |
Streamly.Internal.Data.Channel
Description
This module contains operations that are common for Stream and Fold channels.
Synopsis
- data Limit
- data SVarStats = SVarStats {}
- data Rate = Rate {}
- data YieldRateInfo = YieldRateInfo {
- svarLatencyTarget :: NanoSecond64
- svarLatencyRange :: LatencyRange
- svarRateBuffer :: Int
- svarGainedLostYields :: IORef Count
- svarAllTimeLatency :: IORef (Count, AbsTime)
- workerBootstrapLatency :: Maybe NanoSecond64
- workerPollingInterval :: IORef Count
- workerPendingLatency :: IORef (Count, Count, NanoSecond64)
- workerCollectedLatency :: IORef (Count, Count, NanoSecond64)
- workerMeasuredLatency :: IORef NanoSecond64
- data LatencyRange = LatencyRange {}
- data WorkerInfo = WorkerInfo {}
- data ChildEvent a
- data ThreadAbort = ThreadAbort
- newtype Count = Count Int64
- magicMaxBuffer :: Word
- data StopWhen
- newSVarStats :: IO SVarStats
- readOutputQRaw :: IORef ([ChildEvent a], Int) -> Maybe SVarStats -> IO ([ChildEvent a], Int)
- readOutputQBasic :: IORef ([a], Int) -> IO ([a], Int)
- ringDoorBell :: IORef Bool -> MVar () -> IO ()
- decrementYieldLimit :: Maybe (IORef Count) -> IO Bool
- incrementYieldLimit :: Maybe (IORef Count) -> IO ()
- dumpCreator :: Show a => a -> String
- dumpOutputQ :: (Foldable t, Show a1) => IORef (t a2, a1) -> IO String
- dumpDoorBell :: Show a => MVar a -> IO String
- dumpNeedDoorBell :: Show a => IORef a -> IO String
- dumpRunningThreads :: Show a => IORef a -> IO String
- dumpWorkerCount :: Show a => IORef a -> IO String
- withDiagMVar :: Bool -> IO String -> String -> IO () -> IO ()
- printSVar :: IO String -> String -> IO ()
- minThreadDelay :: NanoSecond64
- collectLatency :: Bool -> SVarStats -> YieldRateInfo -> Bool -> IO (Count, AbsTime, NanoSecond64)
- addThread :: MonadIO m => IORef (Set ThreadId) -> ThreadId -> m ()
- delThread :: MonadIO m => IORef (Set ThreadId) -> ThreadId -> m ()
- modifyThread :: MonadIO m => IORef (Set ThreadId) -> MVar () -> ThreadId -> m ()
- allThreadsDone :: MonadIO m => IORef (Set ThreadId) -> m Bool
- recordMaxWorkers :: MonadIO m => IORef Int -> SVarStats -> m ()
- dumpSVarStats :: Bool -> Maybe YieldRateInfo -> SVarStats -> IO String
- data Work
- estimateWorkers :: Limit -> Count -> Count -> NanoSecond64 -> NanoSecond64 -> NanoSecond64 -> LatencyRange -> Work
- isBeyondMaxRate :: Limit -> IORef Int -> YieldRateInfo -> IO Bool
- incrWorkerYieldCount :: Limit -> IORef Int -> YieldRateInfo -> WorkerInfo -> IO Bool
- sendEvent :: IORef ([a], Int) -> MVar () -> a -> IO Int
- sendYield :: Limit -> Limit -> IORef Int -> Maybe YieldRateInfo -> IORef ([ChildEvent a], Int) -> MVar () -> Maybe WorkerInfo -> a -> IO Bool
- sendStop :: IORef Int -> Maybe YieldRateInfo -> IORef ([ChildEvent a], Int) -> MVar () -> Maybe WorkerInfo -> IO ()
- sendException :: IORef Int -> IORef ([ChildEvent a], Int) -> MVar () -> SomeException -> IO ()
Channel Config & Stats
Constructors
SVarStats | |
Fields
|
Specifies the stream yield rate in yields per second (Hertz
).
We keep accumulating yield credits at rateGoal
. At any point of time we
allow only as many yields as we have accumulated as per rateGoal
since the
start of time. If the consumer or the producer is slower or faster, the
actual rate may fall behind or exceed rateGoal
. We try to recover the gap
between the two by increasing or decreasing the pull rate from the producer.
However, if the yield count gap becomes more than rateBuffer
(specified as
a yield count) we try to recover only as much as rateBuffer
.
rateLow
puts a bound on how low the instantaneous rate can go when
recovering the rate gap. In other words, it determines the maximum yield
latency. Similarly, rateHigh
puts a bound on how high the instantaneous
rate can go when recovering the rate gap. In other words, it determines the
minimum yield latency. We reduce the latency by increasing concurrency,
therefore we can say that it puts an upper bound on concurrency.
If the rateGoal
is 0 or negative the stream never yields a value.
If the rateBuffer
is 0 or negative we do not attempt to recover.
data YieldRateInfo Source #
Rate control.
Constructors
YieldRateInfo | |
Fields
|
data LatencyRange Source #
Constructors
LatencyRange | |
Fields |
Instances
Show LatencyRange Source # | |
Defined in Streamly.Internal.Data.Channel.Types Methods showsPrec :: Int -> LatencyRange -> ShowS # show :: LatencyRange -> String # showList :: [LatencyRange] -> ShowS # |
data WorkerInfo Source #
We measure the individual worker latencies to estimate the number of workers needed or the amount of time we have to sleep between dispatches to achieve a particular rate when controlled pace mode it used.
Constructors
WorkerInfo | |
Fields
|
data ChildEvent a Source #
Events that a child thread may send to a parent thread.
Constructors
ChildYield a | |
ChildStopChannel | |
ChildStop ThreadId (Maybe SomeException) |
data ThreadAbort Source #
Channel driver throws this exception to all active workers to clean up the channel.
Constructors
ThreadAbort |
Instances
Exception ThreadAbort Source # | |
Defined in Streamly.Internal.Data.Channel.Types Methods toException :: ThreadAbort -> SomeException # fromException :: SomeException -> Maybe ThreadAbort # displayException :: ThreadAbort -> String # | |
Show ThreadAbort Source # | |
Defined in Streamly.Internal.Data.Channel.Types Methods showsPrec :: Int -> ThreadAbort -> ShowS # show :: ThreadAbort -> String # showList :: [ThreadAbort] -> ShowS # |
Instances
Bounded Count Source # | |
Enum Count Source # | |
Defined in Streamly.Internal.Data.Channel.Types | |
Num Count Source # | |
Read Count Source # | |
Integral Count Source # | |
Defined in Streamly.Internal.Data.Channel.Types | |
Real Count Source # | |
Defined in Streamly.Internal.Data.Channel.Types Methods toRational :: Count -> Rational # | |
Show Count Source # | |
Eq Count Source # | |
Ord Count Source # | |
magicMaxBuffer :: Word Source #
A magical value for the buffer size arrived at by running the smallest possible task and measuring the optimal value of the buffer for that. This is obviously dependent on hardware, this figure is based on a 2.2GHz intel core-i7 processor.
Specify when the Channel
should stop.
Constructors
FirstStops | Stop when the first stream ends. |
AllStop | Stop when all the streams end. |
AnyStops | Stop when any one stream ends. |
Arguments
:: IORef ([ChildEvent a], Int) | Channel output queue |
-> Maybe SVarStats | Channel stats |
-> IO ([ChildEvent a], Int) | (events, count) |
Same as readOutputQBasic
but additionally update the max output queue
size channel stat if the new size is more than current max.
Read the output queue of the channel. After reading set it to empty list and 0 count.
decrementYieldLimit :: Maybe (IORef Count) -> IO Bool Source #
A worker decrements the yield limit before it executes an action. However, the action may not result in an element being yielded, in that case we have to increment the yield limit.
Note that we need it to be an Int type so that we have the ability to undo a decrement that takes it below zero.
dumpCreator :: Show a => a -> String Source #
withDiagMVar :: Bool -> IO String -> String -> IO () -> IO () Source #
MVar diagnostics has some overhead - around 5% on AsyncT null benchmark, we can keep it on in production to debug problems quickly if and when they happen, but it may result in unexpected output when threads are left hanging until they are GCed because the consumer went away.
Worker Dispatcher
Operations used by the consumer of the channel.
minThreadDelay :: NanoSecond64 Source #
This is a magic number and it is overloaded, and used at several places to achieve batching:
- If we have to sleep to slowdown this is the minimum period that we accumulate before we sleep. Also, workers do not stop until this much sleep time is accumulated.
- Collected latencies are computed and transferred to measured latency after a minimum of this period.
Arguments
:: Bool | stat inspection mode |
-> SVarStats | Channel stats |
-> YieldRateInfo | Channel rate control info |
-> Bool | Force batch collection |
-> IO (Count, AbsTime, NanoSecond64) | (channel yield count since beginning, beginning timestamp, |
Always moves workerPendingLatency
to workerCollectedLatency
:
workerCollectedLatency
always incremented byworkerPendingLatency
workerPendingLatency
always reset to 0
Moves workerCollectedLatency
to svarAllTimeLatency
periodically, when
the collected batch size hits a limit, or time limit is over, or latency
changes beyond a limit. Updates done when the batch is collected:
svarAllTimeLatency
yield count updatedworkerMeasuredLatency
set to (new+prev)/2workerPollingInterval
set using max of new/prev worker latencyworkerCollectedLatency
reset to 0
See also getWorkerLatency
.
allThreadsDone :: MonadIO m => IORef (Set ThreadId) -> m Bool Source #
This is safe even if we are adding more threads concurrently because if
a child thread is adding another thread then anyway workerThreads
will
not be empty.
dumpSVarStats :: Bool -> Maybe YieldRateInfo -> SVarStats -> IO String Source #
Channel Workers
Operations used by the workers (producers) of the channel. These operations are thread-safe, these can be called concurrently by workers working in independent Haskell threads, the shared channel data structures are read or updated atomically.
Describes how to pace the work based on current measurement estimates. If
the rate is higher than expected we may have to sleep for some time
(BlockWait
), or send just one worker with limited yield count
(PartialWorker
) or send more than one workers with max yield count of each
limited to the total maximum target count.
Constructors
BlockWait NanoSecond64 | Sleep required before next dispatch |
PartialWorker Count | One worker is enough, total yields needed |
ManyWorkers Int Count | Worker count, total yields needed overall |
Arguments
:: Limit | Channel's max worker limit |
-> Count | Channel's yield count since start |
-> Count | |
-> NanoSecond64 | The up time of the channel |
-> NanoSecond64 | Current |
-> NanoSecond64 | |
-> LatencyRange | |
-> Work |
Estimate how many workers and yield count (Work
) is required to maintian
the target yield rate of the channel.
This is used by the worker dispatcher to estimate how many workers to dispatch. It is also used periodically by the workers to decide whether to stop or continue working.
Arguments
:: Limit | Channel's max worker limit |
-> IORef Int | Current worker count |
-> YieldRateInfo | Channel's rate control info |
-> IO Bool | True if we are exceeding the specified rate |
Using the channel worker latency and channel yield count stats from the current measurement interval, estimate how many workers are needed to maintain the target rate and compare that with current number of workers. Returns true if we have have more than required workers.
Arguments
:: Limit | Channel's max worker limit |
-> IORef Int | Current worker count |
-> YieldRateInfo | Channel's rate control info |
-> WorkerInfo | Worker's yield count info |
-> IO Bool | True means limits are ok and worker can continue |
Update the local yield count of the worker and check if:
- the channel yield rate is beyond max limit
- worker's yield count is beyond max limit
Arguments
:: IORef ([a], Int) | Queue where the event is added |
-> MVar () | Door bell to ring |
-> a | The event to be added |
-> IO Int | Length of the queue before adding this event |
Low level API to add an event on the channel's output queue. Atomically adds the event to the queue and rings the doorbell if needed to wakeup the consumer thread.
Arguments
:: Limit | Channel's max buffer limit |
-> Limit | Channel's max worker limit |
-> IORef Int | Current worker count |
-> Maybe YieldRateInfo | Channel's rate control info |
-> IORef ([ChildEvent a], Int) | Queue where the output is added |
-> MVar () | Door bell to ring |
-> Maybe WorkerInfo | Worker's yield count info |
-> a | The output to be sent |
-> IO Bool | True means worker is allowed to continue working |
Add a ChildYield
event to the channel's output queue.
This is a wrapper over sendEvent
, it does a few more things:
- performs a buffer limit check, returns False if exceeded
When rate control is enabled and WorkerInfo
is supplied::
- increments the worker yield count
- periodically pushes the worker latency stats to the channel
- performs a rate limit check, returns False if exceeded
Arguments
:: IORef Int | Channel's current worker count |
-> Maybe YieldRateInfo | Channel's rate control info |
-> IORef ([ChildEvent a], Int) | Queue where the stop event is added |
-> MVar () | Door bell to ring |
-> Maybe WorkerInfo | Worker's yield count info |
-> IO () |
Add a ChildStop
event to the channel's output queue. When rate control
is enabled, it pushes the worker latency stats to the channel.
Arguments
:: IORef Int | Channel's current worker count |
-> IORef ([ChildEvent a], Int) | Queue where the event is added |
-> MVar () | Door bell to ring |
-> SomeException | The exception to send |
-> IO () |
Add a ChildStop
event with exception to the channel's output queue.