module Streamly.Internal.Data.Fold.Channel.Type
(
Channel (..)
, OutEvent (..)
, Config
, defaultConfig
, maxBuffer
, boundThreads
, inspect
, newChannelWith
, newChannelWithScan
, newChannel
, newScanChannel
, sendToWorker
, sendToWorker_
, checkFoldStatus
, dumpChannel
, cleanup
, finalize
)
where
#include "inline.hs"
import Control.Concurrent (ThreadId, myThreadId, tryPutMVar)
import Control.Concurrent.MVar (MVar, newEmptyMVar, takeMVar)
import Control.Exception (SomeException(..))
import Control.Monad (void, when)
import Control.Monad.Catch (throwM)
import Control.Monad.IO.Class (MonadIO(..))
import Data.IORef (IORef, newIORef, readIORef, writeIORef)
import Data.List (intersperse)
import Streamly.Internal.Control.Concurrent
(MonadAsync, MonadRunInIO, askRunInIO)
import Streamly.Internal.Control.ForkLifted (doForkWith)
import Streamly.Internal.Data.Atomics (writeBarrier)
import Streamly.Internal.Data.Fold (Fold(..))
import Streamly.Internal.Data.Scanl (Scanl(..))
import Streamly.Internal.Data.Channel.Dispatcher (dumpSVarStats)
import Streamly.Internal.Data.Channel.Worker (sendEvent)
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)
import qualified Streamly.Internal.Data.Fold as Fold
import qualified Streamly.Internal.Data.Stream as D
import Streamly.Internal.Data.Channel.Types
data OutEvent b =
FoldException ThreadId SomeException
| FoldPartial b
| FoldDone ThreadId b
| FoldEOF ThreadId
data Channel m a b = Channel
{
forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
inputQueue :: IORef ([ChildEvent a], Int)
, forall (m :: * -> *) a b. Channel m a b -> Limit
maxInputBuffer :: Limit
, forall (m :: * -> *) a b. Channel m a b -> MVar ()
inputItemDoorBell :: MVar ()
, forall (m :: * -> *) a b. Channel m a b -> IORef Bool
closedForInput :: IORef Bool
, forall (m :: * -> *) a b. Channel m a b -> MVar ()
inputSpaceDoorBell :: MVar ()
, forall (m :: * -> *) a b. Channel m a b -> m [ChildEvent a]
readInputQ :: m [ChildEvent a]
, forall (m :: * -> *) a b.
Channel m a b -> IORef ([OutEvent b], Int)
outputQueue :: IORef ([OutEvent b], Int)
, forall (m :: * -> *) a b. Channel m a b -> MVar ()
outputDoorBell :: MVar ()
, forall (m :: * -> *) a b. Channel m a b -> Maybe (IORef ())
svarRef :: Maybe (IORef ())
, forall (m :: * -> *) a b. Channel m a b -> SVarStats
svarStats :: SVarStats
, forall (m :: * -> *) a b. Channel m a b -> Bool
svarInspectMode :: Bool
, forall (m :: * -> *) a b. Channel m a b -> ThreadId
svarCreator :: ThreadId
}
data Config = Config
{
Config -> Limit
_bufferHigh :: Limit
, Config -> Bool
_inspect :: Bool
, Config -> Bool
_bound :: Bool
}
defaultMaxBuffer :: Limit
defaultMaxBuffer :: Limit
defaultMaxBuffer = Word -> Limit
Limited Word
magicMaxBuffer
defaultConfig :: Config
defaultConfig :: Config
defaultConfig = Config
{
_bufferHigh :: Limit
_bufferHigh = Limit
defaultMaxBuffer
, _inspect :: Bool
_inspect = Bool
False
, _bound :: Bool
_bound = Bool
False
}
maxBuffer :: Int -> Config -> Config
maxBuffer :: Int -> Config -> Config
maxBuffer Int
n Config
st =
Config
st { _bufferHigh =
if n < 0
then Unlimited
else if n == 0
then defaultMaxBuffer
else Limited (fromIntegral n)
}
getMaxBuffer :: Config -> Limit
getMaxBuffer :: Config -> Limit
getMaxBuffer = Config -> Limit
_bufferHigh
inspect :: Bool -> Config -> Config
inspect :: Bool -> Config -> Config
inspect Bool
flag Config
st = Config
st { _inspect = flag }
getInspectMode :: Config -> Bool
getInspectMode :: Config -> Bool
getInspectMode = Config -> Bool
_inspect
boundThreads :: Bool -> Config -> Config
boundThreads :: Bool -> Config -> Config
boundThreads Bool
flag Config
st = Config
st { _bound = flag }
getBound :: Config -> Bool
getBound :: Config -> Bool
getBound = Config -> Bool
_bound
{-# NOINLINE dumpChannel #-}
dumpChannel :: Channel m a b -> IO String
dumpChannel :: forall (m :: * -> *) a b. Channel m a b -> IO String
dumpChannel Channel m a b
sv = do
[String]
xs <- [IO String] -> IO [String]
forall (t :: * -> *) (m :: * -> *) a.
(Traversable t, Monad m) =>
t (m a) -> m (t a)
forall (m :: * -> *) a. Monad m => [m a] -> m [a]
sequence ([IO String] -> IO [String]) -> [IO String] -> IO [String]
forall a b. (a -> b) -> a -> b
$ IO String -> [IO String] -> [IO String]
forall a. a -> [a] -> [a]
intersperse (String -> IO String
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return String
"\n")
[ String -> IO String
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (ThreadId -> String
forall a. Show a => a -> String
dumpCreator (Channel m a b -> ThreadId
forall (m :: * -> *) a b. Channel m a b -> ThreadId
svarCreator Channel m a b
sv))
, String -> IO String
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return String
"---------CURRENT STATE-----------"
, IORef ([ChildEvent a], Int) -> IO String
forall (t :: * -> *) a1 a2.
(Foldable t, Show a1) =>
IORef (t a2, a1) -> IO String
dumpOutputQ (Channel m a b -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
inputQueue Channel m a b
sv)
, MVar () -> IO String
forall a. Show a => MVar a -> IO String
dumpDoorBell (Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
inputItemDoorBell Channel m a b
sv)
, String -> IO String
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return String
"---------STATS-----------\n"
, Bool -> Maybe YieldRateInfo -> SVarStats -> IO String
dumpSVarStats (Channel m a b -> Bool
forall (m :: * -> *) a b. Channel m a b -> Bool
svarInspectMode Channel m a b
sv) Maybe YieldRateInfo
forall a. Maybe a
Nothing (Channel m a b -> SVarStats
forall (m :: * -> *) a b. Channel m a b -> SVarStats
svarStats Channel m a b
sv)
]
String -> IO String
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (String -> IO String) -> String -> IO String
forall a b. (a -> b) -> a -> b
$ [String] -> String
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat [String]
xs
sendToDriver :: Channel m a b -> OutEvent b -> IO Int
sendToDriver :: forall (m :: * -> *) a b. Channel m a b -> OutEvent b -> IO Int
sendToDriver Channel m a b
sv OutEvent b
msg = do
IORef ([OutEvent b], Int) -> MVar () -> OutEvent b -> IO Int
forall a. IORef ([a], Int) -> MVar () -> a -> IO Int
sendEvent (Channel m a b -> IORef ([OutEvent b], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([OutEvent b], Int)
outputQueue Channel m a b
sv)
(Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
outputDoorBell Channel m a b
sv) OutEvent b
msg
sendYieldToDriver :: MonadIO m => Channel m a b -> b -> m ()
sendYieldToDriver :: forall (m :: * -> *) a b. MonadIO m => Channel m a b -> b -> m ()
sendYieldToDriver Channel m a b
sv b
res = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
ThreadId
tid <- IO ThreadId
myThreadId
IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ Channel m a b -> OutEvent b -> IO Int
forall (m :: * -> *) a b. Channel m a b -> OutEvent b -> IO Int
sendToDriver Channel m a b
sv (ThreadId -> b -> OutEvent b
forall b. ThreadId -> b -> OutEvent b
FoldDone ThreadId
tid b
res)
sendPartialToDriver :: MonadIO m => Channel m a b -> b -> m ()
sendPartialToDriver :: forall (m :: * -> *) a b. MonadIO m => Channel m a b -> b -> m ()
sendPartialToDriver Channel m a b
sv b
res = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ Channel m a b -> OutEvent b -> IO Int
forall (m :: * -> *) a b. Channel m a b -> OutEvent b -> IO Int
sendToDriver Channel m a b
sv (b -> OutEvent b
forall b. b -> OutEvent b
FoldPartial b
res)
sendEOFToDriver :: MonadIO m => Channel m a b -> m ()
sendEOFToDriver :: forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m ()
sendEOFToDriver Channel m a b
sv = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
ThreadId
tid <- IO ThreadId
myThreadId
IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ Channel m a b -> OutEvent b -> IO Int
forall (m :: * -> *) a b. Channel m a b -> OutEvent b -> IO Int
sendToDriver Channel m a b
sv (ThreadId -> OutEvent b
forall b. ThreadId -> OutEvent b
FoldEOF ThreadId
tid)
{-# NOINLINE sendExceptionToDriver #-}
sendExceptionToDriver :: Channel m a b -> SomeException -> IO ()
sendExceptionToDriver :: forall (m :: * -> *) a b. Channel m a b -> SomeException -> IO ()
sendExceptionToDriver Channel m a b
sv SomeException
e = do
ThreadId
tid <- IO ThreadId
myThreadId
IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ Channel m a b -> OutEvent b -> IO Int
forall (m :: * -> *) a b. Channel m a b -> OutEvent b -> IO Int
sendToDriver Channel m a b
sv (ThreadId -> SomeException -> OutEvent b
forall b. ThreadId -> SomeException -> OutEvent b
FoldException ThreadId
tid SomeException
e)
data FromSVarState m a b =
FromSVarRead (Channel m a b)
| FromSVarLoop (Channel m a b) [ChildEvent a]
{-# INLINE_NORMAL fromInputQueue #-}
fromInputQueue :: MonadIO m => Channel m a b -> D.Stream m a
fromInputQueue :: forall (m :: * -> *) a b. MonadIO m => Channel m a b -> Stream m a
fromInputQueue Channel m a b
svar = (State StreamK m a
-> FromSVarState m a b -> m (Step (FromSVarState m a b) a))
-> FromSVarState m a b -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m a
-> FromSVarState m a b -> m (Step (FromSVarState m a b) a)
forall {m :: * -> *} {p} {a} {b}.
Monad m =>
p -> FromSVarState m a b -> m (Step (FromSVarState m a b) a)
step (Channel m a b -> FromSVarState m a b
forall (m :: * -> *) a b. Channel m a b -> FromSVarState m a b
FromSVarRead Channel m a b
svar)
where
{-# INLINE_LATE step #-}
step :: p -> FromSVarState m a b -> m (Step (FromSVarState m a b) a)
step p
_ (FromSVarRead Channel m a b
sv) = do
[ChildEvent a]
list <- Channel m a b -> m [ChildEvent a]
forall (m :: * -> *) a b. Channel m a b -> m [ChildEvent a]
readInputQ Channel m a b
sv
Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a))
-> Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a)
forall a b. (a -> b) -> a -> b
$ FromSVarState m a b -> Step (FromSVarState m a b) a
forall s a. s -> Step s a
D.Skip (FromSVarState m a b -> Step (FromSVarState m a b) a)
-> FromSVarState m a b -> Step (FromSVarState m a b) a
forall a b. (a -> b) -> a -> b
$ Channel m a b -> [ChildEvent a] -> FromSVarState m a b
forall (m :: * -> *) a b.
Channel m a b -> [ChildEvent a] -> FromSVarState m a b
FromSVarLoop Channel m a b
sv ([ChildEvent a] -> [ChildEvent a]
forall a. [a] -> [a]
Prelude.reverse [ChildEvent a]
list)
step p
_ (FromSVarLoop Channel m a b
sv []) = Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a))
-> Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a)
forall a b. (a -> b) -> a -> b
$ FromSVarState m a b -> Step (FromSVarState m a b) a
forall s a. s -> Step s a
D.Skip (FromSVarState m a b -> Step (FromSVarState m a b) a)
-> FromSVarState m a b -> Step (FromSVarState m a b) a
forall a b. (a -> b) -> a -> b
$ Channel m a b -> FromSVarState m a b
forall (m :: * -> *) a b. Channel m a b -> FromSVarState m a b
FromSVarRead Channel m a b
sv
step p
_ (FromSVarLoop Channel m a b
sv (ChildEvent a
ev : [ChildEvent a]
es)) = do
case ChildEvent a
ev of
ChildYield a
a -> Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a))
-> Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a)
forall a b. (a -> b) -> a -> b
$ a -> FromSVarState m a b -> Step (FromSVarState m a b) a
forall s a. a -> s -> Step s a
D.Yield a
a (Channel m a b -> [ChildEvent a] -> FromSVarState m a b
forall (m :: * -> *) a b.
Channel m a b -> [ChildEvent a] -> FromSVarState m a b
FromSVarLoop Channel m a b
sv [ChildEvent a]
es)
ChildEvent a
ChildStopChannel -> Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Step (FromSVarState m a b) a
forall s a. Step s a
D.Stop
ChildEvent a
_ -> m (Step (FromSVarState m a b) a)
forall a. HasCallStack => a
undefined
{-# INLINE readInputQChan #-}
readInputQChan :: Channel m a b -> IO ([ChildEvent a], Int)
readInputQChan :: forall (m :: * -> *) a b. Channel m a b -> IO ([ChildEvent a], Int)
readInputQChan Channel m a b
chan = do
let ss :: Maybe SVarStats
ss = if Channel m a b -> Bool
forall (m :: * -> *) a b. Channel m a b -> Bool
svarInspectMode Channel m a b
chan then SVarStats -> Maybe SVarStats
forall a. a -> Maybe a
Just (Channel m a b -> SVarStats
forall (m :: * -> *) a b. Channel m a b -> SVarStats
svarStats Channel m a b
chan) else Maybe SVarStats
forall a. Maybe a
Nothing
r :: ([ChildEvent a], Int)
r@([ChildEvent a]
_, Int
n) <- IORef ([ChildEvent a], Int)
-> Maybe SVarStats -> IO ([ChildEvent a], Int)
forall a.
IORef ([ChildEvent a], Int)
-> Maybe SVarStats -> IO ([ChildEvent a], Int)
readOutputQRaw (Channel m a b -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
inputQueue Channel m a b
chan) Maybe SVarStats
ss
if Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0
then do
IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
(IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Bool -> IO String -> String -> IO () -> IO ()
withDiagMVar
(Channel m a b -> Bool
forall (m :: * -> *) a b. Channel m a b -> Bool
svarInspectMode Channel m a b
chan)
(Channel m a b -> IO String
forall (m :: * -> *) a b. Channel m a b -> IO String
dumpChannel Channel m a b
chan)
String
"readInputQChan: nothing to do"
(IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar (Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
inputItemDoorBell Channel m a b
chan)
IORef ([ChildEvent a], Int)
-> Maybe SVarStats -> IO ([ChildEvent a], Int)
forall a.
IORef ([ChildEvent a], Int)
-> Maybe SVarStats -> IO ([ChildEvent a], Int)
readOutputQRaw (Channel m a b -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
inputQueue Channel m a b
chan) Maybe SVarStats
ss
else ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ([ChildEvent a], Int)
r
{-# INLINE readInputQWithDB #-}
readInputQWithDB :: Channel m a b -> IO ([ChildEvent a], Int)
readInputQWithDB :: forall (m :: * -> *) a b. Channel m a b -> IO ([ChildEvent a], Int)
readInputQWithDB Channel m a b
chan = do
([ChildEvent a], Int)
r <- Channel m a b -> IO ([ChildEvent a], Int)
forall (m :: * -> *) a b. Channel m a b -> IO ([ChildEvent a], Int)
readInputQChan Channel m a b
chan
Bool
_ <- MVar () -> () -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar (Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
inputSpaceDoorBell Channel m a b
chan) ()
([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ([ChildEvent a], Int)
r
mkNewChannelWith :: forall m a b. MonadIO m =>
IORef ([OutEvent b], Int)
-> MVar ()
-> Config
-> IO (Channel m a b)
mkNewChannelWith :: forall (m :: * -> *) a b.
MonadIO m =>
IORef ([OutEvent b], Int)
-> MVar () -> Config -> IO (Channel m a b)
mkNewChannelWith IORef ([OutEvent b], Int)
outQRev MVar ()
outQMvRev Config
cfg = do
IORef ([ChildEvent a], Int)
outQ <- ([ChildEvent a], Int) -> IO (IORef ([ChildEvent a], Int))
forall a. a -> IO (IORef a)
newIORef ([], Int
0)
MVar ()
outQMv <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
MVar ()
bufferMv <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
IORef Bool
ref <- Bool -> IO (IORef Bool)
forall a. a -> IO (IORef a)
newIORef Bool
False
SVarStats
stats <- IO SVarStats
newSVarStats
ThreadId
tid <- IO ThreadId
myThreadId
let getSVar :: Channel m a b -> Channel m a b
getSVar :: Channel m a b -> Channel m a b
getSVar Channel m a b
sv = Channel
{ inputQueue :: IORef ([ChildEvent a], Int)
inputQueue = IORef ([ChildEvent a], Int)
outQ
, inputItemDoorBell :: MVar ()
inputItemDoorBell = MVar ()
outQMv
, outputQueue :: IORef ([OutEvent b], Int)
outputQueue = IORef ([OutEvent b], Int)
outQRev
, outputDoorBell :: MVar ()
outputDoorBell = MVar ()
outQMvRev
, inputSpaceDoorBell :: MVar ()
inputSpaceDoorBell = MVar ()
bufferMv
, closedForInput :: IORef Bool
closedForInput = IORef Bool
ref
, maxInputBuffer :: Limit
maxInputBuffer = Config -> Limit
getMaxBuffer Config
cfg
, readInputQ :: m [ChildEvent a]
readInputQ = IO [ChildEvent a] -> m [ChildEvent a]
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [ChildEvent a] -> m [ChildEvent a])
-> IO [ChildEvent a] -> m [ChildEvent a]
forall a b. (a -> b) -> a -> b
$ (([ChildEvent a], Int) -> [ChildEvent a])
-> IO ([ChildEvent a], Int) -> IO [ChildEvent a]
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ([ChildEvent a], Int) -> [ChildEvent a]
forall a b. (a, b) -> a
fst (Channel m a b -> IO ([ChildEvent a], Int)
forall (m :: * -> *) a b. Channel m a b -> IO ([ChildEvent a], Int)
readInputQWithDB Channel m a b
sv)
, svarRef :: Maybe (IORef ())
svarRef = Maybe (IORef ())
forall a. Maybe a
Nothing
, svarInspectMode :: Bool
svarInspectMode = Config -> Bool
getInspectMode Config
cfg
, svarCreator :: ThreadId
svarCreator = ThreadId
tid
, svarStats :: SVarStats
svarStats = SVarStats
stats
}
let sv :: Channel m a b
sv = Channel m a b -> Channel m a b
getSVar Channel m a b
sv in Channel m a b -> IO (Channel m a b)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Channel m a b
sv
{-# INLINABLE newChannelWith #-}
{-# SPECIALIZE newChannelWith ::
IORef ([OutEvent b], Int)
-> MVar ()
-> (Config -> Config)
-> Fold IO a b
-> IO (Channel IO a b, ThreadId) #-}
newChannelWith :: (MonadRunInIO m) =>
IORef ([OutEvent b], Int)
-> MVar ()
-> (Config -> Config)
-> Fold m a b
-> m (Channel m a b, ThreadId)
newChannelWith :: forall (m :: * -> *) b a.
MonadRunInIO m =>
IORef ([OutEvent b], Int)
-> MVar ()
-> (Config -> Config)
-> Fold m a b
-> m (Channel m a b, ThreadId)
newChannelWith IORef ([OutEvent b], Int)
outq MVar ()
outqDBell Config -> Config
modifier Fold m a b
f = do
let config :: Config
config = Config -> Config
modifier Config
defaultConfig
Channel m a b
sv <- IO (Channel m a b) -> m (Channel m a b)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Channel m a b) -> m (Channel m a b))
-> IO (Channel m a b) -> m (Channel m a b)
forall a b. (a -> b) -> a -> b
$ IORef ([OutEvent b], Int)
-> MVar () -> Config -> IO (Channel m a b)
forall (m :: * -> *) a b.
MonadIO m =>
IORef ([OutEvent b], Int)
-> MVar () -> Config -> IO (Channel m a b)
mkNewChannelWith IORef ([OutEvent b], Int)
outq MVar ()
outqDBell Config
config
RunInIO m
mrun <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
ThreadId
tid <- Bool -> m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
forall (m :: * -> *).
MonadRunInIO m =>
Bool -> m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doForkWith
(Config -> Bool
getBound Config
config) (Channel m a b -> m ()
work Channel m a b
sv) RunInIO m
mrun (Channel m a b -> SomeException -> IO ()
forall (m :: * -> *) a b. Channel m a b -> SomeException -> IO ()
sendExceptionToDriver Channel m a b
sv)
(Channel m a b, ThreadId) -> m (Channel m a b, ThreadId)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Channel m a b
sv, ThreadId
tid)
where
{-# NOINLINE work #-}
work :: Channel m a b -> m ()
work Channel m a b
chan =
let f1 :: Fold m a ()
f1 = (b -> m ()) -> Fold m a b -> Fold m a ()
forall (m :: * -> *) b c a.
Monad m =>
(b -> m c) -> Fold m a b -> Fold m a c
Fold.rmapM (m () -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m () -> m ()) -> (b -> m ()) -> b -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Channel m a b -> b -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> b -> m ()
sendYieldToDriver Channel m a b
chan) Fold m a b
f
in Fold m a () -> Stream m a -> m ()
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
D.fold Fold m a ()
f1 (Stream m a -> m ()) -> Stream m a -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a b -> Stream m a
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> Stream m a
fromInputQueue Channel m a b
chan
{-# INLINE scanToChannel #-}
scanToChannel :: MonadIO m => Channel m a b -> Scanl m a b -> Fold m a Bool
scanToChannel :: forall (m :: * -> *) a b.
MonadIO m =>
Channel m a b -> Scanl m a b -> Fold m a Bool
scanToChannel Channel m a b
chan (Scanl s -> a -> m (Step s b)
step m (Step s b)
initial s -> m b
extract s -> m b
final) =
(s -> a -> m (Step s Bool))
-> m (Step s Bool)
-> (s -> m Bool)
-> (s -> m Bool)
-> Fold m a Bool
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> (s -> m b) -> Fold m a b
Fold s -> a -> m (Step s Bool)
step1 m (Step s Bool)
initial1 s -> m Bool
forall {p} {a}. p -> a
extract1 s -> m Bool
final1
where
initial1 :: m (Step s Bool)
initial1 = do
Step s b
r <- m (Step s b)
initial
case Step s b
r of
Fold.Partial s
s -> do
b
b <- s -> m b
extract s
s
m () -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a b -> b -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> b -> m ()
sendPartialToDriver Channel m a b
chan b
b
Step s Bool -> m (Step s Bool)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s Bool -> m (Step s Bool)) -> Step s Bool -> m (Step s Bool)
forall a b. (a -> b) -> a -> b
$ s -> Step s Bool
forall s b. s -> Step s b
Fold.Partial s
s
Fold.Done b
b -> do
Channel m a b -> b -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> b -> m ()
sendYieldToDriver Channel m a b
chan b
b
Step s Bool -> m (Step s Bool)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s Bool -> m (Step s Bool)) -> Step s Bool -> m (Step s Bool)
forall a b. (a -> b) -> a -> b
$ Bool -> Step s Bool
forall s b. b -> Step s b
Fold.Done Bool
True
step1 :: s -> a -> m (Step s Bool)
step1 s
st a
x = do
Step s b
r <- s -> a -> m (Step s b)
step s
st a
x
case Step s b
r of
Fold.Partial s
s -> do
b
b <- s -> m b
extract s
s
m () -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a b -> b -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> b -> m ()
sendPartialToDriver Channel m a b
chan b
b
Step s Bool -> m (Step s Bool)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s Bool -> m (Step s Bool)) -> Step s Bool -> m (Step s Bool)
forall a b. (a -> b) -> a -> b
$ s -> Step s Bool
forall s b. s -> Step s b
Fold.Partial s
s
Fold.Done b
b -> do
Channel m a b -> b -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> b -> m ()
sendYieldToDriver Channel m a b
chan b
b
Step s Bool -> m (Step s Bool)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s Bool -> m (Step s Bool)) -> Step s Bool -> m (Step s Bool)
forall a b. (a -> b) -> a -> b
$ Bool -> Step s Bool
forall s b. b -> Step s b
Fold.Done Bool
True
extract1 :: p -> a
extract1 p
_ = String -> a
forall a. HasCallStack => String -> a
error String
"extract: not supported by folds"
final1 :: s -> m Bool
final1 s
st = do
m b -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (s -> m b
final s
st)
Bool -> m Bool
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
{-# INLINABLE newChannelWithScan #-}
{-# SPECIALIZE newChannelWithScan ::
IORef ([OutEvent b], Int)
-> MVar ()
-> (Config -> Config)
-> Scanl IO a b
-> IO (Channel IO a b, ThreadId) #-}
newChannelWithScan :: (MonadRunInIO m) =>
IORef ([OutEvent b], Int)
-> MVar ()
-> (Config -> Config)
-> Scanl m a b
-> m (Channel m a b, ThreadId)
newChannelWithScan :: forall (m :: * -> *) b a.
MonadRunInIO m =>
IORef ([OutEvent b], Int)
-> MVar ()
-> (Config -> Config)
-> Scanl m a b
-> m (Channel m a b, ThreadId)
newChannelWithScan IORef ([OutEvent b], Int)
outq MVar ()
outqDBell Config -> Config
modifier Scanl m a b
f = do
let config :: Config
config = Config -> Config
modifier Config
defaultConfig
Channel m a b
sv <- IO (Channel m a b) -> m (Channel m a b)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Channel m a b) -> m (Channel m a b))
-> IO (Channel m a b) -> m (Channel m a b)
forall a b. (a -> b) -> a -> b
$ IORef ([OutEvent b], Int)
-> MVar () -> Config -> IO (Channel m a b)
forall (m :: * -> *) a b.
MonadIO m =>
IORef ([OutEvent b], Int)
-> MVar () -> Config -> IO (Channel m a b)
mkNewChannelWith IORef ([OutEvent b], Int)
outq MVar ()
outqDBell Config
config
RunInIO m
mrun <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
ThreadId
tid <- Bool -> m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
forall (m :: * -> *).
MonadRunInIO m =>
Bool -> m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doForkWith
(Config -> Bool
getBound Config
config) (Channel m a b -> m ()
work Channel m a b
sv) RunInIO m
mrun (Channel m a b -> SomeException -> IO ()
forall (m :: * -> *) a b. Channel m a b -> SomeException -> IO ()
sendExceptionToDriver Channel m a b
sv)
(Channel m a b, ThreadId) -> m (Channel m a b, ThreadId)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Channel m a b
sv, ThreadId
tid)
where
{-# NOINLINE work #-}
work :: Channel m a b -> m ()
work Channel m a b
chan = do
Bool
completed <- Fold m a Bool -> Stream m a -> m Bool
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
D.fold (Channel m a b -> Scanl m a b -> Fold m a Bool
forall (m :: * -> *) a b.
MonadIO m =>
Channel m a b -> Scanl m a b -> Fold m a Bool
scanToChannel Channel m a b
chan Scanl m a b
f) (Channel m a b -> Stream m a
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> Stream m a
fromInputQueue Channel m a b
chan)
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
completed) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a b -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m ()
sendEOFToDriver Channel m a b
chan
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (Channel m a b -> IORef Bool
forall (m :: * -> *) a b. Channel m a b -> IORef Bool
closedForInput Channel m a b
chan) Bool
True
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ()
writeBarrier
m Bool -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Bool -> m ()) -> m Bool -> m ()
forall a b. (a -> b) -> a -> b
$ IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar (Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
inputSpaceDoorBell Channel m a b
chan) ()
{-# INLINABLE newChannel #-}
{-# SPECIALIZE newChannel ::
(Config -> Config) -> Fold IO a b -> IO (Channel IO a b) #-}
newChannel :: (MonadRunInIO m) =>
(Config -> Config) -> Fold m a b -> m (Channel m a b)
newChannel :: forall (m :: * -> *) a b.
MonadRunInIO m =>
(Config -> Config) -> Fold m a b -> m (Channel m a b)
newChannel Config -> Config
modifier Fold m a b
f = do
IORef ([OutEvent b], Int)
outQRev <- IO (IORef ([OutEvent b], Int)) -> m (IORef ([OutEvent b], Int))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef ([OutEvent b], Int)) -> m (IORef ([OutEvent b], Int)))
-> IO (IORef ([OutEvent b], Int)) -> m (IORef ([OutEvent b], Int))
forall a b. (a -> b) -> a -> b
$ ([OutEvent b], Int) -> IO (IORef ([OutEvent b], Int))
forall a. a -> IO (IORef a)
newIORef ([], Int
0)
MVar ()
outQMvRev <- IO (MVar ()) -> m (MVar ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
((Channel m a b, ThreadId) -> Channel m a b)
-> m (Channel m a b, ThreadId) -> m (Channel m a b)
forall a b. (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Channel m a b, ThreadId) -> Channel m a b
forall a b. (a, b) -> a
fst (IORef ([OutEvent b], Int)
-> MVar ()
-> (Config -> Config)
-> Fold m a b
-> m (Channel m a b, ThreadId)
forall (m :: * -> *) b a.
MonadRunInIO m =>
IORef ([OutEvent b], Int)
-> MVar ()
-> (Config -> Config)
-> Fold m a b
-> m (Channel m a b, ThreadId)
newChannelWith IORef ([OutEvent b], Int)
outQRev MVar ()
outQMvRev Config -> Config
modifier Fold m a b
f)
{-# INLINABLE newScanChannel #-}
{-# SPECIALIZE newScanChannel ::
(Config -> Config) -> Scanl IO a b -> IO (Channel IO a b) #-}
newScanChannel :: (MonadRunInIO m) =>
(Config -> Config) -> Scanl m a b -> m (Channel m a b)
newScanChannel :: forall (m :: * -> *) a b.
MonadRunInIO m =>
(Config -> Config) -> Scanl m a b -> m (Channel m a b)
newScanChannel Config -> Config
modifier Scanl m a b
f = do
IORef ([OutEvent b], Int)
outQRev <- IO (IORef ([OutEvent b], Int)) -> m (IORef ([OutEvent b], Int))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef ([OutEvent b], Int)) -> m (IORef ([OutEvent b], Int)))
-> IO (IORef ([OutEvent b], Int)) -> m (IORef ([OutEvent b], Int))
forall a b. (a -> b) -> a -> b
$ ([OutEvent b], Int) -> IO (IORef ([OutEvent b], Int))
forall a. a -> IO (IORef a)
newIORef ([], Int
0)
MVar ()
outQMvRev <- IO (MVar ()) -> m (MVar ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
((Channel m a b, ThreadId) -> Channel m a b)
-> m (Channel m a b, ThreadId) -> m (Channel m a b)
forall a b. (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Channel m a b, ThreadId) -> Channel m a b
forall a b. (a, b) -> a
fst (IORef ([OutEvent b], Int)
-> MVar ()
-> (Config -> Config)
-> Scanl m a b
-> m (Channel m a b, ThreadId)
forall (m :: * -> *) b a.
MonadRunInIO m =>
IORef ([OutEvent b], Int)
-> MVar ()
-> (Config -> Config)
-> Scanl m a b
-> m (Channel m a b, ThreadId)
newChannelWithScan IORef ([OutEvent b], Int)
outQRev MVar ()
outQMvRev Config -> Config
modifier Scanl m a b
f)
{-# NOINLINE checkFoldStatus #-}
checkFoldStatus :: MonadAsync m => Channel m a b -> m (Maybe b)
checkFoldStatus :: forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> m (Maybe b)
checkFoldStatus Channel m a b
sv = do
([OutEvent b]
list, Int
_) <- IO ([OutEvent b], Int) -> m ([OutEvent b], Int)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([OutEvent b], Int) -> m ([OutEvent b], Int))
-> IO ([OutEvent b], Int) -> m ([OutEvent b], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([OutEvent b], Int) -> IO ([OutEvent b], Int)
forall a. IORef ([a], Int) -> IO ([a], Int)
readOutputQBasic (Channel m a b -> IORef ([OutEvent b], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([OutEvent b], Int)
outputQueue Channel m a b
sv)
[OutEvent b] -> m (Maybe b)
forall {m :: * -> *} {a}.
MonadThrow m =>
[OutEvent a] -> m (Maybe a)
processEvents ([OutEvent b] -> m (Maybe b)) -> [OutEvent b] -> m (Maybe b)
forall a b. (a -> b) -> a -> b
$ [OutEvent b] -> [OutEvent b]
forall a. [a] -> [a]
reverse [OutEvent b]
list
where
{-# INLINE processEvents #-}
processEvents :: [OutEvent a] -> m (Maybe a)
processEvents [] = Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
processEvents (OutEvent a
ev : [OutEvent a]
_) = do
case OutEvent a
ev of
FoldException ThreadId
_ SomeException
e -> SomeException -> m (Maybe a)
forall e a. (HasCallStack, Exception e) => e -> m a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM SomeException
e
FoldDone ThreadId
_ a
b -> Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
b)
FoldPartial a
_ ->
String -> m (Maybe a)
forall a. HasCallStack => String -> a
error String
"checkFoldStatus: FoldPartial can occur only for scans"
FoldEOF ThreadId
_ ->
String -> m (Maybe a)
forall a. HasCallStack => String -> a
error String
"checkFoldStatus: FoldEOF can occur only for scans"
{-# INLINE isBufferAvailable #-}
isBufferAvailable :: MonadIO m => Channel m a b -> m Bool
isBufferAvailable :: forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m Bool
isBufferAvailable Channel m a b
sv = do
let limit :: Limit
limit = Channel m a b -> Limit
forall (m :: * -> *) a b. Channel m a b -> Limit
maxInputBuffer Channel m a b
sv
case Limit
limit of
Limit
Unlimited -> Bool -> m Bool
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Limited Word
lim -> do
([ChildEvent a]
_, Int
n) <- IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int))
-> IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a. IORef a -> IO a
readIORef (Channel m a b -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
inputQueue Channel m a b
sv)
Bool -> m Bool
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> m Bool) -> Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Word -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
lim Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
n
{-# INLINE sendToWorker #-}
sendToWorker :: MonadAsync m => Channel m a b -> a -> m (Maybe b)
sendToWorker :: forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> a -> m (Maybe b)
sendToWorker Channel m a b
chan a
a = m (Maybe b)
go
where
go :: m (Maybe b)
go = do
let qref :: IORef ([OutEvent b], Int)
qref = Channel m a b -> IORef ([OutEvent b], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([OutEvent b], Int)
outputQueue Channel m a b
chan
Maybe b
status <- do
([OutEvent b]
_, Int
n) <- IO ([OutEvent b], Int) -> m ([OutEvent b], Int)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([OutEvent b], Int) -> m ([OutEvent b], Int))
-> IO ([OutEvent b], Int) -> m ([OutEvent b], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([OutEvent b], Int) -> IO ([OutEvent b], Int)
forall a. IORef a -> IO a
readIORef IORef ([OutEvent b], Int)
qref
if Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
then Channel m a b -> m (Maybe b)
forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> m (Maybe b)
checkFoldStatus Channel m a b
chan
else Maybe b -> m (Maybe b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe b
forall a. Maybe a
Nothing
case Maybe b
status of
Just b
_ -> Maybe b -> m (Maybe b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe b
status
Maybe b
Nothing -> do
Bool
r <- Channel m a b -> m Bool
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m Bool
isBufferAvailable Channel m a b
chan
if Bool
r
then do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
(IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void
(IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
forall a. IORef ([a], Int) -> MVar () -> a -> IO Int
sendEvent
(Channel m a b -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
inputQueue Channel m a b
chan)
(Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
inputItemDoorBell Channel m a b
chan)
(a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
Maybe b -> m (Maybe b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe b
forall a. Maybe a
Nothing
else do
() <- IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar (Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
inputSpaceDoorBell Channel m a b
chan)
m (Maybe b)
go
{-# INLINE sendToWorker_ #-}
sendToWorker_ :: MonadAsync m => Channel m a b -> a -> m ()
sendToWorker_ :: forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> a -> m ()
sendToWorker_ Channel m a b
chan a
a = m ()
go
where
go :: m ()
go = do
Bool
r <- Channel m a b -> m Bool
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m Bool
isBufferAvailable Channel m a b
chan
if Bool
r
then do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
(IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void
(IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
forall a. IORef ([a], Int) -> MVar () -> a -> IO Int
sendEvent
(Channel m a b -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
inputQueue Channel m a b
chan)
(Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
inputItemDoorBell Channel m a b
chan)
(a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
else do
() <- IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar (Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
inputSpaceDoorBell Channel m a b
chan)
Bool
closed <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef (Channel m a b -> IORef Bool
forall (m :: * -> *) a b. Channel m a b -> IORef Bool
closedForInput Channel m a b
chan)
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
closed) m ()
go
cleanup :: MonadIO m => Channel m a b -> m ()
cleanup :: forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m ()
cleanup Channel m a b
chan = do
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Channel m a b -> Bool
forall (m :: * -> *) a b. Channel m a b -> Bool
svarInspectMode Channel m a b
chan) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
AbsTime
t <- Clock -> IO AbsTime
getTime Clock
Monotonic
IORef (Maybe AbsTime) -> Maybe AbsTime -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef (Maybe AbsTime)
svarStopTime (Channel m a b -> SVarStats
forall (m :: * -> *) a b. Channel m a b -> SVarStats
svarStats Channel m a b
chan)) (AbsTime -> Maybe AbsTime
forall a. a -> Maybe a
Just AbsTime
t)
IO String -> String -> IO ()
printSVar (Channel m a b -> IO String
forall (m :: * -> *) a b. Channel m a b -> IO String
dumpChannel Channel m a b
chan) String
"Scan channel done"
finalize :: MonadIO m => Channel m a b -> m ()
finalize :: forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m ()
finalize Channel m a b
chan = do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void
(IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
forall a. IORef ([a], Int) -> MVar () -> a -> IO Int
sendEvent
(Channel m a b -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
inputQueue Channel m a b
chan)
(Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
inputItemDoorBell Channel m a b
chan)
ChildEvent a
forall a. ChildEvent a
ChildStopChannel