{-# LANGUAGE CPP #-}
-- |
-- Module      : Streamly.Internal.Data.Scanl.Concurrent
-- Copyright   : (c) 2024 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC

module Streamly.Internal.Data.Scanl.Concurrent
    (
      parTeeWith
    , parDistributeScanM
    , parDistributeScan
    , parDemuxScanM
    , parDemuxScan
    )
where

#include "inline.hs"

import Control.Concurrent (newEmptyMVar, takeMVar, throwTo)
import Control.Monad.Catch (throwM)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.IORef (newIORef, readIORef, atomicModifyIORef)
import Fusion.Plugin.Types (Fuse(..))
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Atomics (atomicModifyIORefCAS)
import Streamly.Internal.Data.Fold (Step (..))
import Streamly.Internal.Data.Scanl (Scanl(..))
import Streamly.Internal.Data.Stream (Stream(..), Step(..))
import Streamly.Internal.Data.SVar.Type (adaptState)
import Streamly.Internal.Data.Tuple.Strict (Tuple3'(..))

import qualified Data.Map.Strict as Map
import qualified Streamly.Internal.Data.Stream as Stream

import Streamly.Internal.Data.Fold.Channel.Type
import Streamly.Internal.Data.Channel.Types

#include "DocTestDataScanl.hs"

-------------------------------------------------------------------------------
-- Concurrent scans
-------------------------------------------------------------------------------

-- | Execute both the scans in a tee concurrently.
--
-- Example:
--
-- >>> src = Stream.delay 1 (Stream.enumerateFromTo 1 3)
-- >>> delay x = threadDelay 1000000 >> print x >> return x
-- >>> c1 = Scanl.lmapM delay Scanl.sum
-- >>> c2 = Scanl.lmapM delay Scanl.length
-- >>> dst = Scanl.parTeeWith id (,) c1 c2
-- >>> Stream.toList $ Stream.scanl dst src
-- ...
--
{-# INLINABLE parTeeWith #-}
parTeeWith :: MonadAsync m =>
       (Config -> Config)
    -> (a -> b -> c)
    -> Scanl m x a
    -> Scanl m x b
    -> Scanl m x c
parTeeWith :: forall (m :: * -> *) a b c x.
MonadAsync m =>
(Config -> Config)
-> (a -> b -> c) -> Scanl m x a -> Scanl m x b -> Scanl m x c
parTeeWith Config -> Config
cfg a -> b -> c
f Scanl m x a
c1 Scanl m x b
c2 = (Tuple3' (Channel m x a) (Channel m x b) c
 -> x -> m (Step (Tuple3' (Channel m x a) (Channel m x b) c) c))
-> m (Step (Tuple3' (Channel m x a) (Channel m x b) c) c)
-> (Tuple3' (Channel m x a) (Channel m x b) c -> m c)
-> (Tuple3' (Channel m x a) (Channel m x b) c -> m c)
-> Scanl m x c
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> (s -> m b) -> Scanl m a b
Scanl Tuple3' (Channel m x a) (Channel m x b) c
-> x -> m (Step (Tuple3' (Channel m x a) (Channel m x b) c) c)
forall {m :: * -> *} {a} {c}.
(MonadIO m, MonadBaseControl IO m, MonadThrow m) =>
Tuple3' (Channel m a a) (Channel m a b) c
-> a -> m (Step (Tuple3' (Channel m a a) (Channel m a b) c) c)
step m (Step (Tuple3' (Channel m x a) (Channel m x b) c) c)
initial Tuple3' (Channel m x a) (Channel m x b) c -> m c
forall {m :: * -> *} {a} {b} {a}. Monad m => Tuple3' a b a -> m a
extract Tuple3' (Channel m x a) (Channel m x b) c -> m c
forall {m :: * -> *} {a} {b} {a} {b} {b}.
MonadIO m =>
Tuple3' (Channel m a b) (Channel m a b) b -> m b
final

    where

    getResponse :: Channel m a b -> Channel m a b -> m (Either b b)
getResponse Channel m a b
ch1 Channel m a b
ch2 = do
        -- NOTE: We do not need a queue and doorbell mechanism for this, a single
        -- MVar should be enough. Also, there is only one writer and it writes
        -- only once before we read it.
        let db1 :: MVar ()
db1 = Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
outputDoorBell Channel m a b
ch1
        let q1 :: IORef ([OutEvent b], Int)
q1 = Channel m a b -> IORef ([OutEvent b], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([OutEvent b], Int)
outputQueue Channel m a b
ch1
        ([OutEvent b]
xs1, Int
_) <- IO ([OutEvent b], Int) -> m ([OutEvent b], Int)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([OutEvent b], Int) -> m ([OutEvent b], Int))
-> IO ([OutEvent b], Int) -> m ([OutEvent b], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([OutEvent b], Int)
-> (([OutEvent b], Int)
    -> (([OutEvent b], Int), ([OutEvent b], Int)))
-> IO ([OutEvent b], Int)
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef ([OutEvent b], Int)
q1 ((([OutEvent b], Int)
  -> (([OutEvent b], Int), ([OutEvent b], Int)))
 -> IO ([OutEvent b], Int))
-> (([OutEvent b], Int)
    -> (([OutEvent b], Int), ([OutEvent b], Int)))
-> IO ([OutEvent b], Int)
forall a b. (a -> b) -> a -> b
$ \([OutEvent b], Int)
x -> (([],Int
0), ([OutEvent b], Int)
x)
        case [OutEvent b]
xs1 of
            [] -> do
                IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
db1
                Channel m a b -> Channel m a b -> m (Either b b)
getResponse Channel m a b
ch1 Channel m a b
ch2
            OutEvent b
x1 : [] -> do
                case OutEvent b
x1 of
                    FoldException ThreadId
_tid SomeException
ex -> do
                        -- XXX
                        -- liftIO $ throwTo ch2Tid ThreadAbort
                        Channel m a b -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m ()
cleanup Channel m a b
ch1
                        Channel m a b -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m ()
cleanup Channel m a b
ch2
                        IO (Either b b) -> m (Either b b)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either b b) -> m (Either b b))
-> IO (Either b b) -> m (Either b b)
forall a b. (a -> b) -> a -> b
$ SomeException -> IO (Either b b)
forall e a. (HasCallStack, Exception e) => e -> IO a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM SomeException
ex
                    FoldDone ThreadId
_tid b
b -> Either b b -> m (Either b b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> Either b b
forall a b. a -> Either a b
Left b
b)
                    FoldPartial b
b -> Either b b -> m (Either b b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> Either b b
forall a b. b -> Either a b
Right b
b)
                    FoldEOF ThreadId
_ -> [Char] -> m (Either b b)
forall a. HasCallStack => [Char] -> a
error [Char]
"parTeeWith: FoldEOF cannot occur here"
            [OutEvent b]
_ -> [Char] -> m (Either b b)
forall a. HasCallStack => [Char] -> a
error [Char]
"parTeeWith: not expecting more than one msg in q"

    processResponses :: a -> b -> Either a a -> Either b b -> m (Step (Tuple3' a b c) c)
processResponses a
ch1 b
ch2 Either a a
r1 Either b b
r2 =
        Step (Tuple3' a b c) c -> m (Step (Tuple3' a b c) c)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Tuple3' a b c) c -> m (Step (Tuple3' a b c) c))
-> Step (Tuple3' a b c) c -> m (Step (Tuple3' a b c) c)
forall a b. (a -> b) -> a -> b
$ case Either a a
r1 of
            Left a
b1 -> do
                case Either b b
r2 of
                    Left b
b2 -> c -> Step (Tuple3' a b c) c
forall s b. b -> Step s b
Done (a -> b -> c
f a
b1 b
b2)
                    Right b
b2 -> c -> Step (Tuple3' a b c) c
forall s b. b -> Step s b
Done (a -> b -> c
f a
b1 b
b2)
            Right a
b1 -> do
                case Either b b
r2 of
                    Left b
b2 -> c -> Step (Tuple3' a b c) c
forall s b. b -> Step s b
Done (a -> b -> c
f a
b1 b
b2)
                    Right b
b2 -> Tuple3' a b c -> Step (Tuple3' a b c) c
forall s b. s -> Step s b
Partial (Tuple3' a b c -> Step (Tuple3' a b c) c)
-> Tuple3' a b c -> Step (Tuple3' a b c) c
forall a b. (a -> b) -> a -> b
$ a -> b -> c -> Tuple3' a b c
forall a b c. a -> b -> c -> Tuple3' a b c
Tuple3' a
ch1 b
ch2 (a -> b -> c
f a
b1 b
b2)

    initial :: m (Step (Tuple3' (Channel m x a) (Channel m x b) c) c)
initial = do
        Channel m x a
ch1 <- (Config -> Config) -> Scanl m x a -> m (Channel m x a)
forall (m :: * -> *) a b.
MonadRunInIO m =>
(Config -> Config) -> Scanl m a b -> m (Channel m a b)
newScanChannel Config -> Config
cfg Scanl m x a
c1
        Channel m x b
ch2 <- (Config -> Config) -> Scanl m x b -> m (Channel m x b)
forall (m :: * -> *) a b.
MonadRunInIO m =>
(Config -> Config) -> Scanl m a b -> m (Channel m a b)
newScanChannel Config -> Config
cfg Scanl m x b
c2
        Either a a
r1 <- Channel m x a -> Channel m x b -> m (Either a a)
forall {m :: * -> *} {a} {b} {a} {b}.
MonadIO m =>
Channel m a b -> Channel m a b -> m (Either b b)
getResponse Channel m x a
ch1 Channel m x b
ch2
        Either b b
r2 <- Channel m x b -> Channel m x a -> m (Either b b)
forall {m :: * -> *} {a} {b} {a} {b}.
MonadIO m =>
Channel m a b -> Channel m a b -> m (Either b b)
getResponse Channel m x b
ch2 Channel m x a
ch1
        Channel m x a
-> Channel m x b
-> Either a a
-> Either b b
-> m (Step (Tuple3' (Channel m x a) (Channel m x b) c) c)
forall {m :: * -> *} {a} {b}.
Monad m =>
a -> b -> Either a a -> Either b b -> m (Step (Tuple3' a b c) c)
processResponses Channel m x a
ch1 Channel m x b
ch2 Either a a
r1 Either b b
r2

    step :: Tuple3' (Channel m a a) (Channel m a b) c
-> a -> m (Step (Tuple3' (Channel m a a) (Channel m a b) c) c)
step (Tuple3' Channel m a a
ch1 Channel m a b
ch2 c
_) a
x = do
        Channel m a a -> a -> m ()
forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> a -> m ()
sendToWorker_ Channel m a a
ch1 a
x
        Channel m a b -> a -> m ()
forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> a -> m ()
sendToWorker_ Channel m a b
ch2 a
x
        Either a a
r1 <- Channel m a a -> Channel m a b -> m (Either a a)
forall {m :: * -> *} {a} {b} {a} {b}.
MonadIO m =>
Channel m a b -> Channel m a b -> m (Either b b)
getResponse Channel m a a
ch1 Channel m a b
ch2
        Either b b
r2 <- Channel m a b -> Channel m a a -> m (Either b b)
forall {m :: * -> *} {a} {b} {a} {b}.
MonadIO m =>
Channel m a b -> Channel m a b -> m (Either b b)
getResponse Channel m a b
ch2 Channel m a a
ch1
        Channel m a a
-> Channel m a b
-> Either a a
-> Either b b
-> m (Step (Tuple3' (Channel m a a) (Channel m a b) c) c)
forall {m :: * -> *} {a} {b}.
Monad m =>
a -> b -> Either a a -> Either b b -> m (Step (Tuple3' a b c) c)
processResponses Channel m a a
ch1 Channel m a b
ch2 Either a a
r1 Either b b
r2

    extract :: Tuple3' a b a -> m a
extract (Tuple3' a
_ b
_ a
x) = a -> m a
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x

    final :: Tuple3' (Channel m a b) (Channel m a b) b -> m b
final (Tuple3' Channel m a b
ch1 Channel m a b
ch2 b
x) = do
        Channel m a b -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m ()
finalize Channel m a b
ch1
        Channel m a b -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m ()
finalize Channel m a b
ch2
        -- XXX generate the final value?
        b -> m b
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return b
x

-- There are two ways to implement a concurrent scan.
--
-- 1. Make the scan itself asynchronous, add the input to the queue, and then
-- extract the output. Extraction will have to be asynchronous, which will
-- require changes to the scan driver. This will require a different Scanl
-- type.
--
-- 2. A monolithic implementation of concurrent Stream->Stream scan, using a
-- custom implementation of the scan and the driver.

{-# ANN type ScanState Fuse #-}
data ScanState s q db f =
      ScanInit
    | ScanGo s q db [f]
    | ScanDrain q db [f]
    | ScanStop

-- XXX return [b] or just b?
-- XXX We can use a one way mailbox type abstraction instead of using an IORef
-- for adding new folds dynamically.

-- | Evaluate a stream by distributing its inputs across zero or more
-- concurrently running scans. New scans can be generated dynamically. Use
-- 'parDistributeScan' for an eaiser to use interface, if you do not need the
-- power of 'parDistributeScanM'.
--
-- Before processing each input element, the supplied action is executed to
-- produce additional scans. These scans are appended to the set of currently
-- active scans. If you do not want the same scan to be added repeatedly,
-- ensure that the action only generates it once (see the example below).
--
-- If there are no scans currently active, the input element is discarded.
-- The results from all active scans are collected and lattened into the
-- the output stream.
--
-- Concurrency and buffering:
--
-- If the input buffer (see 'maxBuffer') is bounded, a scan may block until
-- space becomes available. If any scan is blocked on buffer, all scans are
-- blocked. Processing continues only when all scans have buffer space
-- available.
--
-- Example:
--
-- >>> import Data.IORef
-- >>> ref <- newIORef [Scanl.take 5 Scanl.sum, Scanl.take 5 Scanl.length :: Scanl.Scanl IO Int Int]
-- >>> gen = atomicModifyIORef ref (\xs -> ([], xs))
-- >>> Stream.toList $ Scanl.parDistributeScanM id gen (Stream.enumerateFromTo 1 10)
-- ...
--
{-# INLINE parDistributeScanM #-}
parDistributeScanM :: MonadAsync m =>
    (Config -> Config) -> m [Scanl m a b] -> Stream m a -> Stream m [b]
parDistributeScanM :: forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> m [Scanl m a b] -> Stream m a -> Stream m [b]
parDistributeScanM Config -> Config
cfg m [Scanl m a b]
getFolds (Stream State StreamK m a -> s -> m (Step s a)
sstep s
state) =
    (State StreamK m [b]
 -> ScanState
      s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
 -> m (Step
         (ScanState
            s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
         [b]))
-> ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> Stream m [b]
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m [b]
-> ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> m (Step
        (ScanState
           s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
        [b])
forall {m :: * -> *} {a}.
State StreamK m a
-> ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> m (Step
        (ScanState
           s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
        [b])
step ScanState
  s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
forall s q db f. ScanState s q db f
ScanInit

    where

    -- XXX can be written as a fold
    processOutputs :: [(Channel m a b, ThreadId)]
-> [OutEvent a] -> [a] -> m ([(Channel m a b, ThreadId)], [a])
processOutputs [(Channel m a b, ThreadId)]
chans [OutEvent a]
events [a]
done = do
        case [OutEvent a]
events of
            [] -> ([(Channel m a b, ThreadId)], [a])
-> m ([(Channel m a b, ThreadId)], [a])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ([(Channel m a b, ThreadId)]
chans, [a]
done)
            (OutEvent a
x:[OutEvent a]
xs) ->
                case OutEvent a
x of
                    FoldException ThreadId
_tid SomeException
ex -> do
                        -- XXX report the fold that threw the exception
                        IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ (ThreadId -> IO ()) -> [ThreadId] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (ThreadId -> ThreadAbort -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
`throwTo` ThreadAbort
ThreadAbort) (((Channel m a b, ThreadId) -> ThreadId)
-> [(Channel m a b, ThreadId)] -> [ThreadId]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Channel m a b, ThreadId) -> ThreadId
forall a b. (a, b) -> b
snd [(Channel m a b, ThreadId)]
chans)
                        (Channel m a b -> m ()) -> [Channel m a b] -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Channel m a b -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m ()
cleanup (((Channel m a b, ThreadId) -> Channel m a b)
-> [(Channel m a b, ThreadId)] -> [Channel m a b]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Channel m a b, ThreadId) -> Channel m a b
forall a b. (a, b) -> a
fst [(Channel m a b, ThreadId)]
chans)
                        IO ([(Channel m a b, ThreadId)], [a])
-> m ([(Channel m a b, ThreadId)], [a])
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([(Channel m a b, ThreadId)], [a])
 -> m ([(Channel m a b, ThreadId)], [a]))
-> IO ([(Channel m a b, ThreadId)], [a])
-> m ([(Channel m a b, ThreadId)], [a])
forall a b. (a -> b) -> a -> b
$ SomeException -> IO ([(Channel m a b, ThreadId)], [a])
forall e a. (HasCallStack, Exception e) => e -> IO a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM SomeException
ex
                    FoldDone ThreadId
tid a
b ->
                        let ch :: [(Channel m a b, ThreadId)]
ch = ((Channel m a b, ThreadId) -> Bool)
-> [(Channel m a b, ThreadId)] -> [(Channel m a b, ThreadId)]
forall a. (a -> Bool) -> [a] -> [a]
filter (\(Channel m a b
_, ThreadId
t) -> ThreadId
t ThreadId -> ThreadId -> Bool
forall a. Eq a => a -> a -> Bool
/= ThreadId
tid) [(Channel m a b, ThreadId)]
chans
                         in [(Channel m a b, ThreadId)]
-> [OutEvent a] -> [a] -> m ([(Channel m a b, ThreadId)], [a])
processOutputs [(Channel m a b, ThreadId)]
ch [OutEvent a]
xs (a
ba -> [a] -> [a]
forall a. a -> [a] -> [a]
:[a]
done)
                    FoldEOF ThreadId
tid -> do
                        let ch :: [(Channel m a b, ThreadId)]
ch = ((Channel m a b, ThreadId) -> Bool)
-> [(Channel m a b, ThreadId)] -> [(Channel m a b, ThreadId)]
forall a. (a -> Bool) -> [a] -> [a]
filter (\(Channel m a b
_, ThreadId
t) -> ThreadId
t ThreadId -> ThreadId -> Bool
forall a. Eq a => a -> a -> Bool
/= ThreadId
tid) [(Channel m a b, ThreadId)]
chans
                         in [(Channel m a b, ThreadId)]
-> [OutEvent a] -> [a] -> m ([(Channel m a b, ThreadId)], [a])
processOutputs [(Channel m a b, ThreadId)]
ch [OutEvent a]
xs [a]
done
                    FoldPartial a
b ->
                         [(Channel m a b, ThreadId)]
-> [OutEvent a] -> [a] -> m ([(Channel m a b, ThreadId)], [a])
processOutputs [(Channel m a b, ThreadId)]
chans [OutEvent a]
xs (a
ba -> [a] -> [a]
forall a. a -> [a] -> [a]
:[a]
done)

    collectOutputs :: IORef ([OutEvent a], Int)
-> [(Channel m a b, ThreadId)]
-> m ([(Channel m a b, ThreadId)], [a])
collectOutputs IORef ([OutEvent a], Int)
qref [(Channel m a b, ThreadId)]
chans = do
        ([OutEvent a]
_, Int
n) <- IO ([OutEvent a], Int) -> m ([OutEvent a], Int)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([OutEvent a], Int) -> m ([OutEvent a], Int))
-> IO ([OutEvent a], Int) -> m ([OutEvent a], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([OutEvent a], Int) -> IO ([OutEvent a], Int)
forall a. IORef a -> IO a
readIORef IORef ([OutEvent a], Int)
qref
        if Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
        then do
            [OutEvent a]
r <- (([OutEvent a], Int) -> [OutEvent a])
-> m ([OutEvent a], Int) -> m [OutEvent a]
forall a b. (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ([OutEvent a], Int) -> [OutEvent a]
forall a b. (a, b) -> a
fst (m ([OutEvent a], Int) -> m [OutEvent a])
-> m ([OutEvent a], Int) -> m [OutEvent a]
forall a b. (a -> b) -> a -> b
$ IO ([OutEvent a], Int) -> m ([OutEvent a], Int)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([OutEvent a], Int) -> m ([OutEvent a], Int))
-> IO ([OutEvent a], Int) -> m ([OutEvent a], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([OutEvent a], Int) -> IO ([OutEvent a], Int)
forall a. IORef ([a], Int) -> IO ([a], Int)
readOutputQBasic IORef ([OutEvent a], Int)
qref
            [(Channel m a b, ThreadId)]
-> [OutEvent a] -> [a] -> m ([(Channel m a b, ThreadId)], [a])
forall {m :: * -> *} {a} {b} {a}.
MonadIO m =>
[(Channel m a b, ThreadId)]
-> [OutEvent a] -> [a] -> m ([(Channel m a b, ThreadId)], [a])
processOutputs [(Channel m a b, ThreadId)]
chans [OutEvent a]
r []
        else ([(Channel m a b, ThreadId)], [a])
-> m ([(Channel m a b, ThreadId)], [a])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ([(Channel m a b, ThreadId)]
chans, [])

    step :: State StreamK m a
-> ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> m (Step
        (ScanState
           s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
        [b])
step State StreamK m a
_ ScanState
  s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
ScanInit = do
        IORef ([OutEvent b], Int)
q <- IO (IORef ([OutEvent b], Int)) -> m (IORef ([OutEvent b], Int))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef ([OutEvent b], Int)) -> m (IORef ([OutEvent b], Int)))
-> IO (IORef ([OutEvent b], Int)) -> m (IORef ([OutEvent b], Int))
forall a b. (a -> b) -> a -> b
$ ([OutEvent b], Int) -> IO (IORef ([OutEvent b], Int))
forall a. a -> IO (IORef a)
newIORef ([], Int
0)
        MVar ()
db <- IO (MVar ()) -> m (MVar ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
        Step
  (ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
  [b]
-> m (Step
        (ScanState
           s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
        [b])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
   (ScanState
      s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
   [b]
 -> m (Step
         (ScanState
            s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
         [b]))
-> Step
     (ScanState
        s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
     [b]
-> m (Step
        (ScanState
           s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
        [b])
forall a b. (a -> b) -> a -> b
$ ScanState
  s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> Step
     (ScanState
        s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
     [b]
forall s a. s -> Step s a
Skip (s
-> IORef ([OutEvent b], Int)
-> MVar ()
-> [(Channel m a b, ThreadId)]
-> ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
forall s q db f. s -> q -> db -> [f] -> ScanState s q db f
ScanGo s
state IORef ([OutEvent b], Int)
q MVar ()
db [])

    step State StreamK m a
gst (ScanGo s
st IORef ([OutEvent b], Int)
q MVar ()
db [(Channel m a b, ThreadId)]
chans) = do
        -- merge any new channels added since last input
        [Scanl m a b]
fxs <- m [Scanl m a b]
getFolds
        [(Channel m a b, ThreadId)]
newChans <- (Scanl m a b -> m (Channel m a b, ThreadId))
-> [Scanl m a b] -> m [(Channel m a b, ThreadId)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
Prelude.mapM (IORef ([OutEvent b], Int)
-> MVar ()
-> (Config -> Config)
-> Scanl m a b
-> m (Channel m a b, ThreadId)
forall (m :: * -> *) b a.
MonadRunInIO m =>
IORef ([OutEvent b], Int)
-> MVar ()
-> (Config -> Config)
-> Scanl m a b
-> m (Channel m a b, ThreadId)
newChannelWithScan IORef ([OutEvent b], Int)
q MVar ()
db Config -> Config
cfg) [Scanl m a b]
fxs
        let allChans :: [(Channel m a b, ThreadId)]
allChans = [(Channel m a b, ThreadId)]
chans [(Channel m a b, ThreadId)]
-> [(Channel m a b, ThreadId)] -> [(Channel m a b, ThreadId)]
forall a. [a] -> [a] -> [a]
++ [(Channel m a b, ThreadId)]
newChans

        -- Collect outputs from running channels
        ([(Channel m a b, ThreadId)]
running, [b]
outputs) <- IORef ([OutEvent b], Int)
-> [(Channel m a b, ThreadId)]
-> m ([(Channel m a b, ThreadId)], [b])
forall {m :: * -> *} {a} {a} {b}.
MonadIO m =>
IORef ([OutEvent a], Int)
-> [(Channel m a b, ThreadId)]
-> m ([(Channel m a b, ThreadId)], [a])
collectOutputs IORef ([OutEvent b], Int)
q [(Channel m a b, ThreadId)]
allChans

        -- Send input to running folds
        Step s a
res <- State StreamK m a -> s -> m (Step s a)
sstep (State StreamK m a -> State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
st
        ScanState
  s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
next <- case Step s a
res of
            Yield a
x s
s -> do
                -- XXX The blocking will delay the processing of outputs.
                -- Should we yield the outputs before blocking?
                (Channel m a b -> m ()) -> [Channel m a b] -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
Prelude.mapM_ (Channel m a b -> a -> m ()
forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> a -> m ()
`sendToWorker_` a
x) (((Channel m a b, ThreadId) -> Channel m a b)
-> [(Channel m a b, ThreadId)] -> [Channel m a b]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Channel m a b, ThreadId) -> Channel m a b
forall a b. (a, b) -> a
fst [(Channel m a b, ThreadId)]
running)
                ScanState
  s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> m (ScanState
        s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (ScanState
   s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
 -> m (ScanState
         s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)))
-> ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> m (ScanState
        s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
forall a b. (a -> b) -> a -> b
$ s
-> IORef ([OutEvent b], Int)
-> MVar ()
-> [(Channel m a b, ThreadId)]
-> ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
forall s q db f. s -> q -> db -> [f] -> ScanState s q db f
ScanGo s
s IORef ([OutEvent b], Int)
q MVar ()
db [(Channel m a b, ThreadId)]
running
            Skip s
s -> do
                ScanState
  s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> m (ScanState
        s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (ScanState
   s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
 -> m (ScanState
         s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)))
-> ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> m (ScanState
        s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
forall a b. (a -> b) -> a -> b
$ s
-> IORef ([OutEvent b], Int)
-> MVar ()
-> [(Channel m a b, ThreadId)]
-> ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
forall s q db f. s -> q -> db -> [f] -> ScanState s q db f
ScanGo s
s IORef ([OutEvent b], Int)
q MVar ()
db [(Channel m a b, ThreadId)]
running
            Step s a
Stop -> do
                (Channel m a b -> m ()) -> [Channel m a b] -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
Prelude.mapM_ Channel m a b -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m ()
finalize (((Channel m a b, ThreadId) -> Channel m a b)
-> [(Channel m a b, ThreadId)] -> [Channel m a b]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Channel m a b, ThreadId) -> Channel m a b
forall a b. (a, b) -> a
fst [(Channel m a b, ThreadId)]
running)
                ScanState
  s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> m (ScanState
        s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (ScanState
   s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
 -> m (ScanState
         s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)))
-> ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> m (ScanState
        s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
forall a b. (a -> b) -> a -> b
$ IORef ([OutEvent b], Int)
-> MVar ()
-> [(Channel m a b, ThreadId)]
-> ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
forall s q db f. q -> db -> [f] -> ScanState s q db f
ScanDrain IORef ([OutEvent b], Int)
q MVar ()
db [(Channel m a b, ThreadId)]
running
        if [b] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [b]
outputs
        then Step
  (ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
  [b]
-> m (Step
        (ScanState
           s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
        [b])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
   (ScanState
      s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
   [b]
 -> m (Step
         (ScanState
            s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
         [b]))
-> Step
     (ScanState
        s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
     [b]
-> m (Step
        (ScanState
           s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
        [b])
forall a b. (a -> b) -> a -> b
$ ScanState
  s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> Step
     (ScanState
        s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
     [b]
forall s a. s -> Step s a
Skip ScanState
  s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
next
        else Step
  (ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
  [b]
-> m (Step
        (ScanState
           s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
        [b])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
   (ScanState
      s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
   [b]
 -> m (Step
         (ScanState
            s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
         [b]))
-> Step
     (ScanState
        s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
     [b]
-> m (Step
        (ScanState
           s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
        [b])
forall a b. (a -> b) -> a -> b
$ [b]
-> ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> Step
     (ScanState
        s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
     [b]
forall s a. a -> s -> Step s a
Yield [b]
outputs ScanState
  s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
next
    step State StreamK m a
_ (ScanDrain IORef ([OutEvent b], Int)
q MVar ()
db [(Channel m a b, ThreadId)]
chans) = do
        ([(Channel m a b, ThreadId)]
running, [b]
outputs) <- IORef ([OutEvent b], Int)
-> [(Channel m a b, ThreadId)]
-> m ([(Channel m a b, ThreadId)], [b])
forall {m :: * -> *} {a} {a} {b}.
MonadIO m =>
IORef ([OutEvent a], Int)
-> [(Channel m a b, ThreadId)]
-> m ([(Channel m a b, ThreadId)], [a])
collectOutputs IORef ([OutEvent b], Int)
q [(Channel m a b, ThreadId)]
chans
        case [(Channel m a b, ThreadId)]
running of
            [] -> Step
  (ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
  [b]
-> m (Step
        (ScanState
           s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
        [b])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
   (ScanState
      s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
   [b]
 -> m (Step
         (ScanState
            s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
         [b]))
-> Step
     (ScanState
        s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
     [b]
-> m (Step
        (ScanState
           s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
        [b])
forall a b. (a -> b) -> a -> b
$ [b]
-> ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> Step
     (ScanState
        s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
     [b]
forall s a. a -> s -> Step s a
Yield [b]
outputs ScanState
  s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
forall s q db f. ScanState s q db f
ScanStop
            [(Channel m a b, ThreadId)]
_ -> do
                if [b] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [b]
outputs
                then do
                    IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
db
                    Step
  (ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
  [b]
-> m (Step
        (ScanState
           s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
        [b])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
   (ScanState
      s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
   [b]
 -> m (Step
         (ScanState
            s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
         [b]))
-> Step
     (ScanState
        s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
     [b]
-> m (Step
        (ScanState
           s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
        [b])
forall a b. (a -> b) -> a -> b
$ ScanState
  s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> Step
     (ScanState
        s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
     [b]
forall s a. s -> Step s a
Skip (IORef ([OutEvent b], Int)
-> MVar ()
-> [(Channel m a b, ThreadId)]
-> ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
forall s q db f. q -> db -> [f] -> ScanState s q db f
ScanDrain IORef ([OutEvent b], Int)
q MVar ()
db [(Channel m a b, ThreadId)]
running)
                else Step
  (ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
  [b]
-> m (Step
        (ScanState
           s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
        [b])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
   (ScanState
      s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
   [b]
 -> m (Step
         (ScanState
            s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
         [b]))
-> Step
     (ScanState
        s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
     [b]
-> m (Step
        (ScanState
           s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
        [b])
forall a b. (a -> b) -> a -> b
$ [b]
-> ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
-> Step
     (ScanState
        s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
     [b]
forall s a. a -> s -> Step s a
Yield [b]
outputs (IORef ([OutEvent b], Int)
-> MVar ()
-> [(Channel m a b, ThreadId)]
-> ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
forall s q db f. q -> db -> [f] -> ScanState s q db f
ScanDrain IORef ([OutEvent b], Int)
q MVar ()
db [(Channel m a b, ThreadId)]
running)
    step State StreamK m a
_ ScanState
  s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId)
ScanStop = Step
  (ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
  [b]
-> m (Step
        (ScanState
           s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
        [b])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Step
  (ScanState
     s (IORef ([OutEvent b], Int)) (MVar ()) (Channel m a b, ThreadId))
  [b]
forall s a. Step s a
Stop

-- | A pure variant of 'parDistributeScanM' that uses a fixed list of scans.
--
-- The provided scans are started once and run concurrently for the duration
-- of the stream. Each input element is distributed to all active scans, and
-- their outputs are collected and emitted together.
--
-- Example:
--
-- >>> xs = [Scanl.take 5 Scanl.sum, Scanl.take 5 Scanl.length :: Scanl.Scanl IO Int Int]
-- >>> Stream.toList $ Scanl.parDistributeScan id xs (Stream.enumerateFromTo 1 10)
-- ...
{-# INLINE parDistributeScan #-}
parDistributeScan :: MonadAsync m =>
    (Config -> Config) -> [Scanl m a b] -> Stream m a -> Stream m [b]
parDistributeScan :: forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> [Scanl m a b] -> Stream m a -> Stream m [b]
parDistributeScan Config -> Config
cfg [Scanl m a b]
getFolds Stream m a
stream =
    m (Stream m [b]) -> Stream m [b]
forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
Stream.concatEffect (m (Stream m [b]) -> Stream m [b])
-> m (Stream m [b]) -> Stream m [b]
forall a b. (a -> b) -> a -> b
$ do
        IORef [Scanl m a b]
ref <- IO (IORef [Scanl m a b]) -> m (IORef [Scanl m a b])
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef [Scanl m a b]) -> m (IORef [Scanl m a b]))
-> IO (IORef [Scanl m a b]) -> m (IORef [Scanl m a b])
forall a b. (a -> b) -> a -> b
$ [Scanl m a b] -> IO (IORef [Scanl m a b])
forall a. a -> IO (IORef a)
newIORef [Scanl m a b]
getFolds
        let action :: m [Scanl m a b]
action = IO [Scanl m a b] -> m [Scanl m a b]
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [Scanl m a b] -> m [Scanl m a b])
-> IO [Scanl m a b] -> m [Scanl m a b]
forall a b. (a -> b) -> a -> b
$ IORef [Scanl m a b]
-> ([Scanl m a b] -> ([Scanl m a b], [Scanl m a b]))
-> IO [Scanl m a b]
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef [Scanl m a b]
ref (\[Scanl m a b]
xs -> ([], [Scanl m a b]
xs))
        Stream m [b] -> m (Stream m [b])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream m [b] -> m (Stream m [b]))
-> Stream m [b] -> m (Stream m [b])
forall a b. (a -> b) -> a -> b
$ (Config -> Config) -> m [Scanl m a b] -> Stream m a -> Stream m [b]
forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> m [Scanl m a b] -> Stream m a -> Stream m [b]
parDistributeScanM Config -> Config
cfg m [Scanl m a b]
action Stream m a
stream

{-# ANN type DemuxState Fuse #-}
data DemuxState s q db f =
      DemuxInit
    | DemuxGo s q db f
    | DemuxDrain q db f
    | DemuxStop

-- XXX We need to either (1) remember a key when done so that we do not add the
-- fold again because some inputs would be lost in between, or (2) have a
-- FoldYield constructor to yield repeatedly so that we can restart the
-- existing fold itself when it is done. But in that case we cannot change the
-- fold once it is started. Also the Map would keep on increasing in size as we
-- never delete a key. Whatever we do we should keep the non-concurrent fold as
-- well consistent with that.

-- | Evaluate a stream by routing each input to a scan determined by a key.
--
-- For each distinct key, the first input encountered triggers the creation
-- of a new scan (via the supplied key-to-scan function). This scan is then
-- added to the set of currently active scans. Subsequent inputs with the
-- same key are directed to the same scan.
--
-- If no scan can be created for a key, the input element is discarded.
--
-- When a constituent scan completes, its final output is emitted as part of
-- the composed output stream. The output of 'parDemuxScanM' is a stream of
-- key–value pairs, where each value is the output produced by the scan
-- corresponding to that key.
--
-- For a simpler interface, use 'parDemuxScan' if you do not need the full
-- flexibility of 'parDemuxScanM'.
--
-- Example:
--
-- >>> import qualified Data.Map.Strict as Map
-- >>> import Data.Maybe (fromJust)
-- >>> f1 = ("even", Scanl.take 5 Scanl.sum)
-- >>> f2 = ("odd",  Scanl.take 5 Scanl.sum)
-- >>> kv = Map.fromList [f1, f2]
-- >>> getScan k = return (fromJust $ Map.lookup k kv)
-- >>> getKey x = if even x then "even" else "odd"
-- >>> input = Stream.enumerateFromTo 1 10
-- >>> Stream.toList $ Scanl.parDemuxScanM id getKey getScan input
-- ...
--
{-# INLINE parDemuxScanM #-}
parDemuxScanM :: (MonadAsync m, Ord k) =>
       (Config -> Config)
    -> (a -> k)
    -> (k -> m (Scanl m a b))
    -> Stream m a
    -> Stream m [(k, b)]
parDemuxScanM :: forall (m :: * -> *) k a b.
(MonadAsync m, Ord k) =>
(Config -> Config)
-> (a -> k)
-> (k -> m (Scanl m a b))
-> Stream m a
-> Stream m [(k, b)]
parDemuxScanM Config -> Config
cfg a -> k
getKey k -> m (Scanl m a b)
getFold (Stream State StreamK m a -> s -> m (Step s a)
sstep s
state) =
    (State StreamK m [(k, b)]
 -> DemuxState
      s
      (IORef ([OutEvent (k, b)], Int))
      (MVar ())
      (Map k (Channel m a (k, b), ThreadId))
 -> m (Step
         (DemuxState
            s
            (IORef ([OutEvent (k, b)], Int))
            (MVar ())
            (Map k (Channel m a (k, b), ThreadId)))
         [(k, b)]))
-> DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId))
-> Stream m [(k, b)]
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m [(k, b)]
-> DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId))
-> m (Step
        (DemuxState
           s
           (IORef ([OutEvent (k, b)], Int))
           (MVar ())
           (Map k (Channel m a (k, b), ThreadId)))
        [(k, b)])
forall {m :: * -> *} {a}.
State StreamK m a
-> DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId))
-> m (Step
        (DemuxState
           s
           (IORef ([OutEvent (k, b)], Int))
           (MVar ())
           (Map k (Channel m a (k, b), ThreadId)))
        [(k, b)])
step DemuxState
  s
  (IORef ([OutEvent (k, b)], Int))
  (MVar ())
  (Map k (Channel m a (k, b), ThreadId))
forall s q db f. DemuxState s q db f
DemuxInit

    where

    -- XXX can be written as a fold
    processOutputs :: Map a (Channel m a b, ThreadId)
-> [OutEvent (a, b)]
-> [(a, b)]
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
processOutputs Map a (Channel m a b, ThreadId)
keyToChan [OutEvent (a, b)]
events [(a, b)]
done = do
        case [OutEvent (a, b)]
events of
            [] -> (Map a (Channel m a b, ThreadId), [(a, b)])
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Map a (Channel m a b, ThreadId)
keyToChan, [(a, b)]
done)
            (OutEvent (a, b)
x:[OutEvent (a, b)]
xs) ->
                case OutEvent (a, b)
x of
                    FoldException ThreadId
_tid SomeException
ex -> do
                        -- XXX report the fold that threw the exception
                        let chans :: [(Channel m a b, ThreadId)]
chans = ((a, (Channel m a b, ThreadId)) -> (Channel m a b, ThreadId))
-> [(a, (Channel m a b, ThreadId))] -> [(Channel m a b, ThreadId)]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (a, (Channel m a b, ThreadId)) -> (Channel m a b, ThreadId)
forall a b. (a, b) -> b
snd ([(a, (Channel m a b, ThreadId))] -> [(Channel m a b, ThreadId)])
-> [(a, (Channel m a b, ThreadId))] -> [(Channel m a b, ThreadId)]
forall a b. (a -> b) -> a -> b
$ Map a (Channel m a b, ThreadId) -> [(a, (Channel m a b, ThreadId))]
forall k a. Map k a -> [(k, a)]
Map.toList Map a (Channel m a b, ThreadId)
keyToChan
                        IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ (ThreadId -> IO ()) -> [ThreadId] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (ThreadId -> ThreadAbort -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
`throwTo` ThreadAbort
ThreadAbort) (((Channel m a b, ThreadId) -> ThreadId)
-> [(Channel m a b, ThreadId)] -> [ThreadId]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Channel m a b, ThreadId) -> ThreadId
forall a b. (a, b) -> b
snd [(Channel m a b, ThreadId)]
chans)
                        (Channel m a b -> m ()) -> [Channel m a b] -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Channel m a b -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m ()
cleanup (((Channel m a b, ThreadId) -> Channel m a b)
-> [(Channel m a b, ThreadId)] -> [Channel m a b]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Channel m a b, ThreadId) -> Channel m a b
forall a b. (a, b) -> a
fst [(Channel m a b, ThreadId)]
chans)
                        IO (Map a (Channel m a b, ThreadId), [(a, b)])
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Map a (Channel m a b, ThreadId), [(a, b)])
 -> m (Map a (Channel m a b, ThreadId), [(a, b)]))
-> IO (Map a (Channel m a b, ThreadId), [(a, b)])
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
forall a b. (a -> b) -> a -> b
$ SomeException -> IO (Map a (Channel m a b, ThreadId), [(a, b)])
forall e a. (HasCallStack, Exception e) => e -> IO a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM SomeException
ex
                    FoldDone ThreadId
_tid o :: (a, b)
o@(a
k, b
_) ->
                        let ch :: Map a (Channel m a b, ThreadId)
ch = a
-> Map a (Channel m a b, ThreadId)
-> Map a (Channel m a b, ThreadId)
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete a
k Map a (Channel m a b, ThreadId)
keyToChan
                         in Map a (Channel m a b, ThreadId)
-> [OutEvent (a, b)]
-> [(a, b)]
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
processOutputs Map a (Channel m a b, ThreadId)
ch [OutEvent (a, b)]
xs ((a, b)
o(a, b) -> [(a, b)] -> [(a, b)]
forall a. a -> [a] -> [a]
:[(a, b)]
done)
                    FoldEOF ThreadId
tid ->
                        let chans :: [(a, (Channel m a b, ThreadId))]
chans = Map a (Channel m a b, ThreadId) -> [(a, (Channel m a b, ThreadId))]
forall k a. Map k a -> [(k, a)]
Map.toList Map a (Channel m a b, ThreadId)
keyToChan
                            ch :: [(a, (Channel m a b, ThreadId))]
ch = ((a, (Channel m a b, ThreadId)) -> Bool)
-> [(a, (Channel m a b, ThreadId))]
-> [(a, (Channel m a b, ThreadId))]
forall a. (a -> Bool) -> [a] -> [a]
filter (\(a
_, (Channel m a b
_, ThreadId
t)) -> ThreadId
t ThreadId -> ThreadId -> Bool
forall a. Eq a => a -> a -> Bool
/= ThreadId
tid) [(a, (Channel m a b, ThreadId))]
chans
                         in Map a (Channel m a b, ThreadId)
-> [OutEvent (a, b)]
-> [(a, b)]
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
processOutputs ([(a, (Channel m a b, ThreadId))] -> Map a (Channel m a b, ThreadId)
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList [(a, (Channel m a b, ThreadId))]
ch) [OutEvent (a, b)]
xs [(a, b)]
done
                    FoldPartial (a, b)
b ->
                         Map a (Channel m a b, ThreadId)
-> [OutEvent (a, b)]
-> [(a, b)]
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
processOutputs Map a (Channel m a b, ThreadId)
keyToChan [OutEvent (a, b)]
xs ((a, b)
b(a, b) -> [(a, b)] -> [(a, b)]
forall a. a -> [a] -> [a]
:[(a, b)]
done)

    collectOutputs :: IORef ([OutEvent (a, b)], Int)
-> Map a (Channel m a b, ThreadId)
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
collectOutputs IORef ([OutEvent (a, b)], Int)
qref Map a (Channel m a b, ThreadId)
keyToChan = do
        ([OutEvent (a, b)]
_, Int
n) <- IO ([OutEvent (a, b)], Int) -> m ([OutEvent (a, b)], Int)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([OutEvent (a, b)], Int) -> m ([OutEvent (a, b)], Int))
-> IO ([OutEvent (a, b)], Int) -> m ([OutEvent (a, b)], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([OutEvent (a, b)], Int) -> IO ([OutEvent (a, b)], Int)
forall a. IORef a -> IO a
readIORef IORef ([OutEvent (a, b)], Int)
qref
        if Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
        then do
            [OutEvent (a, b)]
r <- (([OutEvent (a, b)], Int) -> [OutEvent (a, b)])
-> m ([OutEvent (a, b)], Int) -> m [OutEvent (a, b)]
forall a b. (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ([OutEvent (a, b)], Int) -> [OutEvent (a, b)]
forall a b. (a, b) -> a
fst (m ([OutEvent (a, b)], Int) -> m [OutEvent (a, b)])
-> m ([OutEvent (a, b)], Int) -> m [OutEvent (a, b)]
forall a b. (a -> b) -> a -> b
$ IO ([OutEvent (a, b)], Int) -> m ([OutEvent (a, b)], Int)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([OutEvent (a, b)], Int) -> m ([OutEvent (a, b)], Int))
-> IO ([OutEvent (a, b)], Int) -> m ([OutEvent (a, b)], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([OutEvent (a, b)], Int) -> IO ([OutEvent (a, b)], Int)
forall a. IORef ([a], Int) -> IO ([a], Int)
readOutputQBasic IORef ([OutEvent (a, b)], Int)
qref
            Map a (Channel m a b, ThreadId)
-> [OutEvent (a, b)]
-> [(a, b)]
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
forall {m :: * -> *} {a} {a} {b} {b}.
(MonadIO m, Ord a) =>
Map a (Channel m a b, ThreadId)
-> [OutEvent (a, b)]
-> [(a, b)]
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
processOutputs Map a (Channel m a b, ThreadId)
keyToChan [OutEvent (a, b)]
r []
        else (Map a (Channel m a b, ThreadId), [(a, b)])
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Map a (Channel m a b, ThreadId)
keyToChan, [])

    step :: State StreamK m a
-> DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId))
-> m (Step
        (DemuxState
           s
           (IORef ([OutEvent (k, b)], Int))
           (MVar ())
           (Map k (Channel m a (k, b), ThreadId)))
        [(k, b)])
step State StreamK m a
_ DemuxState
  s
  (IORef ([OutEvent (k, b)], Int))
  (MVar ())
  (Map k (Channel m a (k, b), ThreadId))
DemuxInit = do
        IORef ([OutEvent (k, b)], Int)
q <- IO (IORef ([OutEvent (k, b)], Int))
-> m (IORef ([OutEvent (k, b)], Int))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef ([OutEvent (k, b)], Int))
 -> m (IORef ([OutEvent (k, b)], Int)))
-> IO (IORef ([OutEvent (k, b)], Int))
-> m (IORef ([OutEvent (k, b)], Int))
forall a b. (a -> b) -> a -> b
$ ([OutEvent (k, b)], Int) -> IO (IORef ([OutEvent (k, b)], Int))
forall a. a -> IO (IORef a)
newIORef ([], Int
0)
        MVar ()
db <- IO (MVar ()) -> m (MVar ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
        Step
  (DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId)))
  [(k, b)]
-> m (Step
        (DemuxState
           s
           (IORef ([OutEvent (k, b)], Int))
           (MVar ())
           (Map k (Channel m a (k, b), ThreadId)))
        [(k, b)])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
   (DemuxState
      s
      (IORef ([OutEvent (k, b)], Int))
      (MVar ())
      (Map k (Channel m a (k, b), ThreadId)))
   [(k, b)]
 -> m (Step
         (DemuxState
            s
            (IORef ([OutEvent (k, b)], Int))
            (MVar ())
            (Map k (Channel m a (k, b), ThreadId)))
         [(k, b)]))
-> Step
     (DemuxState
        s
        (IORef ([OutEvent (k, b)], Int))
        (MVar ())
        (Map k (Channel m a (k, b), ThreadId)))
     [(k, b)]
-> m (Step
        (DemuxState
           s
           (IORef ([OutEvent (k, b)], Int))
           (MVar ())
           (Map k (Channel m a (k, b), ThreadId)))
        [(k, b)])
forall a b. (a -> b) -> a -> b
$ DemuxState
  s
  (IORef ([OutEvent (k, b)], Int))
  (MVar ())
  (Map k (Channel m a (k, b), ThreadId))
-> Step
     (DemuxState
        s
        (IORef ([OutEvent (k, b)], Int))
        (MVar ())
        (Map k (Channel m a (k, b), ThreadId)))
     [(k, b)]
forall s a. s -> Step s a
Skip (s
-> IORef ([OutEvent (k, b)], Int)
-> MVar ()
-> Map k (Channel m a (k, b), ThreadId)
-> DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId))
forall s q db f. s -> q -> db -> f -> DemuxState s q db f
DemuxGo s
state IORef ([OutEvent (k, b)], Int)
q MVar ()
db Map k (Channel m a (k, b), ThreadId)
forall k a. Map k a
Map.empty)

    step State StreamK m a
gst (DemuxGo s
st IORef ([OutEvent (k, b)], Int)
q MVar ()
db Map k (Channel m a (k, b), ThreadId)
keyToChan) = do
        -- Collect outputs from running channels
        (Map k (Channel m a (k, b), ThreadId)
keyToChan1, [(k, b)]
outputs) <- IORef ([OutEvent (k, b)], Int)
-> Map k (Channel m a (k, b), ThreadId)
-> m (Map k (Channel m a (k, b), ThreadId), [(k, b)])
forall {m :: * -> *} {a} {b} {a} {b}.
(MonadIO m, Ord a) =>
IORef ([OutEvent (a, b)], Int)
-> Map a (Channel m a b, ThreadId)
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
collectOutputs IORef ([OutEvent (k, b)], Int)
q Map k (Channel m a (k, b), ThreadId)
keyToChan

        -- Send input to the selected fold
        Step s a
res <- State StreamK m a -> s -> m (Step s a)
sstep (State StreamK m a -> State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
st

        DemuxState
  s
  (IORef ([OutEvent (k, b)], Int))
  (MVar ())
  (Map k (Channel m a (k, b), ThreadId))
next <- case Step s a
res of
            Yield a
x s
s -> do
                -- XXX If the fold for a particular key is done and we see that
                -- key again. If we have not yet collected the done event we
                -- cannot restart the fold because the previous key is already
                -- installed. Thererfore, restarting the fold for the same key
                -- fraught with races.
                let k :: k
k = a -> k
getKey a
x
                (Map k (Channel m a (k, b), ThreadId)
keyToChan2, Channel m a (k, b)
ch) <-
                    case k
-> Map k (Channel m a (k, b), ThreadId)
-> Maybe (Channel m a (k, b), ThreadId)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup k
k Map k (Channel m a (k, b), ThreadId)
keyToChan1 of
                        Maybe (Channel m a (k, b), ThreadId)
Nothing -> do
                            Scanl m a b
fld <- k -> m (Scanl m a b)
getFold k
k
                            r :: (Channel m a (k, b), ThreadId)
r@(Channel m a (k, b)
chan, ThreadId
_) <- IORef ([OutEvent (k, b)], Int)
-> MVar ()
-> (Config -> Config)
-> Scanl m a (k, b)
-> m (Channel m a (k, b), ThreadId)
forall (m :: * -> *) b a.
MonadRunInIO m =>
IORef ([OutEvent b], Int)
-> MVar ()
-> (Config -> Config)
-> Scanl m a b
-> m (Channel m a b, ThreadId)
newChannelWithScan IORef ([OutEvent (k, b)], Int)
q MVar ()
db Config -> Config
cfg ((b -> (k, b)) -> Scanl m a b -> Scanl m a (k, b)
forall a b. (a -> b) -> Scanl m a a -> Scanl m a b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (k
k,) Scanl m a b
fld)
                            (Map k (Channel m a (k, b), ThreadId), Channel m a (k, b))
-> m (Map k (Channel m a (k, b), ThreadId), Channel m a (k, b))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (k
-> (Channel m a (k, b), ThreadId)
-> Map k (Channel m a (k, b), ThreadId)
-> Map k (Channel m a (k, b), ThreadId)
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert k
k (Channel m a (k, b), ThreadId)
r Map k (Channel m a (k, b), ThreadId)
keyToChan1, Channel m a (k, b)
chan)
                        Just (Channel m a (k, b)
chan, ThreadId
_) -> (Map k (Channel m a (k, b), ThreadId), Channel m a (k, b))
-> m (Map k (Channel m a (k, b), ThreadId), Channel m a (k, b))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Map k (Channel m a (k, b), ThreadId)
keyToChan1, Channel m a (k, b)
chan)
                Channel m a (k, b) -> a -> m ()
forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> a -> m ()
sendToWorker_ Channel m a (k, b)
ch a
x
                DemuxState
  s
  (IORef ([OutEvent (k, b)], Int))
  (MVar ())
  (Map k (Channel m a (k, b), ThreadId))
-> m (DemuxState
        s
        (IORef ([OutEvent (k, b)], Int))
        (MVar ())
        (Map k (Channel m a (k, b), ThreadId)))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (DemuxState
   s
   (IORef ([OutEvent (k, b)], Int))
   (MVar ())
   (Map k (Channel m a (k, b), ThreadId))
 -> m (DemuxState
         s
         (IORef ([OutEvent (k, b)], Int))
         (MVar ())
         (Map k (Channel m a (k, b), ThreadId))))
-> DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId))
-> m (DemuxState
        s
        (IORef ([OutEvent (k, b)], Int))
        (MVar ())
        (Map k (Channel m a (k, b), ThreadId)))
forall a b. (a -> b) -> a -> b
$ s
-> IORef ([OutEvent (k, b)], Int)
-> MVar ()
-> Map k (Channel m a (k, b), ThreadId)
-> DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId))
forall s q db f. s -> q -> db -> f -> DemuxState s q db f
DemuxGo s
s IORef ([OutEvent (k, b)], Int)
q MVar ()
db Map k (Channel m a (k, b), ThreadId)
keyToChan2
            Skip s
s ->
                DemuxState
  s
  (IORef ([OutEvent (k, b)], Int))
  (MVar ())
  (Map k (Channel m a (k, b), ThreadId))
-> m (DemuxState
        s
        (IORef ([OutEvent (k, b)], Int))
        (MVar ())
        (Map k (Channel m a (k, b), ThreadId)))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (DemuxState
   s
   (IORef ([OutEvent (k, b)], Int))
   (MVar ())
   (Map k (Channel m a (k, b), ThreadId))
 -> m (DemuxState
         s
         (IORef ([OutEvent (k, b)], Int))
         (MVar ())
         (Map k (Channel m a (k, b), ThreadId))))
-> DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId))
-> m (DemuxState
        s
        (IORef ([OutEvent (k, b)], Int))
        (MVar ())
        (Map k (Channel m a (k, b), ThreadId)))
forall a b. (a -> b) -> a -> b
$ s
-> IORef ([OutEvent (k, b)], Int)
-> MVar ()
-> Map k (Channel m a (k, b), ThreadId)
-> DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId))
forall s q db f. s -> q -> db -> f -> DemuxState s q db f
DemuxGo s
s IORef ([OutEvent (k, b)], Int)
q MVar ()
db Map k (Channel m a (k, b), ThreadId)
keyToChan1
            Step s a
Stop -> do
                let chans :: [Channel m a (k, b)]
chans = ((k, (Channel m a (k, b), ThreadId)) -> Channel m a (k, b))
-> [(k, (Channel m a (k, b), ThreadId))] -> [Channel m a (k, b)]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((Channel m a (k, b), ThreadId) -> Channel m a (k, b)
forall a b. (a, b) -> a
fst ((Channel m a (k, b), ThreadId) -> Channel m a (k, b))
-> ((k, (Channel m a (k, b), ThreadId))
    -> (Channel m a (k, b), ThreadId))
-> (k, (Channel m a (k, b), ThreadId))
-> Channel m a (k, b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (k, (Channel m a (k, b), ThreadId))
-> (Channel m a (k, b), ThreadId)
forall a b. (a, b) -> b
snd) ([(k, (Channel m a (k, b), ThreadId))] -> [Channel m a (k, b)])
-> [(k, (Channel m a (k, b), ThreadId))] -> [Channel m a (k, b)]
forall a b. (a -> b) -> a -> b
$ Map k (Channel m a (k, b), ThreadId)
-> [(k, (Channel m a (k, b), ThreadId))]
forall k a. Map k a -> [(k, a)]
Map.toList Map k (Channel m a (k, b), ThreadId)
keyToChan1
                (Channel m a (k, b) -> m ()) -> [Channel m a (k, b)] -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
Prelude.mapM_ Channel m a (k, b) -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m ()
finalize [Channel m a (k, b)]
chans
                DemuxState
  s
  (IORef ([OutEvent (k, b)], Int))
  (MVar ())
  (Map k (Channel m a (k, b), ThreadId))
-> m (DemuxState
        s
        (IORef ([OutEvent (k, b)], Int))
        (MVar ())
        (Map k (Channel m a (k, b), ThreadId)))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (DemuxState
   s
   (IORef ([OutEvent (k, b)], Int))
   (MVar ())
   (Map k (Channel m a (k, b), ThreadId))
 -> m (DemuxState
         s
         (IORef ([OutEvent (k, b)], Int))
         (MVar ())
         (Map k (Channel m a (k, b), ThreadId))))
-> DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId))
-> m (DemuxState
        s
        (IORef ([OutEvent (k, b)], Int))
        (MVar ())
        (Map k (Channel m a (k, b), ThreadId)))
forall a b. (a -> b) -> a -> b
$ IORef ([OutEvent (k, b)], Int)
-> MVar ()
-> Map k (Channel m a (k, b), ThreadId)
-> DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId))
forall s q db f. q -> db -> f -> DemuxState s q db f
DemuxDrain IORef ([OutEvent (k, b)], Int)
q MVar ()
db Map k (Channel m a (k, b), ThreadId)
keyToChan1
        if [(k, b)] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [(k, b)]
outputs
        then Step
  (DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId)))
  [(k, b)]
-> m (Step
        (DemuxState
           s
           (IORef ([OutEvent (k, b)], Int))
           (MVar ())
           (Map k (Channel m a (k, b), ThreadId)))
        [(k, b)])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
   (DemuxState
      s
      (IORef ([OutEvent (k, b)], Int))
      (MVar ())
      (Map k (Channel m a (k, b), ThreadId)))
   [(k, b)]
 -> m (Step
         (DemuxState
            s
            (IORef ([OutEvent (k, b)], Int))
            (MVar ())
            (Map k (Channel m a (k, b), ThreadId)))
         [(k, b)]))
-> Step
     (DemuxState
        s
        (IORef ([OutEvent (k, b)], Int))
        (MVar ())
        (Map k (Channel m a (k, b), ThreadId)))
     [(k, b)]
-> m (Step
        (DemuxState
           s
           (IORef ([OutEvent (k, b)], Int))
           (MVar ())
           (Map k (Channel m a (k, b), ThreadId)))
        [(k, b)])
forall a b. (a -> b) -> a -> b
$ DemuxState
  s
  (IORef ([OutEvent (k, b)], Int))
  (MVar ())
  (Map k (Channel m a (k, b), ThreadId))
-> Step
     (DemuxState
        s
        (IORef ([OutEvent (k, b)], Int))
        (MVar ())
        (Map k (Channel m a (k, b), ThreadId)))
     [(k, b)]
forall s a. s -> Step s a
Skip DemuxState
  s
  (IORef ([OutEvent (k, b)], Int))
  (MVar ())
  (Map k (Channel m a (k, b), ThreadId))
next
        else Step
  (DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId)))
  [(k, b)]
-> m (Step
        (DemuxState
           s
           (IORef ([OutEvent (k, b)], Int))
           (MVar ())
           (Map k (Channel m a (k, b), ThreadId)))
        [(k, b)])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
   (DemuxState
      s
      (IORef ([OutEvent (k, b)], Int))
      (MVar ())
      (Map k (Channel m a (k, b), ThreadId)))
   [(k, b)]
 -> m (Step
         (DemuxState
            s
            (IORef ([OutEvent (k, b)], Int))
            (MVar ())
            (Map k (Channel m a (k, b), ThreadId)))
         [(k, b)]))
-> Step
     (DemuxState
        s
        (IORef ([OutEvent (k, b)], Int))
        (MVar ())
        (Map k (Channel m a (k, b), ThreadId)))
     [(k, b)]
-> m (Step
        (DemuxState
           s
           (IORef ([OutEvent (k, b)], Int))
           (MVar ())
           (Map k (Channel m a (k, b), ThreadId)))
        [(k, b)])
forall a b. (a -> b) -> a -> b
$ [(k, b)]
-> DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId))
-> Step
     (DemuxState
        s
        (IORef ([OutEvent (k, b)], Int))
        (MVar ())
        (Map k (Channel m a (k, b), ThreadId)))
     [(k, b)]
forall s a. a -> s -> Step s a
Yield [(k, b)]
outputs DemuxState
  s
  (IORef ([OutEvent (k, b)], Int))
  (MVar ())
  (Map k (Channel m a (k, b), ThreadId))
next
    step State StreamK m a
_ (DemuxDrain IORef ([OutEvent (k, b)], Int)
q MVar ()
db Map k (Channel m a (k, b), ThreadId)
keyToChan) = do
        (Map k (Channel m a (k, b), ThreadId)
keyToChan1, [(k, b)]
outputs) <- IORef ([OutEvent (k, b)], Int)
-> Map k (Channel m a (k, b), ThreadId)
-> m (Map k (Channel m a (k, b), ThreadId), [(k, b)])
forall {m :: * -> *} {a} {b} {a} {b}.
(MonadIO m, Ord a) =>
IORef ([OutEvent (a, b)], Int)
-> Map a (Channel m a b, ThreadId)
-> m (Map a (Channel m a b, ThreadId), [(a, b)])
collectOutputs IORef ([OutEvent (k, b)], Int)
q Map k (Channel m a (k, b), ThreadId)
keyToChan
        if Map k (Channel m a (k, b), ThreadId) -> Bool
forall k a. Map k a -> Bool
Map.null Map k (Channel m a (k, b), ThreadId)
keyToChan1
        -- XXX null outputs case
        then Step
  (DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId)))
  [(k, b)]
-> m (Step
        (DemuxState
           s
           (IORef ([OutEvent (k, b)], Int))
           (MVar ())
           (Map k (Channel m a (k, b), ThreadId)))
        [(k, b)])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
   (DemuxState
      s
      (IORef ([OutEvent (k, b)], Int))
      (MVar ())
      (Map k (Channel m a (k, b), ThreadId)))
   [(k, b)]
 -> m (Step
         (DemuxState
            s
            (IORef ([OutEvent (k, b)], Int))
            (MVar ())
            (Map k (Channel m a (k, b), ThreadId)))
         [(k, b)]))
-> Step
     (DemuxState
        s
        (IORef ([OutEvent (k, b)], Int))
        (MVar ())
        (Map k (Channel m a (k, b), ThreadId)))
     [(k, b)]
-> m (Step
        (DemuxState
           s
           (IORef ([OutEvent (k, b)], Int))
           (MVar ())
           (Map k (Channel m a (k, b), ThreadId)))
        [(k, b)])
forall a b. (a -> b) -> a -> b
$ [(k, b)]
-> DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId))
-> Step
     (DemuxState
        s
        (IORef ([OutEvent (k, b)], Int))
        (MVar ())
        (Map k (Channel m a (k, b), ThreadId)))
     [(k, b)]
forall s a. a -> s -> Step s a
Yield [(k, b)]
outputs DemuxState
  s
  (IORef ([OutEvent (k, b)], Int))
  (MVar ())
  (Map k (Channel m a (k, b), ThreadId))
forall s q db f. DemuxState s q db f
DemuxStop
        else do
            if [(k, b)] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [(k, b)]
outputs
            then do
                IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
db
                Step
  (DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId)))
  [(k, b)]
-> m (Step
        (DemuxState
           s
           (IORef ([OutEvent (k, b)], Int))
           (MVar ())
           (Map k (Channel m a (k, b), ThreadId)))
        [(k, b)])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
   (DemuxState
      s
      (IORef ([OutEvent (k, b)], Int))
      (MVar ())
      (Map k (Channel m a (k, b), ThreadId)))
   [(k, b)]
 -> m (Step
         (DemuxState
            s
            (IORef ([OutEvent (k, b)], Int))
            (MVar ())
            (Map k (Channel m a (k, b), ThreadId)))
         [(k, b)]))
-> Step
     (DemuxState
        s
        (IORef ([OutEvent (k, b)], Int))
        (MVar ())
        (Map k (Channel m a (k, b), ThreadId)))
     [(k, b)]
-> m (Step
        (DemuxState
           s
           (IORef ([OutEvent (k, b)], Int))
           (MVar ())
           (Map k (Channel m a (k, b), ThreadId)))
        [(k, b)])
forall a b. (a -> b) -> a -> b
$ DemuxState
  s
  (IORef ([OutEvent (k, b)], Int))
  (MVar ())
  (Map k (Channel m a (k, b), ThreadId))
-> Step
     (DemuxState
        s
        (IORef ([OutEvent (k, b)], Int))
        (MVar ())
        (Map k (Channel m a (k, b), ThreadId)))
     [(k, b)]
forall s a. s -> Step s a
Skip (IORef ([OutEvent (k, b)], Int)
-> MVar ()
-> Map k (Channel m a (k, b), ThreadId)
-> DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId))
forall s q db f. q -> db -> f -> DemuxState s q db f
DemuxDrain IORef ([OutEvent (k, b)], Int)
q MVar ()
db Map k (Channel m a (k, b), ThreadId)
keyToChan1)
            else Step
  (DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId)))
  [(k, b)]
-> m (Step
        (DemuxState
           s
           (IORef ([OutEvent (k, b)], Int))
           (MVar ())
           (Map k (Channel m a (k, b), ThreadId)))
        [(k, b)])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
   (DemuxState
      s
      (IORef ([OutEvent (k, b)], Int))
      (MVar ())
      (Map k (Channel m a (k, b), ThreadId)))
   [(k, b)]
 -> m (Step
         (DemuxState
            s
            (IORef ([OutEvent (k, b)], Int))
            (MVar ())
            (Map k (Channel m a (k, b), ThreadId)))
         [(k, b)]))
-> Step
     (DemuxState
        s
        (IORef ([OutEvent (k, b)], Int))
        (MVar ())
        (Map k (Channel m a (k, b), ThreadId)))
     [(k, b)]
-> m (Step
        (DemuxState
           s
           (IORef ([OutEvent (k, b)], Int))
           (MVar ())
           (Map k (Channel m a (k, b), ThreadId)))
        [(k, b)])
forall a b. (a -> b) -> a -> b
$ [(k, b)]
-> DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId))
-> Step
     (DemuxState
        s
        (IORef ([OutEvent (k, b)], Int))
        (MVar ())
        (Map k (Channel m a (k, b), ThreadId)))
     [(k, b)]
forall s a. a -> s -> Step s a
Yield [(k, b)]
outputs (IORef ([OutEvent (k, b)], Int)
-> MVar ()
-> Map k (Channel m a (k, b), ThreadId)
-> DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId))
forall s q db f. q -> db -> f -> DemuxState s q db f
DemuxDrain IORef ([OutEvent (k, b)], Int)
q MVar ()
db Map k (Channel m a (k, b), ThreadId)
keyToChan1)
    step State StreamK m a
_ DemuxState
  s
  (IORef ([OutEvent (k, b)], Int))
  (MVar ())
  (Map k (Channel m a (k, b), ThreadId))
DemuxStop = Step
  (DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId)))
  [(k, b)]
-> m (Step
        (DemuxState
           s
           (IORef ([OutEvent (k, b)], Int))
           (MVar ())
           (Map k (Channel m a (k, b), ThreadId)))
        [(k, b)])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Step
  (DemuxState
     s
     (IORef ([OutEvent (k, b)], Int))
     (MVar ())
     (Map k (Channel m a (k, b), ThreadId)))
  [(k, b)]
forall s a. Step s a
Stop

-- | A pure variant of 'parDemuxScanM' where the key-to-scan mapping is
-- static and does not require monadic effects.
--
-- Each distinct key is deterministically mapped to a scan using the provided
-- function. The behavior is otherwise the same as 'parDemuxScanM'.
{-# INLINE parDemuxScan #-}
parDemuxScan :: (MonadAsync m, Ord k) =>
       (Config -> Config)
    -> (a -> k)
    -> (k -> Scanl m a b)
    -> Stream m a
    -> Stream m [(k, b)]
parDemuxScan :: forall (m :: * -> *) k a b.
(MonadAsync m, Ord k) =>
(Config -> Config)
-> (a -> k)
-> (k -> Scanl m a b)
-> Stream m a
-> Stream m [(k, b)]
parDemuxScan Config -> Config
cfg a -> k
getKey k -> Scanl m a b
getFold = (Config -> Config)
-> (a -> k)
-> (k -> m (Scanl m a b))
-> Stream m a
-> Stream m [(k, b)]
forall (m :: * -> *) k a b.
(MonadAsync m, Ord k) =>
(Config -> Config)
-> (a -> k)
-> (k -> m (Scanl m a b))
-> Stream m a
-> Stream m [(k, b)]
parDemuxScanM Config -> Config
cfg a -> k
getKey (Scanl m a b -> m (Scanl m a b)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Scanl m a b -> m (Scanl m a b))
-> (k -> Scanl m a b) -> k -> m (Scanl m a b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. k -> Scanl m a b
getFold)