-- |
-- Module      : Streamly.Internal.Data.Stream.Channel.Type
-- Copyright   : (c) 2017 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC

module Streamly.Internal.Data.Stream.Channel.Type
    (
    -- ** Type
      Channel(..)

    -- ** Configuration
    , Config

    -- *** Default config
    , defaultConfig

    -- *** Limits
    , maxThreads
    , maxBuffer
    , maxYields

    -- *** Rate Control
    , Rate(..)
    , newRateInfo
    , rate
    , avgRate
    , minRate
    , maxRate
    , constRate

    -- *** Stop behavior
    , StopWhen (..)
    , stopWhen

    -- *** Scheduling behavior
    , eager
    , ordered
    , interleaved
    , boundThreads

    -- *** Diagnostics
    , inspect

    -- *** Resource management
    , useAcquire
    , clearAcquire

    -- *** Get config
    , getMaxBuffer
    , getMaxThreads
    , getYieldLimit
    , getInspectMode
    , getStreamRate
    , getEagerDispatch
    , getOrdered
    , getStopWhen
    , getInterleaved
    , getCleanup

    -- ** Sending Worker Events
    , yieldWith
    , stopWith
    , exceptionWith
    , shutdown

    -- ** Cleanup
    , channelDone
    , cleanupChan

    -- ** Diagnostics
    , dumpChannel
    )
where

import Control.Concurrent (ThreadId, throwTo, takeMVar, putMVar)
import Control.Concurrent.MVar (MVar)
import Control.Exception (SomeException(..))
import Control.Monad (void, when)
import Control.Monad.IO.Class (MonadIO(..))
import Data.Int (Int64)
import Data.IORef (IORef, newIORef, readIORef, atomicWriteIORef, writeIORef)
import Data.List (intersperse)
import Data.Set (Set)
import Streamly.Internal.Control.Concurrent (RunInIO)
import Streamly.Internal.Data.Atomics (atomicModifyIORefCAS)
import Streamly.Internal.Data.Channel.Dispatcher (dumpSVarStats)
import Streamly.Internal.Data.Channel.Worker
    (sendYield, sendStop, sendEvent, sendException)
import Streamly.Internal.Data.StreamK (StreamK)
import Streamly.Internal.Control.Exception
    (AcquireIO(..), Priority(..), registerWith)
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)
import Streamly.Internal.Data.Time.Units (NanoSecond64(..))
import System.Mem (performMajorGC)

import qualified Data.Set as Set

import Streamly.Internal.Data.Channel.Types

-- IMPORTANT NOTE: we cannot update the SVar after generating it as we have
-- references to the original SVar stored in several functions which will keep
-- pointing to the original data and the new updates won't reflect there.
-- Any updateable parts must be kept in mutable references (IORef).

-- XXX Since we have stream specific channels now, we can remove functions like
-- enqueue, readOuputQ, postProcess, workLoop etc from this.

-- XXX Add an option in channel for minthreads.
-- dispatch tail worker from the worker itself up to min threads or based on
-- pace data. min threads can be increased dynamically by the event loop.
-- for eager minthreads = maxthreads

