#ifdef __HADDOCK_VERSION__
#undef INSPECTION
#endif

#ifdef INSPECTION
{-# LANGUAGE TemplateHaskell #-}
{-# OPTIONS_GHC -fplugin Test.Inspection.Plugin #-}
#endif

-- |
-- Module      : Streamly.Internal.Data.Stream.Channel.Operations
-- Copyright   : (c) 2017 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC

module Streamly.Internal.Data.Stream.Channel.Operations
    (
    -- *** Reading Stream
      fromChannelK
    , fromChannel

    -- ** Enqueuing Work
    , toChannelK
    , toChannel
    )
where

#include "inline.hs"

import Control.Exception (fromException, displayException)
import Control.Monad (when)
import Control.Monad.Catch (throwM, MonadThrow)
import Control.Monad.IO.Class (MonadIO(liftIO))
#if __GLASGOW_HASKELL__ >= 810
import Data.Kind (Type)
#endif
import Data.IORef (newIORef, mkWeakIORef, writeIORef)
import Streamly.Internal.Control.Concurrent
    (MonadAsync, MonadRunInIO, askRunInIO)
import Streamly.Internal.Data.Stream (Stream)
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)

import qualified Streamly.Internal.Data.Stream as Stream
import qualified Streamly.Internal.Data.Stream as D
import qualified Streamly.Internal.Data.StreamK as K

import Streamly.Internal.Data.Channel.Types
import Streamly.Internal.Data.Stream.Channel.Dispatcher
import Streamly.Internal.Data.Stream.Channel.Type hiding (inspect)

import Prelude hiding (map, concat, concatMap)

#ifdef INSPECTION
import Control.Exception (Exception)
import Control.Monad.Trans.Control (MonadBaseControl)
import Data.Typeable (Typeable)
import Test.Inspection (inspect, hasNoTypeClassesExcept)
#endif

------------------------------------------------------------------------------
-- Generating streams from a channel
------------------------------------------------------------------------------

-- $concurrentEval
--
-- Usually a channel is used to concurrently evaluate multiple actions in a
-- stream using many worker threads that push the results to the channel and a
-- single puller that pulls them from channel generating the evaluated stream.
--
-- @
--                  input stream
--                       |
--     <-----------------|<--------worker
--     |  exceptions     |
-- output stream <---Channel<------worker
--                       |
--                       |<--------worker
--
-- @
--
-- The puller itself schedules the worker threads based on demand.
-- Exceptions are propagated from the worker threads to the puller.

-------------------------------------------------------------------------------
-- Write a stream to a channel
-------------------------------------------------------------------------------

-- XXX Should be a Fold, singleton API could be called joinChannel, or the fold
-- can be called joinChannel.
-- XXX If we use toChannelK multiple times on a channel make sure the channel
-- does not go away before we use the subsequent ones.

-- | High level function to enqueue a work item on the channel. The fundamental
-- unit of work is a stream. Each stream enqueued on the channel is picked up
-- and evaluated by a worker thread. The worker evaluates the stream it picked
-- up serially. When multiple streams are queued on the channel each stream can
-- be evaluated concurrently by different workers.
--
-- Note that the items in each stream are not concurrently evaluated, streams
-- are fundamentally serial, therefore, elements in one particular stream will
-- be generated serially one after the other. Only two or more streams can be
-- run concurrently with each other.
--
-- See 'chanConcatMapK' for concurrent evaluation of each element of a stream.
-- Alternatively, you can wrap each element of the original stream into a
-- stream generating action and queue all those streams on the channel. Then
-- all of them would be evaluated concurrently. However, that would not be
-- streaming in nature, it would require buffering space for the entire
-- original stream. Prefer 'chanConcatMapK' for larger streams.
--
-- Items from each evaluated streams are queued to the same output queue of the
-- channel which can be read using 'fromChannelK'. 'toChannelK' can be called
-- multiple times to enqueue multiple streams on the channel.
--
{-# INLINE toChannelK #-}
toChannelK :: MonadRunInIO m => Channel m a -> K.StreamK m a -> m ()
toChannelK :: forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> StreamK m a -> m ()
toChannelK Channel m a
chan StreamK m a
m = do
    RunInIO m
runIn <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
    IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a -> (RunInIO m, StreamK m a) -> IO ()
forall (m :: * -> *) a.
Channel m a -> (RunInIO m, StreamK m a) -> IO ()
enqueue Channel m a
chan (RunInIO m
runIn, StreamK m a
m)

-- INLINE for fromStreamK/toStreamK fusion

-- | A wrapper over 'toChannelK' for 'Stream' type.
{-# INLINE toChannel #-}
toChannel :: MonadRunInIO m => Channel m a -> Stream m a -> m ()
toChannel :: forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> Stream m a -> m ()
toChannel Channel m a
chan = Channel m a -> StreamK m a -> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> StreamK m a -> m ()
toChannelK Channel m a
chan (StreamK m a -> m ())
-> (Stream m a -> StreamK m a) -> Stream m a -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream m a -> StreamK m a
forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
Stream.toStreamK

{-
-- | Send a stream of streams to a concurrent channel for evaluation.
{-# INLINE joinChannel #-}
joinChannel :: Channel m a -> Fold m (Stream m a) ()
joinChannel = undefined
-}

-------------------------------------------------------------------------------
-- Read a stream from a channel
-------------------------------------------------------------------------------

-- | Pull a stream from an SVar.
{-# NOINLINE fromChannelRaw #-}
fromChannelRaw :: (MonadIO m, MonadThrow m) => Channel m a -> K.StreamK m a
fromChannelRaw :: forall (m :: * -> *) a.
(MonadIO m, MonadThrow m) =>
Channel m a -> StreamK m a
fromChannelRaw Channel m a
sv = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.MkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp -> do
    [ChildEvent a]
list <- Channel m a -> m [ChildEvent a]
forall (m :: * -> *) a. Channel m a -> m [ChildEvent a]
readOutputQ Channel m a
sv
    -- Reversing the output is important to guarantee that we process the
    -- outputs in the same order as they were generated by the constituent
    -- streams.
    State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp (StreamK m a -> m r) -> StreamK m a -> m r
forall a b. (a -> b) -> a -> b
$ [ChildEvent a] -> StreamK m a
processEvents ([ChildEvent a] -> StreamK m a) -> [ChildEvent a] -> StreamK m a
forall a b. (a -> b) -> a -> b
$ [ChildEvent a] -> [ChildEvent a]
forall a. [a] -> [a]
reverse [ChildEvent a]
list

    where

    {-# INLINE processEvents #-}
    processEvents :: [ChildEvent a] -> StreamK m a
processEvents [] = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.MkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp -> do
        Bool
done <- Channel m a -> m Bool
forall (m :: * -> *) a. Channel m a -> m Bool
postProcess Channel m a
sv
        if Bool
done
        then IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Channel m a -> String -> IO ()
forall (m :: * -> *) a. Channel m a -> String -> IO ()
channelDone Channel m a
sv String
"Channel done") m () -> m r -> m r
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> m r
stp
        else State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp (StreamK m a -> m r) -> StreamK m a -> m r
forall a b. (a -> b) -> a -> b
$ Channel m a -> StreamK m a
forall (m :: * -> *) a.
(MonadIO m, MonadThrow m) =>
Channel m a -> StreamK m a
fromChannelRaw Channel m a
sv

    processEvents (ChildEvent a
ev : [ChildEvent a]
es) = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.MkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp -> do
        let rest :: StreamK m a
rest = [ChildEvent a] -> StreamK m a
processEvents [ChildEvent a]
es
        case ChildEvent a
ev of
            ChildYield a
a -> a -> StreamK m a -> m r
yld a
a StreamK m a
rest
            ChildEvent a
ChildStopChannel -> do
                IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a -> String -> IO ()
forall (m :: * -> *) a. Channel m a -> String -> IO ()
cleanupChan Channel m a
sv String
"ChildStopChannel"
                m r
stp
            ChildStop ThreadId
tid Maybe SomeException
e -> do
                Channel m a -> ThreadId -> m ()
forall (m :: * -> *) a. Channel m a -> ThreadId -> m ()
accountThread Channel m a
sv ThreadId
tid
                case Maybe SomeException
e of
                    Maybe SomeException
Nothing -> State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp StreamK m a
rest
                    Just SomeException
ex ->
                        case SomeException -> Maybe ThreadAbort
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
ex of
                            Just ThreadAbort
ThreadAbort ->
                                -- We terminate the loop after sending
                                -- ThreadAbort to workers so we should never
                                -- get it unless it is thrown from inside a
                                -- worker thread or by someone else to our
                                -- thread.
                                String -> m r
forall a. HasCallStack => String -> a
error (String -> m r) -> String -> m r
forall a b. (a -> b) -> a -> b
$ String
"processEvents: got ThreadAbort for tid " String -> String -> String
forall a. [a] -> [a] -> [a]
++ ThreadId -> String
forall a. Show a => a -> String
show ThreadId
tid
                                -- K.foldStream st yld sng stp rest
                            Maybe ThreadAbort
Nothing -> do
                                IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a -> String -> IO ()
forall (m :: * -> *) a. Channel m a -> String -> IO ()
cleanupChan Channel m a
sv (SomeException -> String
forall e. Exception e => e -> String
displayException SomeException
ex)
                                SomeException -> m r
forall e a. (HasCallStack, Exception e) => e -> m a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM SomeException
ex

#ifdef INSPECTION
-- Use of GHC constraint tuple (GHC.Classes.(%,,%)) in fromStreamVar leads to
-- space leak because the tuple gets allocated in every recursive call and each
-- allocation holds on to the previous allocation. This test is to make sure
-- that we do not use the constraint tuple type class.
--
inspect $ hasNoTypeClassesExcept 'fromChannelRaw
    [ ''Monad
    , ''Applicative
    , ''MonadThrow
    , ''Exception
    , ''MonadIO
    , ''MonadBaseControl
    , ''Typeable
    , ''Functor
    ]
#endif

-- XXX Add a lock in the channel so that fromChannel cannot be called multiple
-- times.
--
-- XXX Add an option to block the consumer rather than stopping the stream if
-- the work queue gets over.

-- | Draw a stream from a concurrent channel. The stream consists of the
-- evaluated values from the input streams that were enqueued on the channel
-- using 'toChannelK'.
--
-- This is the event processing loop for the channel which does two
-- things, (1) dispatch workers, (2) process the events sent by the workers.
-- Workers are dispatched based on the channel's configuration settings.
--
-- The stream stops and the channel is shutdown if any of the following occurs:
--
-- * the work queue becomes empty
-- * channel's max yield limit is reached
-- * an exception is thrown by a worker
-- * 'shutdown' is called on the channel
--
-- Before the channel stops, all the workers are drained and no more workers
-- are dispatched. When the channel is garbage collected a 'ThreadAbort'
-- exception is thrown to all pending workers. If 'inspect' option is enabled
-- then channel's stats are printed on stdout when the channel stops.
--
-- CAUTION! This API must not be called more than once on a channel.
{-# INLINE fromChannelK #-}
fromChannelK :: MonadAsync m =>
    Maybe (IO () -> IO ()) -> Channel m a -> K.StreamK m a
fromChannelK :: forall (m :: * -> *) a.
MonadAsync m =>
Maybe (IO () -> IO ()) -> Channel m a -> StreamK m a
fromChannelK Maybe (IO () -> IO ())
register Channel m a
chan =
    -- Note: when an explicit cleanup handler registration is used, we still
    -- install a GC based cleanup handler, in case the explicit cleanup handler
    -- is not called by the user we will still clean it up when it is garbage
    -- collected.
    (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp -> do
        IORef ()
ref <- IO (IORef ()) -> m (IORef ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef ()) -> m (IORef ())) -> IO (IORef ()) -> m (IORef ())
forall a b. (a -> b) -> a -> b
$ () -> IO (IORef ())
forall a. a -> IO (IORef a)
newIORef ()
        Weak (IORef ())
_ <- IO (Weak (IORef ())) -> m (Weak (IORef ()))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Weak (IORef ())) -> m (Weak (IORef ())))
-> IO (Weak (IORef ())) -> m (Weak (IORef ()))
forall a b. (a -> b) -> a -> b
$ IORef () -> IO () -> IO (Weak (IORef ()))
forall a. IORef a -> IO () -> IO (Weak (IORef a))
mkWeakIORef IORef ()
ref (Channel m a -> String -> IO ()
forall (m :: * -> *) a. Channel m a -> String -> IO ()
cleanupChan Channel m a
chan String
"Channel cleanup via GC")
        let msg :: String
msg = String
"Channel cleanup via explicit handler"

        -- Register the cleanup handler to be called at the end of a user
        -- defined bracket.
        --
        -- IMPORTANT: this hook should run before the resource cleanup hooks
        -- registered by the worker threads themselves. If the auto release
        -- hooks registered by the workers run first then we might release the
        -- resources which are potentially in use by the workers, thus the
        -- workers may misbehave. The correct sequence is to first abort and
        -- drain all the workers then run any hooks registered by them.

        case Maybe (IO () -> IO ())
register of
            Maybe (IO () -> IO ())
Nothing -> () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
            Just IO () -> IO ()
f -> IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
f (Channel m a -> String -> IO ()
forall (m :: * -> *) a. Channel m a -> String -> IO ()
cleanupChan Channel m a
chan String
msg)

        Channel m a -> m ()
forall (m :: * -> *) a. MonadIO m => Channel m a -> m ()
startChannel Channel m a
chan
        -- We pass a copy of chan to fromChannelRaw, with svarRef set to ref,
        -- so that we know that it has no other references, when that copy gets
        -- garbage collected "ref" will get garbage collected and our hook will
        -- be called.
        --
        -- XXX We should install cleanupChan as the exception handler for
        -- ThreadAbort or any async exception while running fromChannelRaw.
        State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp (StreamK m a -> m r) -> StreamK m a -> m r
forall a b. (a -> b) -> a -> b
$
            Channel m a -> StreamK m a
forall (m :: * -> *) a.
(MonadIO m, MonadThrow m) =>
Channel m a -> StreamK m a
fromChannelRaw Channel m a
chan{svarRef = Just ref}

-- | A wrapper over 'fromChannelK' for 'Stream' type.
{-# INLINE fromChannel #-}
fromChannel :: MonadAsync m => Channel m a -> Stream m a
-- XXX Pass the cleanup registration function to fromChannelK
fromChannel :: forall (m :: * -> *) a. MonadAsync m => Channel m a -> Stream m a
fromChannel = StreamK m a -> Stream m a
forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
Stream.fromStreamK (StreamK m a -> Stream m a)
-> (Channel m a -> StreamK m a) -> Channel m a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Maybe (IO () -> IO ()) -> Channel m a -> StreamK m a
forall (m :: * -> *) a.
MonadAsync m =>
Maybe (IO () -> IO ()) -> Channel m a -> StreamK m a
fromChannelK Maybe (IO () -> IO ())
forall a. Maybe a
Nothing

#if __GLASGOW_HASKELL__ >= 810
type FromSVarState :: Type -> (Type -> Type) -> Type -> Type
#endif
data FromSVarState t m a =
      FromSVarInit
    | FromSVarRead (Channel m a)
    | FromSVarLoop (Channel m a) [ChildEvent a]
    | FromSVarDone (Channel m a)

-- | Like 'fromSVar' but generates a StreamD style stream instead of CPS.
--
{-# INLINE_NORMAL _fromChannelD #-}
_fromChannelD :: (MonadIO m, MonadThrow m) => Channel m a -> D.Stream m a
_fromChannelD :: forall (m :: * -> *) a.
(MonadIO m, MonadThrow m) =>
Channel m a -> Stream m a
_fromChannelD Channel m a
svar = (State StreamK m a
 -> FromSVarState Any m a -> m (Step (FromSVarState Any m a) a))
-> FromSVarState Any m a -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m a
-> FromSVarState Any m a -> m (Step (FromSVarState Any m a) a)
forall {p} {t} {t}.
p -> FromSVarState t m a -> m (Step (FromSVarState t m a) a)
step FromSVarState Any m a
forall t (m :: * -> *) a. FromSVarState t m a
FromSVarInit
    where

    {-# INLINE_LATE step #-}
    step :: p -> FromSVarState t m a -> m (Step (FromSVarState t m a) a)
step p
_ FromSVarState t m a
FromSVarInit = do
        IORef ()
ref <- IO (IORef ()) -> m (IORef ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef ()) -> m (IORef ())) -> IO (IORef ()) -> m (IORef ())
forall a b. (a -> b) -> a -> b
$ () -> IO (IORef ())
forall a. a -> IO (IORef a)
newIORef ()
        Weak (IORef ())
_ <- IO (Weak (IORef ())) -> m (Weak (IORef ()))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Weak (IORef ())) -> m (Weak (IORef ())))
-> IO (Weak (IORef ())) -> m (Weak (IORef ()))
forall a b. (a -> b) -> a -> b
$ IORef () -> IO () -> IO (Weak (IORef ()))
forall a. IORef a -> IO () -> IO (Weak (IORef a))
mkWeakIORef IORef ()
ref (Channel m a -> String -> IO ()
forall (m :: * -> *) a. Channel m a -> String -> IO ()
cleanupChan Channel m a
svar String
"Channel cleanup via GC")
        -- when this copy of svar gets garbage collected "ref" will get
        -- garbage collected and our GC hook will be called.
        let sv :: Channel m a
sv = Channel m a
svar{svarRef = Just ref}
        Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a))
-> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a b. (a -> b) -> a -> b
$ FromSVarState t m a -> Step (FromSVarState t m a) a
forall s a. s -> Step s a
D.Skip (Channel m a -> FromSVarState t m a
forall t (m :: * -> *) a. Channel m a -> FromSVarState t m a
FromSVarRead Channel m a
sv)

    step p
_ (FromSVarRead Channel m a
sv) = do
        [ChildEvent a]
list <- Channel m a -> m [ChildEvent a]
forall (m :: * -> *) a. Channel m a -> m [ChildEvent a]
readOutputQ Channel m a
sv
        -- Reversing the output is important to guarantee that we process the
        -- outputs in the same order as they were generated by the constituent
        -- streams.
        Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a))
-> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a b. (a -> b) -> a -> b
$ FromSVarState t m a -> Step (FromSVarState t m a) a
forall s a. s -> Step s a
D.Skip (FromSVarState t m a -> Step (FromSVarState t m a) a)
-> FromSVarState t m a -> Step (FromSVarState t m a) a
forall a b. (a -> b) -> a -> b
$ Channel m a -> [ChildEvent a] -> FromSVarState t m a
forall t (m :: * -> *) a.
Channel m a -> [ChildEvent a] -> FromSVarState t m a
FromSVarLoop Channel m a
sv ([ChildEvent a] -> [ChildEvent a]
forall a. [a] -> [a]
Prelude.reverse [ChildEvent a]
list)

    step p
_ (FromSVarLoop Channel m a
sv []) = do
        Bool
done <- Channel m a -> m Bool
forall (m :: * -> *) a. Channel m a -> m Bool
postProcess Channel m a
sv
        Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a))
-> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a b. (a -> b) -> a -> b
$ FromSVarState t m a -> Step (FromSVarState t m a) a
forall s a. s -> Step s a
D.Skip (FromSVarState t m a -> Step (FromSVarState t m a) a)
-> FromSVarState t m a -> Step (FromSVarState t m a) a
forall a b. (a -> b) -> a -> b
$ if Bool
done
                      then Channel m a -> FromSVarState t m a
