-- |
-- Module      : Streamly.Internal.Data.Channel.Types
-- Copyright   : (c) 2017 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
-- A Channel is a place where streams join and new streams start. This module
-- defines low level data structures and functions to build channels. For
-- concrete Channels see the Channel modules of specific stream types.
--
-- A Channel is a conduit to the output from multiple streams running
-- concurrently and asynchronously. A channel can be thought of as an
-- asynchronous IO handle. We can write any number of streams to a channel in a
-- non-blocking manner and then read them back at any time at any pace.  The
-- channel would run the streams asynchronously and accumulate results. A
-- channel may not really execute the stream completely and accumulate all the
-- results. However, it ensures that the reader can read the results at
-- whatever pace it wants to read. The channel monitors and adapts to the
-- consumer's pace.
--
-- A channel is a mini scheduler, it has an associated workLoop that holds the
-- stream tasks to be picked and run by a pool of worker threads. It has an
-- associated output queue where the output stream elements are placed by the
-- worker threads. An outputDoorBell is used by the worker threads to intimate the
-- consumer thread about availability of new results in the output queue. More
-- workers are added to the channel by 'fromChannel' on demand if the output
-- produced is not keeping pace with the consumer. On bounded channels, workers
-- block on the output queue to provide throttling of the producer  when the
-- consumer is not pulling fast enough.  The number of workers may even get
-- reduced depending on the consuming pace.
--
module Streamly.Internal.Data.Channel.Types
    (
    -- ** Types
      Count (..)
    , Limit (..)
    , ThreadAbort (..)
    , ChildEvent (..)

    -- ** Stats
    , SVarStats (..)
    , newSVarStats

    -- ** Rate Control
    , WorkerInfo (..)
    , LatencyRange (..)
    , YieldRateInfo (..)

    -- ** Output queue
    , readOutputQRaw
    , readOutputQBasic
    , ringDoorBell

    -- ** Yield Limit
    , decrementYieldLimit
    , incrementYieldLimit

    -- ** Configuration
    , Rate (..)
    , StopWhen (..)
    , magicMaxBuffer

    -- ** Diagnostics
    , dumpCreator
    , dumpOutputQ
    , dumpDoorBell
    , dumpNeedDoorBell
    , dumpRunningThreads
    , dumpWorkerCount

    , withDiagMVar
    , printSVar
    )
where

import Control.Concurrent (ThreadId, MVar, tryReadMVar)
import Control.Concurrent.MVar (tryPutMVar)
import Control.Exception
    ( SomeException(..), Exception, catches, throwIO, Handler(..)
    , BlockedIndefinitelyOnMVar(..), BlockedIndefinitelyOnSTM(..))
import Control.Monad (void, when)
import Data.Int (Int64)
import Data.IORef (IORef, newIORef, readIORef, writeIORef)
import Streamly.Internal.Data.Atomics
    (atomicModifyIORefCAS, atomicModifyIORefCAS_, storeLoadBarrier)
import Streamly.Internal.Data.Time.Units (AbsTime, NanoSecond64(..))
import System.IO (hPutStrLn, stderr)

------------------------------------------------------------------------------
-- Common types
------------------------------------------------------------------------------

