-- |
-- Module      : Streamly.Internal.Data.Fold.Channel.Type
-- Copyright   : (c) 2017, 2022 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC

module Streamly.Internal.Data.Fold.Channel.Type
    (
    -- ** Type
      Channel (..)
    , OutEvent (..)

    -- ** Configuration
    , Config
    , defaultConfig
    , maxBuffer
    , boundThreads
    , inspect

    -- ** Operations
    , newChannelWith
    , newChannelWithScan
    , newChannel
    , newScanChannel
    , sendToWorker
    , sendToWorker_
    , checkFoldStatus -- XXX collectFoldOutput
    , dumpChannel
    , cleanup
    , finalize
    )
where

#include "inline.hs"

import Control.Concurrent (ThreadId, myThreadId, tryPutMVar)
import Control.Concurrent.MVar (MVar, newEmptyMVar, takeMVar)
import Control.Exception (SomeException(..))
import Control.Monad (void, when)
import Control.Monad.Catch (throwM)
import Control.Monad.IO.Class (MonadIO(..))
import Data.IORef (IORef, newIORef, readIORef, writeIORef)
import Data.List (intersperse)
import Streamly.Internal.Control.Concurrent
    (MonadAsync, MonadRunInIO, askRunInIO)
import Streamly.Internal.Control.ForkLifted (doForkWith)
import Streamly.Internal.Data.Atomics (writeBarrier)
import Streamly.Internal.Data.Fold (Fold(..))
import Streamly.Internal.Data.Scanl (Scanl(..))
import Streamly.Internal.Data.Channel.Dispatcher (dumpSVarStats)
import Streamly.Internal.Data.Channel.Worker (sendEvent)
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)

import qualified Streamly.Internal.Data.Fold as Fold
import qualified Streamly.Internal.Data.Stream as D

import Streamly.Internal.Data.Channel.Types

-- XXX We can make the fold evaluation concurrent by using a monoid for the
-- accumulator. It will then work in the same way as the stream evaluation, in
-- stream evaluation we dequeue the head and queue the tail, in folds we will
-- queue the accumulator and it will be picked by the next worker to accumulate
-- the next value.

data OutEvent b =
      FoldException ThreadId SomeException
    | FoldPartial b
    | FoldDone ThreadId b
    | FoldEOF ThreadId

-- | The fold driver thread queues the input of the fold in the 'inputQueue'
-- The driver rings the doorbell when the queue transitions from empty to
-- non-empty state.
--
-- The fold consumer thread dequeues the input items from the 'inputQueue' and
-- supplies them to the fold. When the fold is done the output of the fold is
-- placed in 'inputQueue' and 'outputDoorBell' is rung.
--
-- The fold driver thread keeps watching the 'outputQueue', if the fold has
-- terminated, it stops queueing the input to the 'inputQueue'
--
-- If the fold driver runs out of input it stops and waits for the fold to
-- drain the buffered input.
--
-- Driver thread ------>------Input Queue and Doorbell ----->-----Fold thread
--
-- Driver thread ------<------Output Queue and Doorbell-----<-----Fold thread
--
data Channel m a b = Channel
    {
    -- FORWARD FLOW: Flow of data from the driver to the consuming fold

    -- XXX Use a different type than ChildEvent. We can do with a simpler type
    -- in folds.

    -- | Input queue (messages, length).
    --
    -- [LOCKING] Frequent, locked access. Input is queued frequently by the
    -- driver and infrequently dequeued in chunks by the fold.
    --
      forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
inputQueue :: IORef ([ChildEvent a], Int)

      -- | The maximum size of the inputQueue allowed.
    , forall (m :: * -> *) a b. Channel m a b -> Limit
maxInputBuffer :: Limit

    -- | Doorbell is rung by the driver when 'inputQueue' transitions from
    -- empty to non-empty.
    --
    -- [LOCKING] Infrequent, MVar.
    , forall (m :: * -> *) a b. Channel m a b -> MVar ()
inputItemDoorBell :: MVar ()
    , forall (m :: * -> *) a b. Channel m a b -> IORef Bool
closedForInput :: IORef Bool

    -- | Doorbell to tell the driver that there is now space available in the
    -- 'inputQueue' and more items can be queued.
    , forall (m :: * -> *) a b. Channel m a b -> MVar ()
inputSpaceDoorBell :: MVar ()

    , forall (m :: * -> *) a b. Channel m a b -> m [ChildEvent a]
readInputQ :: m [ChildEvent a]

    -- | Final output and exceptions, if any, queued by the fold and read by
    -- the fold driver.
    --
    -- [LOCKING] atomicModifyIORef. Output is queued infrequently by the fold
    -- and read frequently by the driver.
    , forall (m :: * -> *) a b.
Channel m a b -> IORef ([OutEvent b], Int)
outputQueue :: IORef ([OutEvent b], Int)

    -- | Doorbell for the 'outputQueue', rung by the fold when the queue
    -- transitions from empty to non-empty.
    --
    -- [LOCKING] Infrequent, MVar.
    , forall (m :: * -> *) a b. Channel m a b -> MVar ()
outputDoorBell :: MVar ()

    -- cleanup: to track garbage collection of SVar --
    , forall (m :: * -> *) a b. Channel m a b -> Maybe (IORef ())
svarRef :: Maybe (IORef ())

    -- Stats --
    , forall (m :: * -> *) a b. Channel m a b -> SVarStats
svarStats :: SVarStats

    -- Diagnostics --
    , forall (m :: * -> *) a b. Channel m a b -> Bool
svarInspectMode :: Bool
    , forall (m :: * -> *) a b. Channel m a b -> ThreadId
svarCreator :: ThreadId
    }

-------------------------------------------------------------------------------
-- Config
-------------------------------------------------------------------------------

-- | An abstract type for specifying the configuration parameters of a
-- 'Channel'. Use @Config -> Config@ modifier functions to modify the default
-- configuration. See the individual modifier documentation for default values.
--
data Config = Config
    {
      Config -> Limit
_bufferHigh :: Limit
    , Config -> Bool
_inspect    :: Bool
    , Config -> Bool
_bound :: Bool
    }

