module Control.Concurrent.Channel(
Channel (..),
writeChannel',
evalWriteChannel,
evalWriteChannel',
consumeChannel,
stateConsumeChannel,
feedChannel,
evalFeedChannel,
pipeline,
pipeline_
) where
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Concurrent.TBCQueue
import Control.Exception
import Control.DeepSeq
import Control.Monad
class Channel c where
readChannel :: c a -> STM (Maybe a)
writeChannel :: c a -> a -> STM Bool
closeChannel :: c a -> STM ()
isClosedChannel :: c a -> STM Bool
instance Channel TBCQueue where
readChannel :: forall a. TBCQueue a -> STM (Maybe a)
readChannel = TBCQueue a -> STM (Maybe a)
forall a. TBCQueue a -> STM (Maybe a)
readTBCQueue
writeChannel :: forall a. TBCQueue a -> a -> STM Bool
writeChannel = TBCQueue a -> a -> STM Bool
forall a. TBCQueue a -> a -> STM Bool
writeTBCQueue
closeChannel :: forall a. TBCQueue a -> STM ()
closeChannel = TBCQueue a -> STM ()
forall a. TBCQueue a -> STM ()
closeTBCQueue
isClosedChannel :: forall a. TBCQueue a -> STM Bool
isClosedChannel = TBCQueue a -> STM Bool
forall a. TBCQueue a -> STM Bool
isClosedTBCQueue
writeChannel' :: (Channel c) => c a -> a -> STM ()
writeChannel' :: forall (c :: * -> *) a. Channel c => c a -> a -> STM ()
writeChannel' c a
c a
v = do
Bool
o <- c a -> a -> STM Bool
forall a. c a -> a -> STM Bool
forall (c :: * -> *) a. Channel c => c a -> a -> STM Bool
writeChannel c a
c a
v
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
o (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ [Char] -> STM ()
forall a. HasCallStack => [Char] -> a
error [Char]
"absurd: writeChannel' on closed channel"
evalWriteChannel :: (Channel c, NFData a) => c a -> a -> IO Bool
evalWriteChannel :: forall (c :: * -> *) a.
(Channel c, NFData a) =>
c a -> a -> IO Bool
evalWriteChannel c a
c a
v = do
a
v' <- a -> IO a
forall a. a -> IO a
evaluate (a -> IO a) -> a -> IO a
forall a b. (a -> b) -> a -> b
$ a -> a
forall a. NFData a => a -> a
force a
v
STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ c a -> a -> STM Bool
forall a. c a -> a -> STM Bool
forall (c :: * -> *) a. Channel c => c a -> a -> STM Bool
writeChannel c a
c a
v'
evalWriteChannel' :: (Channel c, NFData a) => c a -> a -> IO ()
evalWriteChannel' :: forall (c :: * -> *) a. (Channel c, NFData a) => c a -> a -> IO ()
evalWriteChannel' c a
c a
v = do
a
v' <- a -> IO a
forall a. a -> IO a
evaluate (a -> IO a) -> a -> IO a
forall a b. (a -> b) -> a -> b
$ a -> a
forall a. NFData a => a -> a
force a
v
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ c a -> a -> STM ()
forall (c :: * -> *) a. Channel c => c a -> a -> STM ()
writeChannel' c a
c a
v'
consumeChannel :: (Channel c, Monoid m) => c a -> (a -> IO m) -> IO m
consumeChannel :: forall (c :: * -> *) m a.
(Channel c, Monoid m) =>
c a -> (a -> IO m) -> IO m
consumeChannel c a
c a -> IO m
f = STM (Maybe a) -> IO (Maybe a)
forall a. STM a -> IO a
atomically (c a -> STM (Maybe a)
forall a. c a -> STM (Maybe a)
forall (c :: * -> *) a. Channel c => c a -> STM (Maybe a)
readChannel c a
c) IO (Maybe a) -> (Maybe a -> IO m) -> IO m
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Just a
v -> do
m
res <- a -> IO m
f a
v
m -> m -> m
forall a. Monoid a => a -> a -> a
mappend m
res (m -> m) -> IO m -> IO m
forall (m :: * -> *) a b. Monad m => (a -> b) -> m a -> m b
<$!> c a -> (a -> IO m) -> IO m
forall (c :: * -> *) m a.
(Channel c, Monoid m) =>
c a -> (a -> IO m) -> IO m
consumeChannel c a
c a -> IO m
f
Maybe a
Nothing -> m -> IO m
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure m
forall a. Monoid a => a
mempty
stateConsumeChannel :: (Channel c) => c a -> s -> (s -> a -> IO s) -> IO s
stateConsumeChannel :: forall (c :: * -> *) a s.
Channel c =>
c a -> s -> (s -> a -> IO s) -> IO s
stateConsumeChannel c a
c !s
state s -> a -> IO s
f = STM (Maybe a) -> IO (Maybe a)
forall a. STM a -> IO a
atomically (c a -> STM (Maybe a)
forall a. c a -> STM (Maybe a)
forall (c :: * -> *) a. Channel c => c a -> STM (Maybe a)
readChannel c a
c) IO (Maybe a) -> (Maybe a -> IO s) -> IO s
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Just a
v -> do
s
next <- s -> a -> IO s
f s
state a
v
c a -> s -> (s -> a -> IO s) -> IO s
forall (c :: * -> *) a s.
Channel c =>
c a -> s -> (s -> a -> IO s) -> IO s
stateConsumeChannel c a
c s
next s -> a -> IO s
f
Maybe a
Nothing -> s -> IO s
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure s
state
feedChannel' :: (Channel c) => c a -> IO (Maybe a) -> IO ()
feedChannel' :: forall (c :: * -> *) a. Channel c => c a -> IO (Maybe a) -> IO ()
feedChannel' c a
c IO (Maybe a)
f = IO (Maybe a)
f IO (Maybe a) -> (Maybe a -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Just a
v -> do
Bool
stillOpen <- STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ c a -> a -> STM Bool
forall a. c a -> a -> STM Bool
forall (c :: * -> *) a. Channel c => c a -> a -> STM Bool
writeChannel c a
c a
v
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
stillOpen (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ c a -> IO (Maybe a) -> IO ()
forall (c :: * -> *) a. Channel c => c a -> IO (Maybe a) -> IO ()
feedChannel' c a
c IO (Maybe a)
f
Maybe a
Nothing -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ c a -> STM ()
forall a. c a -> STM ()
forall (c :: * -> *) a. Channel c => c a -> STM ()
closeChannel c a
c
feedChannel :: (Channel c) => c a -> IO (Maybe a) -> IO ()
feedChannel :: forall (c :: * -> *) a. Channel c => c a -> IO (Maybe a) -> IO ()
feedChannel c a
c IO (Maybe a)
f = do
let chanClosed :: IO ()
chanClosed = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ c a -> STM Bool
forall a. c a -> STM Bool
forall (c :: * -> *) a. Channel c => c a -> STM Bool
isClosedChannel c a
c STM Bool -> (Bool -> STM ()) -> STM ()
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Bool -> STM ()
check
IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO ()
race_ (c a -> IO (Maybe a) -> IO ()
forall (c :: * -> *) a. Channel c => c a -> IO (Maybe a) -> IO ()
feedChannel' c a
c IO (Maybe a)
f) IO ()
chanClosed
evalFeedChannel :: (Channel c, NFData a) => c a -> IO (Maybe a) -> IO ()
evalFeedChannel :: forall (c :: * -> *) a.
(Channel c, NFData a) =>
c a -> IO (Maybe a) -> IO ()
evalFeedChannel c a
c IO (Maybe a)
f = c a -> IO (Maybe a) -> IO ()
forall (c :: * -> *) a. Channel c => c a -> IO (Maybe a) -> IO ()
feedChannel c a
c IO (Maybe a)
f' where
f' :: IO (Maybe a)
f' = IO (Maybe a)
f IO (Maybe a) -> (Maybe a -> IO (Maybe a)) -> IO (Maybe a)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Maybe a -> IO (Maybe a)
forall a. a -> IO a
evaluate (Maybe a -> IO (Maybe a))
-> (Maybe a -> Maybe a) -> Maybe a -> IO (Maybe a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Maybe a -> Maybe a
forall a. NFData a => a -> a
force
{-# INLINE evalFeedChannel #-}
pipeline :: (Channel c) => IO (c a) -> (c a -> IO x) -> (c a -> IO y) -> IO (x, y)
pipeline :: forall (c :: * -> *) a x y.
Channel c =>
IO (c a) -> (c a -> IO x) -> (c a -> IO y) -> IO (x, y)
pipeline IO (c a)
new c a -> IO x
producer c a -> IO y
consumer = do
c a
c <- IO (c a)
new
let producer' :: IO x
producer' = c a -> IO x
producer c a
c IO x -> IO () -> IO x
forall a b. IO a -> IO b -> IO a
`finally` STM () -> IO ()
forall a. STM a -> IO a
atomically (c a -> STM ()
forall a. c a -> STM ()
forall (c :: * -> *) a. Channel c => c a -> STM ()
closeChannel c a
c)
IO x -> IO y -> IO (x, y)
forall a b. IO a -> IO b -> IO (a, b)
concurrently IO x
producer' (c a -> IO y
consumer c a
c)
pipeline_ :: (Channel c) => IO (c a) -> (c a -> IO x) -> (c a -> IO y) -> IO ()
pipeline_ :: forall (c :: * -> *) a x y.
Channel c =>
IO (c a) -> (c a -> IO x) -> (c a -> IO y) -> IO ()
pipeline_ IO (c a)
n c a -> IO x
p c a -> IO y
c = IO (x, y) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (x, y) -> IO ()) -> IO (x, y) -> IO ()
forall a b. (a -> b) -> a -> b
$ IO (c a) -> (c a -> IO x) -> (c a -> IO y) -> IO (x, y)
forall (c :: * -> *) a x y.
Channel c =>
IO (c a) -> (c a -> IO x) -> (c a -> IO y) -> IO (x, y)
pipeline IO (c a)
n c a -> IO x
p c a -> IO y
c
{-# INLINE pipeline_ #-}