newtype Count = Count Int64
    deriving ( Count -> Count -> Bool
(Count -> Count -> Bool) -> (Count -> Count -> Bool) -> Eq Count
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: Count -> Count -> Bool
== :: Count -> Count -> Bool
$c/= :: Count -> Count -> Bool
/= :: Count -> Count -> Bool
Eq
             , ReadPrec [Count]
ReadPrec Count
Int -> ReadS Count
ReadS [Count]
(Int -> ReadS Count)
-> ReadS [Count]
-> ReadPrec Count
-> ReadPrec [Count]
-> Read Count
forall a.
(Int -> ReadS a)
-> ReadS [a] -> ReadPrec a -> ReadPrec [a] -> Read a
$creadsPrec :: Int -> ReadS Count
readsPrec :: Int -> ReadS Count
$creadList :: ReadS [Count]
readList :: ReadS [Count]
$creadPrec :: ReadPrec Count
readPrec :: ReadPrec Count
$creadListPrec :: ReadPrec [Count]
readListPrec :: ReadPrec [Count]
Read
             , Int -> Count -> ShowS
[Count] -> ShowS
Count -> String
(Int -> Count -> ShowS)
-> (Count -> String) -> ([Count] -> ShowS) -> Show Count
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> Count -> ShowS
showsPrec :: Int -> Count -> ShowS
$cshow :: Count -> String
show :: Count -> String
$cshowList :: [Count] -> ShowS
showList :: [Count] -> ShowS
Show
             , Int -> Count
Count -> Int
Count -> [Count]
Count -> Count
Count -> Count -> [Count]
Count -> Count -> Count -> [Count]
(Count -> Count)
-> (Count -> Count)
-> (Int -> Count)
-> (Count -> Int)
-> (Count -> [Count])
-> (Count -> Count -> [Count])
-> (Count -> Count -> [Count])
-> (Count -> Count -> Count -> [Count])
-> Enum Count
forall a.
(a -> a)
-> (a -> a)
-> (Int -> a)
-> (a -> Int)
-> (a -> [a])
-> (a -> a -> [a])
-> (a -> a -> [a])
-> (a -> a -> a -> [a])
-> Enum a
$csucc :: Count -> Count
succ :: Count -> Count
$cpred :: Count -> Count
pred :: Count -> Count
$ctoEnum :: Int -> Count
toEnum :: Int -> Count
$cfromEnum :: Count -> Int
fromEnum :: Count -> Int
$cenumFrom :: Count -> [Count]
enumFrom :: Count -> [Count]
$cenumFromThen :: Count -> Count -> [Count]
enumFromThen :: Count -> Count -> [Count]
$cenumFromTo :: Count -> Count -> [Count]
enumFromTo :: Count -> Count -> [Count]
$cenumFromThenTo :: Count -> Count -> Count -> [Count]
enumFromThenTo :: Count -> Count -> Count -> [Count]
Enum
             , Count
Count -> Count -> Bounded Count
forall a. a -> a -> Bounded a
$cminBound :: Count
minBound :: Count
$cmaxBound :: Count
maxBound :: Count
Bounded
             , Integer -> Count
Count -> Count
Count -> Count -> Count
(Count -> Count -> Count)
-> (Count -> Count -> Count)
-> (Count -> Count -> Count)
-> (Count -> Count)
-> (Count -> Count)
-> (Count -> Count)
-> (Integer -> Count)
-> Num Count
forall a.
(a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a)
-> (a -> a)
-> (a -> a)
-> (Integer -> a)
-> Num a
$c+ :: Count -> Count -> Count
+ :: Count -> Count -> Count
$c- :: Count -> Count -> Count
- :: Count -> Count -> Count
$c* :: Count -> Count -> Count
* :: Count -> Count -> Count
$cnegate :: Count -> Count
negate :: Count -> Count
$cabs :: Count -> Count
abs :: Count -> Count
$csignum :: Count -> Count
signum :: Count -> Count
$cfromInteger :: Integer -> Count
fromInteger :: Integer -> Count
Num
             , Num Count
Ord Count
(Num Count, Ord Count) => (Count -> Rational) -> Real Count
Count -> Rational
forall a. (Num a, Ord a) => (a -> Rational) -> Real a
$ctoRational :: Count -> Rational
toRational :: Count -> Rational
Real
             , Enum Count
Real Count
(Real Count, Enum Count) =>
(Count -> Count -> Count)
-> (Count -> Count -> Count)
-> (Count -> Count -> Count)
-> (Count -> Count -> Count)
-> (Count -> Count -> (Count, Count))
-> (Count -> Count -> (Count, Count))
-> (Count -> Integer)
-> Integral Count
Count -> Integer
Count -> Count -> (Count, Count)
Count -> Count -> Count
forall a.
(Real a, Enum a) =>
(a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> (a, a))
-> (a -> a -> (a, a))
-> (a -> Integer)
-> Integral a
$cquot :: Count -> Count -> Count
quot :: Count -> Count -> Count
$crem :: Count -> Count -> Count
rem :: Count -> Count -> Count
$cdiv :: Count -> Count -> Count
div :: Count -> Count -> Count
$cmod :: Count -> Count -> Count
mod :: Count -> Count -> Count
$cquotRem :: Count -> Count -> (Count, Count)
quotRem :: Count -> Count -> (Count, Count)
$cdivMod :: Count -> Count -> (Count, Count)
divMod :: Count -> Count -> (Count, Count)
$ctoInteger :: Count -> Integer
toInteger :: Count -> Integer
Integral
             , Eq Count
Eq Count =>
(Count -> Count -> Ordering)
-> (Count -> Count -> Bool)
-> (Count -> Count -> Bool)
-> (Count -> Count -> Bool)
-> (Count -> Count -> Bool)
-> (Count -> Count -> Count)
-> (Count -> Count -> Count)
-> Ord Count
Count -> Count -> Bool
Count -> Count -> Ordering
Count -> Count -> Count
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: Count -> Count -> Ordering
compare :: Count -> Count -> Ordering
$c< :: Count -> Count -> Bool
< :: Count -> Count -> Bool
$c<= :: Count -> Count -> Bool
<= :: Count -> Count -> Bool
$c> :: Count -> Count -> Bool
> :: Count -> Count -> Bool
$c>= :: Count -> Count -> Bool
>= :: Count -> Count -> Bool
$cmax :: Count -> Count -> Count
max :: Count -> Count -> Count
$cmin :: Count -> Count -> Count
min :: Count -> Count -> Count
Ord
             )

