module Control.Concurrent.Channel.Endless(
Channel (..),
evalWriteChannel,
consumeChannel,
feedChannel,
evalFeedChannel,
pipeline,
pipeline_
) where
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Exception
import Control.DeepSeq
import Control.Monad
class Channel c where
readChannel :: c a -> STM a
writeChannel :: c a -> a -> STM ()
instance Channel TBQueue where
readChannel :: forall a. TBQueue a -> STM a
readChannel = TBQueue a -> STM a
forall a. TBQueue a -> STM a
readTBQueue
writeChannel :: forall a. TBQueue a -> a -> STM ()
writeChannel = TBQueue a -> a -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue
instance Channel TMVar where
readChannel :: forall a. TMVar a -> STM a
readChannel = TMVar a -> STM a
forall a. TMVar a -> STM a
takeTMVar
writeChannel :: forall a. TMVar a -> a -> STM ()
writeChannel = TMVar a -> a -> STM ()
forall a. TMVar a -> a -> STM ()
writeTMVar
evalWriteChannel :: (Channel c, NFData a) => c a -> a -> IO ()
evalWriteChannel :: forall (c :: * -> *) a. (Channel c, NFData a) => c a -> a -> IO ()
evalWriteChannel c a
c a
v = do
a
v' <- a -> IO a
forall a. a -> IO a
evaluate (a -> IO a) -> a -> IO a
forall a b. (a -> b) -> a -> b
$ a -> a
forall a. NFData a => a -> a
force a
v
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ c a -> a -> STM ()
forall a. c a -> a -> STM ()
forall (c :: * -> *) a. Channel c => c a -> a -> STM ()
writeChannel c a
c a
v'
consumeChannel :: (Channel c) => c a -> (a -> IO ()) -> IO ()
consumeChannel :: forall (c :: * -> *) a. Channel c => c a -> (a -> IO ()) -> IO ()
consumeChannel c a
c a -> IO ()
f = IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM a -> IO a
forall a. STM a -> IO a
atomically (c a -> STM a
forall a. c a -> STM a
forall (c :: * -> *) a. Channel c => c a -> STM a
readChannel c a
c) IO a -> (a -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= a -> IO ()
f
feedChannel :: (Channel c) => c a -> IO a -> IO ()
feedChannel :: forall (c :: * -> *) a. Channel c => c a -> IO a -> IO ()
feedChannel c a
c IO a
f = IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IO a
f IO a -> (a -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> (a -> STM ()) -> a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. c a -> a -> STM ()
forall a. c a -> a -> STM ()
forall (c :: * -> *) a. Channel c => c a -> a -> STM ()
writeChannel c a
c
evalFeedChannel :: (Channel c, NFData a) => c a -> IO a -> IO ()
evalFeedChannel :: forall (c :: * -> *) a.
(Channel c, NFData a) =>
c a -> IO a -> IO ()
evalFeedChannel c a
c IO a
f = c a -> IO a -> IO ()
forall (c :: * -> *) a. Channel c => c a -> IO a -> IO ()
feedChannel c a
c IO a
f' where
f' :: IO a
f' = IO a
f IO a -> (a -> IO a) -> IO a
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= a -> IO a
forall a. a -> IO a
evaluate (a -> IO a) -> (a -> a) -> a -> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> a
forall a. NFData a => a -> a
force
{-# INLINE evalFeedChannel #-}
pipeline :: IO c -> (c -> IO x) -> (c -> IO y) -> IO (x, y)
pipeline :: forall c x y. IO c -> (c -> IO x) -> (c -> IO y) -> IO (x, y)
pipeline IO c
new c -> IO x
producer c -> IO y
consumer = do
c
c <- IO c
new
IO x -> IO y -> IO (x, y)
forall a b. IO a -> IO b -> IO (a, b)
concurrently (c -> IO x
producer c
c) (c -> IO y
consumer c
c)
pipeline_ :: IO c -> (c -> IO x) -> (c -> IO y) -> IO ()
pipeline_ :: forall c x y. IO c -> (c -> IO x) -> (c -> IO y) -> IO ()
pipeline_ IO c
n c -> IO x
p c -> IO y
c = IO (x, y) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (x, y) -> IO ()) -> IO (x, y) -> IO ()
forall a b. (a -> b) -> a -> b
$ IO c -> (c -> IO x) -> (c -> IO y) -> IO (x, y)
forall c x y. IO c -> (c -> IO x) -> (c -> IO y) -> IO (x, y)
pipeline IO c
n c -> IO x
p c -> IO y
c
{-# INLINE pipeline_ #-}