{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE QuantifiedConstraints #-}
{-# LANGUAGE UndecidableInstances #-}
module Data.Effect.Concurrent.Parallel where
#if ( __GLASGOW_HASKELL__ < 906 )
import Control.Applicative (liftA2)
#endif
import Control.Applicative (Alternative (empty, (<|>)))
import Control.Monad (forever)
import Data.Effect (Emb, UnliftIO)
import Data.Function (fix)
import Data.Tuple (swap)
import UnliftIO (
MonadIO,
MonadUnliftIO,
atomically,
liftIO,
mask,
newEmptyTMVarIO,
putTMVar,
readTMVar,
tryReadTMVar,
uninterruptibleMask_,
withRunInIO,
)
import UnliftIO.Concurrent (forkIO, killThread, threadDelay)
data Parallel :: Effect where
LiftP2
:: (a -> b -> c)
-> f a
-> f b
-> Parallel f c
data Halt :: Effect where
Halt :: Halt f a
data Race :: Effect where
Race :: f a -> f a -> Race f a
makeEffectF ''Halt
makeEffectsH [''Parallel, ''Race]
newtype Concurrently ff es a = Concurrently {forall (ff :: Effect) (es :: [Effect]) a.
Concurrently ff es a -> Eff ff es a
runConcurrently :: Eff ff es a}
deriving instance (Functor (Eff ff es)) => Functor (Concurrently ff es)
instance
(Parallel :> es, Applicative (Eff ff es), Free c ff)
=> Applicative (Concurrently ff es)
where
pure :: forall a. a -> Concurrently ff es a
pure = Eff ff es a -> Concurrently ff es a
forall (ff :: Effect) (es :: [Effect]) a.
Eff ff es a -> Concurrently ff es a
Concurrently (Eff ff es a -> Concurrently ff es a)
-> (a -> Eff ff es a) -> a -> Concurrently ff es a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Eff ff es a
forall a. a -> Eff ff es a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
{-# INLINE pure #-}
liftA2 :: forall a b c.
(a -> b -> c)
-> Concurrently ff es a
-> Concurrently ff es b
-> Concurrently ff es c
liftA2 a -> b -> c
f (Concurrently Eff ff es a
a) (Concurrently Eff ff es b
b) = Eff ff es c -> Concurrently ff es c
forall (ff :: Effect) (es :: [Effect]) a.
Eff ff es a -> Concurrently ff es a
Concurrently (Eff ff es c -> Concurrently ff es c)
-> Eff ff es c -> Concurrently ff es c
forall a b. (a -> b) -> a -> b
$ (a -> b -> c) -> Eff ff es a -> Eff ff es b -> Eff ff es c
forall a b c (f :: * -> *) (es :: [Effect]) (ff :: Effect)
(c :: (* -> *) -> Constraint).
(Free c ff, f ~ Eff ff es, Parallel :> es) =>
(a -> b -> c) -> f a -> f b -> f c
liftP2 a -> b -> c
f Eff ff es a
a Eff ff es b
b
{-# INLINE liftA2 #-}
instance
(Race :> es, Halt :> es, Parallel :> es, Applicative (Eff ff es), Free c ff)
=> Alternative (Concurrently ff es)
where
empty :: forall a. Concurrently ff es a
empty = Eff ff es a -> Concurrently ff es a
forall (ff :: Effect) (es :: [Effect]) a.
Eff ff es a -> Concurrently ff es a
Concurrently Eff ff es a
forall a (f :: * -> *) (es :: [Effect]) (ff :: Effect)
(c :: (* -> *) -> Constraint).
(Free c ff, f ~ Eff ff es, Halt :> es) =>
f a
halt
{-# INLINE empty #-}
(Concurrently Eff ff es a
a) <|> :: forall a.
Concurrently ff es a
-> Concurrently ff es a -> Concurrently ff es a
<|> (Concurrently Eff ff es a
b) = Eff ff es a -> Concurrently ff es a
forall (ff :: Effect) (es :: [Effect]) a.
Eff ff es a -> Concurrently ff es a
Concurrently (Eff ff es a -> Concurrently ff es a)
-> Eff ff es a -> Concurrently ff es a
forall a b. (a -> b) -> a -> b
$ Eff ff es a -> Eff ff es a -> Eff ff es a
forall a (f :: * -> *) (es :: [Effect]) (ff :: Effect)
(c :: (* -> *) -> Constraint).
(Free c ff, f ~ Eff ff es, Race :> es) =>
f a -> f a -> f a
race Eff ff es a
a Eff ff es a
b
{-# INLINE (<|>) #-}
liftP3
:: forall a b c d es ff con
. (Parallel :> es, Free con ff)
=> (a -> b -> c -> d)
-> Eff ff es a
-> Eff ff es b
-> Eff ff es c
-> Eff ff es d
liftP3 :: forall a b c d (es :: [Effect]) (ff :: Effect)
(con :: (* -> *) -> Constraint).
(Parallel :> es, Free con ff) =>
(a -> b -> c -> d)
-> Eff ff es a -> Eff ff es b -> Eff ff es c -> Eff ff es d
liftP3 a -> b -> c -> d
f Eff ff es a
a Eff ff es b
b = ((c -> d) -> c -> d)
-> Eff ff es (c -> d) -> Eff ff es c -> Eff ff es d
forall a b c (f :: * -> *) (es :: [Effect]) (ff :: Effect)
(c :: (* -> *) -> Constraint).
(Free c ff, f ~ Eff ff es, Parallel :> es) =>
(a -> b -> c) -> f a -> f b -> f c
liftP2 (c -> d) -> c -> d
forall a b. (a -> b) -> a -> b
($) ((a -> b -> c -> d)
-> Eff ff es a -> Eff ff es b -> Eff ff es (c -> d)
forall a b c (f :: * -> *) (es :: [Effect]) (ff :: Effect)
(c :: (* -> *) -> Constraint).
(Free c ff, f ~ Eff ff es, Parallel :> es) =>
(a -> b -> c) -> f a -> f b -> f c
liftP2 a -> b -> c -> d
f Eff ff es a
a Eff ff es b
b)
{-# INLINE liftP3 #-}
data Poll :: Effect where
Poldl
:: (a -> Maybe b -> f (Either r a))
-> f a
-> f b
-> Poll f r
makeEffectH ''Poll
cancels
:: forall a b es ff c
. (Poll :> es, Applicative (Eff ff es), Free c ff)
=> Eff ff es a
-> Eff ff es b
-> Eff ff es (a, Maybe b)
cancels :: forall a b (es :: [Effect]) (ff :: Effect)
(c :: (* -> *) -> Constraint).
(Poll :> es, Applicative (Eff ff es), Free c ff) =>
Eff ff es a -> Eff ff es b -> Eff ff es (a, Maybe b)
cancels = (a -> Maybe b -> Eff ff es (Either (a, Maybe b) a))
-> Eff ff es a -> Eff ff es b -> Eff ff es (a, Maybe b)
forall a b r (f :: * -> *) (es :: [Effect]) (ff :: Effect)
(c :: (* -> *) -> Constraint).
(Free c ff, f ~ Eff ff es, Poll :> es) =>
(a -> Maybe b -> f (Either r a)) -> f a -> f b -> f r
poldl ((a -> Maybe b -> Eff ff es (Either (a, Maybe b) a))
-> Eff ff es a -> Eff ff es b -> Eff ff es (a, Maybe b))
-> (a -> Maybe b -> Eff ff es (Either (a, Maybe b) a))
-> Eff ff es a
-> Eff ff es b
-> Eff ff es (a, Maybe b)
forall a b. (a -> b) -> a -> b
$ ((a, Maybe b) -> Eff ff es (Either (a, Maybe b) a))
-> a -> Maybe b -> Eff ff es (Either (a, Maybe b) a)
forall a b c. ((a, b) -> c) -> a -> b -> c
curry (((a, Maybe b) -> Eff ff es (Either (a, Maybe b) a))
-> a -> Maybe b -> Eff ff es (Either (a, Maybe b) a))
-> ((a, Maybe b) -> Eff ff es (Either (a, Maybe b) a))
-> a
-> Maybe b
-> Eff ff es (Either (a, Maybe b) a)
forall a b. (a -> b) -> a -> b
$ Either (a, Maybe b) a -> Eff ff es (Either (a, Maybe b) a)
forall a. a -> Eff ff es a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either (a, Maybe b) a -> Eff ff es (Either (a, Maybe b) a))
-> ((a, Maybe b) -> Either (a, Maybe b) a)
-> (a, Maybe b)
-> Eff ff es (Either (a, Maybe b) a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a, Maybe b) -> Either (a, Maybe b) a
forall a b. a -> Either a b
Left
{-# INLINE cancels #-}
cancelBy
:: forall a b es ff c
. (Poll :> es, Applicative (Eff ff es), Free c ff)
=> Eff ff es a
-> Eff ff es b
-> Eff ff es (Maybe a, b)
cancelBy :: forall a b (es :: [Effect]) (ff :: Effect)
(c :: (* -> *) -> Constraint).
(Poll :> es, Applicative (Eff ff es), Free c ff) =>
Eff ff es a -> Eff ff es b -> Eff ff es (Maybe a, b)
cancelBy = (Eff ff es b -> Eff ff es a -> Eff ff es (Maybe a, b))
-> Eff ff es a -> Eff ff es b -> Eff ff es (Maybe a, b)
forall a b c. (a -> b -> c) -> b -> a -> c
flip ((Eff ff es b -> Eff ff es a -> Eff ff es (Maybe a, b))
-> Eff ff es a -> Eff ff es b -> Eff ff es (Maybe a, b))
-> (Eff ff es b -> Eff ff es a -> Eff ff es (Maybe a, b))
-> Eff ff es a
-> Eff ff es b
-> Eff ff es (Maybe a, b)
forall a b. (a -> b) -> a -> b
$ (b -> Maybe a -> Eff ff es (Either (Maybe a, b) b))
-> Eff ff es b -> Eff ff es a -> Eff ff es (Maybe a, b)
forall a b r (f :: * -> *) (es :: [Effect]) (ff :: Effect)
(c :: (* -> *) -> Constraint).
(Free c ff, f ~ Eff ff es, Poll :> es) =>
(a -> Maybe b -> f (Either r a)) -> f a -> f b -> f r
poldl ((b -> Maybe a -> Eff ff es (Either (Maybe a, b) b))
-> Eff ff es b -> Eff ff es a -> Eff ff es (Maybe a, b))
-> (b -> Maybe a -> Eff ff es (Either (Maybe a, b) b))
-> Eff ff es b
-> Eff ff es a
-> Eff ff es (Maybe a, b)
forall a b. (a -> b) -> a -> b
$ ((b, Maybe a) -> Eff ff es (Either (Maybe a, b) b))
-> b -> Maybe a -> Eff ff es (Either (Maybe a, b) b)
forall a b c. ((a, b) -> c) -> a -> b -> c
curry (((b, Maybe a) -> Eff ff es (Either (Maybe a, b) b))
-> b -> Maybe a -> Eff ff es (Either (Maybe a, b) b))
-> ((b, Maybe a) -> Eff ff es (Either (Maybe a, b) b))
-> b
-> Maybe a
-> Eff ff es (Either (Maybe a, b) b)
forall a b. (a -> b) -> a -> b
$ Either (Maybe a, b) b -> Eff ff es (Either (Maybe a, b) b)
forall a. a -> Eff ff es a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either (Maybe a, b) b -> Eff ff es (Either (Maybe a, b) b))
-> ((b, Maybe a) -> Either (Maybe a, b) b)
-> (b, Maybe a)
-> Eff ff es (Either (Maybe a, b) b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Maybe a, b) -> Either (Maybe a, b) b
forall a b. a -> Either a b
Left ((Maybe a, b) -> Either (Maybe a, b) b)
-> ((b, Maybe a) -> (Maybe a, b))
-> (b, Maybe a)
-> Either (Maybe a, b) b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (b, Maybe a) -> (Maybe a, b)
forall a b. (a, b) -> (b, a)
swap
{-# INLINE cancelBy #-}
data For (t :: Type -> Type) :: Effect where
For :: t (f a) -> For t f (t a)
makeEffectH_ ''For
makeHFunctor' ''For \(t :< _) -> [t|Functor $t|]
forToParallel
:: forall t a es ff c
. (Parallel :> es, Traversable t, Applicative (Eff ff es), Free c ff)
=> For t (Eff ff es) a
-> Eff ff es a
forToParallel :: forall (t :: * -> *) a (es :: [Effect]) (ff :: Effect)
(c :: (* -> *) -> Constraint).
(Parallel :> es, Traversable t, Applicative (Eff ff es),
Free c ff) =>
For t (Eff ff es) a -> Eff ff es a
forToParallel (For t (Eff ff es a)
iters) = Concurrently ff es a -> Eff ff es a
forall (ff :: Effect) (es :: [Effect]) a.
Concurrently ff es a -> Eff ff es a
runConcurrently (Concurrently ff es a -> Eff ff es a)
-> Concurrently ff es a -> Eff ff es a
forall a b. (a -> b) -> a -> b
$ (Eff ff es a -> Concurrently ff es a)
-> t (Eff ff es a) -> Concurrently ff es (t a)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> t a -> f (t b)
traverse Eff ff es a -> Concurrently ff es a
forall (ff :: Effect) (es :: [Effect]) a.
Eff ff es a -> Concurrently ff es a
Concurrently t (Eff ff es a)
iters
{-# INLINE forToParallel #-}
runConcurrentIO
:: forall a es ff c
. (UnliftIO :> es, Emb IO :> es, forall es'. Monad (Eff ff es'), Free c ff)
=> Eff ff (Parallel ': Race ': Poll ': Halt ': es) a
-> Eff ff es a
runConcurrentIO :: forall a (es :: [Effect]) (ff :: Effect)
(c :: (* -> *) -> Constraint).
(UnliftIO :> es, Emb IO :> es,
forall (es' :: [Effect]). Monad (Eff ff es'), Free c ff) =>
Eff ff (Parallel : Race : Poll : Halt : es) a -> Eff ff es a
runConcurrentIO = Eff ff (Halt : es) a -> Eff ff es a
forall a (es :: [Effect]) (ff :: Effect)
(c :: (* -> *) -> Constraint).
(Emb IO :> es, Monad (Eff ff es), Free c ff) =>
Eff ff (Halt : es) a -> Eff ff es a
runHaltIO (Eff ff (Halt : es) a -> Eff ff es a)
-> (Eff ff (Parallel : Race : Poll : Halt : es) a
-> Eff ff (Halt : es) a)
-> Eff ff (Parallel : Race : Poll : Halt : es) a
-> Eff ff es a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Eff ff (Poll : Halt : es) a -> Eff ff (Halt : es) a
forall a (es :: [Effect]) (ff :: Effect)
(c :: (* -> *) -> Constraint).
(Emb IO :> es, UnliftIO :> es, Monad (Eff ff es), Free c ff) =>
Eff ff (Poll : es) a -> Eff ff es a
runPollIO (Eff ff (Poll : Halt : es) a -> Eff ff (Halt : es) a)
-> (Eff ff (Parallel : Race : Poll : Halt : es) a
-> Eff ff (Poll : Halt : es) a)
-> Eff ff (Parallel : Race : Poll : Halt : es) a
-> Eff ff (Halt : es) a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Eff ff (Race : Poll : Halt : es) a -> Eff ff (Poll : Halt : es) a
forall a (es :: [Effect]) (ff :: Effect)
(c :: (* -> *) -> Constraint).
(Emb IO :> es, UnliftIO :> es, Monad (Eff ff es), Free c ff) =>
Eff ff (Race : es) a -> Eff ff es a
runRaceIO (Eff ff (Race : Poll : Halt : es) a -> Eff ff (Poll : Halt : es) a)
-> (Eff ff (Parallel : Race : Poll : Halt : es) a
-> Eff ff (Race : Poll : Halt : es) a)
-> Eff ff (Parallel : Race : Poll : Halt : es) a
-> Eff ff (Poll : Halt : es) a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Eff ff (Parallel : Race : Poll : Halt : es) a
-> Eff ff (Race : Poll : Halt : es) a
forall a (es :: [Effect]) (ff :: Effect)
(c :: (* -> *) -> Constraint).
(UnliftIO :> es, Emb IO :> es, Monad (Eff ff es), Free c ff) =>
Eff ff (Parallel : es) a -> Eff ff es a
runParallelIO
{-# INLINE runConcurrentIO #-}
runParallelIO
:: forall a es ff c
. (UnliftIO :> es, Emb IO :> es, Monad (Eff ff es), Free c ff)
=> Eff ff (Parallel ': es) a
-> Eff ff es a
runParallelIO :: forall a (es :: [Effect]) (ff :: Effect)
(c :: (* -> *) -> Constraint).
(UnliftIO :> es, Emb IO :> es, Monad (Eff ff es), Free c ff) =>
Eff ff (Parallel : es) a -> Eff ff es a
runParallelIO = (Parallel ~~> Eff ff es) -> Eff ff (Parallel : es) a -> Eff ff es a
forall (e :: Effect) (es :: [Effect]) (ff :: Effect) a
(c :: (* -> *) -> Constraint).
(KnownOrder e, Free c ff) =>
(e ~~> Eff ff es) -> Eff ff (e : es) a -> Eff ff es a
interpret Parallel (Eff ff es) x -> Eff ff es x
Parallel ~~> Eff ff es
forall (m :: * -> *). MonadUnliftIO m => Parallel ~~> m
parallelToIO
{-# INLINE runParallelIO #-}
parallelToIO :: (MonadUnliftIO m) => Parallel ~~> m
parallelToIO :: forall (m :: * -> *). MonadUnliftIO m => Parallel ~~> m
parallelToIO (LiftP2 a -> b -> x
f m a
a m b
b) =
((forall a. m a -> IO a) -> IO x) -> m x
forall b. ((forall a. m a -> IO a) -> IO b) -> m b
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO \forall a. m a -> IO a
run -> do
TMVar a
var <- IO (TMVar a)
forall (m :: * -> *) a. MonadIO m => m (TMVar a)
newEmptyTMVarIO
((forall a. IO a -> IO a) -> IO x) -> IO x
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> m a) -> m b) -> m b
mask \forall a. IO a -> IO a
restore -> do
ThreadId
t <- IO () -> IO ThreadId
forall (m :: * -> *). MonadUnliftIO m => m () -> m ThreadId
forkIO do
a
x <- IO a -> IO a
forall a. IO a -> IO a
restore (IO a -> IO a) -> IO a -> IO a
forall a b. (a -> b) -> a -> b
$ m a -> IO a
forall a. m a -> IO a
run m a
a
STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar a -> a -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar a
var a
x
b
y <- IO b -> IO b
forall a. IO a -> IO a
restore (IO b -> IO b) -> IO b -> IO b
forall a b. (a -> b) -> a -> b
$ m b -> IO b
forall a. m a -> IO a
run m b
b
STM x -> IO x
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically do
a
x <- TMVar a -> STM a
forall a. TMVar a -> STM a
readTMVar TMVar a
var
x -> STM x
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (x -> STM x) -> x -> STM x
forall a b. (a -> b) -> a -> b
$ a -> b -> x
f a
x b
y
IO x -> IO () -> IO x
forall a b. IO a -> IO b -> IO a
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f a
<* IO () -> IO ()
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m a
uninterruptibleMask_ (ThreadId -> IO ()
forall (m :: * -> *). MonadIO m => ThreadId -> m ()
killThread ThreadId
t)
{-# INLINE parallelToIO #-}
runPollIO
:: forall a es ff c
. (Emb IO :> es, UnliftIO :> es, Monad (Eff ff es), Free c ff)
=> Eff ff (Poll ': es) a
-> Eff ff es a
runPollIO :: forall a (es :: [Effect]) (ff :: Effect)
(c :: (* -> *) -> Constraint).
(Emb IO :> es, UnliftIO :> es, Monad (Eff ff es), Free c ff) =>
Eff ff (Poll : es) a -> Eff ff es a
runPollIO = (Poll ~~> Eff ff es) -> Eff ff (Poll : es) a -> Eff ff es a
forall (e :: Effect) (es :: [Effect]) (ff :: Effect) a
(c :: (* -> *) -> Constraint).
(KnownOrder e, Free c ff) =>
(e ~~> Eff ff es) -> Eff ff (e : es) a -> Eff ff es a
interpret Poll (Eff ff es) x -> Eff ff es x
Poll ~~> Eff ff es
forall (m :: * -> *). MonadUnliftIO m => Poll ~~> m
pollToIO
{-# INLINE runPollIO #-}
runRaceIO
:: forall a es ff c
. (Emb IO :> es, UnliftIO :> es, Monad (Eff ff es), Free c ff)
=> Eff ff (Race ': es) a
-> Eff ff es a
runRaceIO :: forall a (es :: [Effect]) (ff :: Effect)
(c :: (* -> *) -> Constraint).
(Emb IO :> es, UnliftIO :> es, Monad (Eff ff es), Free c ff) =>
Eff ff (Race : es) a -> Eff ff es a
runRaceIO = (Race ~~> Eff ff es) -> Eff ff (Race : es) a -> Eff ff es a
forall (e :: Effect) (es :: [Effect]) (ff :: Effect) a
(c :: (* -> *) -> Constraint).
(KnownOrder e, Free c ff) =>
(e ~~> Eff ff es) -> Eff ff (e : es) a -> Eff ff es a
interpret Race (Eff ff es) x -> Eff ff es x
Race ~~> Eff ff es
forall (m :: * -> *). MonadUnliftIO m => Race ~~> m
raceToIO
{-# INLINE runRaceIO #-}
runHaltIO
:: forall a es ff c
. (Emb IO :> es, Monad (Eff ff es), Free c ff)
=> Eff ff (Halt ': es) a
-> Eff ff es a
runHaltIO :: forall a (es :: [Effect]) (ff :: Effect)
(c :: (* -> *) -> Constraint).
(Emb IO :> es, Monad (Eff ff es), Free c ff) =>
Eff ff (Halt : es) a -> Eff ff es a
runHaltIO = (Halt ~~> Eff ff es) -> Eff ff (Halt : es) a -> Eff ff es a
forall (e :: Effect) (es :: [Effect]) (ff :: Effect) a
(c :: (* -> *) -> Constraint).
(KnownOrder e, Free c ff) =>
(e ~~> Eff ff es) -> Eff ff (e : es) a -> Eff ff es a
interpret Halt (Eff ff es) x -> Eff ff es x
Halt ~~> Eff ff es
forall (m :: * -> *). MonadIO m => Halt ~~> m
haltToIO
{-# INLINE runHaltIO #-}
raceToIO :: (MonadUnliftIO m) => Race ~~> m
raceToIO :: forall (m :: * -> *). MonadUnliftIO m => Race ~~> m
raceToIO (Race m x
a m x
b) =
((forall a. m a -> IO a) -> IO x) -> m x
forall b. ((forall a. m a -> IO a) -> IO b) -> m b
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO \forall a. m a -> IO a
run -> do
TMVar x
var <- IO (TMVar x)
forall (m :: * -> *) a. MonadIO m => m (TMVar a)
newEmptyTMVarIO
((forall a. IO a -> IO a) -> IO x) -> IO x
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> m a) -> m b) -> m b
mask \forall a. IO a -> IO a
restore -> do
let runThread :: m x -> IO ThreadId
runThread m x
m = IO () -> IO ThreadId
forall (m :: * -> *). MonadUnliftIO m => m () -> m ThreadId
forkIO do
x
x <- IO x -> IO x
forall a. IO a -> IO a
restore (IO x -> IO x) -> IO x -> IO x
forall a b. (a -> b) -> a -> b
$ m x -> IO x
forall a. m a -> IO a
run m x
m
STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar x -> x -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar x
var x
x
ThreadId
t1 <- m x -> IO ThreadId
runThread m x
a
ThreadId
t2 <- m x -> IO ThreadId
runThread m x
b
STM x -> IO x
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (TMVar x -> STM x
forall a. TMVar a -> STM a
readTMVar TMVar x
var)
IO x -> IO () -> IO x
forall a b. IO a -> IO b -> IO a
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f a
<* IO () -> IO ()
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m a
uninterruptibleMask_ (ThreadId -> IO ()
forall (m :: * -> *). MonadIO m => ThreadId -> m ()
killThread ThreadId
t1 IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> ThreadId -> IO ()
forall (m :: * -> *). MonadIO m => ThreadId -> m ()
killThread ThreadId
t2)
{-# INLINE raceToIO #-}
pollToIO :: (MonadUnliftIO m) => Poll ~~> m
pollToIO :: forall (m :: * -> *). MonadUnliftIO m => Poll ~~> m
pollToIO (Poldl a -> Maybe b -> m (Either x a)
f m a
a m b
b) =
((forall a. m a -> IO a) -> IO x) -> m x
forall b. ((forall a. m a -> IO a) -> IO b) -> m b
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO \forall a. m a -> IO a
run -> do
TMVar b
var <- IO (TMVar b)
forall (m :: * -> *) a. MonadIO m => m (TMVar a)
newEmptyTMVarIO
((forall a. IO a -> IO a) -> IO x) -> IO x
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> m a) -> m b) -> m b
mask \forall a. IO a -> IO a
restore -> do
ThreadId
t <- IO () -> IO ThreadId
forall (m :: * -> *). MonadUnliftIO m => m () -> m ThreadId
forkIO do
b
x <- IO b -> IO b
forall a. IO a -> IO a
restore (IO b -> IO b) -> IO b -> IO b
forall a b. (a -> b) -> a -> b
$ m b -> IO b
forall a. m a -> IO a
run m b
b
STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar b -> b -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar b
var b
x
IO a -> IO a
forall a. IO a -> IO a
restore (m a -> IO a
forall a. m a -> IO a
run m a
a) IO a -> (a -> IO x) -> IO x
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ((a -> IO x) -> a -> IO x) -> a -> IO x
forall a. (a -> a) -> a
fix \a -> IO x
next a
acc -> do
Maybe b
poll <- STM (Maybe b) -> IO (Maybe b)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (Maybe b) -> IO (Maybe b)) -> STM (Maybe b) -> IO (Maybe b)
forall a b. (a -> b) -> a -> b
$ TMVar b -> STM (Maybe b)
forall a. TMVar a -> STM (Maybe a)
tryReadTMVar TMVar b
var
IO (Either x a) -> IO (Either x a)
forall a. IO a -> IO a
restore (m (Either x a) -> IO (Either x a)
forall a. m a -> IO a
run (m (Either x a) -> IO (Either x a))
-> m (Either x a) -> IO (Either x a)
forall a b. (a -> b) -> a -> b
$ a -> Maybe b -> m (Either x a)
f a
acc Maybe b
poll) IO (Either x a) -> (Either x a -> IO x) -> IO x
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left x
r -> do
IO () -> IO ()
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m a
uninterruptibleMask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ ThreadId -> IO ()
forall (m :: * -> *). MonadIO m => ThreadId -> m ()
killThread ThreadId
t
x -> IO x
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure x
r
Right a
acc' -> a -> IO x
next a
acc'
{-# INLINE pollToIO #-}
haltToIO :: (MonadIO m) => Halt ~~> m
haltToIO :: forall (m :: * -> *). MonadIO m => Halt ~~> m
haltToIO Halt m x
Halt = IO x -> m x
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO x -> m x) -> IO x -> m x
forall a b. (a -> b) -> a -> b
$ IO () -> IO x
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO x) -> IO () -> IO x
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
forall (m :: * -> *). MonadIO m => Int -> m ()
threadDelay Int
forall a. Bounded a => a
maxBound
{-# INLINE haltToIO #-}
runParallelAsSequential
:: forall a es ff c
. (Applicative (Eff ff es), Free c ff)
=> Eff ff (Parallel ': es) a
-> Eff ff es a
runParallelAsSequential :: forall a (es :: [Effect]) (ff :: Effect)
(c :: (* -> *) -> Constraint).
(Applicative (Eff ff es), Free c ff) =>
Eff ff (Parallel : es) a -> Eff ff es a
runParallelAsSequential = (Parallel ~~> Eff ff es) -> Eff ff (Parallel : es) a -> Eff ff es a
forall (e :: Effect) (es :: [Effect]) (ff :: Effect) a
(c :: (* -> *) -> Constraint).
(KnownOrder e, Free c ff) =>
(e ~~> Eff ff es) -> Eff ff (e : es) a -> Eff ff es a
interpret Parallel (Eff ff es) x -> Eff ff es x
Parallel ~~> Eff ff es
forall (ff :: Effect) (es :: [Effect]).
Applicative (Eff ff es) =>
Parallel ~~> Eff ff es
parallelToSequential
{-# INLINE runParallelAsSequential #-}
parallelToSequential :: (Applicative (Eff ff es)) => Parallel ~~> Eff ff es
parallelToSequential :: forall (ff :: Effect) (es :: [Effect]).
Applicative (Eff ff es) =>
Parallel ~~> Eff ff es
parallelToSequential (LiftP2 a -> b -> x
f Eff ff es a
a Eff ff es b
b) = (a -> b -> x) -> Eff ff es a -> Eff ff es b -> Eff ff es x
forall a b c.
(a -> b -> c) -> Eff ff es a -> Eff ff es b -> Eff ff es c
forall (f :: * -> *) a b c.
Applicative f =>
(a -> b -> c) -> f a -> f b -> f c
liftA2 a -> b -> x
f Eff ff es a
a Eff ff es b
b
{-# INLINE parallelToSequential #-}
runForAsParallel
:: forall t a es ff c
. (Parallel :> es, Traversable t, Applicative (Eff ff es), Free c ff)
=> Eff ff (For t ': es) a
-> Eff ff es a
runForAsParallel :: forall (t :: * -> *) a (es :: [Effect]) (ff :: Effect)
(c :: (* -> *) -> Constraint).
(Parallel :> es, Traversable t, Applicative (Eff ff es),
Free c ff) =>
Eff ff (For t : es) a -> Eff ff es a
runForAsParallel = (For t ~~> Eff ff es) -> Eff ff (For t : es) a -> Eff ff es a
forall (e :: Effect) (es :: [Effect]) (ff :: Effect) a
(c :: (* -> *) -> Constraint).
(KnownOrder e, Free c ff) =>
(e ~~> Eff ff es) -> Eff ff (e : es) a -> Eff ff es a
interpret For t (Eff ff es) x -> Eff ff es x
For t ~~> Eff ff es
forall (t :: * -> *) a (es :: [Effect]) (ff :: Effect)
(c :: (* -> *) -> Constraint).
(Parallel :> es, Traversable t, Applicative (Eff ff es),
Free c ff) =>
For t (Eff ff es) a -> Eff ff es a
forToParallel
{-# INLINE runForAsParallel #-}