-------------------------------------------------------------------------------
-- State defaults and reset
-------------------------------------------------------------------------------

defaultMaxBuffer :: Limit
defaultMaxBuffer :: Limit
defaultMaxBuffer = Word -> Limit
Limited Word
magicMaxBuffer

-- | The fields prefixed by an _ are not to be accessed or updated directly but
-- via smart accessor APIs. Use get/set routines instead of directly accessing
-- the Config fields
defaultConfig :: Config
defaultConfig :: Config
defaultConfig = Config
    {
      _bufferHigh :: Limit
_bufferHigh = Limit
defaultMaxBuffer
    , _inspect :: Bool
_inspect = Bool
False
    , _bound :: Bool
_bound = Bool
False
    }

-------------------------------------------------------------------------------
-- Smart get/set routines for State
-------------------------------------------------------------------------------

-- | Specify the maximum size of the buffer for storing the results from
-- concurrent computations. If the buffer becomes full we stop spawning more
-- concurrent tasks until there is space in the buffer.
-- A value of 0 resets the buffer size to default, a negative value means
-- there is no limit. The default value is 1500.
--
-- CAUTION! using an unbounded 'maxBuffer' value (i.e. a negative value)
-- coupled with an unbounded 'maxThreads' value is a recipe for disaster in
-- presence of infinite streams, or very large streams.  Especially, it must
-- not be used when 'pure' is used in 'ZipAsyncM' streams as 'pure' in
-- applicative zip streams generates an infinite stream causing unbounded
-- concurrent generation with no limit on the buffer or threads.
--
maxBuffer :: Int -> Config -> Config
maxBuffer :: Int -> Config -> Config
maxBuffer Int
n Config
st =
    Config
st { _bufferHigh =
            if n < 0
            then Unlimited
            else if n == 0
                 then defaultMaxBuffer
                 else Limited (fromIntegral n)
       }

getMaxBuffer :: Config -> Limit
getMaxBuffer :: Config -> Limit
getMaxBuffer = Config -> Limit
_bufferHigh

-- | Print debug information about the 'Channel' when the stream ends. When the
-- stream does not end normally, the channel debug information is printed when
-- the channel is garbage collected. If you are expecting but not seeing the
-- debug info try adding a 'performMajorGC' before the program ends.
--
inspect :: Bool -> Config -> Config
inspect :: Bool -> Config -> Config
inspect Bool
flag Config
st = Config
st { _inspect = flag }

getInspectMode :: Config -> Bool
getInspectMode :: Config -> Bool
getInspectMode = Config -> Bool
_inspect

-- | Spawn bound threads (i.e., spawn threads using 'forkOS' instead of
-- 'forkIO'). The default value is 'False'.
--
-- Currently, this only takes effect only for concurrent folds.
boundThreads :: Bool -> Config -> Config
boundThreads :: Bool -> Config -> Config
boundThreads Bool
flag Config
st = Config
st { _bound = flag }

getBound :: Config -> Bool
getBound :: Config -> Bool
getBound = Config -> Bool
_bound

-------------------------------------------------------------------------------
-- Inspection
-------------------------------------------------------------------------------