-- | A mutable channel to evaluate multiple streams concurrently and provide
-- the combined results as output stream.
--
-- There are only two actors working on the channel data structure, the event
-- processing loop (single thread), and the workers (multiple threads). Locking
-- notes are provided below for concurrent access.
data Channel m a = Channel
    {
     -- XXX Do we need this? We store the runner in the work q, is that enough?
     -- This seems to be used only by the 'ordered' stream as of now.

     -- | Runner for the monadic actions in the stream. Captures the monad
     -- state at the point where the channel was created and uses the same
     -- state to run all actions.
      forall (m :: * -> *) a. Channel m a -> RunInIO m
svarMrun :: RunInIO m

    ---------------------------------------------------------------------------
    -- Output queue related
    ---------------------------------------------------------------------------

    -- | Maximum size of the 'outputQueue'. The actual worst case buffer could
    -- be double of this as the event loop may read the queue and the workers
    -- may fill it up even before the event loop has started consuming.
    , forall (m :: * -> *) a. Channel m a -> Limit
maxBufferLimit :: Limit

    -- XXX For better efficiency we can try a preallocated array type (perhaps
    -- something like a vector) that allows an O(1) append. That way we will
    -- avoid constructing and reversing the list. Possibly we can also avoid
    -- the GC copying overhead. When the size increases we should be able to
    -- allocate the array in chunks.
    --
    -- XXX We can use a per-CPU data structure to reduce the locking overhead.
    -- However, a per-cpu structure cannot guarantee the exact sequence in
    -- which the elements were added, though that may not be important.
    --
    -- XXX We can send a bundle of events of one type coaleseced together in an
    -- unboxed structure.

    -- | (events, count): worker event queue of the channel. This is where the
    -- workers queue the results and other events.
    --
    -- [LOCKING] Frequently locked. This is locked and updated by workers on
    -- each yield, and locked, updated by the event loop thread once in a while
    -- for reading. Workers' locking contention may be high if there are a
    -- large number of workers.
    , forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue :: IORef ([ChildEvent a], Int)

    -- | Door bell for workers to wakeup the event loop.
    --
    -- [LOCKING] Infrequently locked. Used only when the 'outputQueue'
    -- transitions from empty to non-empty, or a work item is queued by a
    -- worker to the work queue and 'doorBellOnWorkQ' is set by the event loop.
    --
    -- We also use this for workerCount decrement, we wait on this during
    -- cleanup. So any workerCount decrement must send a doorBell.
    , forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell :: MVar ()

    -- XXX Can we use IO instead of m here?

    -- | Function to read the output queue of the channel, depends on the rate
    -- control option.
    , forall (m :: * -> *) a. Channel m a -> m [ChildEvent a]
readOutputQ :: m [ChildEvent a]

    -- | Function to invoke after all the events read in a batch are processed
    -- i.e. before we go on to read the next batch, depends on the rate control
    -- option.
    , forall (m :: * -> *) a. Channel m a -> m Bool
postProcess :: m Bool

    ---------------------------------------------------------------------------
    -- Work and rate control
    ---------------------------------------------------------------------------

    -- | Tracks how many yields are remaining before the channel stops, used
    -- when 'maxYields' option is enabled.
    --
    -- [LOCKING] Read only access by event loop when dispatching a worker.
    -- Decremented by workers when picking work and undo decrement if the
    -- worker does not yield a value.
    , forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork :: Maybe (IORef Count)

    -- XXX We make this isChannelDone which should not include isQueueDone.
    --
    -- | Determine if there is no more work to do. When 'maxYields' is set for
    -- the channel we may be done even if the work queue still has work.
    , forall (m :: * -> *) a. Channel m a -> IO Bool
isWorkDone :: IO Bool

    -- | Rate control information for the channel used when 'rate' control is
    -- enabled,
    , forall (m :: * -> *) a. Channel m a -> Maybe YieldRateInfo
yieldRateInfo :: Maybe YieldRateInfo

    ---------------------------------------------------------------------------
    -- Work queue related
    ---------------------------------------------------------------------------

    -- | When set to True, ring 'outputDoorBell' when a work item is queued on
    -- the work queue. This is set by the dispatcher before going to sleep. It
    -- wants to be woken up whenever the work queue got more work to do so that
    -- it can dispatch a worker.
    , forall (m :: * -> *) a. Channel m a -> IORef Bool
doorBellOnWorkQ :: IORef Bool

    -- XXX instead of this we should use a dispatcher setting.

    -- | This is a hook which is invoked whenever the tail of the stream is
    -- re-enqueued on the work queue. Normally, this is set to a noop. When
    -- 'eager' option is enabled this is set to an unconditional worker
    -- dispatch function. This ensures that we eagerly send a worker as long
    -- as there is work to do.
    --
    -- NOTE that this is called from a worker context, therefore we should
    -- consider appropriate locking semantics.
    , forall (m :: * -> *) a. Channel m a -> m ()
eagerDispatch :: m ()

    -- | Enqueue a stream for evaluation on the channel. The first element of
    -- the tuple is the runner function which is used to run the stream actions
    -- in a specific monadic context.
    , forall (m :: * -> *) a.
Channel m a -> (RunInIO m, StreamK m a) -> IO ()
enqueue :: (RunInIO m, StreamK m a) -> IO ()

    -- | Determine if the work queue is empty, therefore, there is no more work
    -- to do.
    , forall (m :: * -> *) a. Channel m a -> IO Bool
isQueueDone :: IO Bool

    -- | Worker function. It is implicitly aware of the work queue. It dequeues
    -- a work item from the queue and runs it. It keeps on doing this in a loop
    -- until it determines that it needs to stop.
    --
    -- Normally, the worker stops when the work queue becomes empty or the work
    -- rate is higher than the target rate when rate control is enabled. It
    -- stops by sending a 'ChildStop' event to the channel
    --
    -- When rate control is enabled, the worker is dispatched with a
    -- 'WorkerInfo' record which is used by the worker to maintain rate control
    -- information and communicate it to the channel.
    , forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> m ()
workLoop :: Maybe WorkerInfo -> m ()

    , forall (m :: * -> *) a. Channel m a -> IORef Bool
channelStopping :: IORef Bool
    , forall (m :: * -> *) a. Channel m a -> MVar Bool
channelStopped :: MVar Bool

    ---------------------------------------------------------------------------
    -- Worker thread accounting
    ---------------------------------------------------------------------------
    --
    -- | This is capped to 'maxBufferLimit' if set to more than that. Otherwise
    -- potentially each worker may yield one value to the buffer in the worst
    -- case exceeding the requested buffer size.
    , forall (m :: * -> *) a. Channel m a -> Limit
maxWorkerLimit :: Limit

    -- | Tracks all active worker threads. An entry is added by the dispatcher
    -- when a worker is dispatched, and removed whenever the event processing
    -- loop receives a 'ChildStop' event.
    --
    -- [LOCKING] Normally, this is updated only by the event loop thread, but
    -- in case of eager dispatch (done in worker context) it is updated by a
    -- worker. So reads from the event loop should be mindful of that.
    -- Updates to this must be async signal safe because we rely on it for
    -- cleanup and cleanup may leave unfinished threads if a thread is forked
    -- but this is not updated.
    , forall (m :: * -> *) a. Channel m a -> IORef (Set ThreadId)
workerThreads :: IORef (Set ThreadId)

    -- | Total number of active worker threads.
    --
    -- [LOCKING] Updated locked, by the event loop thread when dispatching a
    -- worker and by a worker thread when the thread stops. This is read
    -- without lock at several places where we want to rely on an approximate
    -- value. Updates to this must be async signal safe because we rely on it
    -- for cleanup and cleanup may hang and leave unfinished threads if this is
    -- not correct.
    , forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount :: IORef Int

    -- XXX Can we use IO instead of m here?
    , forall (m :: * -> *) a. Channel m a -> ThreadId -> m ()
accountThread  :: ThreadId -> m ()

    -- | Used when 'ordered' is enabled. This is a lock to stop the workers one
    -- at a time. Stopping one might affect whether the other should stop.
    , forall (m :: * -> *) a. Channel m a -> MVar ()
workerStopMVar :: MVar ()

    ---------------------------------------------------------------------------
    -- Channel cleanup --
    ---------------------------------------------------------------------------
    -- | A weak IORef to call a cleanup function when the channel is garbage
    -- collected.
    , forall (m :: * -> *) a. Channel m a -> Maybe (IORef ())
svarRef :: Maybe (IORef ())

    ---------------------------------------------------------------------------
    -- Channel Stats --
    ---------------------------------------------------------------------------
    -- | Stats collection.
    , forall (m :: * -> *) a. Channel m a -> SVarStats
svarStats :: SVarStats

    ---------------------------------------------------------------------------
    -- Diagnostics --
    ---------------------------------------------------------------------------
    -- | When 'inspect' mode is enabled we report diagnostic data about the
    -- channel at certain points.
    , forall (m :: * -> *) a. Channel m a -> Bool
svarInspectMode :: Bool
    -- | threadId of the thread that created the channel
    , forall (m :: * -> *) a. Channel m a -> ThreadId
svarCreator :: ThreadId

    -- XXX Add a map of dispatcher thread to worker thread.
    }