forall t (m :: * -> *) a. Channel m a -> FromSVarState t m a
FromSVarDone Channel m a
sv
                      else Channel m a -> FromSVarState t m a
forall t (m :: * -> *) a. Channel m a -> FromSVarState t m a
FromSVarRead Channel m a
sv

    step p
_ (FromSVarLoop Channel m a
sv (ChildEvent a
ev : [ChildEvent a]
es)) = do
        case ChildEvent a
ev of
            ChildYield a
a -> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a))
-> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a b. (a -> b) -> a -> b
$ a -> FromSVarState t m a -> Step (FromSVarState t m a) a
forall s a. a -> s -> Step s a
D.Yield a
a (Channel m a -> [ChildEvent a] -> FromSVarState t m a
forall t (m :: * -> *) a.
Channel m a -> [ChildEvent a] -> FromSVarState t m a
FromSVarLoop Channel m a
sv [ChildEvent a]
es)
            ChildEvent a
ChildStopChannel -> do
                IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a -> String -> IO ()
forall (m :: * -> *) a. Channel m a -> String -> IO ()
cleanupChan Channel m a
sv String
"ChildStopChannel"
                Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a))
-> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a b. (a -> b) -> a -> b
$ FromSVarState t m a -> Step (FromSVarState t m a) a
forall s a. s -> Step s a
D.Skip (Channel m a -> FromSVarState t m a
forall t (m :: * -> *) a. Channel m a -> FromSVarState t m a
FromSVarDone Channel m a
sv)
            ChildStop ThreadId