-- XXX We can use maxBound for unlimited?

-- This is essentially a 'Maybe Word' type
data Limit = Unlimited | Limited Word deriving Int -> Limit -> ShowS
[Limit] -> ShowS
Limit -> String
(Int -> Limit -> ShowS)
-> (Limit -> String) -> ([Limit] -> ShowS) -> Show Limit
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> Limit -> ShowS
showsPrec :: Int -> Limit -> ShowS
$cshow :: Limit -> String
show :: Limit -> String
$cshowList :: [Limit] -> ShowS
showList :: [Limit] -> ShowS
Show

instance Eq Limit where
    Limit
Unlimited == :: Limit -> Limit -> Bool
== Limit
Unlimited = Bool
True
    Limit
Unlimited == Limited Word
_ = Bool
False
    Limited Word
_ == Limit
Unlimited = Bool
False
    Limited Word
x == Limited Word
y = Word
x Word -> Word -> Bool
forall a. Eq a => a -> a -> Bool
== Word
y

instance Ord Limit where
    Limit
Unlimited <= :: Limit -> Limit -> Bool
<= Limit
Unlimited = Bool
True
    Limit
Unlimited <= Limited Word
_ = Bool
False
    Limited Word
_ <= Limit
Unlimited = Bool
True
    Limited Word
x <= Limited Word
y = Word
x Word -> Word -> Bool
forall a. Ord a => a -> a -> Bool
<= Word
y

------------------------------------------------------------------------------
-- Parent child thread communication type
------------------------------------------------------------------------------

-- | Channel driver throws this exception to all active workers to clean up
-- the channel.
data ThreadAbort = ThreadAbort deriving Int -> ThreadAbort -> ShowS
[ThreadAbort] -> ShowS
ThreadAbort -> String
(Int -> ThreadAbort -> ShowS)
-> (ThreadAbort -> String)
-> ([ThreadAbort] -> ShowS)
-> Show ThreadAbort
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ThreadAbort -> ShowS
showsPrec :: Int -> ThreadAbort -> ShowS
$cshow :: ThreadAbort -> String
show :: ThreadAbort -> String
$cshowList :: [ThreadAbort] -> ShowS
showList :: [ThreadAbort] -> ShowS
Show

instance Exception ThreadAbort

-- XXX Use a ChildSingle event to speed up mapM?
-- | Events that a child thread may send to a parent thread.
data ChildEvent a =
      ChildYield a
    | ChildStopChannel
    | ChildStop ThreadId (Maybe SomeException)

-- | We measure the individual worker latencies to estimate the number of workers
-- needed or the amount of time we have to sleep between dispatches to achieve
-- a particular rate when controlled pace mode it used.
data WorkerInfo = WorkerInfo
    {
    -- | Yields allowed for this worker. 0 means unlimited.
      WorkerInfo -> Count
workerYieldMax   :: Count
    -- | total number of yields by the worker till now
    , WorkerInfo -> IORef Count
workerYieldCount    :: IORef Count
    -- | (yield count at start of collection interval, collection start timestamp)
    , WorkerInfo -> IORef (Count, AbsTime)
workerLatencyStart  :: IORef (Count, AbsTime)
    }