-------------------------------------------------------------------------------
-- Channel Config
-------------------------------------------------------------------------------

-- XXX we can put the resettable fields in a oneShotConfig field and others in
-- a persistentConfig field. That way reset would be fast and scalable
-- irrespective of the number of fields.
--
-- XXX make all these Limited types and use phantom types to distinguish them

-- | An abstract type for specifying the configuration parameters of a
-- 'Channel'. Use @Config -> Config@ modifier functions to modify the default
-- configuration. See the individual modifier documentation for default values.
--
data Config = Config
    { -- one shot configuration, automatically reset for each API call
      -- streamVar   :: Maybe (SVar t m a)
      Config -> Maybe Count
_yieldLimit  :: Maybe Count

    -- persistent configuration, state that remains valid until changed by
    -- an explicit setting via a combinator.
    , Config -> Limit
_threadsHigh    :: Limit
    , Config -> Limit
_bufferHigh     :: Limit

    -- XXX these two can be collapsed into a single type
    , Config -> Maybe NanoSecond64
_streamLatency  :: Maybe NanoSecond64 -- bootstrap latency
    , Config -> Maybe Rate
_maxStreamRate  :: Maybe Rate
    , Config -> Bool
_inspect    :: Bool
    , Config -> Bool
_eagerDispatch  :: Bool
    , Config -> StopWhen
_stopWhen :: StopWhen
    , Config -> Bool
_ordered :: Bool
    , Config -> Bool
_interleaved :: Bool
    , Config -> Bool
_bound :: Bool

    -- XXX We can also use resource-t to release the channel. But that will
    -- require a MonadResource constraint. It is a bigger change, we can plan
    -- in future. With MonadResource, runResourceT will have to be called to
    -- create a scope. Here we have an option to use prompt release or GC
    -- release, but there are chances of missing a prompt release when the
    -- option is provided to the programmer instead of always enforcing it.
    --
    -- We could store Channel m a, here instead of a deallocation function, if
    -- we make the Config type as "Config m a". That way we can also share
    -- channels across multiple computations.
    , Config -> Maybe (IO () -> IO ())
_release :: Maybe (IO () -> IO ())
    }

-------------------------------------------------------------------------------
-- State defaults and reset
-------------------------------------------------------------------------------

defaultMaxThreads, defaultMaxBuffer :: Limit
defaultMaxThreads :: Limit
defaultMaxThreads = Word -> Limit
Limited Word
magicMaxBuffer
defaultMaxBuffer :: Limit
defaultMaxBuffer = Word -> Limit
Limited Word
magicMaxBuffer

-- | The fields prefixed by an _ are not to be accessed or updated directly but
-- via smart accessor APIs. Use get/set routines instead of directly accessing
-- the Config fields
defaultConfig :: Config
defaultConfig :: Config
defaultConfig = Config
    { -- streamVar = Nothing
      _yieldLimit :: Maybe Count
_yieldLimit = Maybe Count
forall a. Maybe a
Nothing
    , _threadsHigh :: Limit
_threadsHigh = Limit
defaultMaxThreads
    , _bufferHigh :: Limit
_bufferHigh = Limit
defaultMaxBuffer
    , _maxStreamRate :: Maybe Rate
_maxStreamRate = Maybe Rate
forall a. Maybe a
Nothing
    , _streamLatency :: Maybe NanoSecond64
_streamLatency = Maybe NanoSecond64
forall a. Maybe a
Nothing
    , _inspect :: Bool
_inspect = Bool
False
    -- XXX Set it to True when Rate is not set?
    , _eagerDispatch :: Bool
_eagerDispatch = Bool
False
    , _stopWhen :: StopWhen
_stopWhen = StopWhen
AllStop
    , _ordered :: Bool
_ordered = Bool
False
    , _interleaved :: Bool
_interleaved = Bool
False
    , _bound :: Bool
_bound = Bool
False
    , _release :: Maybe (IO () -> IO ())
_release = Maybe (IO () -> IO ())
forall a. Maybe a
Nothing
    }

-------------------------------------------------------------------------------
-- Smart get/set routines for State
-------------------------------------------------------------------------------

-- | The maximum number of yields that this channel would produce. The Channel
-- automatically stops after that. This could be used to limit the speculative
-- execution beyond the limit.
--
-- 'Nothing' means there is no limit.
--
-- Keep in mind that checking this limit all the time has a performance
-- overhead.
--
-- Known Bugs: currently this works only when rate is specified.
-- Known Bugs: for ordered streams sometimes the actual count is less than
-- expected.
maxYields :: Maybe Int64 -> Config -> Config
maxYields :: Maybe Int64 -> Config -> Config
maxYields Maybe Int64
lim Config
st =
    Config
st { _yieldLimit =
            case lim of
                Maybe Int64
Nothing -> Maybe Count
forall a. Maybe a
Nothing
                Just Int64
n  ->
                    if Int64
n Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
<= Int64
0
                    then Count -> Maybe Count
forall a. a -> Maybe a
Just Count
0
                    else Count -> Maybe Count
forall a. a -> Maybe a
Just (Int64 -> Count
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int64
n)
       }

getYieldLimit :: Config -> Maybe Count
getYieldLimit :: Config -> Maybe Count
getYieldLimit = Config -> Maybe Count
_yieldLimit

-- | Specify the maximum number of threads that can be spawned by the channel.
-- A value of 0 resets the thread limit to default, a negative value means
-- there is no limit. The default value is 1500.
--
-- When the actions in a stream are IO bound, having blocking IO calls, this
-- option can be used to control the maximum number of in-flight IO requests.
-- When the actions are CPU bound this option can be used to control the amount
-- of CPU used by the stream.
--
maxThreads :: Int -> Config -> Config
maxThreads :: Int -> Config -> Config
maxThreads Int
n Config
st =
    Config
st { _threadsHigh =
            if n < 0
            then Unlimited
            else if n == 0
                 then defaultMaxThreads
                 else Limited (fromIntegral n)
       }

getMaxThreads :: Config -> Limit
getMaxThreads :: Config -> Limit
getMaxThreads = Config -> Limit
_threadsHigh