-- | Dump the channel stats for diagnostics. Used when 'inspect' option is
-- enabled.
{-# NOINLINE dumpChannel #-}
dumpChannel :: Channel m a b -> IO String
dumpChannel :: forall (m :: * -> *) a b. Channel m a b -> IO String
dumpChannel Channel m a b
sv = do
    [String]
xs <- [IO String] -> IO [String]
forall (t :: * -> *) (m :: * -> *) a.
(Traversable t, Monad m) =>
t (m a) -> m (t a)
forall (m :: * -> *) a. Monad m => [m a] -> m [a]
sequence ([IO String] -> IO [String]) -> [IO String] -> IO [String]
forall a b. (a -> b) -> a -> b
$ IO String -> [IO String] -> [IO String]
forall a. a -> [a] -> [a]
intersperse (String -> IO String
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return String
"\n")
        [ String -> IO String
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (ThreadId -> String
forall a. Show a => a -> String
dumpCreator (Channel m a b -> ThreadId
forall (m :: * -> *) a b. Channel m a b -> ThreadId
svarCreator Channel m a b
sv))
        , String -> IO String
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return String
"---------CURRENT STATE-----------"
        , IORef ([ChildEvent a], Int) -> IO String
forall (t :: * -> *) a1 a2.
(Foldable t, Show a1) =>
IORef (t a2, a1) -> IO String
dumpOutputQ (Channel m a b -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
inputQueue Channel m a b
sv)
        -- XXX print the types of events in the outputQueue, first 5
        , MVar () -> IO String
forall a. Show a => MVar a -> IO String
dumpDoorBell (Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
inputItemDoorBell Channel m a b
sv)
        , String -> IO String
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return String
"---------STATS-----------\n"
        , Bool -> Maybe YieldRateInfo -> SVarStats -> IO String
dumpSVarStats (Channel m a b -> Bool
forall (m :: * -> *) a b. Channel m a b -> Bool
svarInspectMode Channel m a b
sv) Maybe YieldRateInfo
forall a. Maybe a
Nothing (Channel m a b -> SVarStats
forall (m :: * -> *) a b. Channel m a b -> SVarStats
svarStats Channel m a b
sv)
        ]
    String -> IO String
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (String -> IO String) -> String -> IO String
forall a b. (a -> b) -> a -> b
$ [String] -> String
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat [String]
xs

-------------------------------------------------------------------------------
-- Support for running folds concurrently
-------------------------------------------------------------------------------

-- $concurrentFolds
--
-- To run folds concurrently, we need to decouple the fold execution from the
-- stream production. We use the SVar to do that, we have a single worker
-- pushing the stream elements to the SVar and on the consumer side a fold
-- driver pulls the values and folds them.
--
-- @
--
-- Fold worker <------Channel<------Fold driver
--     |  exceptions  |
--     --------------->
--
-- @
--
-- We need a channel for pushing exceptions from the fold worker to the fold
-- driver. The stream may be pushed to multiple folds at the same time. For
-- that we need one Channel per fold:
--
-- @
--
-- Fold worker <------Channel--
--                    |        |
-- Fold worker <------Channel------Driver
--                    |        |
-- Fold worker <------Channel--
--
-- @
--
-- Note: If the stream pusher terminates due to an exception, we do not
-- actively terminate the fold. It gets cleaned up by the GC.

-------------------------------------------------------------------------------
-- Process events received by a fold worker from a fold driver
-------------------------------------------------------------------------------

sendToDriver :: Channel m a b -> OutEvent b -> IO Int
sendToDriver :: forall (m :: * -> *) a b. Channel m a b -> OutEvent b -> IO Int
sendToDriver Channel m a b
sv OutEvent b
msg = do
    -- In case the producer stream is blocked on pushing to the fold buffer
    -- then wake it up so that it can check for the stop event or exception
    -- being sent to it otherwise we will be deadlocked.
    -- void $ tryPutMVar (pushBufferMVar sv) ()
    IORef ([OutEvent b], Int) -> MVar () -> OutEvent b -> IO Int
forall a. IORef ([a], Int) -> MVar () -> a -> IO Int
sendEvent (Channel m a b -> IORef ([OutEvent b], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([OutEvent b], Int)
outputQueue Channel m a b
sv)
                     (Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
outputDoorBell Channel m a b
sv) OutEvent b
msg

sendYieldToDriver :: MonadIO m => Channel m a b -> b -> m ()
sendYieldToDriver :: forall (m :: * -> *) a b. MonadIO m => Channel m a b -> b -> m ()
sendYieldToDriver Channel m a b
sv b
res = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
    ThreadId
tid <- IO ThreadId
myThreadId
    IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ Channel m a b -> OutEvent b -> IO Int
forall (m :: * -> *) a b. Channel m a b -> OutEvent b -> IO Int
sendToDriver Channel m a b
sv (ThreadId -> b -> OutEvent b
forall b. ThreadId -> b -> OutEvent b
FoldDone ThreadId
tid b
res)

sendPartialToDriver :: MonadIO m => Channel m a b -> b -> m ()
sendPartialToDriver :: forall (m :: * -> *) a b. MonadIO m => Channel m a b -> b -> m ()
sendPartialToDriver Channel m a b
sv b
res = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
    IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ Channel m a b -> OutEvent b -> IO Int
forall (m :: * -> *) a b. Channel m a b -> OutEvent b -> IO Int
sendToDriver Channel m a b
sv (b -> OutEvent b
forall b. b -> OutEvent b
FoldPartial b
res)

sendEOFToDriver :: MonadIO m => Channel m a b -> m ()
sendEOFToDriver :: forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m ()
sendEOFToDriver Channel m a b
sv = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
    ThreadId
tid <- IO ThreadId
myThreadId
    IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ Channel m a b -> OutEvent b -> IO Int
forall (m :: * -> *) a b. Channel m a b -> OutEvent b -> IO Int
sendToDriver Channel m a b
sv (ThreadId -> OutEvent b
forall b. ThreadId -> OutEvent b
FoldEOF ThreadId
tid)

{-# NOINLINE sendExceptionToDriver #-}
sendExceptionToDriver :: Channel m a b -> SomeException -> IO ()
sendExceptionToDriver :: forall (m :: * -> *) a b. Channel m a b -> SomeException -> IO ()
sendExceptionToDriver Channel m a b
sv SomeException
e = do
    ThreadId
tid <- IO ThreadId
myThreadId
    IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ Channel m a b -> OutEvent b -> IO Int
forall (m :: * -> *) a b. Channel m a b -> OutEvent b -> IO Int
sendToDriver Channel m a b
sv (ThreadId -> SomeException -> OutEvent b
forall b. ThreadId -> SomeException -> OutEvent b
FoldException ThreadId
tid SomeException
e)

data FromSVarState m a b =
      FromSVarRead (Channel m a b)
    | FromSVarLoop (Channel m a b) [ChildEvent a]

{-# INLINE_NORMAL fromInputQueue #-}
fromInputQueue :: MonadIO m => Channel m a b -> D.Stream m a
fromInputQueue :: forall (m :: * -> *) a b. MonadIO m => Channel m a b -> Stream m a
fromInputQueue Channel m a b
svar = (State StreamK m a
 -> FromSVarState m a b -> m (Step (FromSVarState m a b) a))
-> FromSVarState m a b -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m a
-> FromSVarState m a b -> m (Step (FromSVarState m a b) a)
forall {m :: * -> *} {p} {a} {b}.
Monad m =>
p -> FromSVarState m a b -> m (Step (FromSVarState m a b) a)
step (Channel m a b -> FromSVarState m a b
forall (m :: * -> *) a b. Channel m a b -> FromSVarState m a b
FromSVarRead Channel m a b
svar)

    where

    {-# INLINE_LATE step #-}
    step :: p -> FromSVarState m a b -> m (Step (FromSVarState m a b) a)
step p
_ (FromSVarRead Channel m a b
sv) = do
        [ChildEvent a]
list <- Channel m a b -> m [ChildEvent a]
forall (m :: * -> *) a b. Channel m a b -> m [ChildEvent a]
readInputQ Channel m a b
sv
        -- Reversing the output is important to guarantee that we process the
        -- outputs in the same order as they were generated by the constituent
        -- streams.
        Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a))
-> Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a)
forall a b. (a -> b) -> a -> b
$ FromSVarState m a b -> Step (FromSVarState m a b) a
forall s a. s -> Step s a
D.Skip (FromSVarState m a b -> Step (FromSVarState m a b) a)
-> FromSVarState m a b -> Step (FromSVarState m a b) a
forall a b. (a -> b) -> a -> b
$ Channel m a b -> [ChildEvent a] -> FromSVarState m a b
forall (m :: * -> *) a b.
Channel m a b -> [ChildEvent a] -> FromSVarState m a b
FromSVarLoop Channel m a b
sv ([ChildEvent a] -> [ChildEvent a]
forall a. [a] -> [a]
Prelude.reverse [ChildEvent a]
list)

    step p
_ (FromSVarLoop Channel m a b
sv []) = Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a))
-> Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a)
forall a b. (a -> b) -> a -> b
$ FromSVarState m a b -> Step (FromSVarState m a b) a
forall s a. s -> Step s a
D.Skip (FromSVarState m a b -> Step (FromSVarState m a b) a)
-> FromSVarState m a b -> Step (FromSVarState m a b) a
forall a b. (a -> b) -> a -> b
$ Channel m a b -> FromSVarState m a b
forall (m :: * -> *) a b. Channel m a b -> FromSVarState m a b
FromSVarRead Channel m a b
sv
    step p
