module Multitasking.Workers
  ( -- ** Sink Task
    SinkTask,
    startSink,
    putSinkTask,

    -- ** Source Task
    SourceTask,
    startSource,
    takeSourceTask,
    drainSourceTask,

    -- ** Pipe Task
    PipeTask,
    startPipe,
    putPipeTask,
    takePipeTask,
    drainPipeTask,
  )
where

import Control.Monad (forever)
import Control.Monad.IO.Class
import Data.Void (Void)
import Multitasking.AsyncOperations
import Multitasking.Communication
import Multitasking.Core
import Multitasking.MonadSTM

-- | A 'SinkTask' continously consumes values of type `a`.
-- Multiples values can queue up if 'writeSinkTask' is too fast.
data SinkTask a = SinkTask
  { forall a. SinkTask a -> Task Void
task :: Task Void,
    forall a. SinkTask a -> Sink a
sink :: Sink a
  }

-- | start a 'SinkTask'. It will execute the given function until it is canceled.
startSink :: (MonadIO m) => Coordinator -> (a -> IO ()) -> m (SinkTask a)
startSink :: forall (m :: * -> *) a.
MonadIO m =>
Coordinator -> (a -> IO ()) -> m (SinkTask a)
startSink Coordinator
coordinator a -> IO ()
f = do
  queueA <- IO (Queue a) -> m (Queue a)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (Queue a)
forall (m :: * -> *) a. MonadSTM m => m (Queue a)
newQueue
  task <- start coordinator $ forever $ do
    a <- popQueue queueA
    f a

  pure $
    SinkTask
      { task,
        sink = queueSink queueA
      }

-- | Feeds a new value to the sink task
putSinkTask :: (MonadSTM m) => SinkTask a -> a -> m ()
putSinkTask :: forall (m :: * -> *) a. MonadSTM m => SinkTask a -> a -> m ()
putSinkTask SinkTask a
sinkTask = Sink a -> a -> m ()
forall (m :: * -> *) a. MonadSTM m => Sink a -> a -> m ()
putSink SinkTask a
sinkTask.sink

instance Await (SinkTask a) where
  type Payload _ = Void
  await :: forall (m :: * -> *).
MonadSTM m =>
SinkTask a -> m (Payload (SinkTask a))
await SinkTask a
sinkTask = Task Void -> m (Payload (Task Void))
forall t (m :: * -> *). (Await t, MonadSTM m) => t -> m (Payload t)
forall (m :: * -> *).
MonadSTM m =>
Task Void -> m (Payload (Task Void))
await SinkTask a
sinkTask.task

-- | A 'SourceTask' is a task which continously produces values of type 'a'.
-- Multiples values can queue up to be consumed with 'readSourceTask'.
data SourceTask a = SourceTask
  { forall a. SourceTask a -> Task Void
task :: Task Void,
    forall a. SourceTask a -> Source a
source :: Source a
  }

-- | start a 'SourceTask' with the given action. It will be executed forever to produce new values.
startSource :: (MonadIO m) => Coordinator -> IO a -> m (SourceTask a)
startSource :: forall (m :: * -> *) a.
MonadIO m =>
Coordinator -> IO a -> m (SourceTask a)
startSource Coordinator
coordinator IO a
f = IO (SourceTask a) -> m (SourceTask a)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (SourceTask a) -> m (SourceTask a))
-> IO (SourceTask a) -> m (SourceTask a)
forall a b. (a -> b) -> a -> b
$ do
  queueA <- IO (Queue a)
forall (m :: * -> *) a. MonadSTM m => m (Queue a)
newQueue
  task <- start coordinator $ forever $ do
    a <- f
    putQueue queueA a

  pure $
    SourceTask
      { task,
        source = queueSource queueA
      }

-- | Take a value from the 'SourceTask', waiting if there is no output.
takeSourceTask :: (MonadSTM m) => SourceTask a -> m a
takeSourceTask :: forall (m :: * -> *) a. MonadSTM m => SourceTask a -> m a
takeSourceTask SourceTask a
sourceTask = Source a -> m a
forall (m :: * -> *) a. MonadSTM m => Source a -> m a
takeSource SourceTask a
sourceTask.source