-- | Specify the maximum size of the buffer for storing the results from
-- concurrent computations. If the buffer becomes full we stop spawning more
-- concurrent tasks until there is space in the buffer.
-- A value of 0 resets the buffer size to default, a negative value means
-- there is no limit. The default value is 1500.
--
-- CAUTION! using an unbounded 'maxBuffer' value (i.e. a negative value)
-- coupled with an unbounded 'maxThreads' value is a recipe for disaster in
-- presence of infinite streams, or very large streams.  Especially, it must
-- not be used when 'pure' is used in 'ZipAsyncM' streams as 'pure' in
-- applicative zip streams generates an infinite stream causing unbounded
-- concurrent generation with no limit on the buffer or threads.
--
maxBuffer :: Int -> Config -> Config
maxBuffer :: Int -> Config -> Config
maxBuffer Int
n Config
st =
    Config
st { _bufferHigh =
            if n < 0
            then Unlimited
            else if n == 0
                 then defaultMaxBuffer
                 else Limited (fromIntegral n)
       }

getMaxBuffer :: Config -> Limit
getMaxBuffer :: Config -> Limit
getMaxBuffer = Config -> Limit
_bufferHigh

-- | Specify the stream evaluation rate of a channel.
--
-- A 'Nothing' value means there is no smart rate control, concurrent execution
-- blocks only if 'maxThreads' or 'maxBuffer' is reached, or there are no more
-- concurrent tasks to execute. This is the default.
--
-- When rate (throughput) is specified, concurrent production may be ramped
-- up or down automatically to achieve the specified stream throughput. The
-- specific behavior for different styles of 'Rate' specifications is
-- documented under 'Rate'.  The effective maximum production rate achieved by
-- a channel is governed by:
--
-- * The 'maxThreads' limit
-- * The 'maxBuffer' limit
-- * The maximum rate that the stream producer can achieve
-- * The maximum rate that the stream consumer can achieve
--
-- Maximum production rate is given by:
--
-- \(rate = \frac{maxThreads}{latency}\)
--
-- If we know the average latency of the tasks we can set 'maxThreads'
-- accordingly.
--
rate :: Maybe Rate -> Config -> Config
rate :: Maybe Rate -> Config -> Config
rate Maybe Rate
r Config
st = Config
st { _maxStreamRate = r }

getStreamRate :: Config -> Maybe Rate
getStreamRate :: Config -> Maybe Rate
getStreamRate = Config -> Maybe Rate
_maxStreamRate

_setStreamLatency :: Int -> Config -> Config
_setStreamLatency :: Int -> Config -> Config
_setStreamLatency Int
n Config
st =
    Config
st { _streamLatency =
            if n <= 0
            then Nothing
            else Just (fromIntegral n)
       }

getStreamLatency :: Config -> Maybe NanoSecond64
getStreamLatency :: Config -> Maybe NanoSecond64
getStreamLatency = Config -> Maybe NanoSecond64
_streamLatency

-- XXX Rename to "inspect"

-- | Print debug information about the 'Channel' when the stream ends. When the
-- stream does not end normally, the channel debug information is printed when
-- the channel is garbage collected. If you are expecting but not seeing the
-- debug info try adding a 'performMajorGC' before the program ends.
--
inspect :: Bool -> Config -> Config
inspect :: Bool -> Config -> Config
inspect Bool
flag Config
st = Config
st { _inspect = flag }

getInspectMode :: Config -> Bool
getInspectMode :: Config -> Bool
getInspectMode = Config -> Bool
_inspect

-- | By default, processing of output from the worker threads is given priority
-- over dispatching new workers. More workers are dispatched only when there is
-- no output to process. When 'eager' is set to 'True', workers are dispatched
-- aggresively as long as there is more work to do irrespective of whether
-- there is output pending to be processed by the stream consumer. However,
-- dispatching may stop if 'maxThreads' or 'maxBuffer' is reached.
--
-- /Note:/ This option has no effect when rate has been specified.
--
-- /Note:/ Not supported with 'interleaved'.
--
eager :: Bool -> Config -> Config
eager :: Bool -> Config -> Config
eager Bool
flag Config
st = Config
st { _eagerDispatch = flag }

getEagerDispatch :: Config -> Bool
getEagerDispatch :: Config -> Bool
getEagerDispatch = Config -> Bool
_eagerDispatch

-- | Specify when the 'Channel' should stop.
stopWhen :: StopWhen -> Config -> Config
stopWhen :: StopWhen -> Config -> Config
stopWhen StopWhen
cond Config
st = Config
st { _stopWhen = cond }

getStopWhen :: Config -> StopWhen
getStopWhen :: Config -> StopWhen
getStopWhen = Config -> StopWhen
_stopWhen

-- | When enabled the streams may be evaluated cocnurrently but the results are
-- produced in the same sequence as a serial evaluation would produce.
--
-- /Note:/ Not supported with 'interleaved'.
--
ordered :: Bool -> Config -> Config
ordered :: Bool -> Config -> Config
ordered Bool
flag Config
st = Config
st { _ordered = flag }

getOrdered :: Config -> Bool
getOrdered :: Config -> Bool
getOrdered = Config -> Bool
_ordered

-- | Interleave the streams fairly instead of prioritizing the left stream.
-- This schedules all streams in a round robin fashion over limited number of
-- threads.
--
-- /Note:/ Can only be used on finite number of streams.
--
-- /Note:/ Not supported with 'ordered'.
--
interleaved :: Bool -> Config -> Config
interleaved :: Bool -> Config -> Config
interleaved Bool
flag Config
st = Config
st { _interleaved = flag }

getInterleaved :: Config -> Bool
getInterleaved :: Config -> Bool
getInterleaved = Config -> Bool
_interleaved

-- | Spawn bound threads (i.e., spawn threads using 'forkOS' instead of
-- 'forkIO'). The default value is 'False'.
--
-- /Unimplemented/
boundThreads :: Bool -> Config -> Config
boundThreads :: Bool -> Config -> Config
boundThreads Bool
flag Config
st = Config
st { _bound = flag }