tid Maybe SomeException
e -> do
                Channel m a -> ThreadId -> m ()
forall (m :: * -> *) a. Channel m a -> ThreadId -> m ()
accountThread Channel m a
sv ThreadId
tid
                case Maybe SomeException
e of
                    Maybe SomeException
Nothing -> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a))
-> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a b. (a -> b) -> a -> b
$ FromSVarState t m a -> Step (FromSVarState t m a) a
forall s a. s -> Step s a
D.Skip (Channel m a -> [ChildEvent a] -> FromSVarState t m a
forall t (m :: * -> *) a.
Channel m a -> [ChildEvent a] -> FromSVarState t m a
FromSVarLoop Channel m a
sv [ChildEvent a]
es)
                    Just SomeException
ex ->
                        case SomeException -> Maybe ThreadAbort
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
ex of
                            Just ThreadAbort
ThreadAbort ->
                                Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a))
-> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a b. (a -> b) -> a -> b
$ FromSVarState t m a -> Step (FromSVarState t m a) a
forall s a. s -> Step s a
D.Skip (Channel m a -> [ChildEvent a] -> FromSVarState t m a
forall t (m :: * -> *) a.
Channel m a -> [ChildEvent a] -> FromSVarState t m a
FromSVarLoop Channel m a
sv [ChildEvent a]
es)
                            Maybe ThreadAbort