-- | Get a value from the 'SourceTask'. Returns immediately with 'Nothing' if there is no output.
drainSourceTask :: (MonadSTM m) => SourceTask a -> m (Maybe a)
drainSourceTask :: forall (m :: * -> *) a. MonadSTM m => SourceTask a -> m (Maybe a)
drainSourceTask SourceTask a
sourceTask = Source a -> m (Maybe a)
forall (m :: * -> *) a. MonadSTM m => Source a -> m (Maybe a)
drainSource SourceTask a
sourceTask.source

instance Await (SourceTask a) where
  type Payload _ = Void
  await :: forall (m :: * -> *).
MonadSTM m =>
SourceTask a -> m (Payload (SourceTask a))
await SourceTask a
sourceTask = Task Void -> m (Payload (Task Void))
forall t (m :: * -> *). (Await t, MonadSTM m) => t -> m (Payload t)
forall (m :: * -> *).
MonadSTM m =>
Task Void -> m (Payload (Task Void))
await SourceTask a
sourceTask.task

-- | A pipe task transforms values of type 'a' to values of type 'b'.
-- Both ends can queue up.
data PipeTask a b = PipeTask
  { forall a b. PipeTask a b -> Task Void
task :: Task Void,
    forall a b. PipeTask a b -> Sink a
sink :: Sink a,
    forall a b. PipeTask a b -> Source b
source :: Source b
  }

-- | start a 'PipeTask' with the given function. It will be executed continously.
startPipe :: (MonadIO m) => Coordinator -> (a -> IO b) -> m (PipeTask a b)
startPipe :: forall (m :: * -> *) a b.
MonadIO m =>
Coordinator -> (a -> IO b) -> m (PipeTask a b)
startPipe Coordinator
coordinator a -> IO b
f = IO (PipeTask a b) -> m (PipeTask a b)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (PipeTask a b) -> m (PipeTask a b))
-> IO (PipeTask a b) -> m (PipeTask a b)
forall a b. (a -> b) -> a -> b
$ do
  queueA <- IO (Queue a)
forall (m :: * -> *) a. MonadSTM m => m (Queue a)
newQueue
  queueB <- newQueue
  task <- start coordinator $ forever $ do
    a <- popQueue queueA
    b <- f a
    putQueue queueB b

  let pipeTask =
        PipeTask
          { Task Void
task :: Task Void
task :: Task Void
task,
            sink :: Sink a
sink = Queue a -> Sink a
forall a. Queue a -> Sink a
queueSink Queue a
queueA,
            source :: Source b
source = Queue b -> Source b
forall a. Queue a -> Source a
queueSource Queue b
queueB
          }

  pure $ pipeTask

-- | Feeds new values to the pipe task
putPipeTask :: (MonadSTM m) => PipeTask a b -> a -> m ()
putPipeTask :: forall (m :: * -> *) a b. MonadSTM m => PipeTask a b -> a -> m ()
putPipeTask PipeTask a b
pipeTask = Sink a -> a -> m ()
forall (m :: * -> *) a. MonadSTM m => Sink a -> a -> m ()
putSink PipeTask a b
pipeTask.sink

-- | Gets the output from the pipe task. Waits if there is output available yet.
takePipeTask :: (MonadSTM m) => PipeTask a b -> m b
takePipeTask :: forall (m :: * -> *) a b. MonadSTM m => PipeTask a b -> m b
takePipeTask PipeTask a b
pipeTask = Source b -> m b
forall (m :: * -> *) a. MonadSTM m => Source a -> m a
takeSource PipeTask a b
pipeTask.source

-- | Gets the output from the pipe task. Does not wait for output.
drainPipeTask :: (MonadSTM m) => PipeTask a b -> m (Maybe b)
drainPipeTask :: forall (m :: * -> *) a b. MonadSTM m => PipeTask a b -> m (Maybe b)
drainPipeTask PipeTask a b
pipeTask = Source b -> m (Maybe b)
forall (m :: * -> *) a. MonadSTM m => Source a -> m (Maybe a)
drainSource PipeTask a b
pipeTask.source

instance Await (PipeTask a b) where
  type Payload _ = Void
  await :: forall (m :: * -> *).
MonadSTM m =>
PipeTask a b -> m (Payload (PipeTask a b))
await PipeTask a b
pipeTask = Task Void -> m (Payload (Task Void))
forall t (m :: * -> *). (Await t, MonadSTM m) => t -> m (Payload t)
forall (m :: * -> *).
MonadSTM m =>
Task Void -> m (Payload (Task Void))
await PipeTask a b
pipeTask.task