module Multitasking.Workers
(
SinkTask,
startSink,
putSinkTask,
SourceTask,
startSource,
takeSourceTask,
drainSourceTask,
PipeTask,
startPipe,
putPipeTask,
takePipeTask,
drainPipeTask,
)
where
import Control.Monad (forever)
import Control.Monad.IO.Class
import Data.Void (Void)
import Multitasking.AsyncOperations
import Multitasking.Communication
import Multitasking.Core
import Multitasking.MonadSTM
data SinkTask a = SinkTask
{ forall a. SinkTask a -> Task Void
task :: Task Void,
forall a. SinkTask a -> Sink a
sink :: Sink a
}
startSink :: (MonadIO m) => Coordinator -> (a -> IO ()) -> m (SinkTask a)
startSink :: forall (m :: * -> *) a.
MonadIO m =>
Coordinator -> (a -> IO ()) -> m (SinkTask a)
startSink Coordinator
coordinator a -> IO ()
f = do
queueA <- IO (Queue a) -> m (Queue a)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (Queue a)
forall (m :: * -> *) a. MonadSTM m => m (Queue a)
newQueue
task <- start coordinator $ forever $ do
a <- popQueue queueA
f a
pure $
SinkTask
{ task,
sink = queueSink queueA
}
putSinkTask :: (MonadSTM m) => SinkTask a -> a -> m ()
putSinkTask :: forall (m :: * -> *) a. MonadSTM m => SinkTask a -> a -> m ()
putSinkTask SinkTask a
sinkTask = Sink a -> a -> m ()
forall (m :: * -> *) a. MonadSTM m => Sink a -> a -> m ()
putSink SinkTask a
sinkTask.sink
instance Await (SinkTask a) where
type Payload _ = Void
await :: forall (m :: * -> *).
MonadSTM m =>
SinkTask a -> m (Payload (SinkTask a))
await SinkTask a
sinkTask = Task Void -> m (Payload (Task Void))
forall t (m :: * -> *). (Await t, MonadSTM m) => t -> m (Payload t)
forall (m :: * -> *).
MonadSTM m =>
Task Void -> m (Payload (Task Void))
await SinkTask a
sinkTask.task
data SourceTask a = SourceTask
{ forall a. SourceTask a -> Task Void
task :: Task Void,
forall a. SourceTask a -> Source a
source :: Source a
}
startSource :: (MonadIO m) => Coordinator -> IO a -> m (SourceTask a)
startSource :: forall (m :: * -> *) a.
MonadIO m =>
Coordinator -> IO a -> m (SourceTask a)
startSource Coordinator
coordinator IO a
f = IO (SourceTask a) -> m (SourceTask a)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (SourceTask a) -> m (SourceTask a))
-> IO (SourceTask a) -> m (SourceTask a)
forall a b. (a -> b) -> a -> b
$ do
queueA <- IO (Queue a)
forall (m :: * -> *) a. MonadSTM m => m (Queue a)
newQueue
task <- start coordinator $ forever $ do
a <- f
putQueue queueA a
pure $
SourceTask
{ task,
source = queueSource queueA
}
takeSourceTask :: (MonadSTM m) => SourceTask a -> m a
takeSourceTask :: forall (m :: * -> *) a. MonadSTM m => SourceTask a -> m a
takeSourceTask SourceTask a
sourceTask = Source a -> m a
forall (m :: * -> *) a. MonadSTM m => Source a -> m a
takeSource SourceTask a
sourceTask.source
drainSourceTask :: (MonadSTM m) => SourceTask a -> m (Maybe a)
drainSourceTask :: forall (m :: * -> *) a. MonadSTM m => SourceTask a -> m (Maybe a)
drainSourceTask SourceTask a
sourceTask = Source a -> m (Maybe a)
forall (m :: * -> *) a. MonadSTM m => Source a -> m (Maybe a)
drainSource SourceTask a
sourceTask.source
instance Await (SourceTask a) where
type Payload _ = Void
await :: forall (m :: * -> *).
MonadSTM m =>
SourceTask a -> m (Payload (SourceTask a))
await SourceTask a
sourceTask = Task Void -> m (Payload (Task Void))
forall t (m :: * -> *). (Await t, MonadSTM m) => t -> m (Payload t)
forall (m :: * -> *).
MonadSTM m =>
Task Void -> m (Payload (Task Void))
await SourceTask a
sourceTask.task
data PipeTask a b = PipeTask
{ forall a b. PipeTask a b -> Task Void
task :: Task Void,
forall a b. PipeTask a b -> Sink a
sink :: Sink a,
forall a b. PipeTask a b -> Source b
source :: Source b
}
startPipe :: (MonadIO m) => Coordinator -> (a -> IO b) -> m (PipeTask a b)
startPipe :: forall (m :: * -> *) a b.
MonadIO m =>
Coordinator -> (a -> IO b) -> m (PipeTask a b)
startPipe Coordinator
coordinator a -> IO b
f = IO (PipeTask a b) -> m (PipeTask a b)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (PipeTask a b) -> m (PipeTask a b))
-> IO (PipeTask a b) -> m (PipeTask a b)
forall a b. (a -> b) -> a -> b
$ do
queueA <- IO (Queue a)
forall (m :: * -> *) a. MonadSTM m => m (Queue a)
newQueue
queueB <- newQueue
task <- start coordinator $ forever $ do
a <- popQueue queueA
b <- f a
putQueue queueB b
let pipeTask =
PipeTask
{ Task Void
task :: Task Void
task :: Task Void
task,
sink :: Sink a
sink = Queue a -> Sink a
forall a. Queue a -> Sink a
queueSink Queue a
queueA,
source :: Source b
source = Queue b -> Source b
forall a. Queue a -> Source a
queueSource Queue b
queueB
}
pure $ pipeTask
putPipeTask :: (MonadSTM m) => PipeTask a b -> a -> m ()
putPipeTask :: forall (m :: * -> *) a b. MonadSTM m => PipeTask a b -> a -> m ()
putPipeTask PipeTask a b
pipeTask = Sink a -> a -> m ()
forall (m :: * -> *) a. MonadSTM m => Sink a -> a -> m ()
putSink PipeTask a b
pipeTask.sink
takePipeTask :: (MonadSTM m) => PipeTask a b -> m b
takePipeTask :: forall (m :: * -> *) a b. MonadSTM m => PipeTask a b -> m b
takePipeTask PipeTask a b
pipeTask = Source b -> m b
forall (m :: * -> *) a. MonadSTM m => Source a -> m a
takeSource PipeTask a b
pipeTask.source
drainPipeTask :: (MonadSTM m) => PipeTask a b -> m (Maybe b)
drainPipeTask :: forall (m :: * -> *) a b. MonadSTM m => PipeTask a b -> m (Maybe b)
drainPipeTask PipeTask a b
pipeTask = Source b -> m (Maybe b)
forall (m :: * -> *) a. MonadSTM m => Source a -> m (Maybe a)
drainSource PipeTask a b
pipeTask.source
instance Await (PipeTask a b) where
type Payload _ = Void
await :: forall (m :: * -> *).
MonadSTM m =>
PipeTask a b -> m (Payload (PipeTask a b))
await PipeTask a b
pipeTask = Task Void -> m (Payload (Task Void))
forall t (m :: * -> *). (Await t, MonadSTM m) => t -> m (Payload t)
forall (m :: * -> *).
MonadSTM m =>
Task Void -> m (Payload (Task Void))
await PipeTask a b
pipeTask.task