-- | Tools for connecting communicating sequential processes (threads)
-- with endless channels.
--
-- Many useful programs have no end to their input—they run in a steady state until killed.
-- This module provides the same utilities as "Control.Concurrent.Channel" (see those docs),
-- but for standard STM types we know and love.
module Control.Concurrent.Channel.Endless(
    Channel (..),
    evalWriteChannel,
    consumeChannel,
    feedChannel,
    evalFeedChannel,
    pipeline,
    pipeline_
) where

import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Exception
import Control.DeepSeq
import Control.Monad

class Channel c where
    readChannel :: c a -> STM a

    writeChannel :: c a -> a -> STM ()

instance Channel TBQueue where
    readChannel :: forall a. TBQueue a -> STM a
readChannel = TBQueue a -> STM a
forall a. TBQueue a -> STM a
readTBQueue

    writeChannel :: forall a. TBQueue a -> a -> STM ()
writeChannel = TBQueue a -> a -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue

instance Channel TMVar where
    readChannel :: forall a. TMVar a -> STM a
readChannel = TMVar a -> STM a
forall a. TMVar a -> STM a
takeTMVar

    writeChannel :: forall a. TMVar a -> a -> STM ()
writeChannel = TMVar a -> a -> STM ()
forall a. TMVar a -> a -> STM ()
writeTMVar

evalWriteChannel :: (Channel c, NFData a) => c a -> a -> IO ()
evalWriteChannel :: forall (c :: * -> *) a. (Channel c, NFData a) => c a -> a -> IO ()
evalWriteChannel c a
c a
v = do
    a
v' <- a -> IO a
forall a. a -> IO a
evaluate (a -> IO a) -> a -> IO a
forall a b. (a -> b) -> a -> b
$ a -> a
forall a. NFData a => a -> a
force a
v
    STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ c a -> a -> STM ()
forall a. c a -> a -> STM ()
forall (c :: * -> *) a. Channel c => c a -> a -> STM ()
writeChannel c a
c a
v'

consumeChannel :: (Channel c) => c a -> (a -> IO ()) -> IO ()
consumeChannel :: forall (c :: * -> *) a. Channel c => c a -> (a -> IO ()) -> IO ()
consumeChannel c a
c a -> IO ()
f = IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM a -> IO a
forall a. STM a -> IO a
atomically (c a -> STM a
forall a. c a -> STM a
forall (c :: * -> *) a. Channel c => c a -> STM a
readChannel c a
c) IO a -> (a -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= a -> IO ()
f

feedChannel :: (Channel c) => c a -> IO a -> IO ()
feedChannel :: forall (c :: * -> *) a. Channel c => c a -> IO a -> IO ()
feedChannel c a
c IO a
f = IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IO a
f IO a -> (a -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> (a -> STM ()) -> a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. c a -> a -> STM ()
forall a. c a -> a -> STM ()
forall (c :: * -> *) a. Channel c => c a -> a -> STM ()
writeChannel c a
c

evalFeedChannel :: (Channel c, NFData a) => c a -> IO a -> IO ()
evalFeedChannel :: forall (c :: * -> *) a.
(Channel c, NFData a) =>
c a -> IO a -> IO ()
evalFeedChannel c a
c IO a
f = c a -> IO a -> IO ()
forall (c :: * -> *) a. Channel c => c a -> IO a -> IO ()
feedChannel c a
c IO a
f' where
    f' :: IO a
f' = IO a
f IO a -> (a -> IO a) -> IO a
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= a -> IO a
forall a. a -> IO a
evaluate (a -> IO a) -> (a -> a) -> a -> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> a
forall a. NFData a => a -> a
force
{-# INLINE evalFeedChannel #-}

-- There's not much to these when you don't ever close, but for completeness:

pipeline :: IO c -> (c -> IO x) -> (c -> IO y) -> IO (x, y)
pipeline :: forall c x y. IO c -> (c -> IO x) -> (c -> IO y) -> IO (x, y)
pipeline IO c
new c -> IO x
producer c -> IO y
consumer = do
    c
c <- IO c
new
    IO x -> IO y -> IO (x, y)
forall a b. IO a -> IO b -> IO (a, b)
concurrently (c -> IO x
producer c
c) (c -> IO y
consumer c
c)

pipeline_ :: IO c -> (c -> IO x) -> (c -> IO y) -> IO ()
pipeline_ :: forall c x y. IO c -> (c -> IO x) -> (c -> IO y) -> IO ()
pipeline_ IO c
n c -> IO x
p c -> IO y
c = IO (x, y) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (x, y) -> IO ()) -> IO (x, y) -> IO ()
forall a b. (a -> b) -> a -> b
$ IO c -> (c -> IO x) -> (c -> IO y) -> IO (x, y)
forall c x y. IO c -> (c -> IO x) -> (c -> IO y) -> IO (x, y)
pipeline IO c
n c -> IO x
p c -> IO y
c
{-# INLINE pipeline_ #-}