data LatencyRange = LatencyRange
    { LatencyRange -> NanoSecond64
minLatency :: NanoSecond64
    , LatencyRange -> NanoSecond64
maxLatency :: NanoSecond64
    } deriving Int -> LatencyRange -> ShowS
[LatencyRange] -> ShowS
LatencyRange -> String
(Int -> LatencyRange -> ShowS)
-> (LatencyRange -> String)
-> ([LatencyRange] -> ShowS)
-> Show LatencyRange
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> LatencyRange -> ShowS
showsPrec :: Int -> LatencyRange -> ShowS
$cshow :: LatencyRange -> String
show :: LatencyRange -> String
$cshowList :: [LatencyRange] -> ShowS
showList :: [LatencyRange] -> ShowS
Show

-- | Rate control.
data YieldRateInfo = YieldRateInfo
    { YieldRateInfo -> NanoSecond64
svarLatencyTarget    :: NanoSecond64
    , YieldRateInfo -> LatencyRange
svarLatencyRange     :: LatencyRange

    -- | Number of yields beyond which we will not try to recover the rate.
    , YieldRateInfo -> Int
svarRateBuffer :: Int

    -- | Yields that we have permanently gained or lost since the start of the
    -- channel i.e. we do not want to adjust the rate to make up for this
    -- deficit or gain.
    --
    -- [LOCKING] Unlocked access. Modified by the consumer thread and snapshot
    -- read by the worker threads
    , YieldRateInfo -> IORef Count
svarGainedLostYields :: IORef Count

    -- XXX interval latency is enough, we can move this under diagnostics build

    -- | (channel yields from start till now, channel start timestamp) as
    -- recorded by the consumer side of the channel.
    --
    -- [LOCKING] Unlocked access. Modified by the consumer thread, snapshot
    -- read by the worker threads.
    , YieldRateInfo -> IORef (Count, AbsTime)
svarAllTimeLatency :: IORef (Count, AbsTime)

    -- | TODO. Not yet implemented. Worker latency specified by the user to be
    -- used as a guide before the first actual measurement arrives.
    , YieldRateInfo -> Maybe NanoSecond64
workerBootstrapLatency :: Maybe NanoSecond64

    -- XXX If the latency suddenly becomes too high this count may remain too
    -- high for long time, in such cases the consumer can change it. 0 means no
    -- latency computation
    -- XXX this is derivable from workerMeasuredLatency, can be removed.

    -- | After how many yields the worker should update the latency
    -- information. If the 'workerMeasuredLatency' is high, this count is kept
    -- lower and vice-versa.
    --
    -- [LOCKING] Unlocked access. Modified by the consumer thread and snapshot
    -- read by the worker threads
    , YieldRateInfo -> IORef Count
workerPollingInterval :: IORef Count

    -- | (total yields, measured yields, time taken by measured yields).
    -- This is first level collection bucket which is continuously updated by
    -- workers and periodically emptied and collected into
    -- 'workerCollectedLatency' by the consumer thread.
    --
    -- "Measured yields" are only those yields for which the latency was
    -- measured to be non-zero (note that if the timer resolution is low the
    -- measured latency may be zero e.g. on JS platform).
    --
    -- [LOCKING] Locked access. Atomically modified by the consumer thread as
    -- well as worker threads. Workers modify it periodically based on
    -- workerPollingInterval and not on every yield to reduce the locking
    -- overhead.
    , YieldRateInfo -> IORef (Count, Count, NanoSecond64)
workerPendingLatency   :: IORef (Count, Count, NanoSecond64)

    -- | 'workerPendingLatency' is periodically reset and aggregated into this
    -- by the consumer thread. This itself is reset periodically and
    -- 'svarAllTimeLatency', 'workerMeasuredLatency' are updated using it.
    --
    -- [LOCKING] Unlocked access. Modified by the consumer thread and snapshot
    -- read by the worker threads
    , YieldRateInfo -> IORef (Count, Count, NanoSecond64)
workerCollectedLatency :: IORef (Count, Count, NanoSecond64)

    -- | Weighted average of worker latencies in previous measurement periods.
    --
    -- [LOCKING] Unlocked access. Modified by the consumer thread and snapshot
    -- read by the worker threads
    , YieldRateInfo -> IORef NanoSecond64
workerMeasuredLatency :: IORef NanoSecond64
    }