_ (FromSVarLoop Channel m a b
sv (ChildEvent a
ev : [ChildEvent a]
es)) = do
        case ChildEvent a
ev of
            -- XXX Separate input and output events. Input events cannot have
            -- Stop event and output events cannot have ChildStopChannel
            -- event.
            ChildYield a
a -> Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a))
-> Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a)
forall a b. (a -> b) -> a -> b
$ a -> FromSVarState m a b -> Step (FromSVarState m a b) a
forall s a. a -> s -> Step s a
D.Yield a
a (Channel m a b -> [ChildEvent a] -> FromSVarState m a b
forall (m :: * -> *) a b.
Channel m a b -> [ChildEvent a] -> FromSVarState m a b
FromSVarLoop Channel m a b
sv [ChildEvent a]
es)
            ChildEvent a
ChildStopChannel -> Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Step (FromSVarState m a b) a
forall s a. Step s a
D.Stop
            ChildEvent a
_ -> m (Step (FromSVarState m a b) a)
forall a. HasCallStack => a
undefined

{-# INLINE readInputQChan #-}
readInputQChan :: Channel m a b -> IO ([ChildEvent a], Int)
readInputQChan :: forall (m :: * -> *) a b. Channel m a b -> IO ([ChildEvent a], Int)
readInputQChan Channel m a b
chan = do
    let ss :: Maybe SVarStats
ss = if Channel m a b -> Bool
forall (m :: * -> *) a b. Channel m a b -> Bool
svarInspectMode Channel m a b
chan then SVarStats -> Maybe SVarStats
forall a. a -> Maybe a
Just (Channel m a b -> SVarStats
forall (m :: * -> *) a b. Channel m a b -> SVarStats
svarStats Channel m a b
chan) else Maybe SVarStats
forall a. Maybe a
Nothing
    r :: ([ChildEvent a], Int)
r@([ChildEvent a]
_, Int
n) <- IORef ([ChildEvent a], Int)
-> Maybe SVarStats -> IO ([ChildEvent a], Int)
forall a.
IORef ([ChildEvent a], Int)
-> Maybe SVarStats -> IO ([ChildEvent a], Int)
readOutputQRaw (Channel m a b -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
inputQueue Channel m a b
chan) Maybe SVarStats
ss
    if Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0
    then do
        IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
            (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Bool -> IO String -> String -> IO () -> IO ()
withDiagMVar
                (Channel m a b -> Bool
forall (m :: * -> *) a b. Channel m a b -> Bool
svarInspectMode Channel m a b
chan)
                (Channel m a b -> IO String
forall (m :: * -> *) a b. Channel m a b -> IO String
dumpChannel Channel m a b
chan)
                String
"readInputQChan: nothing to do"
            (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar (Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
inputItemDoorBell Channel m a b
chan)
        IORef ([ChildEvent a], Int)
-> Maybe SVarStats -> IO ([ChildEvent a], Int)
forall a.
IORef ([ChildEvent a], Int)
-> Maybe SVarStats -> IO ([ChildEvent a], Int)
readOutputQRaw (Channel m a b -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
inputQueue Channel m a b
chan) Maybe SVarStats
ss
    else ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ([ChildEvent a], Int)
r

{-# INLINE readInputQWithDB #-}
readInputQWithDB :: Channel m a b -> IO ([ChildEvent a], Int)
readInputQWithDB :: forall (m :: * -> *) a b. Channel m a b -> IO ([ChildEvent a], Int)
readInputQWithDB Channel m a b
chan = do
    ([ChildEvent a], Int)
r <- Channel m a b -> IO ([ChildEvent a], Int)
forall (m :: * -> *) a b. Channel m a b -> IO ([ChildEvent a], Int)
readInputQChan Channel m a b
chan
    -- XXX We can do this only if needed, if someone sleeps because of buffer
    -- then they can set a flag and we ring the doorbell only if the flag is
    -- set. Like we do in sendWorkerWait for streams.
    Bool
_ <- MVar () -> () -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar (Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
inputSpaceDoorBell Channel m a b
chan) ()
    ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ([ChildEvent a], Int)
r

mkNewChannelWith :: forall m a b. MonadIO m =>
       IORef ([OutEvent b], Int)
    -> MVar ()
    -> Config
    -> IO (Channel m a b)
mkNewChannelWith :: forall (m :: * -> *) a b.
MonadIO m =>
IORef ([OutEvent b], Int)
-> MVar () -> Config -> IO (Channel m a b)
mkNewChannelWith IORef ([OutEvent b], Int)
outQRev MVar ()
outQMvRev Config
cfg = do
    IORef ([ChildEvent a], Int)
outQ <- ([ChildEvent a], Int) -> IO (IORef ([ChildEvent a], Int))
forall a. a -> IO (IORef a)
newIORef ([], Int
0)
    MVar ()
outQMv <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
    MVar ()
bufferMv <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
    IORef Bool
ref <- Bool -> IO (IORef Bool)
forall a. a -> IO (IORef a)
newIORef Bool
False

    SVarStats
stats <- IO SVarStats
newSVarStats
    ThreadId
tid <- IO ThreadId
myThreadId

    let getSVar :: Channel m a b -> Channel m a b
        getSVar :: Channel m a b -> Channel m a b
getSVar Channel m a b
sv = Channel
            { inputQueue :: IORef ([ChildEvent a], Int)
inputQueue      = IORef ([ChildEvent a], Int)
outQ
            , inputItemDoorBell :: MVar ()
inputItemDoorBell   = MVar ()
outQMv
            , outputQueue :: IORef ([OutEvent b], Int)
outputQueue = IORef ([OutEvent b], Int)
outQRev
            , outputDoorBell :: MVar ()
outputDoorBell = MVar ()
outQMvRev
            , inputSpaceDoorBell :: MVar ()
inputSpaceDoorBell = MVar ()
bufferMv
            , closedForInput :: IORef Bool
closedForInput = IORef Bool
ref
            , maxInputBuffer :: Limit
maxInputBuffer   = Config -> Limit
getMaxBuffer Config
cfg
            , readInputQ :: m [ChildEvent a]
readInputQ      = IO [ChildEvent a] -> m [ChildEvent a]
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [ChildEvent a] -> m [ChildEvent a])
-> IO [ChildEvent a] -> m [ChildEvent a]
forall a b. (a -> b) -> a -> b
$ (([ChildEvent a], Int) -> [ChildEvent a])
-> IO ([ChildEvent a], Int) -> IO [ChildEvent a]
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ([ChildEvent a], Int) -> [ChildEvent a]
forall a b. (a, b) -> a
fst (Channel m a b -> IO ([ChildEvent a], Int)
forall (m :: * -> *) a b. Channel m a b -> IO ([ChildEvent a], Int)
readInputQWithDB Channel m a b
sv)
            , svarRef :: Maybe (IORef ())
svarRef          = Maybe (IORef ())
forall a. Maybe a
Nothing
            , svarInspectMode :: Bool
svarInspectMode  = Config -> Bool
getInspectMode Config
cfg
            , svarCreator :: ThreadId
svarCreator      = ThreadId
tid
            , svarStats :: SVarStats
svarStats        = SVarStats
stats
            }

    let sv :: Channel m a b
sv = Channel m a b -> Channel m a b
getSVar Channel m a b
sv in Channel m a b -> IO (Channel m a b)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Channel m a b
sv

{-# INLINABLE newChannelWith #-}
{-# SPECIALIZE newChannelWith ::
       IORef ([OutEvent b], Int)
    -> MVar ()
    -> (Config -> Config)
    -> Fold IO a b
    -> IO (Channel IO a b, ThreadId) #-}
newChannelWith :: (MonadRunInIO m) =>
       IORef ([OutEvent b], Int)
    -> MVar ()
    -> (Config -> Config)
    -> Fold m a b
    -> m (Channel m a b, ThreadId)
newChannelWith :: forall (m :: * -> *) b a.
MonadRunInIO m =>
IORef ([OutEvent b], Int)
-> MVar ()
-> (Config -> Config)
-> Fold m a b
-> m (Channel m a b, ThreadId)
newChannelWith IORef ([OutEvent b], Int)
outq MVar ()
outqDBell Config -> Config
modifier Fold m a b
f = do
    let config :: Config
config = Config -> Config
modifier Config
defaultConfig
    Channel m a b
sv <- IO (Channel m a b) -> m (Channel m a b)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Channel m a b) -> m (Channel m a b))
-> IO (Channel m a b) -> m (Channel m a b)
forall a b. (a -> b) -> a -> b
$ IORef ([OutEvent b], Int)
-> MVar () -> Config -> IO (Channel m a b)
forall (m :: * -> *) a b.
MonadIO m =>
IORef ([OutEvent b], Int)
-> MVar () -> Config -> IO (Channel m a b)
mkNewChannelWith IORef ([OutEvent b], Int)
outq MVar ()
outqDBell Config
config
    RunInIO m
mrun <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
    ThreadId
tid <- Bool -> m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
forall (m :: * -> *).
MonadRunInIO m =>
Bool -> m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doForkWith
        (Config -> Bool
getBound Config
config) (Channel m a b -> m ()
work Channel m a b
sv) RunInIO m
mrun (Channel m a b -> SomeException -> IO ()
forall (m :: * -> *) a b. Channel m a b -> SomeException -> IO ()
sendExceptionToDriver Channel m a b
sv)
    (Channel m a b, ThreadId) -> m (Channel m a b, ThreadId)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Channel m a b
sv, ThreadId
tid)

    where

    {-# NOINLINE work #-}
    work :: Channel m a b -> m ()
work Channel m a b
chan =
        let f1 :: Fold m a ()
f1 = (b -> m ()) -> Fold m a b -> Fold m a ()
forall (m :: * -> *) b c a.
Monad m =>
(b -> m c) -> Fold m a b -> Fold m a c
Fold.rmapM (m () -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m () -> m ()) -> (b -> m ()) -> b -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Channel m a b -> b -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> b -> m ()
sendYieldToDriver Channel m a b
chan) Fold m a b
f
         in Fold m a () -> Stream m a -> m ()
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
D.fold Fold m a ()
f1 (Stream m a -> m ()) -> Stream m a -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a b -> Stream m a
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> Stream m a
fromInputQueue Channel m a b
chan

-- | Returns True if the fold terminated due to completion and False when due
-- to end-of-stream.
{-# INLINE scanToChannel #-}
scanToChannel :: MonadIO m => Channel m a b -> Scanl m a b -> Fold m a Bool
scanToChannel :: forall (m :: * -> *) a b.
MonadIO m =>
Channel m a b -> Scanl m a b -> Fold m a Bool
scanToChannel Channel m a b
chan (Scanl s -> a -> m (Step s b)
step m (Step s b)
initial s -> m b
extract s -> m b
final) =
    (s -> a -> m (Step s Bool))
-> m (Step s Bool)
-> (s -> m Bool)
-> (s -> m Bool)
-> Fold m a Bool
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> (s -> m b) -> Fold m a b
Fold s -> a -> m (Step s Bool)
step1 m (Step s Bool)
initial1 s -> m Bool
forall {p} {a}. p -> a
extract1 s -> m Bool
final1

    where

    initial1 :: m (Step s Bool)
initial1 = do
        Step s b
r <- m (Step s b)
initial
        case Step s b
r of
            Fold.Partial s
s -> do
                b
b <- s -> m b
extract s
s
                m () -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a b -> b -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> b -> m ()
sendPartialToDriver Channel m a b
chan b
b
                Step s Bool -> m (Step s Bool)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s Bool -> m (Step s Bool)) -> Step s Bool -> m (Step s Bool)
forall a b. (a -> b) -> a -> b
$ s -> Step s Bool
forall s b. s -> Step s b
Fold.Partial s
s
            Fold.Done b
b -> do
                Channel m a b -> b -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> b -> m ()
sendYieldToDriver Channel m a b
chan b
b
                Step s Bool -> m (Step s Bool)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s Bool -> m (Step s Bool)) -> Step s Bool -> m (Step s Bool)
forall a b. (a -> b) -> a -> b
$ Bool -> Step s Bool
forall s b. b -> Step s b
Fold.Done Bool
True

    step1 :: s -> a -> m (Step s Bool)
step1 s
st a
x = do
        Step s b
r <- s -> a -> m (Step s b)
step s
st a
x
        case Step s b
r of
            Fold.Partial s
s -> do
                b
b <- s -> m b
extract s
s
                m () -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a b -> b -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> b -> m ()
sendPartialToDriver Channel m a b
chan b
b
                Step s Bool -> m (Step s Bool)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s Bool -> m (Step s Bool)) -> Step s Bool -> m (Step s Bool)
