-- | Tools for connecting communicating sequential processes (threads)
-- with closeable channels.
module Control.Concurrent.Channel(
    Channel (..),
    writeChannel',
    evalWriteChannel,
    evalWriteChannel',
    consumeChannel,
    stateConsumeChannel,
    feedChannel,
    evalFeedChannel,
    pipeline,
    pipeline_
) where

import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Concurrent.TBCQueue
import Control.Exception
import Control.DeepSeq
import Control.Monad

-- | Any closeable STM-based pipe for inter-thread communication.
class Channel c where
    -- | Returns @Just value@ until the channel is closed, blocking for the next value.
    readChannel :: c a -> STM (Maybe a)

    -- | Writes the given value to the channel if it's still open.
    --
    -- Returns whether the channel is still open so producers know when consumers no longer care.
    writeChannel :: c a -> a -> STM Bool

    -- | Closes the channel so that future writes are no-ops and readers get @Nothing@
    -- once all previously-written values are read.
    closeChannel :: c a -> STM ()

    isClosedChannel :: c a -> STM Bool

instance Channel TBCQueue where
    readChannel :: forall a. TBCQueue a -> STM (Maybe a)
readChannel = TBCQueue a -> STM (Maybe a)
forall a. TBCQueue a -> STM (Maybe a)
readTBCQueue

    writeChannel :: forall a. TBCQueue a -> a -> STM Bool
writeChannel = TBCQueue a -> a -> STM Bool
forall a. TBCQueue a -> a -> STM Bool
writeTBCQueue

    closeChannel :: forall a. TBCQueue a -> STM ()
closeChannel = TBCQueue a -> STM ()
forall a. TBCQueue a -> STM ()
closeTBCQueue

    isClosedChannel :: forall a. TBCQueue a -> STM Bool
isClosedChannel = TBCQueue a -> STM Bool
forall a. TBCQueue a -> STM Bool
isClosedTBCQueue

-- More to follow? A TMVar-like closeable slot?

-- | Writes to the channel, asserting that it hasn't been closed.
--
-- Useful in situations where only the writer closes the channel.
writeChannel' :: (Channel c) => c a -> a -> STM ()
writeChannel' :: forall (c :: * -> *) a. Channel c => c a -> a -> STM ()
writeChannel' c a
c a
v = do
    Bool
o <- c a -> a -> STM Bool
forall a. c a -> a -> STM Bool
forall (c :: * -> *) a. Channel c => c a -> a -> STM Bool
writeChannel c a
c a
v
    Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