data SVarStats = SVarStats {
      SVarStats -> IORef Int
totalDispatches  :: IORef Int
    , SVarStats -> IORef Int
maxWorkers       :: IORef Int
    , SVarStats -> IORef Int
maxOutQSize      :: IORef Int
    , SVarStats -> IORef Int
maxHeapSize      :: IORef Int
    , SVarStats -> IORef Int
maxWorkQSize     :: IORef Int
    , SVarStats -> IORef (Count, NanoSecond64)
avgWorkerLatency :: IORef (Count, NanoSecond64)
    , SVarStats -> IORef NanoSecond64
minWorkerLatency :: IORef NanoSecond64
    , SVarStats -> IORef NanoSecond64
maxWorkerLatency :: IORef NanoSecond64
    , SVarStats -> IORef (Maybe AbsTime)
svarStopTime     :: IORef (Maybe AbsTime)
}

-------------------------------------------------------------------------------
-- Config
-------------------------------------------------------------------------------

-- | Specifies the stream yield rate in yields per second (@Hertz@).
-- We keep accumulating yield credits at 'rateGoal'. At any point of time we
-- allow only as many yields as we have accumulated as per 'rateGoal' since the
-- start of time. If the consumer or the producer is slower or faster, the
-- actual rate may fall behind or exceed 'rateGoal'.  We try to recover the gap
-- between the two by increasing or decreasing the pull rate from the producer.
-- However, if the yield count gap becomes more than 'rateBuffer' (specified as
-- a yield count) we try to recover only as much as 'rateBuffer'.
--
-- 'rateLow' puts a bound on how low the instantaneous rate can go when
-- recovering the rate gap.  In other words, it determines the maximum yield
-- latency.  Similarly, 'rateHigh' puts a bound on how high the instantaneous
-- rate can go when recovering the rate gap.  In other words, it determines the
-- minimum yield latency. We reduce the latency by increasing concurrency,
-- therefore we can say that it puts an upper bound on concurrency.
--
-- If the 'rateGoal' is 0 or negative the stream never yields a value.
-- If the 'rateBuffer' is 0 or negative we do not attempt to recover.
--
data Rate = Rate
    { Rate -> Double
rateLow    :: Double -- ^ The lower rate limit (yields per sec)
    , Rate -> Double
rateGoal   :: Double -- ^ The target rate we want to achieve
    , Rate -> Double
rateHigh   :: Double -- ^ The upper rate limit
    , Rate -> Int
rateBuffer :: Int    -- ^ Maximum yield count slack from the goal
    }

-- | Specify when the 'Channel' should stop.
data StopWhen =
      FirstStops -- ^ Stop when the first stream ends.
    | AllStop    -- ^ Stop when all the streams end.
    | AnyStops   -- ^ Stop when any one stream ends.

-- | A magical value for the buffer size arrived at by running the smallest
-- possible task and measuring the optimal value of the buffer for that.  This
-- is obviously dependent on hardware, this figure is based on a 2.2GHz intel
-- core-i7 processor.
magicMaxBuffer :: Word
magicMaxBuffer :: Word
magicMaxBuffer = Word
1500

newSVarStats :: IO SVarStats
newSVarStats :: IO SVarStats
newSVarStats = do
    IORef Int
disp   <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
    IORef Int
maxWrk <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
    IORef Int
maxOq  <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
    IORef Int
maxHs  <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
    IORef Int
maxWq  <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
    IORef (Count, NanoSecond64)
avgLat <- (Count, NanoSecond64) -> IO (IORef (Count, NanoSecond64))
forall a. a -> IO (IORef a)
newIORef (Count
0, Int64 -> NanoSecond64
NanoSecond64 Int64
0)
    IORef NanoSecond64
maxLat <- NanoSecond64 -> IO (IORef NanoSecond64)
forall a. a -> IO (IORef a)
newIORef (Int64 -> NanoSecond64
NanoSecond64 Int64
0)
    IORef NanoSecond64
minLat <- NanoSecond64 -> IO (IORef NanoSecond64)
forall a. a -> IO (IORef a)
newIORef (Int64 -> NanoSecond64
NanoSecond64 Int64
0)
    IORef (Maybe AbsTime)