Nothing -> do
                                IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a -> String -> IO ()
forall (m :: * -> *) a. Channel m a -> String -> IO ()
cleanupChan Channel m a
sv (SomeException -> String
forall e. Exception e => e -> String
displayException SomeException
ex)
                                SomeException -> m (Step (FromSVarState t m a) a)
forall e a. (HasCallStack, Exception e) => e -> m a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM SomeException
ex

    step p
_ (FromSVarDone Channel m a
sv) = do
        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Channel m a -> Bool
forall (m :: * -> *) a. Channel m a -> Bool
svarInspectMode Channel m a
sv) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
            AbsTime
t <- IO AbsTime -> m AbsTime
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO AbsTime -> m AbsTime) -> IO AbsTime -> m AbsTime
forall a b. (a -> b) -> a -> b
$ Clock -> IO AbsTime
getTime Clock
Monotonic
            IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef (Maybe AbsTime) -> Maybe AbsTime -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef (Maybe AbsTime)
svarStopTime (Channel m a -> SVarStats
forall (m :: * -> *) a. Channel m a -> SVarStats
svarStats Channel m a
sv)) (AbsTime -> Maybe AbsTime
forall a. a -> Maybe a
Just AbsTime
t)
            IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO String -> String -> IO ()
printSVar (Channel m a -> IO String
forall (m :: * -> *) a. Channel m a -> IO String
dumpChannel Channel m a
sv) String
"SVar Done"
        Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Step (FromSVarState t m a) a
forall s a. Step s a
D.Stop