_getBound :: Config -> Bool
_getBound :: Config -> Bool
_getBound = Config -> Bool
_bound

-- | A concurrent stream allocates worker threads to evaluates actions in the
-- stream concurrently. When an exception (sync or async) occurs in the code
-- outside the scope of the stream generation code, these workers need to be
-- stopped promptly. To enable that we can use an 'AcquireIO' bracket from the
-- surrounding scope. When 'AcquireIO' scope ends the channel is automatically
-- cleaned up.
--
-- Here is an example:
--
-- >>> import Control.Monad (when)
-- >>> import Control.Concurrent (threadDelay)
-- >>> import Data.Function ((&))
-- >>> import System.IO (hClose, IOMode(..), openFile)
--
-- >>> import qualified Streamly.Data.Fold as Fold
-- >>> import qualified Streamly.Data.Stream.Prelude as Stream
-- >>> import qualified Streamly.Control.Exception as Exception
--
-- >>> :{
-- close x h = do
--  putStrLn $ "closing: " ++ x
--  hClose h
-- :}
--
-- >>> :{
-- action ref =
--      Stream.fromList ["file1", "file2"]
--    & Stream.parMapM (Stream.useAcquire ref)
--        (\x -> do
--            (h, release) <- Exception.acquire ref (openFile x ReadMode) (close x)
--            -- use h here
--            threadDelay 1000000
--            when (x == "file1") $ do
--                putStrLn $ "Manually releasing: " ++ x
--                release
--            return x
--        )
--    & Stream.trace print
--    & Stream.fold Fold.drain
-- :}
--
-- >>> run = Exception.withAcquireIO action
--
useAcquire :: AcquireIO -> Config -> Config
useAcquire :: AcquireIO -> Config -> Config
useAcquire AcquireIO
f Config
cfg = Config
cfg { _release = Just (registerWith Priority1 f) }

-- | Clear the resource release registration function.
clearAcquire :: Config -> Config
clearAcquire :: Config -> Config
clearAcquire Config
cfg = Config
cfg { _release = Nothing }

getCleanup :: Config -> Maybe (IO () -> IO ())
getCleanup :: Config -> Maybe (IO () -> IO ())
getCleanup = Config -> Maybe (IO () -> IO ())
_release

-------------------------------------------------------------------------------
-- Initialization
-------------------------------------------------------------------------------

newRateInfo :: Config -> IO (Maybe YieldRateInfo)
newRateInfo :: Config -> IO (Maybe YieldRateInfo)
newRateInfo Config
st = do
    -- convert rate in Hertz to latency in Nanoseconds
    let rateToLatency :: a -> a
rateToLatency a
r = if a
r a -> a -> Bool
forall a. Ord a => a -> a -> Bool
<= a
0 then a
forall a. Bounded a => a
maxBound else a -> a
forall b. Integral b => a -> b
forall a b. (RealFrac a, Integral b) => a -> b
round (a -> a) -> a -> a
forall a b. (a -> b) -> a -> b
$ a
1.0e9 a -> a -> a
forall a. Fractional a => a -> a -> a
/ a
r
    case Config -> Maybe Rate
getStreamRate Config
st of
        Just (Rate Double
low Double
goal Double
high Int
buf) ->
            let l :: NanoSecond64
l    = Double -> NanoSecond64
forall {a} {a}. (Bounded a, RealFrac a, Integral a) => a -> a
rateToLatency Double
goal
                minl :: NanoSecond64
minl = Double -> NanoSecond64
forall {a} {a}. (Bounded a, RealFrac a, Integral a) => a -> a
rateToLatency Double
high
                maxl :: NanoSecond64
maxl = Double -> NanoSecond64
forall {a} {a}. (Bounded a, RealFrac a, Integral a) => a -> a
rateToLatency Double
low
            in NanoSecond64 -> LatencyRange -> Int -> IO (Maybe YieldRateInfo)
mkYieldRateInfo NanoSecond64
l (NanoSecond64 -> NanoSecond64 -> LatencyRange
LatencyRange NanoSecond64
minl NanoSecond64
maxl) Int
buf
        Maybe Rate
Nothing -> Maybe YieldRateInfo -> IO (Maybe YieldRateInfo)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe YieldRateInfo
forall a. Maybe a
Nothing

    where

    mkYieldRateInfo :: NanoSecond64 -> LatencyRange -> Int -> IO (Maybe YieldRateInfo)
mkYieldRateInfo NanoSecond64
latency LatencyRange
latRange Int
buf = do
        IORef NanoSecond64
measured <- NanoSecond64 -> IO (IORef NanoSecond64)
forall a. a -> IO (IORef a)
newIORef NanoSecond64
0
        IORef (Count, Count, NanoSecond64)
wcur     <- (Count, Count, NanoSecond64)
-> IO (IORef (Count, Count, NanoSecond64))
forall a. a -> IO (IORef a)
newIORef (Count
0,Count
0,NanoSecond64
0)
        IORef (Count, Count, NanoSecond64)
wcol     <- (Count, Count, NanoSecond64)
-> IO (IORef (Count, Count, NanoSecond64))
forall a. a -> IO (IORef a)
newIORef (Count
0,Count
0,NanoSecond64
0)
        AbsTime
now      <- Clock -> IO AbsTime
getTime Clock
Monotonic
        IORef (Count, AbsTime)
wlong    <- (Count, AbsTime) -> IO (IORef (Count, AbsTime))
forall a. a -> IO (IORef a)
newIORef (Count
0,AbsTime
now)
        IORef Count
period   <- Count -> IO (IORef Count)
forall a. a -> IO (IORef a)
newIORef Count
1
        IORef Count
gainLoss <- Count -> IO (IORef Count)
forall a. a -> IO (IORef a)
newIORef (Int64 -> Count
Count Int64
0)

        Maybe YieldRateInfo -> IO (Maybe YieldRateInfo)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe YieldRateInfo -> IO (Maybe YieldRateInfo))