forall a b. (a -> b) -> a -> b
$ s -> Step s Bool
forall s b. s -> Step s b
Fold.Partial s
s
            Fold.Done b
b -> do
                Channel m a b -> b -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> b -> m ()
sendYieldToDriver Channel m a b
chan b
b
                Step s Bool -> m (Step s Bool)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s Bool -> m (Step s Bool)) -> Step s Bool -> m (Step s Bool)
forall a b. (a -> b) -> a -> b
$ Bool -> Step s Bool
forall s b. b -> Step s b
Fold.Done Bool
True

    extract1 :: p -> a
extract1 p
_ = String -> a
forall a. HasCallStack => String -> a
error String
"extract: not supported by folds"

    -- XXX Should we not discard the result?
    final1 :: s -> m Bool
final1 s
st = do
        m b -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (s -> m b
final s
st)
        Bool -> m Bool
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False

{-# INLINABLE newChannelWithScan #-}
{-# SPECIALIZE newChannelWithScan ::
       IORef ([OutEvent b], Int)
    -> MVar ()
    -> (Config -> Config)
    -> Scanl IO a b
    -> IO (Channel IO a b, ThreadId) #-}
newChannelWithScan :: (MonadRunInIO m) =>
       IORef ([OutEvent b], Int)
    -> MVar ()
    -> (Config -> Config)
    -> Scanl m a b
    -> m (Channel m a b, ThreadId)
newChannelWithScan :: forall (m :: * -> *) b a.
MonadRunInIO m =>
IORef ([OutEvent b], Int)
-> MVar ()
-> (Config -> Config)
-> Scanl m a b
-> m (Channel m a b, ThreadId)
newChannelWithScan IORef ([OutEvent b], Int)
outq MVar ()
outqDBell Config -> Config
modifier Scanl m a b
f = do
    let config :: Config
config = Config -> Config
modifier Config
defaultConfig
    Channel m a b
sv <- IO (Channel m a b) -> m (Channel m a b)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Channel m a b) -> m (Channel m a b))
-> IO (Channel m a b) -> m (Channel m a b)
forall a b. (a -> b) -> a -> b
$ IORef ([OutEvent b], Int)
-> MVar () -> Config -> IO (Channel m a b)
forall (m :: * -> *) a b.
MonadIO m =>
IORef ([OutEvent b], Int)
-> MVar () -> Config -> IO (Channel m a b)
mkNewChannelWith IORef ([OutEvent b], Int)
outq MVar ()
outqDBell Config
config
    RunInIO m
