module Streamly.Internal.Data.Stream.Channel.Type
(
Channel(..)
, Config
, defaultConfig
, maxThreads
, maxBuffer
, maxYields
, Rate(..)
, newRateInfo
, rate
, avgRate
, minRate
, maxRate
, constRate
, StopWhen (..)
, stopWhen
, eager
, ordered
, interleaved
, boundThreads
, inspect
, useAcquire
, clearAcquire
, getMaxBuffer
, getMaxThreads
, getYieldLimit
, getInspectMode
, getStreamRate
, getEagerDispatch
, getOrdered
, getStopWhen
, getInterleaved
, getCleanup
, yieldWith
, stopWith
, exceptionWith
, shutdown
, channelDone
, cleanupChan
, dumpChannel
)
where
import Control.Concurrent (ThreadId, throwTo, takeMVar, putMVar)
import Control.Concurrent.MVar (MVar)
import Control.Exception (SomeException(..))
import Control.Monad (void, when)
import Control.Monad.IO.Class (MonadIO(..))
import Data.Int (Int64)
import Data.IORef (IORef, newIORef, readIORef, atomicWriteIORef, writeIORef)
import Data.List (intersperse)
import Data.Set (Set)
import Streamly.Internal.Control.Concurrent (RunInIO)
import Streamly.Internal.Data.Atomics (atomicModifyIORefCAS)
import Streamly.Internal.Data.Channel.Dispatcher (dumpSVarStats)
import Streamly.Internal.Data.Channel.Worker
(sendYield, sendStop, sendEvent, sendException)
import Streamly.Internal.Data.StreamK (StreamK)
import Streamly.Internal.Control.Exception
(AcquireIO(..), Priority(..), registerWith)
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)
import Streamly.Internal.Data.Time.Units (NanoSecond64(..))
import System.Mem (performMajorGC)
import qualified Data.Set as Set
import Streamly.Internal.Data.Channel.Types
data Channel m a = Channel
{
forall (m :: * -> *) a. Channel m a -> RunInIO m
svarMrun :: RunInIO m
, forall (m :: * -> *) a. Channel m a -> Limit
maxBufferLimit :: Limit
, forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue :: IORef ([ChildEvent a], Int)
, forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell :: MVar ()
, forall (m :: * -> *) a. Channel m a -> m [ChildEvent a]
readOutputQ :: m [ChildEvent a]
, forall (m :: * -> *) a. Channel m a -> m Bool
postProcess :: m Bool
, forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork :: Maybe (IORef Count)
, forall (m :: * -> *) a. Channel m a -> IO Bool
isWorkDone :: IO Bool
, forall (m :: * -> *) a. Channel m a -> Maybe YieldRateInfo
yieldRateInfo :: Maybe YieldRateInfo
, forall (m :: * -> *) a. Channel m a -> IORef Bool
doorBellOnWorkQ :: IORef Bool
, forall (m :: * -> *) a. Channel m a -> m ()
eagerDispatch :: m ()
, forall (m :: * -> *) a.
Channel m a -> (RunInIO m, StreamK m a) -> IO ()
enqueue :: (RunInIO m, StreamK m a) -> IO ()
, forall (m :: * -> *) a. Channel m a -> IO Bool
isQueueDone :: IO Bool
, forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> m ()
workLoop :: Maybe WorkerInfo -> m ()
, forall (m :: * -> *) a. Channel m a -> IORef Bool
channelStopping :: IORef Bool
, forall (m :: * -> *) a. Channel m a -> MVar Bool
channelStopped :: MVar Bool
, forall (m :: * -> *) a. Channel m a -> Limit
maxWorkerLimit :: Limit
, forall (m :: * -> *) a. Channel m a -> IORef (Set ThreadId)
workerThreads :: IORef (Set ThreadId)
, forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount :: IORef Int
, forall (m :: * -> *) a. Channel m a -> ThreadId -> m ()
accountThread :: ThreadId -> m ()
, forall (m :: * -> *) a. Channel m a -> MVar ()
workerStopMVar :: MVar ()
, forall (m :: * -> *) a. Channel m a -> Maybe (IORef ())
svarRef :: Maybe (IORef ())
, forall (m :: * -> *) a. Channel m a -> SVarStats
svarStats :: SVarStats
, forall (m :: * -> *) a. Channel m a -> Bool
svarInspectMode :: Bool
, forall (m :: * -> *) a. Channel m a -> ThreadId
svarCreator :: ThreadId
}
data Config = Config
{
Config -> Maybe Count
_yieldLimit :: Maybe Count
, Config -> Limit
_threadsHigh :: Limit
, Config -> Limit
_bufferHigh :: Limit
, Config -> Maybe NanoSecond64
_streamLatency :: Maybe NanoSecond64
, Config -> Maybe Rate
_maxStreamRate :: Maybe Rate
, Config -> Bool
_inspect :: Bool
, Config -> Bool
_eagerDispatch :: Bool
, Config -> StopWhen
_stopWhen :: StopWhen
, Config -> Bool
_ordered :: Bool
, Config -> Bool
_interleaved :: Bool
, Config -> Bool
_bound :: Bool
, Config -> Maybe (IO () -> IO ())
_release :: Maybe (IO () -> IO ())
}
defaultMaxThreads, defaultMaxBuffer :: Limit
defaultMaxThreads :: Limit
defaultMaxThreads = Word -> Limit
Limited Word
magicMaxBuffer
defaultMaxBuffer :: Limit
defaultMaxBuffer = Word -> Limit
Limited Word
magicMaxBuffer
defaultConfig :: Config
defaultConfig :: Config
defaultConfig = Config
{
_yieldLimit :: Maybe Count
_yieldLimit = Maybe Count
forall a. Maybe a
Nothing
, _threadsHigh :: Limit
_threadsHigh = Limit
defaultMaxThreads
, _bufferHigh :: Limit
_bufferHigh = Limit
defaultMaxBuffer
, _maxStreamRate :: Maybe Rate
_maxStreamRate = Maybe Rate
forall a. Maybe a
Nothing
, _streamLatency :: Maybe NanoSecond64
_streamLatency = Maybe NanoSecond64
forall a. Maybe a
Nothing
, _inspect :: Bool
_inspect = Bool
False
, _eagerDispatch :: Bool
_eagerDispatch = Bool
False
, _stopWhen :: StopWhen
_stopWhen = StopWhen
AllStop
, _ordered :: Bool
_ordered = Bool
False
, _interleaved :: Bool
_interleaved = Bool
False
, _bound :: Bool
_bound = Bool
False
, _release :: Maybe (IO () -> IO ())
_release = Maybe (IO () -> IO ())
forall a. Maybe a
Nothing
}
maxYields :: Maybe Int64 -> Config -> Config
maxYields :: Maybe Int64 -> Config -> Config
maxYields Maybe Int64
lim Config
st =
Config
st { _yieldLimit =
case lim of
Maybe Int64
Nothing -> Maybe Count
forall a. Maybe a
Nothing
Just Int64
n ->
if Int64
n Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
<= Int64
0
then Count -> Maybe Count
forall a. a -> Maybe a
Just Count
0
else Count -> Maybe Count
forall a. a -> Maybe a
Just (Int64 -> Count
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int64
n)
}
getYieldLimit :: Config -> Maybe Count
getYieldLimit :: Config -> Maybe Count
getYieldLimit = Config -> Maybe Count
_yieldLimit
maxThreads :: Int -> Config -> Config
maxThreads :: Int -> Config -> Config
maxThreads Int
n Config
st =
Config
st { _threadsHigh =
if n < 0
then Unlimited
else if n == 0
then defaultMaxThreads
else Limited (fromIntegral n)
}
getMaxThreads :: Config -> Limit
getMaxThreads :: Config -> Limit
getMaxThreads = Config -> Limit
_threadsHigh
maxBuffer :: Int -> Config -> Config
maxBuffer :: Int -> Config -> Config
maxBuffer Int
n Config
st =
Config
st { _bufferHigh =
if n < 0
then Unlimited
else if n == 0
then defaultMaxBuffer
else Limited (fromIntegral n)
}
getMaxBuffer :: Config -> Limit
getMaxBuffer :: Config -> Limit
getMaxBuffer = Config -> Limit
_bufferHigh
rate :: Maybe Rate -> Config -> Config
rate :: Maybe Rate -> Config -> Config
rate Maybe Rate
r Config
st = Config
st { _maxStreamRate = r }
getStreamRate :: Config -> Maybe Rate
getStreamRate :: Config -> Maybe Rate
getStreamRate = Config -> Maybe Rate
_maxStreamRate
_setStreamLatency :: Int -> Config -> Config
_setStreamLatency :: Int -> Config -> Config
_setStreamLatency Int
n Config
st =
Config
st { _streamLatency =
if n <= 0
then Nothing
else Just (fromIntegral n)
}
getStreamLatency :: Config -> Maybe NanoSecond64
getStreamLatency :: Config -> Maybe NanoSecond64
getStreamLatency = Config -> Maybe NanoSecond64
_streamLatency
inspect :: Bool -> Config -> Config
inspect :: Bool -> Config -> Config
inspect Bool
flag Config
st = Config
st { _inspect = flag }
getInspectMode :: Config -> Bool
getInspectMode :: Config -> Bool
getInspectMode = Config -> Bool
_inspect
eager :: Bool -> Config -> Config
eager :: Bool -> Config -> Config
eager Bool
flag Config
st = Config
st { _eagerDispatch = flag }
getEagerDispatch :: Config -> Bool
getEagerDispatch :: Config -> Bool
getEagerDispatch = Config -> Bool
_eagerDispatch
stopWhen :: StopWhen -> Config -> Config
stopWhen :: StopWhen -> Config -> Config
stopWhen StopWhen
cond Config
st = Config
st { _stopWhen = cond }
getStopWhen :: Config -> StopWhen
getStopWhen :: Config -> StopWhen
getStopWhen = Config -> StopWhen
_stopWhen
ordered :: Bool -> Config -> Config
ordered :: Bool -> Config -> Config
ordered Bool
flag Config
st = Config
st { _ordered = flag }
getOrdered :: Config -> Bool
getOrdered :: Config -> Bool
getOrdered = Config -> Bool
_ordered
interleaved :: Bool -> Config -> Config
interleaved :: Bool -> Config -> Config
interleaved Bool
flag Config
st = Config
st { _interleaved = flag }
getInterleaved :: Config -> Bool
getInterleaved :: Config -> Bool
getInterleaved = Config -> Bool
_interleaved
boundThreads :: Bool -> Config -> Config
boundThreads :: Bool -> Config -> Config
boundThreads Bool
flag Config
st = Config
st { _bound = flag }
_getBound :: Config -> Bool
_getBound :: Config -> Bool
_getBound = Config -> Bool
_bound
useAcquire :: AcquireIO -> Config -> Config
useAcquire :: AcquireIO -> Config -> Config
useAcquire AcquireIO
f Config
cfg = Config
cfg { _release = Just (registerWith Priority1 f) }
clearAcquire :: Config -> Config
clearAcquire :: Config -> Config
clearAcquire Config
cfg = Config
cfg { _release = Nothing }
getCleanup :: Config -> Maybe (IO () -> IO ())
getCleanup :: Config -> Maybe (IO () -> IO ())
getCleanup = Config -> Maybe (IO () -> IO ())
_release
newRateInfo :: Config -> IO (Maybe YieldRateInfo)
newRateInfo :: Config -> IO (Maybe YieldRateInfo)
newRateInfo Config
st = do
let rateToLatency :: a -> a
rateToLatency a
r = if a
r a -> a -> Bool
forall a. Ord a => a -> a -> Bool
<= a
0 then a
forall a. Bounded a => a
maxBound else a -> a
forall b. Integral b => a -> b
forall a b. (RealFrac a, Integral b) => a -> b
round (a -> a) -> a -> a
forall a b. (a -> b) -> a -> b
$ a
1.0e9 a -> a -> a
forall a. Fractional a => a -> a -> a
/ a
r
case Config -> Maybe Rate
getStreamRate Config
st of
Just (Rate Double
low Double
goal Double
high Int
buf) ->
let l :: NanoSecond64
l = Double -> NanoSecond64
forall {a} {a}. (Bounded a, RealFrac a, Integral a) => a -> a
rateToLatency Double
goal
minl :: NanoSecond64
minl = Double -> NanoSecond64
forall {a} {a}. (Bounded a, RealFrac a, Integral a) => a -> a
rateToLatency Double
high
maxl :: NanoSecond64
maxl = Double -> NanoSecond64
forall {a} {a}. (Bounded a, RealFrac a, Integral a) => a -> a
rateToLatency Double
low
in NanoSecond64 -> LatencyRange -> Int -> IO (Maybe YieldRateInfo)
mkYieldRateInfo NanoSecond64
l (NanoSecond64 -> NanoSecond64 -> LatencyRange
LatencyRange NanoSecond64
minl NanoSecond64
maxl) Int
buf
Maybe Rate
Nothing -> Maybe YieldRateInfo -> IO (Maybe YieldRateInfo)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe YieldRateInfo
forall a. Maybe a
Nothing
where
mkYieldRateInfo :: NanoSecond64 -> LatencyRange -> Int -> IO (Maybe YieldRateInfo)
mkYieldRateInfo NanoSecond64
latency LatencyRange
latRange Int
buf = do
IORef NanoSecond64
measured <- NanoSecond64 -> IO (IORef NanoSecond64)
forall a. a -> IO (IORef a)
newIORef NanoSecond64
0
IORef (Count, Count, NanoSecond64)
wcur <- (Count, Count, NanoSecond64)
-> IO (IORef (Count, Count, NanoSecond64))
forall a. a -> IO (IORef a)
newIORef (Count
0,Count
0,NanoSecond64
0)
IORef (Count, Count, NanoSecond64)
wcol <- (Count, Count, NanoSecond64)
-> IO (IORef (Count, Count, NanoSecond64))
forall a. a -> IO (IORef a)
newIORef (Count
0,Count
0,NanoSecond64
0)
AbsTime
now <- Clock -> IO AbsTime
getTime Clock
Monotonic
IORef (Count, AbsTime)
wlong <- (Count, AbsTime) -> IO (IORef (Count, AbsTime))
forall a. a -> IO (IORef a)
newIORef (Count
0,AbsTime
now)
IORef Count
period <- Count -> IO (IORef Count)
forall a. a -> IO (IORef a)
newIORef Count
1
IORef Count
gainLoss <- Count -> IO (IORef Count)
forall a. a -> IO (IORef a)
newIORef (Int64 -> Count
Count Int64
0)
Maybe YieldRateInfo -> IO (Maybe YieldRateInfo)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe YieldRateInfo -> IO (Maybe YieldRateInfo))
-> Maybe YieldRateInfo -> IO (Maybe YieldRateInfo)
forall a b. (a -> b) -> a -> b
$ YieldRateInfo -> Maybe YieldRateInfo
forall a. a -> Maybe a
Just YieldRateInfo
{ svarLatencyTarget :: NanoSecond64
svarLatencyTarget = NanoSecond64
latency
, svarLatencyRange :: LatencyRange
svarLatencyRange = LatencyRange
latRange
, svarRateBuffer :: Int
svarRateBuffer = Int
buf
, svarGainedLostYields :: IORef Count
svarGainedLostYields = IORef Count
gainLoss
, workerBootstrapLatency :: Maybe NanoSecond64
workerBootstrapLatency = Config -> Maybe NanoSecond64
getStreamLatency Config
st
, workerPollingInterval :: IORef Count
workerPollingInterval = IORef Count
period
, workerMeasuredLatency :: IORef NanoSecond64
workerMeasuredLatency = IORef NanoSecond64
measured
, workerPendingLatency :: IORef (Count, Count, NanoSecond64)
workerPendingLatency = IORef (Count, Count, NanoSecond64)
wcur
, workerCollectedLatency :: IORef (Count, Count, NanoSecond64)
workerCollectedLatency = IORef (Count, Count, NanoSecond64)
wcol
, svarAllTimeLatency :: IORef (Count, AbsTime)
svarAllTimeLatency = IORef (Count, AbsTime)
wlong
}
avgRate :: Double -> Config -> Config
avgRate :: Double -> Config -> Config
avgRate Double
r = Maybe Rate -> Config -> Config
rate (Rate -> Maybe Rate
forall a. a -> Maybe a
Just (Rate -> Maybe Rate) -> Rate -> Maybe Rate
forall a b. (a -> b) -> a -> b
$ Double -> Double -> Double -> Int -> Rate
Rate (Double
rDouble -> Double -> Double
forall a. Fractional a => a -> a -> a
/Double
2) Double
r (Double
2Double -> Double -> Double
forall a. Num a => a -> a -> a
*Double
r) Int
forall a. Bounded a => a
maxBound)
minRate :: Double -> Config -> Config
minRate :: Double -> Config -> Config
minRate Double
r = Maybe Rate -> Config -> Config
rate (Rate -> Maybe Rate
forall a. a -> Maybe a
Just (Rate -> Maybe Rate) -> Rate -> Maybe Rate
forall a b. (a -> b) -> a -> b
$ Double -> Double -> Double -> Int -> Rate
Rate Double
r Double
r (Double
2Double -> Double -> Double
forall a. Num a => a -> a -> a
*Double
r) Int
forall a. Bounded a => a
maxBound)
maxRate :: Double -> Config -> Config
maxRate :: Double -> Config -> Config
maxRate Double
r = Maybe Rate -> Config -> Config
rate (Rate -> Maybe Rate
forall a. a -> Maybe a
Just (Rate -> Maybe Rate) -> Rate -> Maybe Rate
forall a b. (a -> b) -> a -> b
$ Double -> Double -> Double -> Int -> Rate
Rate (Double
rDouble -> Double -> Double
forall a. Fractional a => a -> a -> a
/Double
2) Double
r Double
r Int
forall a. Bounded a => a
maxBound)
constRate :: Double -> Config -> Config
constRate :: Double -> Config -> Config
constRate Double
r = Maybe Rate -> Config -> Config
rate (Rate -> Maybe Rate
forall a. a -> Maybe a
Just (Rate -> Maybe Rate) -> Rate -> Maybe Rate
forall a b. (a -> b) -> a -> b
$ Double -> Double -> Double -> Int -> Rate
Rate Double
r Double
r Double
r Int
0)
{-# INLINE yieldWith #-}
yieldWith ::
Maybe WorkerInfo
-> Channel m a
-> a
-> IO Bool
yieldWith :: forall (m :: * -> *) a.
Maybe WorkerInfo -> Channel m a -> a -> IO Bool
yieldWith Maybe WorkerInfo
winfo Channel m a
chan =
Limit
-> Limit
-> IORef Int
-> Maybe YieldRateInfo
-> IORef ([ChildEvent a], Int)
-> MVar ()
-> Maybe WorkerInfo
-> a
-> IO Bool
forall a.
Limit
-> Limit
-> IORef Int
-> Maybe YieldRateInfo
-> IORef ([ChildEvent a], Int)
-> MVar ()
-> Maybe WorkerInfo
-> a
-> IO Bool
sendYield
(Channel m a -> Limit
forall (m :: * -> *) a. Channel m a -> Limit
maxBufferLimit Channel m a
chan)
(Channel m a -> Limit
forall (m :: * -> *) a. Channel m a -> Limit
maxWorkerLimit Channel m a
chan)
(Channel m a -> IORef Int
forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount Channel m a
chan)
(Channel m a -> Maybe YieldRateInfo
forall (m :: * -> *) a. Channel m a -> Maybe YieldRateInfo
yieldRateInfo Channel m a
chan)
(Channel m a -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue Channel m a
chan)
(Channel m a -> MVar ()
forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
chan)
Maybe WorkerInfo
winfo
{-# INLINE stopWith #-}
stopWith :: Maybe WorkerInfo -> Channel m a -> IO ()
stopWith :: forall (m :: * -> *) a. Maybe WorkerInfo -> Channel m a -> IO ()
stopWith Maybe WorkerInfo
winfo Channel m a
chan =
IORef Int
-> Maybe YieldRateInfo
-> IORef ([ChildEvent a], Int)
-> MVar ()
-> Maybe WorkerInfo
-> IO ()
forall a.
IORef Int
-> Maybe YieldRateInfo
-> IORef ([ChildEvent a], Int)
-> MVar ()
-> Maybe WorkerInfo
-> IO ()
sendStop
(Channel m a -> IORef Int
forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount Channel m a
chan)
(Channel m a -> Maybe YieldRateInfo
forall (m :: * -> *) a. Channel m a -> Maybe YieldRateInfo
yieldRateInfo Channel m a
chan)
(Channel m a -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue Channel m a
chan)
(Channel m a -> MVar ()
forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
chan)
Maybe WorkerInfo
winfo
{-# INLINE exceptionWith #-}
exceptionWith :: Maybe WorkerInfo -> Channel m a -> SomeException -> IO ()
exceptionWith :: forall (m :: * -> *) a.
Maybe WorkerInfo -> Channel m a -> SomeException -> IO ()
exceptionWith Maybe WorkerInfo
_winfo Channel m a
chan =
IORef Int
-> IORef ([ChildEvent a], Int) -> MVar () -> SomeException -> IO ()
forall a.
IORef Int
-> IORef ([ChildEvent a], Int) -> MVar () -> SomeException -> IO ()
sendException
(Channel m a -> IORef Int
forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount Channel m a
chan)
(Channel m a -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue Channel m a
chan)
(Channel m a -> MVar ()
forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
chan)
{-# INLINABLE shutdown #-}
shutdown :: MonadIO m => Channel m a -> m ()
shutdown :: forall (m :: * -> *) a. MonadIO m => Channel m a -> m ()
shutdown Channel m a
chan = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void
(IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
forall a. IORef ([a], Int) -> MVar () -> a -> IO Int
sendEvent
(Channel m a -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue Channel m a
chan)
(Channel m a -> MVar ()
forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
chan)
ChildEvent a
forall a. ChildEvent a
ChildStopChannel
{-# NOINLINE dumpChannel #-}
dumpChannel :: Channel m a -> IO String
dumpChannel :: forall (m :: * -> *) a. Channel m a -> IO String
dumpChannel Channel m a
sv = do
[String]
xs <- [IO String] -> IO [String]
forall (t :: * -> *) (m :: * -> *) a.
(Traversable t, Monad m) =>
t (m a) -> m (t a)
forall (m :: * -> *) a. Monad m => [m a] -> m [a]
sequence ([IO String] -> IO [String]) -> [IO String] -> IO [String]
forall a b. (a -> b) -> a -> b
$ IO String -> [IO String] -> [IO String]
forall a. a -> [a] -> [a]
intersperse (String -> IO String
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return String
"\n")
[ String -> IO String
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (ThreadId -> String
forall a. Show a => a -> String
dumpCreator (Channel m a -> ThreadId
forall (m :: * -> *) a. Channel m a -> ThreadId
svarCreator Channel m a
sv))
, String -> IO String
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return String
"---------CURRENT STATE-----------"
, IORef ([ChildEvent a], Int) -> IO String
forall (t :: * -> *) a1 a2.
(Foldable t, Show a1) =>
IORef (t a2, a1) -> IO String
dumpOutputQ (Channel m a -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue Channel m a
sv)
, MVar () -> IO String
forall a. Show a => MVar a -> IO String
dumpDoorBell (Channel m a -> MVar ()
forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
sv)
, IORef Bool -> IO String
forall a. Show a => IORef a -> IO String
dumpNeedDoorBell (Channel m a -> IORef Bool
forall (m :: * -> *) a. Channel m a -> IORef Bool
doorBellOnWorkQ Channel m a
sv)
, IORef (Set ThreadId) -> IO String
forall a. Show a => IORef a -> IO String
dumpRunningThreads (Channel m a -> IORef (Set ThreadId)
forall (m :: * -> *) a. Channel m a -> IORef (Set ThreadId)
workerThreads Channel m a
sv)
, IORef Int -> IO String
forall a. Show a => IORef a -> IO String
dumpWorkerCount (Channel m a -> IORef Int
forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount Channel m a
sv)
, String -> IO String
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return String
"---------STATS-----------\n"
, Bool -> Maybe YieldRateInfo -> SVarStats -> IO String
dumpSVarStats (Channel m a -> Bool
forall (m :: * -> *) a. Channel m a -> Bool
svarInspectMode Channel m a
sv) (Channel m a -> Maybe YieldRateInfo
forall (m :: * -> *) a. Channel m a -> Maybe YieldRateInfo
yieldRateInfo Channel m a
sv) (Channel m a -> SVarStats
forall (m :: * -> *) a. Channel m a -> SVarStats
svarStats Channel m a
sv)
]
String -> IO String
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (String -> IO String) -> String -> IO String
forall a b. (a -> b) -> a -> b
$ [String] -> String
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat [String]
xs
channelDone :: Channel m a -> String -> IO ()
channelDone :: forall (m :: * -> *) a. Channel m a -> String -> IO ()
channelDone Channel m a
chan String
reason = do
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Channel m a -> Bool
forall (m :: * -> *) a. Channel m a -> Bool
svarInspectMode Channel m a
chan) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
AbsTime
t <- Clock -> IO AbsTime
getTime Clock
Monotonic
IORef (Maybe AbsTime) -> Maybe AbsTime -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef (Maybe AbsTime)
svarStopTime (Channel m a -> SVarStats
forall (m :: * -> *) a. Channel m a -> SVarStats
svarStats Channel m a
chan)) (AbsTime -> Maybe AbsTime
forall a. a -> Maybe a
Just AbsTime
t)
IO String -> String -> IO ()
printSVar (Channel m a -> IO String
forall (m :: * -> *) a. Channel m a -> IO String
dumpChannel Channel m a
chan) String
reason
IO ()
performMajorGC
cleanupChan :: Channel m a -> String -> IO ()
cleanupChan :: forall (m :: * -> *) a. Channel m a -> String -> IO ()
cleanupChan Channel m a
chan String
reason = do
Bool
stopped <- MVar Bool -> IO Bool
forall a. MVar a -> IO a
takeMVar (Channel m a -> MVar Bool
forall (m :: * -> *) a. Channel m a -> MVar Bool
channelStopped Channel m a
chan)
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
stopped) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
atomicWriteIORef (Channel m a -> IORef Bool
forall (m :: * -> *) a. Channel m a -> IORef Bool
channelStopping Channel m a
chan) Bool
True
IO ()
go
Channel m a -> String -> IO ()
forall (m :: * -> *) a. Channel m a -> String -> IO ()
channelDone Channel m a
chan String
reason
MVar Bool -> Bool -> IO ()
forall a. MVar a -> a -> IO ()
putMVar (Channel m a -> MVar Bool
forall (m :: * -> *) a. Channel m a -> MVar Bool
channelStopped Channel m a
chan) Bool
True
where
go :: IO ()
go = do
([ChildEvent a], Int)
_ <- IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a. IORef ([a], Int) -> IO ([a], Int)
readOutputQBasic (Channel m a -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue Channel m a
chan)
Int
cnt <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (Channel m a -> IORef Int
forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount Channel m a
chan)
Set ThreadId
workers <-
IORef (Set ThreadId)
-> (Set ThreadId -> (Set ThreadId, Set ThreadId))
-> IO (Set ThreadId)
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS (Channel m a -> IORef (Set ThreadId)
forall (m :: * -> *) a. Channel m a -> IORef (Set ThreadId)
workerThreads Channel m a
chan) (\Set ThreadId
x -> (Set ThreadId
forall a. Set a
Set.empty,Set ThreadId
x))
(ThreadId -> IO ()) -> [ThreadId] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
Prelude.mapM_ (ThreadId -> ThreadAbort -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
`throwTo` ThreadAbort
ThreadAbort)
(Set ThreadId -> [ThreadId]
forall a. Set a -> [a]
Set.toList Set ThreadId
workers)
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
cnt Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
/= Int
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar (Channel m a -> MVar ()
forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
chan)
IO ()
go