-> Maybe YieldRateInfo -> IO (Maybe YieldRateInfo)
forall a b. (a -> b) -> a -> b
$ YieldRateInfo -> Maybe YieldRateInfo
forall a. a -> Maybe a
Just YieldRateInfo
            { svarLatencyTarget :: NanoSecond64
svarLatencyTarget      = NanoSecond64
latency
            , svarLatencyRange :: LatencyRange
svarLatencyRange       = LatencyRange
latRange
            , svarRateBuffer :: Int
svarRateBuffer         = Int
buf
            , svarGainedLostYields :: IORef Count
svarGainedLostYields   = IORef Count
gainLoss
            , workerBootstrapLatency :: Maybe NanoSecond64
workerBootstrapLatency = Config -> Maybe NanoSecond64
getStreamLatency Config
st
            , workerPollingInterval :: IORef Count
workerPollingInterval  = IORef Count
period
            , workerMeasuredLatency :: IORef NanoSecond64
workerMeasuredLatency  = IORef NanoSecond64
measured
            , workerPendingLatency :: IORef (Count, Count, NanoSecond64)
workerPendingLatency   = IORef (Count, Count, NanoSecond64)
wcur
            , workerCollectedLatency :: IORef (Count, Count, NanoSecond64)
workerCollectedLatency = IORef (Count, Count, NanoSecond64)
wcol
            , svarAllTimeLatency :: IORef (Count, AbsTime)
svarAllTimeLatency     = IORef (Count, AbsTime)
wlong
            }

-------------------------------------------------------------------------------
-- Rate
-------------------------------------------------------------------------------

-- | Same as @rate (Just $ Rate (r/2) r (2*r) maxBound)@
--
-- Specifies the average production rate of a stream in number of yields
-- per second (i.e.  @Hertz@).  Concurrent production is ramped up or down
-- automatically to achieve the specified average yield rate. The rate can
-- go down to half of the specified rate on the lower side and double of
-- the specified rate on the higher side.
--
avgRate :: Double -> Config -> Config
avgRate :: Double -> Config -> Config
avgRate Double
r = Maybe Rate -> Config -> Config
rate (Rate -> Maybe Rate
forall a. a -> Maybe a
Just (Rate -> Maybe Rate) -> Rate -> Maybe Rate
forall a b. (a -> b) -> a -> b
$ Double -> Double -> Double -> Int -> Rate
Rate (Double
rDouble -> Double -> Double
forall a. Fractional a => a -> a -> a
/Double
2) Double
r (Double
2Double -> Double -> Double
forall a. Num a => a -> a -> a
*Double
r) Int
forall a. Bounded a => a
maxBound)

-- | Same as @rate (Just $ Rate r r (2*r) maxBound)@
--
-- Specifies the minimum rate at which the stream should yield values. As
-- far as possible the yield rate would never be allowed to go below the
-- specified rate, even though it may possibly go above it at times, the
-- upper limit is double of the specified rate.
--
minRate :: Double -> Config -> Config
minRate :: Double -> Config -> Config
minRate Double
r = Maybe Rate -> Config -> Config
rate (Rate -> Maybe Rate
forall a. a -> Maybe a
Just (Rate -> Maybe Rate) -> Rate -> Maybe Rate
forall a b. (a -> b) -> a -> b
$ Double -> Double -> Double -> Int -> Rate
Rate Double
r Double
r (Double
2Double -> Double -> Double
forall a. Num a => a -> a -> a
*Double
r) Int
forall a. Bounded a => a
maxBound)

-- | Same as @rate (Just $ Rate (r/2) r r maxBound)@
--
-- Specifies the maximum rate at which the stream should yield values. As
-- far as possible the yield rate would never be allowed to go above the
-- specified rate, even though it may possibly go below it at times, the
-- lower limit is half of the specified rate. This can be useful in
-- applications where certain resource usage must not be allowed to go
-- beyond certain limits.
--
maxRate :: Double -> Config -> Config
maxRate :: Double -> Config -> Config
maxRate Double
r = Maybe Rate -> Config -> Config
rate (Rate -> Maybe Rate
forall a. a -> Maybe a
Just (Rate -> Maybe Rate) -> Rate -> Maybe Rate
forall a b. (a -> b) -> a -> b
$ Double -> Double -> Double -> Int -> Rate
Rate (Double
rDouble -> Double -> Double
forall a. Fractional a => a -> a -> a
/Double
2) Double
r Double
r Int
forall a. Bounded a => a
maxBound)

-- | Same as @rate (Just $ Rate r r r 0)@
--
-- Specifies a constant yield rate. If for some reason the actual rate
-- goes above or below the specified rate we do not try to recover it by
-- increasing or decreasing the rate in future.  This can be useful in
-- applications like graphics frame refresh where we need to maintain a
-- constant refresh rate.
--
constRate :: Double -> Config -> Config
constRate :: Double -> Config -> Config
constRate Double
r = Maybe Rate -> Config -> Config
rate (Rate -> Maybe Rate
forall a. a -> Maybe a
Just (Rate -> Maybe Rate) -> Rate -> Maybe Rate
forall a b. (a -> b) -> a -> b
$ Double -> Double -> Double -> Int -> Rate
Rate Double
r Double
r Double
r Int
0)

-------------------------------------------------------------------------------
-- Operations
-------------------------------------------------------------------------------