o (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ [Char] -> STM ()
forall a. HasCallStack => [Char] -> a
error [Char]
"absurd: writeChannel' on closed channel"

-- | Force a value to normal form before writing it to the given channel.
--
-- One of the goals of CSP is to divide work into independent tasks.
-- Forcing values before passing them to the next actor can improve performance—it
-- keeps the last one in the chain from doing more than its share of evaluation.
evalWriteChannel :: (Channel c, NFData a) => c a -> a -> IO Bool
evalWriteChannel :: forall (c :: * -> *) a.
(Channel c, NFData a) =>
c a -> a -> IO Bool
evalWriteChannel c a
c a
v = do
    a
v' <- a -> IO a
forall a. a -> IO a
evaluate (a -> IO a) -> a -> IO a
forall a b. (a -> b) -> a -> b
$ a -> a
forall a. NFData a => a -> a
force a
v
    STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ c a -> a -> STM Bool
forall a. c a -> a -> STM Bool
forall (c :: * -> *) a. Channel c => c a -> a -> STM Bool
writeChannel c a
c a
v'

-- | `writeChannel'` meets `evalWriteChannel`
evalWriteChannel' :: (Channel c, NFData a) => c a -> a -> IO ()
evalWriteChannel' :: forall (c :: * -> *) a. (Channel c, NFData a) => c a -> a -> IO ()
evalWriteChannel' c a
c a
v = do
    a
v' <- a -> IO a
forall a. a -> IO a
evaluate (a -> IO a) -> a -> IO a
forall a b. (a -> b) -> a -> b
$ a -> a
forall a. NFData a => a -> a
force a
v
    STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ c a -> a -> STM ()
forall (c :: * -> *) a. Channel c => c a -> a -> STM ()
writeChannel' c a
c a
v'

-- | Consume the given channel until it closes,
-- passing values to the given action and collecting its results.
consumeChannel :: (Channel c, Monoid m) => c a -> (a -> IO m) -> IO m
consumeChannel :: forall (c :: * -> *) m a.
(Channel c, Monoid m) =>
c a -> (a -> IO m) -> IO m
consumeChannel c a
c a -> IO m
f = STM (Maybe a) -> IO (Maybe a)
forall a. STM a -> IO a
atomically (c a -> STM (Maybe a)
forall a. c a -> STM (Maybe a)
forall (c :: * -> *) a. Channel c => c a -> STM (Maybe a)
readChannel c a
c) IO (Maybe a) -> (Maybe a -> IO m) -> IO m
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Just a
v -> do
        m
res <- a -> IO m
f a
v
        m -> m -> m
forall a. Monoid a => a -> a -> a
mappend m
res (m -> m) -> IO m -> IO m
forall (m :: * -> *) a b. Monad m => (a -> b) -> m a -> m b
<$!> c a -> (a -> IO m) -> IO m
forall (c :: * -> *) m a.
(Channel c, Monoid m) =>
c a -> (a -> IO m) -> IO m
consumeChannel c a
c a -> IO m
f
    Maybe a
Nothing -> m -> IO m
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure m
forall a. Monoid a => a
mempty

-- | Consume the given channel until it closes,
-- with each action updating some state. Returns the final state.
stateConsumeChannel :: (Channel c) => c a -> s -> (s -> a -> IO s) -> IO s
stateConsumeChannel :: forall (c :: * -> *) a s.
Channel c =>
c a -> s -> (s -> a -> IO s) -> IO s
stateConsumeChannel c a
c !s
state s -> a -> IO s
f = STM (Maybe a) -> IO (Maybe a)
forall a. STM a -> IO a
atomically (c a -> STM (Maybe a)
forall a. c a -> STM (Maybe a)
forall (c :: * -> *) a. Channel c => c a -> STM (Maybe a)
readChannel c a
c) IO (Maybe a) -> (Maybe a -> IO s) -> IO s
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Just a
v -> do
        s
next <- s -> a -> IO s
f s
state a
v
        c a -> s -> (s -> a -> IO s) -> IO s
forall (c :: * -> *) a s.
Channel c =>
c a -> s -> (s -> a -> IO s) -> IO s
stateConsumeChannel c a
c s
next s -> a -> IO s
f
    Maybe a
Nothing -> s -> IO s
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure s
state

feedChannel' :: (Channel c) => c a -> IO (Maybe a) -> IO ()
feedChannel' :: forall (c :: * -> *) a. Channel c => c a -> IO (Maybe a) -> IO ()
feedChannel' c a
c IO (Maybe a)
f = IO (Maybe a)
f IO (Maybe a) -> (Maybe a -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Just a
v -> do
        Bool
stillOpen <- STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ c a -> a -> STM Bool
forall a. c a -> a -> STM Bool
forall (c :: * -> *) a. Channel c => c a -> a -> STM Bool
writeChannel c a
c a
v
        Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
stillOpen (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ c a -> IO (Maybe a) -> IO ()
forall (c :: * -> *) a. Channel c => c a -> IO (Maybe a) -> IO ()
feedChannel' c a
c IO (Maybe a)
f
    Maybe a
Nothing -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ c a -> STM ()
forall a. c a -> STM ()
forall (c :: * -> *) a. Channel c => c a -> STM ()
closeChannel c a
c

-- | Produce values with the given IO action,
-- feeding them into the given channel until it closes or the action returns @Nothing@.
feedChannel :: (Channel c) => c a -> IO (Maybe a) -> IO ()
feedChannel :: forall (c :: * -> *) a. Channel c => c a -> IO (Maybe a) -> IO ()
feedChannel c a
c IO (Maybe a)
f = do
    -- Protect against an f that could block for a long time (e.g., a network socket),
    -- even after the consumer hangs up.
    let chanClosed :: IO ()
chanClosed = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ c a -> STM Bool
forall a. c a -> STM Bool
forall (c :: * -> *) a. Channel c => c a -> STM Bool
isClosedChannel c a
c STM Bool -> (Bool -> STM ()) -> STM ()
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Bool -> STM ()
check
    IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO ()
race_ (c a -> IO (Maybe a) -> IO ()
forall (c :: * -> *) a. Channel c => c a -> IO (Maybe a) -> IO ()
feedChannel' c a
c IO (Maybe a)
f) IO ()
chanClosed

-- | `feedChannel`, but forces produced values.
--
-- See `evalWriteChannel` for the motivation.
evalFeedChannel :: (Channel c, NFData a) => c a -> IO (Maybe a) -> IO ()
evalFeedChannel :: forall (c :: * -> *) a.
(Channel c, NFData a) =>
c a -> IO (Maybe a) -> IO ()
evalFeedChannel c a
c IO (Maybe a)
f = c a -> IO (Maybe a) -> IO ()
forall (c :: * -> *) a. Channel c => c a -> IO (Maybe a) -> IO ()
feedChannel c a
c IO (Maybe a)
f' where
    f' :: IO (Maybe a)
f' = IO (Maybe a)
f IO (Maybe a) -> (Maybe a -> IO (Maybe a)) -> IO (Maybe a)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Maybe a -> IO (Maybe a)
forall a. a -> IO a
evaluate (Maybe a -> IO (Maybe a))
-> (Maybe a -> Maybe a) -> Maybe a -> IO (Maybe a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Maybe a -> Maybe a
forall a. NFData a => a -> a
force
{-# INLINE evalFeedChannel #-}

-- | Create a channel with the given action,
-- then wire it to the given producer and consumer.
pipeline :: (Channel c) => IO (c a) -> (c a -> IO x) -> (c a -> IO y) -> IO (x, y)
pipeline :: forall (c :: * -> *) a x y.
Channel c =>
IO (c a) -> (c a -> IO x) -> (c a -> IO y) -> IO (x, y)
pipeline IO (c a)
new c a -> IO x
producer c a -> IO y
consumer = do
    c a
c <- IO (c a)
new
    let producer' :: IO x
producer' = c a -> IO x
producer c a
c IO x -> IO () -> IO x
forall a b. IO a -> IO b -> IO a
`finally` STM () -> IO ()
forall a. STM a -> IO a
atomically (c a -> STM ()
forall a. c a -> STM ()
forall (c :: * -> *) a. Channel c => c a -> STM ()
closeChannel c a
c)
    IO x -> IO y -> IO (x, y)
forall a b. IO a -> IO b -> IO (a, b)
concurrently IO x
producer' (c a -> IO y
consumer c a
c)

-- | `pipeline`, but ignore the results.
pipeline_ :: (Channel c) => IO (c a) -> (c a -> IO x) -> (c a -> IO y) -> IO ()
pipeline_ :: forall (c :: * -> *) a x y.
Channel c =>
IO (c a) -> (c a -> IO x) -> (c a -> IO y) -> IO ()
pipeline_ IO (c a)
n c a -> IO x
p c a -> IO y
c = IO (x, y) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (x, y) -> IO ()) -> IO (x, y) -> IO ()
forall a b. (a -> b) -> a -> b
$ IO (c a) -> (c a -> IO x) -> (c a -> IO y) -> IO (x, y)
forall (c :: * -> *) a x y.
Channel c =>
IO (c a) -> (c a -> IO x) -> (c a -> IO y) -> IO (x, y)
pipeline IO (c a)
n c a -> IO x
p c a -> IO y
c
{-# INLINE pipeline_ #-}