mrun <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
    ThreadId
tid <- Bool -> m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
forall (m :: * -> *).
MonadRunInIO m =>
Bool -> m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doForkWith
        (Config -> Bool
getBound Config
config) (Channel m a b -> m ()
work Channel m a b
sv) RunInIO m
mrun (Channel m a b -> SomeException -> IO ()
forall (m :: * -> *) a b. Channel m a b -> SomeException -> IO ()
sendExceptionToDriver Channel m a b
sv)
    (Channel m a b, ThreadId) -> m (Channel m a b, ThreadId)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Channel m a b
sv, ThreadId
tid)

    where

    {-# NOINLINE work #-}
    work :: Channel m a b -> m ()
work Channel m a b
chan = do
        Bool
completed <- Fold m a Bool -> Stream m a -> m Bool
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
D.fold (Channel m a b -> Scanl m a b -> Fold m a Bool
forall (m :: * -> *) a b.
MonadIO m =>
Channel m a b -> Scanl m a b -> Fold m a Bool
scanToChannel Channel m a b
chan Scanl m a b
f) (Channel m a b -> Stream m a
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> Stream m a
fromInputQueue Channel m a b
chan)
        -- We check for only one item in the outputqueue, for example in
        -- parTeeWith, multiple messages can make that complicated. Therefore,
        -- we first check if we already sent a FoldDone.
        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
completed) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a b -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m ()
sendEOFToDriver Channel m a b
chan
        IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (Channel m a b -> IORef Bool
forall (m :: * -> *) a b. Channel m a b -> IORef Bool
closedForInput Channel m a b
chan) Bool
True
        IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ()
writeBarrier
        m Bool -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Bool -> m ()) -> m Bool -> m ()
forall a b. (a -> b) -> a -> b
$ IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar (Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
inputSpaceDoorBell Channel m a b
chan) ()