-- | Used by workers to send a value to the channel's output stream.
--
-- When a worker is dispatched, a 'WorkerInfo' record is supplied to it by the
-- dispatcher. This record contains the timestamp at the time of dispatch.
-- Whenever the worker yields a value, the yield count in the 'WorkerInfo' is
-- incremented. If the channel has rate control enabled, the yield count and
-- time duration is periodically (based on 'workerPollingInterval') pushed to
-- the channel's 'workerPendingLatency' stat. It is done only if the
-- 'workerPollingInterval' is non-zero.
--
-- Queues the event but returns 'False' if:
--
-- * the buffer limit is exceeding
-- * channel yield rate is exceeding (when rate control is enabled and
-- 'WorkerInfo' is available)
--
-- This is a thread-safe API and can be called by anyone from anywhere. Even a
-- thread that is not registered as a worker with the channel can use it but
-- when rate control is enabled, it might confuse the rate control mechanism if
-- we use workers beyond the knowledge of dispatcher.
--
{-# INLINE yieldWith #-}
yieldWith ::
       Maybe WorkerInfo -- ^ Rate control info for the worker
    -> Channel m a
    -> a
    -> IO Bool -- ^ True means the worker can continue otherwise stop.
yieldWith :: forall (m :: * -> *) a.
Maybe WorkerInfo -> Channel m a -> a -> IO Bool
yieldWith Maybe WorkerInfo
winfo Channel m a
chan =
    Limit
-> Limit
-> IORef Int
-> Maybe YieldRateInfo
-> IORef ([ChildEvent a], Int)
-> MVar ()
-> Maybe WorkerInfo
-> a
-> IO Bool
forall a.
Limit
-> Limit
-> IORef Int
-> Maybe YieldRateInfo
-> IORef ([ChildEvent a], Int)
-> MVar ()
-> Maybe WorkerInfo
-> a
-> IO Bool
sendYield
        (Channel m a -> Limit
forall (m :: * -> *) a. Channel m a -> Limit
maxBufferLimit Channel m a
chan)
        (Channel m a -> Limit
forall (m :: * -> *) a. Channel m a -> Limit
maxWorkerLimit Channel m a
chan)
        (Channel m a -> IORef Int
forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount Channel m a
chan)
        (Channel m a -> Maybe YieldRateInfo
forall (m :: * -> *) a. Channel m a -> Maybe YieldRateInfo
yieldRateInfo Channel m a
chan)
        (Channel m a -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue Channel m a
chan)
        (Channel m a -> MVar ()
forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
chan)
        Maybe WorkerInfo
winfo

-- | Send a 'ChildStop' event to the channel, used when the worker stops
-- yielding and exits. The final update of the collected latency stats in
-- 'WorkerInfo' is pushed to the channel. Upon receiving the 'ChildStop' event
-- the channel would remove the worker from its set of registered workers.
--
-- A worker that uses this API must have been registered on the Channel prior
-- to invoking this API. This is usually done by the dispatcher  when the
-- worker is dispatched.
{-# INLINE stopWith #-}
stopWith :: Maybe WorkerInfo -> Channel m a -> IO ()
stopWith :: forall (m :: * -> *) a. Maybe WorkerInfo -> Channel m a -> IO ()
stopWith Maybe WorkerInfo
winfo Channel m a
chan =
    IORef Int
-> Maybe YieldRateInfo
-> IORef ([ChildEvent a], Int)
-> MVar ()
-> Maybe WorkerInfo
-> IO ()
forall a.
IORef Int
-> Maybe YieldRateInfo
-> IORef ([ChildEvent a], Int)
-> MVar ()
-> Maybe WorkerInfo
-> IO ()
sendStop
        (Channel m a -> IORef Int
forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount Channel m a
chan)
        (Channel m a -> Maybe YieldRateInfo
forall (m :: * -> *) a. Channel m a -> Maybe YieldRateInfo
yieldRateInfo Channel m a
chan)
        (Channel m a -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue Channel m a
chan)
        (Channel m a -> MVar ()
forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
chan)
        Maybe WorkerInfo
winfo

-- | Like 'stopWith' but marks the stop event with the specified exception.
{-# INLINE exceptionWith #-}
exceptionWith :: Maybe WorkerInfo -> Channel m a -> SomeException -> IO ()
exceptionWith :: forall (m :: * -> *) a.
Maybe WorkerInfo -> Channel m a -> SomeException -> IO ()
exceptionWith Maybe WorkerInfo
_winfo Channel m a
chan =
    IORef Int
-> IORef ([ChildEvent a], Int) -> MVar () -> SomeException -> IO ()
forall a.
IORef Int
-> IORef ([ChildEvent a], Int) -> MVar () -> SomeException -> IO ()
sendException
        (Channel m a -> IORef Int
forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount Channel m a
chan)
        (Channel m a -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue Channel m a
chan)
        (Channel m a -> MVar ()
forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
chan)

-- | Send a 'ChildStopChannel' event to shutdown the channel. Upon receiving
-- the event the event processing loop kills all the registered worker threads
-- and stops the channel.
{-# INLINABLE shutdown #-}
shutdown :: MonadIO m => Channel m a -> m ()
shutdown :: forall (m :: * -> *) a. MonadIO m => Channel m a -> m ()
shutdown Channel m a
chan = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
    IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void
        (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
forall a. IORef ([a], Int) -> MVar () -> a -> IO Int
sendEvent
            (Channel m a -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue Channel m a
chan)
            (Channel m a -> MVar ()
forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
chan)
            ChildEvent a
forall a. ChildEvent a
ChildStopChannel

-- | Dump the channel stats for diagnostics. Used when 'inspect' option is
-- enabled.
{-# NOINLINE dumpChannel #-}
dumpChannel :: Channel m a -> IO String
dumpChannel :: forall (m :: * -> *) a. Channel m a -> IO String
dumpChannel Channel m a
sv = do
    [String]
xs <- [IO String] -> IO [String]
forall (t :: * -> *) (m :: * -> *) a.
(Traversable t, Monad m) =>
t (m a) -> m (t a)
forall (m :: * -> *) a. Monad m => [m a] -> m [a]
sequence ([IO String] -> IO [String]) -> [IO String] -> IO [String]
forall a b. (a -> b) -> a -> b
$ IO String -> [IO String] -> [IO String]
forall a. a -> [a] -> [a]
intersperse (String -> IO String
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return String
"\n")
        [ String -> IO String
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (ThreadId -> String
forall a. Show a => a -> String
dumpCreator (Channel m a -> ThreadId
forall (m :: * -> *) a. Channel m a -> ThreadId
svarCreator Channel m a
sv))
        , String -> IO String
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return String
"---------CURRENT STATE-----------"
        , IORef ([ChildEvent a], Int) -> IO String
forall (t :: * -> *) a1 a2.
(Foldable t, Show a1) =>
IORef (t a2, a1) -> IO String
dumpOutputQ (Channel m a -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue Channel m a
sv)
        -- XXX print the types of events in the outputQueue, first 5
        , MVar () -> IO String
forall a. Show a => MVar a -> IO String
dumpDoorBell (Channel m a -> MVar ()
forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
sv)
        , IORef Bool -> IO String
forall a. Show a => IORef a -> IO String
dumpNeedDoorBell (Channel m a -> IORef Bool
forall (m :: * -> *) a. Channel m a -> IORef Bool
doorBellOnWorkQ Channel m a
sv)
        , IORef (Set ThreadId) -> IO String
forall a. Show a => IORef a -> IO String
dumpRunningThreads (Channel m a -> IORef (Set ThreadId)
forall (m :: * -> *) a. Channel m a -> IORef (Set ThreadId)
workerThreads Channel m a
sv)
        -- XXX print the status of first 5 threads
        , IORef Int -> IO String
forall a. Show a => IORef a -> IO String
dumpWorkerCount (Channel m a -> IORef Int
forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount Channel m a
sv)
        , String -> IO String
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return String
"---------STATS-----------\n"
        , Bool -> Maybe YieldRateInfo -> SVarStats -> IO String
dumpSVarStats (Channel m a -> Bool
forall (m :: * -> *) a. Channel m a -> Bool
svarInspectMode Channel m a
sv) (Channel m a -> Maybe YieldRateInfo
forall (m :: * -> *) a. Channel m a -> Maybe YieldRateInfo
yieldRateInfo Channel m a
sv) (Channel m a -> SVarStats
forall (m :: * -> *) a. Channel m a -> SVarStats
svarStats Channel m a
sv)
        ]
    String -> IO String
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (String -> IO String) -> String -> IO String
forall a b. (a -> b) -> a -> b
$ [String] -> String
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat [String]
xs

-------------------------------------------------------------------------------
-- Cleanup
-------------------------------------------------------------------------------

channelDone :: Channel m a -> String -> IO ()
channelDone :: forall (m :: * -> *) a. Channel m a -> String -> IO ()
channelDone Channel m a
chan String
reason = do
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Channel m a -> Bool
forall (m :: * -> *) a. Channel m a -> Bool
svarInspectMode Channel m a
chan) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        AbsTime
t <- Clock -> IO AbsTime
getTime Clock
Monotonic
        IORef (Maybe AbsTime) -> Maybe AbsTime -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef (Maybe AbsTime)
svarStopTime (Channel m a -> SVarStats
forall (m :: * -> *) a. Channel m a -> SVarStats
svarStats Channel m a
chan)) (AbsTime -> Maybe AbsTime
forall a. a -> Maybe a
Just AbsTime
t)
        IO String -> String -> IO ()
printSVar (Channel m a -> IO String
forall (m :: * -> *) a. Channel m a -> IO String
dumpChannel Channel m a
chan) String
reason
        -- If there are any other channels referenced by this channel a GC will
        -- prompt them to be cleaned up quickly.
        IO ()
performMajorGC

-- XXX Handle ThreadAbort while this is running?

-- | Never called from a worker thread. This can be called multiple times and
-- only one will succeed others will block until it finishes.
--
cleanupChan :: Channel m a -> String -> IO ()
cleanupChan :: forall (m :: * -> *) a. Channel m a -> String -> IO ()
cleanupChan Channel m a
chan String
reason = do
    Bool
stopped <- MVar Bool -> IO Bool
forall a. MVar a -> IO a
takeMVar (Channel m a -> MVar Bool
forall (m :: * -> *) a. Channel m a -> MVar Bool
channelStopped Channel m a
chan)
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
stopped) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        -- putStrLn "cleanupChan: START"
        IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
atomicWriteIORef (Channel m a -> IORef Bool
forall (m :: * -> *) a. Channel m a -> IORef Bool
channelStopping Channel m a
chan) Bool
True
        IO ()
go
        Channel m a -> String -> IO ()
forall (m :: * -> *) a. Channel m a -> String -> IO ()
channelDone Channel m a
chan String
reason
        MVar Bool -> Bool -> IO ()
forall a. MVar a -> a -> IO ()
putMVar (Channel m a -> MVar Bool
forall (m :: * -> *) a. Channel m a -> MVar Bool
channelStopped Channel m a
chan) Bool
True
        -- putStrLn "cleanupChan: DONE"

    where

    go :: IO ()
go = do
        -- Empty the queue so that any new results put the doorbell MVar
        ([ChildEvent a], Int)
_ <- IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a. IORef ([a], Int) -> IO ([a], Int)
readOutputQBasic (Channel m a -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue Channel m a
chan)
        Int
cnt <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (Channel m a -> IORef Int
forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount Channel m a
chan)
        -- Note that the workerSet may not have the threadIds of those workers
        -- which are in the process of dispatch but not yet dispatched. To
        -- ensure that we have aborted all the workers we need to wait until
        -- the worker count drops down to zero, until then we need to watch the
        -- workerset and send ThreadAbort if we find any ThreadId in it. As of
        -- now, this can only happen in the eagerDispatch case as we can
        -- dispatch workers from workers in that case.
        Set ThreadId
workers <-
            IORef (Set ThreadId)
-> (Set ThreadId -> (Set ThreadId, Set ThreadId))
-> IO (Set ThreadId)
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS (Channel m a -> IORef (Set ThreadId)
forall (m :: * -> *) a. Channel m a -> IORef (Set ThreadId)
workerThreads Channel m a
chan) (\Set ThreadId
x -> (Set ThreadId
forall a. Set a
Set.empty,Set ThreadId
x))
        -- self <- myThreadId
        (ThreadId -> IO ()) -> [ThreadId] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
Prelude.mapM_ (ThreadId -> ThreadAbort -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
`throwTo` ThreadAbort
ThreadAbort)
              -- (Prelude.filter (/= self) $ Set.toList workers)
              (Set ThreadId -> [ThreadId]
forall a. Set a -> [a]
Set.toList Set ThreadId
workers)
        {-
        let setSize = Set.size workers
        putStrLn $ "cleanupChan: thread count: " ++ show cnt
                ++ " workerSet size: " ++ show setSize
        -}
        Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
cnt Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
/= Int
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
            {-
                withDiagMVar
                    (svarInspectMode chan)
                    (dumpChannel chan)
                    "cleanupChan"
                $ -}
            MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar (Channel m a -> MVar ()
forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
chan)
            -- threadDelay 10000
            IO ()
go