stpTime <- Maybe AbsTime -> IO (IORef (Maybe AbsTime))
forall a. a -> IO (IORef a)
newIORef Maybe AbsTime
forall a. Maybe a
Nothing

    SVarStats -> IO SVarStats
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return SVarStats
        { totalDispatches :: IORef Int
totalDispatches  = IORef Int
disp
        , maxWorkers :: IORef Int
maxWorkers       = IORef Int
maxWrk
        , maxOutQSize :: IORef Int
maxOutQSize      = IORef Int
maxOq
        , maxHeapSize :: IORef Int
maxHeapSize      = IORef Int
maxHs
        , maxWorkQSize :: IORef Int
maxWorkQSize     = IORef Int
maxWq
        , avgWorkerLatency :: IORef (Count, NanoSecond64)
avgWorkerLatency = IORef (Count, NanoSecond64)
avgLat
        , minWorkerLatency :: IORef NanoSecond64
minWorkerLatency = IORef NanoSecond64
minLat
        , maxWorkerLatency :: IORef NanoSecond64
maxWorkerLatency = IORef NanoSecond64
maxLat
        , svarStopTime :: IORef (Maybe AbsTime)
svarStopTime     = IORef (Maybe AbsTime)
stpTime
        }

-------------------------------------------------------------------------------
-- Channel yield count
-------------------------------------------------------------------------------

-- XXX Can we make access to remainingWork and yieldRateInfo fields in sv
-- faster, along with the fields in sv required by send?
-- XXX make it noinline
--
-- XXX we may want to employ an increment and decrement in batches when the
-- througput is high or when the cost of synchronization is high. For example
-- if the application is distributed then inc/dec of a shared variable may be
-- very costly.