{-# INLINABLE newChannel #-}
{-# SPECIALIZE newChannel ::
    (Config -> Config) -> Fold IO a b -> IO (Channel IO a b) #-}
newChannel :: (MonadRunInIO m) =>
    (Config -> Config) -> Fold m a b -> m (Channel m a b)
newChannel :: forall (m :: * -> *) a b.
MonadRunInIO m =>
(Config -> Config) -> Fold m a b -> m (Channel m a b)
newChannel Config -> Config
modifier Fold m a b
f = do
    IORef ([OutEvent b], Int)
outQRev <- IO (IORef ([OutEvent b], Int)) -> m (IORef ([OutEvent b], Int))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef ([OutEvent b], Int)) -> m (IORef ([OutEvent b], Int)))
-> IO (IORef ([OutEvent b], Int)) -> m (IORef ([OutEvent b], Int))
forall a b. (a -> b) -> a -> b
$ ([OutEvent b], Int) -> IO (IORef ([OutEvent b], Int))
forall a. a -> IO (IORef a)
newIORef ([], Int
0)
    MVar ()
outQMvRev <- IO (MVar ()) -> m (MVar ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
    ((Channel m a b, ThreadId) -> Channel m a b)
-> m (Channel m a b, ThreadId) -> m (Channel m a b)
forall a b. (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Channel m a b, ThreadId) -> Channel m a b
forall a b. (a, b) -> a
fst (IORef ([OutEvent b], Int)
-> MVar ()
-> (Config -> Config)
-> Fold m a b
-> m (Channel m a b, ThreadId)
forall (m :: * -> *) b a.
MonadRunInIO m =>
IORef ([OutEvent b], Int)
-> MVar ()
-> (Config -> Config)
-> Fold m a b
-> m (Channel m a b, ThreadId)
newChannelWith IORef ([OutEvent b], Int)
outQRev MVar ()
outQMvRev Config -> Config
modifier Fold m a b
f)

{-# INLINABLE newScanChannel #-}
{-# SPECIALIZE newScanChannel ::
    (Config -> Config) -> Scanl IO a b -> IO (Channel IO a b) #-}
newScanChannel :: (MonadRunInIO m) =>
    (Config -> Config) -> Scanl m a b -> m (Channel m a b)
newScanChannel :: forall (m :: * -> *) a b.
MonadRunInIO m =>
(Config -> Config) -> Scanl m a b -> m (Channel m a b)
newScanChannel Config -> Config
modifier Scanl m a b
f = do
    IORef ([OutEvent b], Int)
outQRev <- IO (IORef ([OutEvent b], Int)) -> m (IORef ([OutEvent b], Int))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef ([OutEvent b], Int)) -> m (IORef ([OutEvent b], Int)))
-> IO (IORef ([OutEvent b], Int)) -> m (IORef ([OutEvent b], Int))
forall a b. (a -> b) -> a -> b
$ ([OutEvent b], Int) -> IO (IORef ([OutEvent b], Int))
forall a. a -> IO (IORef a)
newIORef ([], Int
0)
    MVar ()
outQMvRev <- IO (MVar ()) -> m (MVar ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
    ((Channel m a b, ThreadId) -> Channel m a b)
-> m (Channel m a b, ThreadId) -> m (Channel m a b)
forall a b. (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Channel m a b, ThreadId) -> Channel m a b
forall a b. (a, b) -> a
fst (IORef ([OutEvent b], Int)
-> MVar ()
-> (Config -> Config)
-> Scanl m a b
-> m (Channel m a b, ThreadId)
forall (m :: * -> *) b a.
MonadRunInIO m =>
IORef ([OutEvent b], Int)
-> MVar ()
-> (Config -> Config)
-> Scanl m a b
-> m (Channel m a b, ThreadId)
newChannelWithScan IORef ([OutEvent b], Int)
outQRev MVar ()
outQMvRev Config -> Config
modifier Scanl m a b
f)

-------------------------------------------------------------------------------
-- Process events received by the driver thread from the fold worker side
-------------------------------------------------------------------------------

-- XXX currently only one event is sent by a fold consumer to the stream
-- producer. But we can potentially have multiple events e.g. the fold step can
-- generate exception more than once and the producer can ignore those
-- exceptions or handle them and still keep driving the fold.

-- XXX In case of scan this could be a stream.

-- | Poll for events sent by the fold worker to the fold driver. The fold
-- consumer can send a "Stop" event or an exception. When a "Stop" is received
-- this function returns 'True'. If an exception is recieved then it throws the
-- exception.
--
{-# NOINLINE checkFoldStatus #-}
checkFoldStatus :: MonadAsync m => Channel m a b -> m (Maybe b)
checkFoldStatus :: forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> m (Maybe b)
checkFoldStatus Channel m a b
sv = do
    ([OutEvent b]
list, Int
_) <- IO ([OutEvent b], Int) -> m ([OutEvent b], Int)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([OutEvent b], Int) -> m ([OutEvent b], Int))
-> IO ([OutEvent b], Int) -> m ([OutEvent b], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([OutEvent b], Int) -> IO ([OutEvent b], Int)
forall a. IORef ([a], Int) -> IO ([a], Int)
readOutputQBasic (Channel m a b -> IORef ([OutEvent b], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([OutEvent b], Int)
outputQueue Channel m a b
sv)
    -- Reversing the output is important to guarantee that we process the
    -- outputs in the same order as they were generated by the constituent
    -- streams.
    [OutEvent b] -> m (Maybe b)
forall {m :: * -> *} {a}.
MonadThrow m =>
[OutEvent a] -> m (Maybe a)
processEvents ([OutEvent b] -> m (Maybe b)) -> [OutEvent b] -> m (Maybe b)
forall a b. (a -> b) -> a -> b
$ [OutEvent b] -> [OutEvent b]
forall a. [a] -> [a]
reverse [OutEvent b]
list

    where

    {-# INLINE processEvents #-}
    processEvents :: [OutEvent a] -> m (Maybe a)
processEvents [] = Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
    processEvents (OutEvent a
ev : [OutEvent a]
_) = do
        case OutEvent a
ev of
            FoldException ThreadId
_ SomeException
e -> SomeException -> m (Maybe a)
forall e a. (HasCallStack, Exception e) => e -> m a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM SomeException
e
            FoldDone ThreadId
_ a
b -> Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
b)
            FoldPartial a
_ ->
                String -> m (Maybe a)
forall a. HasCallStack => String -> a
error String
"checkFoldStatus: FoldPartial can occur only for scans"
            FoldEOF ThreadId
_ ->
                String -> m (Maybe a)
forall a. HasCallStack => String -> a
error String
"checkFoldStatus: FoldEOF can occur only for scans"

{-# INLINE isBufferAvailable #-}
isBufferAvailable :: MonadIO m => Channel m a b -> m Bool
isBufferAvailable :: forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m Bool
isBufferAvailable Channel m a b
sv = do
    let limit :: Limit
limit = Channel m a b -> Limit
forall (m :: * -> *) a b. Channel m a b -> Limit
maxInputBuffer Channel m a b
sv
    case Limit
limit of
        Limit
Unlimited -> Bool -> m Bool
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
        Limited Word
lim -> do
            ([ChildEvent a]
_, Int
n) <- IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int))
-> IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a. IORef a -> IO a
readIORef (Channel m a b -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
inputQueue Channel m a b
sv)
            Bool -> m Bool
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> m Bool) -> Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Word -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
lim Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
n

-- | Push values from a driver to a fold worker via a Channel. Blocks if no
-- space is available in the buffer. Before pushing a value to the Channel it
-- polls for events received from the fold worker.  If a stop event is received
-- then it returns 'True' otherwise false.  Propagates exceptions received from
-- the fold worker.
--
{-# INLINE sendToWorker #-}
sendToWorker :: MonadAsync m => Channel m a b -> a -> m (Maybe b)
sendToWorker :: forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> a -> m (Maybe b)
sendToWorker Channel m a b
chan a
a = m (Maybe b)
go

    where

    -- Recursive function, should we use SPEC?
    go :: m (Maybe b)
go = do
        let qref :: IORef ([OutEvent b], Int)
qref = Channel m a b -> IORef ([OutEvent b], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([OutEvent b], Int)
outputQueue Channel m a b
chan
        Maybe b
status <- do
            ([OutEvent b]
_, Int
n) <- IO ([OutEvent b], Int) -> m ([OutEvent b], Int)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([OutEvent b], Int) -> m ([OutEvent b], Int))
-> IO ([OutEvent b], Int) -> m ([OutEvent b], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([OutEvent b], Int) -> IO ([OutEvent b], Int)
forall a. IORef a -> IO a
readIORef IORef ([OutEvent b], Int)
qref
            if Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
            then Channel m a b -> m (Maybe b)
forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> m (Maybe b)
checkFoldStatus Channel m a b
chan
            else Maybe b -> m (Maybe b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe b
forall a. Maybe a
Nothing
        case Maybe b
status of
            Just b
_ -> Maybe b -> m (Maybe b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe b
status
            Maybe b
Nothing -> do
                    Bool
r <- Channel m a b -> m Bool
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m Bool
isBufferAvailable Channel m a b
chan
                    if Bool
r
                    then do
                        IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
                            (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void
                            (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
forall a. IORef ([a], Int) -> MVar () -> a -> IO Int
sendEvent
                                (Channel m a b -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
inputQueue Channel m a b
chan)
                                (Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
inputItemDoorBell Channel m a b
chan)
                                (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
                        Maybe b -> m (Maybe b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe b
forall a. Maybe a
Nothing
                    else do
                        () <- IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar (Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
inputSpaceDoorBell Channel m a b
chan)
                        m (Maybe b)
go

-- | Like sendToWorker but only sends, does not receive any events from the
-- fold.
{-# INLINE sendToWorker_ #-}
sendToWorker_ :: MonadAsync m => Channel m a b -> a -> m ()
sendToWorker_ :: forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> a -> m ()
sendToWorker_ Channel m a b
chan a
a = m ()
go

    where

    -- Recursive function, should we use SPEC?
    go :: m ()
go = do
        Bool
r <- Channel m a b -> m Bool
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m Bool
isBufferAvailable Channel m a b
chan
        if Bool
r
        then do
            IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
                (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void
                (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
forall a. IORef ([a], Int) -> MVar () -> a -> IO Int
sendEvent
                    (Channel m a b -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
inputQueue Channel m a b
chan)
                    (Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
inputItemDoorBell Channel m a b
chan)
                    (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
        else do
            -- Block for space
            () <- IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar (Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
inputSpaceDoorBell Channel m a b
chan)
            Bool
closed <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef (Channel m a b -> IORef Bool
forall (m :: * -> *) a b. Channel m a b -> IORef Bool
closedForInput Channel m a b
chan)
            Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
closed) m ()
go

-- XXX Cleanup the fold if the stream is interrupted. Add a GC hook.

cleanup :: MonadIO m => Channel m a b -> m ()
cleanup :: forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m ()
cleanup Channel m a b
chan = do
    Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Channel m a b -> Bool
forall (m :: * -> *) a b. Channel m a b -> Bool
svarInspectMode Channel m a b
chan) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
        AbsTime
t <- Clock -> IO AbsTime
getTime Clock
Monotonic
        IORef (Maybe AbsTime) -> Maybe AbsTime -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef (Maybe AbsTime)
svarStopTime (Channel m a b -> SVarStats
forall (m :: * -> *) a b. Channel m a b -> SVarStats
svarStats Channel m a b
chan)) (AbsTime -> Maybe AbsTime
forall a. a -> Maybe a
Just AbsTime
t)
        IO String -> String -> IO ()
printSVar (Channel m a b -> IO String
forall (m :: * -> *) a b. Channel m a b -> IO String
dumpChannel Channel m a b
chan) String
"Scan channel done"

finalize :: MonadIO m => Channel m a b -> m ()
finalize :: forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m ()
finalize Channel m a b
chan = do
    IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void
        (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
forall a. IORef ([a], Int) -> MVar () -> a -> IO Int
sendEvent
            (Channel m a b -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
inputQueue Channel m a b
chan)
            (Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
inputItemDoorBell Channel m a b
chan)
            ChildEvent a
forall a. ChildEvent a
ChildStopChannel