-- | A worker decrements the yield limit before it executes an action. However,
-- the action may not result in an element being yielded, in that case we have
-- to increment the yield limit.
--
-- Note that we need it to be an Int type so that we have the ability to undo a
-- decrement that takes it below zero.
{-# INLINE decrementYieldLimit #-}
decrementYieldLimit :: Maybe (IORef Count) -> IO Bool
decrementYieldLimit :: Maybe (IORef Count) -> IO Bool
decrementYieldLimit Maybe (IORef Count)
remaining =
    case Maybe (IORef Count)
remaining of
        Maybe (IORef Count)
Nothing -> Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
        Just IORef Count
ref -> do
            Count
r <- IORef Count -> (Count -> (Count, Count)) -> IO Count
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef Count
ref ((Count -> (Count, Count)) -> IO Count)
-> (Count -> (Count, Count)) -> IO Count
forall a b. (a -> b) -> a -> b
$ \Count
x -> (Count
x Count -> Count -> Count
forall a. Num a => a -> a -> a
- Count
1, Count
x)
            Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Count
r Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
>= Count
1

{-# INLINE incrementYieldLimit #-}
incrementYieldLimit :: Maybe (IORef Count) -> IO ()
incrementYieldLimit :: Maybe (IORef Count) -> IO ()
incrementYieldLimit Maybe (IORef Count)
remaining =
    case Maybe (IORef Count)
remaining of
        Maybe (IORef Count)
Nothing -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        Just IORef Count
ref -> IORef Count -> (Count -> Count) -> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ IORef Count
ref (Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
1)

-------------------------------------------------------------------------------
-- Output queue
-------------------------------------------------------------------------------

-- | Read the output queue of the channel. After reading set it to empty list
-- and 0 count.
{-# INLINE readOutputQBasic #-}
readOutputQBasic ::
       IORef ([a], Int) -- ^ The channel output queue
    -> IO ([a], Int) -- ^ (events, count)
readOutputQBasic :: forall a. IORef ([a], Int) -> IO ([a], Int)
readOutputQBasic IORef ([a], Int)
q = IORef ([a], Int)
-> (([a], Int) -> (([a], Int), ([a], Int))) -> IO ([a], Int)
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef ([a], Int)
q ((([a], Int) -> (([a], Int), ([a], Int))) -> IO ([a], Int))
-> (([a], Int) -> (([a], Int), ([a], Int))) -> IO ([a], Int)
forall a b. (a -> b) -> a -> b
$ \([a], Int)
x -> (([],Int
0), ([a], Int)
x)

-- | Same as 'readOutputQBasic' but additionally update the max output queue
-- size channel stat if the new size is more than current max.
{-# INLINE readOutputQRaw #-}
readOutputQRaw ::
       IORef ([ChildEvent a], Int) -- ^ Channel output queue
    -> Maybe SVarStats -- ^ Channel stats
    -> IO ([ChildEvent a], Int) -- ^ (events, count)
readOutputQRaw :: forall a.
IORef ([ChildEvent a], Int)
-> Maybe SVarStats -> IO ([ChildEvent a], Int)
readOutputQRaw IORef ([ChildEvent a], Int)
q Maybe SVarStats
stats = do
    ([ChildEvent a]
list, Int
len) <- IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a. IORef ([a], Int) -> IO ([a], Int)
readOutputQBasic IORef ([ChildEvent a], Int)
q
    case Maybe SVarStats
stats of
        Just SVarStats
ss -> do
            let ref :: IORef Int
ref = SVarStats -> IORef Int
maxOutQSize SVarStats
ss
            Int
oqLen <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef IORef Int
ref
            Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
len Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
oqLen) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IORef Int -> Int -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Int
ref Int
len
        Maybe SVarStats
Nothing -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ([ChildEvent a]
list, Int
len)

-- | RingArray door bell. The IORef is read after adding a store-load barrier. If
-- the IORef was set to 'True' it is atomically reset to 'False'.
{-# INLINE ringDoorBell #-}
ringDoorBell ::
       IORef Bool -- ^ If 'True' only then ring the door bell
    -> MVar () -- ^ Door bell, put () to ring
    -> IO ()
ringDoorBell :: IORef Bool -> MVar () -> IO ()
ringDoorBell IORef Bool
needBell MVar ()
bell = do
    IO ()
storeLoadBarrier
    Bool
w <- IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef IORef Bool
needBell
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
w (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        -- Note: the sequence of operations is important for correctness here.
        -- We need to set the flag to false strictly before sending the
        -- outputDoorBell, otherwise the outputDoorBell may get processed too
        -- early and then we may set the flag to False to later making the
        -- consumer lose the flag, even without receiving a outputDoorBell.
        IORef Bool -> (Bool -> Bool) -> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ IORef Bool
needBell (Bool -> Bool -> Bool
forall a b. a -> b -> a
const Bool
False)
        IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar MVar ()
bell ()

-------------------------------------------------------------------------------
-- Diagnostics
-------------------------------------------------------------------------------

dumpCreator :: Show a => a -> String
dumpCreator :: forall a. Show a => a -> String
dumpCreator a
tid = String
"Creator tid = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> a -> String
forall a. Show a => a -> String
show a
tid

dumpOutputQ :: (Foldable t, Show a1) => IORef (t a2, a1) -> IO String
dumpOutputQ :: forall (t :: * -> *) a1 a2.
(Foldable t, Show a1) =>
IORef (t a2, a1) -> IO String
dumpOutputQ IORef (t a2, a1)
q = do
    (t a2
oqList, a1
oqLen) <- IORef (t a2, a1) -> IO (t a2, a1)
forall a. IORef a -> IO a
readIORef IORef (t a2, a1)
q
    String -> IO String
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (String -> IO String) -> String -> IO String
forall a b. (a -> b) -> a -> b
$ [String] -> String
unlines
        [ String
"outputQueue length computed  = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show (t a2 -> Int
forall a. t a -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length t a2
oqList)
        , String
"outputQueue length maintained = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> a1 -> String
forall a. Show a => a -> String
show a1
oqLen
        ]

dumpDoorBell :: Show a => MVar a -> IO String
dumpDoorBell :: forall a. Show a => MVar a -> IO String
dumpDoorBell MVar a
mvar =  do
    Maybe a
db <- MVar a -> IO (Maybe a)
forall a. MVar a -> IO (Maybe a)
tryReadMVar MVar a
mvar
    String -> IO String
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (String -> IO String) -> String -> IO String
forall a b. (a -> b) -> a -> b
$ String
"outputDoorBell = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Maybe a -> String
forall a. Show a => a -> String
show Maybe a
db

dumpNeedDoorBell :: Show a => IORef a -> IO String
dumpNeedDoorBell :: forall a. Show a => IORef a -> IO String
dumpNeedDoorBell IORef a
ref = do
    a
waiting <- IORef a -> IO a
forall a. IORef a -> IO a
readIORef IORef a
ref
    String -> IO String
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (String -> IO String) -> String -> IO String
forall a b. (a -> b) -> a -> b
$ String
"needDoorBell = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> a -> String
forall a. Show a => a -> String
show a
waiting

dumpRunningThreads :: Show a => IORef a -> IO String
dumpRunningThreads :: forall a. Show a => IORef a -> IO String
dumpRunningThreads IORef a
ref = do
    a
rthread <- IORef a -> IO a
forall a. IORef a -> IO a
readIORef IORef a
ref
    String -> IO String
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (String -> IO String) -> String -> IO String
forall a b. (a -> b) -> a -> b
$ String
"running threads = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> a -> String
forall a. Show a => a -> String
show a
rthread

dumpWorkerCount :: Show a => IORef a -> IO String
dumpWorkerCount :: forall a. Show a => IORef a -> IO String
dumpWorkerCount IORef a
ref = do
    a
workers <- IORef a -> IO a
forall a. IORef a -> IO a
readIORef IORef a
ref
    String -> IO String
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (String -> IO String) -> String -> IO String
forall a b. (a -> b) -> a -> b
$ String
"running thread count = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> a -> String
forall a. Show a => a -> String
show a
workers

{-# NOINLINE mvarExcHandler #-}
mvarExcHandler :: IO String -> String -> BlockedIndefinitelyOnMVar -> IO ()
mvarExcHandler :: IO String -> String -> BlockedIndefinitelyOnMVar -> IO ()
mvarExcHandler IO String
dump String
label e :: BlockedIndefinitelyOnMVar
e@BlockedIndefinitelyOnMVar
BlockedIndefinitelyOnMVar = do
    String
svInfo <- IO String
dump
    Handle -> String -> IO ()
hPutStrLn Handle
stderr (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
label String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
" " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
"BlockedIndefinitelyOnMVar\n" String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
svInfo
    BlockedIndefinitelyOnMVar -> IO ()
forall e a. Exception e => e -> IO a
throwIO BlockedIndefinitelyOnMVar
e

{-# NOINLINE stmExcHandler #-}
stmExcHandler :: IO String -> String -> BlockedIndefinitelyOnSTM -> IO ()
stmExcHandler :: IO String -> String -> BlockedIndefinitelyOnSTM -> IO ()
stmExcHandler IO String
dump String
label e :: BlockedIndefinitelyOnSTM
e@BlockedIndefinitelyOnSTM
BlockedIndefinitelyOnSTM = do
    String
svInfo <- IO String
dump
    Handle -> String -> IO ()
hPutStrLn Handle
stderr (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
label String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
" " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
"BlockedIndefinitelyOnSTM\n" String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
svInfo
    BlockedIndefinitelyOnSTM -> IO ()
forall e a. Exception e => e -> IO a
throwIO BlockedIndefinitelyOnSTM
e

-- | MVar diagnostics has some overhead - around 5% on AsyncT null benchmark, we
-- can keep it on in production to debug problems quickly if and when they
-- happen, but it may result in unexpected output when threads are left hanging
-- until they are GCed because the consumer went away.
withDiagMVar :: Bool -> IO String -> String -> IO () -> IO ()
withDiagMVar :: Bool -> IO String -> String -> IO () -> IO ()
withDiagMVar Bool
inspecting IO String
dump String
label IO ()
action =
    if Bool
inspecting
    then
        IO ()
action IO () -> [Handler ()] -> IO ()
forall a. IO a -> [Handler a] -> IO a
`catches` [ (BlockedIndefinitelyOnMVar -> IO ()) -> Handler ()
forall a e. Exception e => (e -> IO a) -> Handler a
Handler (IO String -> String -> BlockedIndefinitelyOnMVar -> IO ()
mvarExcHandler IO String
dump String
label)
                         , (BlockedIndefinitelyOnSTM -> IO ()) -> Handler ()
forall a e. Exception e => (e -> IO a) -> Handler a
Handler (IO String -> String -> BlockedIndefinitelyOnSTM -> IO ()
stmExcHandler IO String
dump String
label)
                         ]
    else IO ()
action

printSVar :: IO String -> String -> IO ()
printSVar :: IO String -> String -> IO ()
printSVar IO String
dump String
how = do
    String
svInfo <- IO String
dump
    Handle -> String -> IO ()
hPutStrLn Handle
stderr (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"\n" String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
how String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
"\n" String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
svInfo