{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE QuantifiedConstraints #-}
{-# LANGUAGE UndecidableInstances #-}

-- SPDX-License-Identifier: MPL-2.0

{- |
Copyright   :  (c) 2024-2025 Sayo contributors
License     :  MPL-2.0 (see the file LICENSE)
Maintainer  :  ymdfield@outlook.jp

Effects for parallel computations.
-}
module Data.Effect.Concurrent.Parallel where

#if ( __GLASGOW_HASKELL__ < 906 )
import Control.Applicative (liftA2)
#endif
import Control.Applicative (Alternative (empty, (<|>)))
import Control.Monad (forever)
import Data.Effect (Emb, UnliftIO)
import Data.Function (fix)
import Data.Tuple (swap)
import UnliftIO (
    MonadIO,
    MonadUnliftIO,
    atomically,
    liftIO,
    mask,
    newEmptyTMVarIO,
    putTMVar,
    readTMVar,
    tryReadTMVar,
    uninterruptibleMask_,
    withRunInIO,
 )
import UnliftIO.Concurrent (forkIO, killThread, threadDelay)

-- | An `Applicative`-based effect for executing computations in parallel.
data Parallel :: Effect where
    -- | Executes two actions in parallel and blocks until both are complete.
    -- Finally, aggregates the execution results based on the specified function.
    LiftP2
        :: (a -> b -> c)
        -- ^ A function that aggregates the two execution results.
        -> f a
        -- ^ The first action to be executed in parallel.
        -> f b
        -- ^ The second action to be executed in parallel.
        -> Parallel f c

-- | An effect that blocks a computation indefinitely.
data Halt :: Effect where
    -- | Blocks a computation indefinitely.
    Halt :: Halt f a

{- |
An effect that adopts the result of the computation that finishes first among
two computations and cancels the other.
-}
data Race :: Effect where
    -- | Adopts the result of the computation that finishes first among two
    --   computations and cancels the other.
    Race :: f a -> f a -> Race f a

makeEffectF ''Halt
makeEffectsH [''Parallel, ''Race]

{- |
A wrapper that allows using the `Parallel` effect in the form of `Applicative` /
 `Alternative` instances.
-}
newtype Concurrently ff es a = Concurrently {forall (ff :: Effect) (es :: [Effect]) a.
Concurrently ff es a -> Eff ff es a
runConcurrently :: Eff ff es a}

deriving instance (Functor (Eff ff es)) => Functor (Concurrently ff es)

instance
    (Parallel :> es, Applicative (Eff ff es), Free c ff)
    => Applicative (Concurrently ff es)
    where
    pure :: forall a. a -> Concurrently ff es a
pure = Eff ff es a -> Concurrently ff es a
forall (ff :: Effect) (es :: [Effect]) a.
Eff ff es a -> Concurrently ff es a
Concurrently (Eff ff es a -> Concurrently ff es a)
-> (a -> Eff ff es a) -> a -> Concurrently ff es a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Eff ff es a
forall a. a -> Eff ff es a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
    {-# INLINE pure #-}

    liftA2 :: forall a b c.
(a -> b -> c)
-> Concurrently ff es a
-> Concurrently ff es b
-> Concurrently ff es c
liftA2 a -> b -> c
f (Concurrently Eff ff es a
a) (Concurrently Eff ff es b
b) = Eff ff es c -> Concurrently ff es c
forall (ff :: Effect) (es :: [Effect]) a.
Eff ff es a -> Concurrently ff es a
Concurrently (Eff ff es c -> Concurrently ff es c)
-> Eff ff es c -> Concurrently ff es c
forall a b. (a -> b) -> a -> b
$ (a -> b -> c) -> Eff ff es a -> Eff ff es b -> Eff ff es c
forall a b c (f :: * -> *) (es :: [Effect]) (ff :: Effect)
       (c :: (* -> *) -> Constraint).
(Free c ff, f ~ Eff ff es, Parallel :> es) =>
(a -> b -> c) -> f a -> f b -> f c
liftP2 a -> b -> c
f Eff ff es a
a Eff ff es b
b
    {-# INLINE liftA2 #-}

instance
    (Race :> es, Halt :> es, Parallel :> es, Applicative (Eff ff es), Free c ff)
    => Alternative (Concurrently ff es)
    where
    empty :: forall a. Concurrently ff es a
empty = Eff ff es a -> Concurrently ff es a
forall (ff :: Effect) (es :: [Effect]) a.
Eff ff es a -> Concurrently ff es a
Concurrently Eff ff es a
forall a (f :: * -> *) (es :: [Effect]) (ff :: Effect)
       (c :: (* -> *) -> Constraint).
(Free c ff, f ~ Eff ff es, Halt :> es) =>
f a
halt
    {-# INLINE empty #-}

    (Concurrently Eff ff es a
a) <|> :: forall a.
Concurrently ff es a
-> Concurrently ff es a -> Concurrently ff es a
<|> (Concurrently Eff ff es a
b) = Eff ff es a -> Concurrently ff es a
forall (ff :: Effect) (es :: [Effect]) a.
Eff ff es a -> Concurrently ff es a
Concurrently (Eff ff es a -> Concurrently ff es a)
-> Eff ff es a -> Concurrently ff es a
forall a b. (a -> b) -> a -> b
$ Eff ff es a -> Eff ff es a -> Eff ff es a
forall a (f :: * -> *) (es :: [Effect]) (ff :: Effect)
       (c :: (* -> *) -> Constraint).
(Free c ff, f ~ Eff ff es, Race :> es) =>
f a -> f a -> f a
race Eff ff es a
a Eff ff es a
b
    {-# INLINE (<|>) #-}

{- |
Executes three actions in parallel and blocks until all are complete.
Finally, aggregates the execution results based on the specified function.
-}
liftP3
    :: forall a b c d es ff con
     . (Parallel :> es, Free con ff)
    => (a -> b -> c -> d)
    -- ^ A function that aggregates the three execution results.
    -> Eff ff es a
    -- ^ The first action to be executed in parallel.
    -> Eff ff es b
    -- ^ The second action to be executed in parallel.
    -> Eff ff es c
    -- ^ The third action to be executed in parallel.
    -> Eff ff es d
liftP3 :: forall a b c d (es :: [Effect]) (ff :: Effect)
       (con :: (* -> *) -> Constraint).
(Parallel :> es, Free con ff) =>
(a -> b -> c -> d)
-> Eff ff es a -> Eff ff es b -> Eff ff es c -> Eff ff es d
liftP3 a -> b -> c -> d
f Eff ff es a
a Eff ff es b
b = ((c -> d) -> c -> d)
-> Eff ff es (c -> d) -> Eff ff es c -> Eff ff es d
forall a b c (f :: * -> *) (es :: [Effect]) (ff :: Effect)
       (c :: (* -> *) -> Constraint).
(Free c ff, f ~ Eff ff es, Parallel :> es) =>
(a -> b -> c) -> f a -> f b -> f c
liftP2 (c -> d) -> c -> d
forall a b. (a -> b) -> a -> b
($) ((a -> b -> c -> d)
-> Eff ff es a -> Eff ff es b -> Eff ff es (c -> d)
forall a b c (f :: * -> *) (es :: [Effect]) (ff :: Effect)
       (c :: (* -> *) -> Constraint).
(Free c ff, f ~ Eff ff es, Parallel :> es) =>
(a -> b -> c) -> f a -> f b -> f c
liftP2 a -> b -> c -> d
f Eff ff es a
a Eff ff es b
b)
{-# INLINE liftP3 #-}

-- | An effect that realizes polling and cancellation of actions running in parallel.
data Poll :: Effect where
    -- | Performs polling on an action running in parallel in the form of a fold.
    --
    -- First, the parallel execution of two actions begins.
    --
    -- When the execution of the first action completes, polling on the second
    -- action is performed at that point, and the result is passed to the
    -- folding function. If the function returns `Left`, the folding terminates
    -- and it becomes the final result. If the second action is not yet
    -- complete, it is canceled. If the function returns `Right`, the folding
    -- continues, and the same process repeats.
    Poldl
        :: (a -> Maybe b -> f (Either r a))
        -- ^ A function for folding.
        -> f a
        -- ^ The first action to be executed in parallel.
        -> f b
        -- ^ The second action to be executed in parallel; the target of polling.
        -> Poll f r

makeEffectH ''Poll

-- | Executes two actions in parallel. If the first action completes before the second, the second action is canceled.
cancels
    :: forall a b es ff c
     . (Poll :> es, Applicative (Eff ff es), Free c ff)
    => Eff ff es a
    -- ^ The action that controls the cancellation.
    -> Eff ff es b
    -- ^ The action to be canceled.
    -> Eff ff es (a, Maybe b)
cancels :: forall a b (es :: [Effect]) (ff :: Effect)
       (c :: (* -> *) -> Constraint).
(Poll :> es, Applicative (Eff ff es), Free c ff) =>
Eff ff es a -> Eff ff es b -> Eff ff es (a, Maybe b)
cancels = (a -> Maybe b -> Eff ff es (Either (a, Maybe b) a))
-> Eff ff es a -> Eff ff es b -> Eff ff es (a, Maybe b)
forall a b r (f :: * -> *) (es :: [Effect]) (ff :: Effect)
       (c :: (* -> *) -> Constraint).
(Free c ff, f ~ Eff ff es, Poll :> es) =>
(a -> Maybe b -> f (Either r a)) -> f a -> f b -> f r
poldl ((a -> Maybe b -> Eff ff es (Either (a, Maybe b) a))
 -> Eff ff es a -> Eff ff es b -> Eff ff es (a, Maybe b))
-> (a -> Maybe b -> Eff ff es (Either (a, Maybe b) a))
-> Eff ff es a
-> Eff ff es b
-> Eff ff es (a, Maybe b)
forall a b. (a -> b) -> a -> b
$ ((a, Maybe b) -> Eff ff es (Either (a, Maybe b) a))
-> a -> Maybe b -> Eff ff es (Either (a, Maybe b) a)
forall a b c. ((a, b) -> c) -> a -> b -> c
curry (((a, Maybe b) -> Eff ff es (Either (a, Maybe b) a))
 -> a -> Maybe b -> Eff ff es (Either (a, Maybe b) a))
-> ((a, Maybe b) -> Eff ff es (Either (a, Maybe b) a))
-> a
-> Maybe b
-> Eff ff es (Either (a, Maybe b) a)
forall a b. (a -> b) -> a -> b
$ Either (a, Maybe b) a -> Eff ff es (Either (a, Maybe b) a)
forall a. a -> Eff ff es a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either (a, Maybe b) a -> Eff ff es (Either (a, Maybe b) a))
-> ((a, Maybe b) -> Either (a, Maybe b) a)
-> (a, Maybe b)
-> Eff ff es (Either (a, Maybe b) a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a, Maybe b) -> Either (a, Maybe b) a
forall a b. a -> Either a b
Left
{-# INLINE cancels #-}

-- | Executes two actions in parallel. If the second action completes before the first, the first action is canceled.
cancelBy
    :: forall a b es ff c
     . (Poll :> es, Applicative (Eff ff es), Free c ff)
    => Eff ff es a
    -- ^ The action to be canceled.
    -> Eff ff es b
    -- ^ The action that controls the cancellation.
    -> Eff ff es (Maybe a, b)
cancelBy :: forall a b (es :: [Effect]) (ff :: Effect)
       (c :: (* -> *) -> Constraint).
(Poll :> es, Applicative (Eff ff es), Free c ff) =>
Eff ff es a -> Eff ff es b -> Eff ff es (Maybe a, b)
cancelBy = (Eff ff es b -> Eff ff es a -> Eff ff es (Maybe a, b))
-> Eff ff es a -> Eff ff es b -> Eff ff es (Maybe a, b)
forall a b c. (a -> b -> c) -> b -> a -> c
flip ((Eff ff es b -> Eff ff es a -> Eff ff es (Maybe a, b))
 -> Eff ff es a -> Eff ff es b -> Eff ff es (Maybe a, b))
-> (Eff ff es b -> Eff ff es a -> Eff ff es (Maybe a, b))
-> Eff ff es a
-> Eff ff es b
-> Eff ff es (Maybe a, b)
forall a b. (a -> b) -> a -> b
$ (b -> Maybe a -> Eff ff es (Either (Maybe a, b) b))
-> Eff ff es b -> Eff ff es a -> Eff ff es (Maybe a, b)
forall a b r (f :: * -> *) (es :: [Effect]) (ff :: Effect)
       (c :: (* -> *) -> Constraint).
(Free c ff, f ~ Eff ff es, Poll :> es) =>
(a -> Maybe b -> f (Either r a)) -> f a -> f b -> f r
poldl ((b -> Maybe a -> Eff ff es (Either (Maybe a, b) b))
 -> Eff ff es b -> Eff ff es a -> Eff ff es (Maybe a, b))
-> (b -> Maybe a -> Eff ff es (Either (Maybe a, b) b))
-> Eff ff es b
-> Eff ff es a
-> Eff ff es (Maybe a, b)
forall a b. (a -> b) -> a -> b
$ ((b, Maybe a) -> Eff ff es (Either (Maybe a, b) b))
-> b -> Maybe a -> Eff ff es (Either (Maybe a, b) b)
forall a b c. ((a, b) -> c) -> a -> b -> c
curry (((b, Maybe a) -> Eff ff es (Either (Maybe a, b) b))
 -> b -> Maybe a -> Eff ff es (Either (Maybe a, b) b))
-> ((b, Maybe a) -> Eff ff es (Either (Maybe a, b) b))
-> b
-> Maybe a
-> Eff ff es (Either (Maybe a, b) b)
forall a b. (a -> b) -> a -> b
$ Either (Maybe a, b) b -> Eff ff es (Either (Maybe a, b) b)
forall a. a -> Eff ff es a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either (Maybe a, b) b -> Eff ff es (Either (Maybe a, b) b))
-> ((b, Maybe a) -> Either (Maybe a, b) b)
-> (b, Maybe a)
-> Eff ff es (Either (Maybe a, b) b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Maybe a, b) -> Either (Maybe a, b) b
forall a b. a -> Either a b
Left ((Maybe a, b) -> Either (Maybe a, b) b)
-> ((b, Maybe a) -> (Maybe a, b))
-> (b, Maybe a)
-> Either (Maybe a, b) b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (b, Maybe a) -> (Maybe a, b)
forall a b. (a, b) -> (b, a)
swap
{-# INLINE cancelBy #-}

-- | An effect for parallel computations based on a `Traversable` container @t@.
data For (t :: Type -> Type) :: Effect where
    -- | Executes in parallel the actions stored within a `Traversable` container @t@.
    For :: t (f a) -> For t f (t a)

makeEffectH_ ''For
makeHFunctor' ''For \(t :< _) -> [t|Functor $t|]

-- | Converts the `Traversable` container-based parallel computation effect t`For` into the `Applicative`-based parallel computation effect `Parallel`.
forToParallel
    :: forall t a es ff c
     . (Parallel :> es, Traversable t, Applicative (Eff ff es), Free c ff)
    => For t (Eff ff es) a
    -> Eff ff es a
forToParallel :: forall (t :: * -> *) a (es :: [Effect]) (ff :: Effect)
       (c :: (* -> *) -> Constraint).
(Parallel :> es, Traversable t, Applicative (Eff ff es),
 Free c ff) =>
For t (Eff ff es) a -> Eff ff es a
forToParallel (For t (Eff ff es a)
iters) = Concurrently ff es a -> Eff ff es a
forall (ff :: Effect) (es :: [Effect]) a.
Concurrently ff es a -> Eff ff es a
runConcurrently (Concurrently ff es a -> Eff ff es a)
-> Concurrently ff es a -> Eff ff es a
forall a b. (a -> b) -> a -> b
$ (Eff ff es a -> Concurrently ff es a)
-> t (Eff ff es a) -> Concurrently ff es (t a)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> t a -> f (t b)
traverse Eff ff es a -> Concurrently ff es a
forall (ff :: Effect) (es :: [Effect]) a.
Eff ff es a -> Concurrently ff es a
Concurrently t (Eff ff es a)
iters
{-# INLINE forToParallel #-}

runConcurrentIO
    :: forall a es ff c
     . (UnliftIO :> es, Emb IO :> es, forall es'. Monad (Eff ff es'), Free c ff)
    => Eff ff (Parallel ': Race ': Poll ': Halt ': es) a
    -> Eff ff es a
runConcurrentIO :: forall a (es :: [Effect]) (ff :: Effect)
       (c :: (* -> *) -> Constraint).
(UnliftIO :> es, Emb IO :> es,
 forall (es' :: [Effect]). Monad (Eff ff es'), Free c ff) =>
Eff ff (Parallel : Race : Poll : Halt : es) a -> Eff ff es a
runConcurrentIO = Eff ff (Halt : es) a -> Eff ff es a
forall a (es :: [Effect]) (ff :: Effect)
       (c :: (* -> *) -> Constraint).
(Emb IO :> es, Monad (Eff ff es), Free c ff) =>
Eff ff (Halt : es) a -> Eff ff es a
runHaltIO (Eff ff (Halt : es) a -> Eff ff es a)
-> (Eff ff (Parallel : Race : Poll : Halt : es) a
    -> Eff ff (Halt : es) a)
-> Eff ff (Parallel : Race : Poll : Halt : es) a
-> Eff ff es a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Eff ff (Poll : Halt : es) a -> Eff ff (Halt : es) a
forall a (es :: [Effect]) (ff :: Effect)
       (c :: (* -> *) -> Constraint).
(Emb IO :> es, UnliftIO :> es, Monad (Eff ff es), Free c ff) =>
Eff ff (Poll : es) a -> Eff ff es a
runPollIO (Eff ff (Poll : Halt : es) a -> Eff ff (Halt : es) a)
-> (Eff ff (Parallel : Race : Poll : Halt : es) a
    -> Eff ff (Poll : Halt : es) a)
-> Eff ff (Parallel : Race : Poll : Halt : es) a
-> Eff ff (Halt : es) a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Eff ff (Race : Poll : Halt : es) a -> Eff ff (Poll : Halt : es) a
forall a (es :: [Effect]) (ff :: Effect)
       (c :: (* -> *) -> Constraint).
(Emb IO :> es, UnliftIO :> es, Monad (Eff ff es), Free c ff) =>
Eff ff (Race : es) a -> Eff ff es a
runRaceIO (Eff ff (Race : Poll : Halt : es) a -> Eff ff (Poll : Halt : es) a)
-> (Eff ff (Parallel : Race : Poll : Halt : es) a
    -> Eff ff (Race : Poll : Halt : es) a)
-> Eff ff (Parallel : Race : Poll : Halt : es) a
-> Eff ff (Poll : Halt : es) a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Eff ff (Parallel : Race : Poll : Halt : es) a
-> Eff ff (Race : Poll : Halt : es) a
forall a (es :: [Effect]) (ff :: Effect)
       (c :: (* -> *) -> Constraint).
(UnliftIO :> es, Emb IO :> es, Monad (Eff ff es), Free c ff) =>
Eff ff (Parallel : es) a -> Eff ff es a
runParallelIO
{-# INLINE runConcurrentIO #-}

runParallelIO
    :: forall a es ff c
     . (UnliftIO :> es, Emb IO :> es, Monad (Eff ff es), Free c ff)
    => Eff ff (Parallel ': es) a
    -> Eff ff es a
runParallelIO :: forall a (es :: [Effect]) (ff :: Effect)
       (c :: (* -> *) -> Constraint).
(UnliftIO :> es, Emb IO :> es, Monad (Eff ff es), Free c ff) =>
Eff ff (Parallel : es) a -> Eff ff es a
runParallelIO = (Parallel ~~> Eff ff es) -> Eff ff (Parallel : es) a -> Eff ff es a
forall (e :: Effect) (es :: [Effect]) (ff :: Effect) a
       (c :: (* -> *) -> Constraint).
(KnownOrder e, Free c ff) =>
(e ~~> Eff ff es) -> Eff ff (e : es) a -> Eff ff es a
interpret Parallel (Eff ff es) x -> Eff ff es x
Parallel ~~> Eff ff es
forall (m :: * -> *). MonadUnliftIO m => Parallel ~~> m
parallelToIO
{-# INLINE runParallelIO #-}

parallelToIO :: (MonadUnliftIO m) => Parallel ~~> m
parallelToIO :: forall (m :: * -> *). MonadUnliftIO m => Parallel ~~> m
parallelToIO (LiftP2 a -> b -> x
f m a
a m b
b) =
    ((forall a. m a -> IO a) -> IO x) -> m x
forall b. ((forall a. m a -> IO a) -> IO b) -> m b
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO \forall a. m a -> IO a
run -> do
        TMVar a
var <- IO (TMVar a)
forall (m :: * -> *) a. MonadIO m => m (TMVar a)
newEmptyTMVarIO
        ((forall a. IO a -> IO a) -> IO x) -> IO x
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> m a) -> m b) -> m b
mask \forall a. IO a -> IO a
restore -> do
            ThreadId
t <- IO () -> IO ThreadId
forall (m :: * -> *). MonadUnliftIO m => m () -> m ThreadId
forkIO do
                a
x <- IO a -> IO a
forall a. IO a -> IO a
restore (IO a -> IO a) -> IO a -> IO a
forall a b. (a -> b) -> a -> b
$ m a -> IO a
forall a. m a -> IO a
run m a
a
                STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar a -> a -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar a
var a
x

            b
y <- IO b -> IO b
forall a. IO a -> IO a
restore (IO b -> IO b) -> IO b -> IO b
forall a b. (a -> b) -> a -> b
$ m b -> IO b
forall a. m a -> IO a
run m b
b

            STM x -> IO x
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically do
                a
x <- TMVar a -> STM a
forall a. TMVar a -> STM a
readTMVar TMVar a
var
                x -> STM x
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (x -> STM x) -> x -> STM x
forall a b. (a -> b) -> a -> b
$ a -> b -> x
f a
x b
y
                IO x -> IO () -> IO x
forall a b. IO a -> IO b -> IO a
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f a
<* IO () -> IO ()
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m a
uninterruptibleMask_ (ThreadId -> IO ()
forall (m :: * -> *). MonadIO m => ThreadId -> m ()
killThread ThreadId
t)
{-# INLINE parallelToIO #-}

runPollIO
    :: forall a es ff c
     . (Emb IO :> es, UnliftIO :> es, Monad (Eff ff es), Free c ff)
    => Eff ff (Poll ': es) a
    -> Eff ff es a
runPollIO :: forall a (es :: [Effect]) (ff :: Effect)
       (c :: (* -> *) -> Constraint).
(Emb IO :> es, UnliftIO :> es, Monad (Eff ff es), Free c ff) =>
Eff ff (Poll : es) a -> Eff ff es a
runPollIO = (Poll ~~> Eff ff es) -> Eff ff (Poll : es) a -> Eff ff es a
forall (e :: Effect) (es :: [Effect]) (ff :: Effect) a
       (c :: (* -> *) -> Constraint).
(KnownOrder e, Free c ff) =>
(e ~~> Eff ff es) -> Eff ff (e : es) a -> Eff ff es a
interpret Poll (Eff ff es) x -> Eff ff es x
Poll ~~> Eff ff es
forall (m :: * -> *). MonadUnliftIO m => Poll ~~> m
pollToIO
{-# INLINE runPollIO #-}

runRaceIO
    :: forall a es ff c
     . (Emb IO :> es, UnliftIO :> es, Monad (Eff ff es), Free c ff)
    => Eff ff (Race ': es) a
    -> Eff ff es a
runRaceIO :: forall a (es :: [Effect]) (ff :: Effect)
       (c :: (* -> *) -> Constraint).
(Emb IO :> es, UnliftIO :> es, Monad (Eff ff es), Free c ff) =>
Eff ff (Race : es) a -> Eff ff es a
runRaceIO = (Race ~~> Eff ff es) -> Eff ff (Race : es) a -> Eff ff es a
forall (e :: Effect) (es :: [Effect]) (ff :: Effect) a
       (c :: (* -> *) -> Constraint).
(KnownOrder e, Free c ff) =>
(e ~~> Eff ff es) -> Eff ff (e : es) a -> Eff ff es a
interpret Race (Eff ff es) x -> Eff ff es x
Race ~~> Eff ff es
forall (m :: * -> *). MonadUnliftIO m => Race ~~> m
raceToIO
{-# INLINE runRaceIO #-}

runHaltIO
    :: forall a es ff c
     . (Emb IO :> es, Monad (Eff ff es), Free c ff)
    => Eff ff (Halt ': es) a
    -> Eff ff es a
runHaltIO :: forall a (es :: [Effect]) (ff :: Effect)
       (c :: (* -> *) -> Constraint).
(Emb IO :> es, Monad (Eff ff es), Free c ff) =>
Eff ff (Halt : es) a -> Eff ff es a
runHaltIO = (Halt ~~> Eff ff es) -> Eff ff (Halt : es) a -> Eff ff es a
forall (e :: Effect) (es :: [Effect]) (ff :: Effect) a
       (c :: (* -> *) -> Constraint).
(KnownOrder e, Free c ff) =>
(e ~~> Eff ff es) -> Eff ff (e : es) a -> Eff ff es a
interpret Halt (Eff ff es) x -> Eff ff es x
Halt ~~> Eff ff es
forall (m :: * -> *). MonadIO m => Halt ~~> m
haltToIO
{-# INLINE runHaltIO #-}

raceToIO :: (MonadUnliftIO m) => Race ~~> m
raceToIO :: forall (m :: * -> *). MonadUnliftIO m => Race ~~> m
raceToIO (Race m x
a m x
b) =
    ((forall a. m a -> IO a) -> IO x) -> m x
forall b. ((forall a. m a -> IO a) -> IO b) -> m b
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO \forall a. m a -> IO a
run -> do
        TMVar x
var <- IO (TMVar x)
forall (m :: * -> *) a. MonadIO m => m (TMVar a)
newEmptyTMVarIO
        ((forall a. IO a -> IO a) -> IO x) -> IO x
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> m a) -> m b) -> m b
mask \forall a. IO a -> IO a
restore -> do
            let runThread :: m x -> IO ThreadId
runThread m x
m = IO () -> IO ThreadId
forall (m :: * -> *). MonadUnliftIO m => m () -> m ThreadId
forkIO do
                    x
x <- IO x -> IO x
forall a. IO a -> IO a
restore (IO x -> IO x) -> IO x -> IO x
forall a b. (a -> b) -> a -> b
$ m x -> IO x
forall a. m a -> IO a
run m x
m
                    STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar x -> x -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar x
var x
x

            ThreadId
t1 <- m x -> IO ThreadId
runThread m x
a
            ThreadId
t2 <- m x -> IO ThreadId
runThread m x
b

            STM x -> IO x
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (TMVar x -> STM x
forall a. TMVar a -> STM a
readTMVar TMVar x
var)
                IO x -> IO () -> IO x
forall a b. IO a -> IO b -> IO a
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f a
<* IO () -> IO ()
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m a
uninterruptibleMask_ (ThreadId -> IO ()
forall (m :: * -> *). MonadIO m => ThreadId -> m ()
killThread ThreadId
t1 IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> ThreadId -> IO ()
forall (m :: * -> *). MonadIO m => ThreadId -> m ()
killThread ThreadId
t2)
{-# INLINE raceToIO #-}

pollToIO :: (MonadUnliftIO m) => Poll ~~> m
pollToIO :: forall (m :: * -> *). MonadUnliftIO m => Poll ~~> m
pollToIO (Poldl a -> Maybe b -> m (Either x a)
f m a
a m b
b) =
    ((forall a. m a -> IO a) -> IO x) -> m x
forall b. ((forall a. m a -> IO a) -> IO b) -> m b
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO \forall a. m a -> IO a
run -> do
        TMVar b
var <- IO (TMVar b)
forall (m :: * -> *) a. MonadIO m => m (TMVar a)
newEmptyTMVarIO
        ((forall a. IO a -> IO a) -> IO x) -> IO x
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> m a) -> m b) -> m b
mask \forall a. IO a -> IO a
restore -> do
            ThreadId
t <- IO () -> IO ThreadId
forall (m :: * -> *). MonadUnliftIO m => m () -> m ThreadId
forkIO do
                b
x <- IO b -> IO b
forall a. IO a -> IO a
restore (IO b -> IO b) -> IO b -> IO b
forall a b. (a -> b) -> a -> b
$ m b -> IO b
forall a. m a -> IO a
run m b
b
                STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar b -> b -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar b
var b
x

            IO a -> IO a
forall a. IO a -> IO a
restore (m a -> IO a
forall a. m a -> IO a
run m a
a) IO a -> (a -> IO x) -> IO x
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ((a -> IO x) -> a -> IO x) -> a -> IO x
forall a. (a -> a) -> a
fix \a -> IO x
next a
acc -> do
                Maybe b
poll <- STM (Maybe b) -> IO (Maybe b)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (Maybe b) -> IO (Maybe b)) -> STM (Maybe b) -> IO (Maybe b)
forall a b. (a -> b) -> a -> b
$ TMVar b -> STM (Maybe b)
forall a. TMVar a -> STM (Maybe a)
tryReadTMVar TMVar b
var
                IO (Either x a) -> IO (Either x a)
forall a. IO a -> IO a
restore (m (Either x a) -> IO (Either x a)
forall a. m a -> IO a
run (m (Either x a) -> IO (Either x a))
-> m (Either x a) -> IO (Either x a)
forall a b. (a -> b) -> a -> b
$ a -> Maybe b -> m (Either x a)
f a
acc Maybe b
poll) IO (Either x a) -> (Either x a -> IO x) -> IO x
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
                    Left x
r -> do
                        IO () -> IO ()
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m a
uninterruptibleMask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ ThreadId -> IO ()
forall (m :: * -> *). MonadIO m => ThreadId -> m ()
killThread ThreadId
t
                        x -> IO x
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure x
r
                    Right a
acc' -> a -> IO x
next a
acc'
{-# INLINE pollToIO #-}

haltToIO :: (MonadIO m) => Halt ~~> m
haltToIO :: forall (m :: * -> *). MonadIO m => Halt ~~> m
haltToIO Halt m x
Halt = IO x -> m x
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO x -> m x) -> IO x -> m x
forall a b. (a -> b) -> a -> b
$ IO () -> IO x
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO x) -> IO () -> IO x
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
forall (m :: * -> *). MonadIO m => Int -> m ()
threadDelay Int
forall a. Bounded a => a
maxBound
{-# INLINE haltToIO #-}

runParallelAsSequential
    :: forall a es ff c
     . (Applicative (Eff ff es), Free c ff)
    => Eff ff (Parallel ': es) a
    -> Eff ff es a
runParallelAsSequential :: forall a (es :: [Effect]) (ff :: Effect)
       (c :: (* -> *) -> Constraint).
(Applicative (Eff ff es), Free c ff) =>
Eff ff (Parallel : es) a -> Eff ff es a
runParallelAsSequential = (Parallel ~~> Eff ff es) -> Eff ff (Parallel : es) a -> Eff ff es a
forall (e :: Effect) (es :: [Effect]) (ff :: Effect) a
       (c :: (* -> *) -> Constraint).
(KnownOrder e, Free c ff) =>
(e ~~> Eff ff es) -> Eff ff (e : es) a -> Eff ff es a
interpret Parallel (Eff ff es) x -> Eff ff es x
Parallel ~~> Eff ff es
forall (ff :: Effect) (es :: [Effect]).
Applicative (Eff ff es) =>
Parallel ~~> Eff ff es
parallelToSequential
{-# INLINE runParallelAsSequential #-}

parallelToSequential :: (Applicative (Eff ff es)) => Parallel ~~> Eff ff es
parallelToSequential :: forall (ff :: Effect) (es :: [Effect]).
Applicative (Eff ff es) =>
Parallel ~~> Eff ff es
parallelToSequential (LiftP2 a -> b -> x
f Eff ff es a
a Eff ff es b
b) = (a -> b -> x) -> Eff ff es a -> Eff ff es b -> Eff ff es x
forall a b c.
(a -> b -> c) -> Eff ff es a -> Eff ff es b -> Eff ff es c
forall (f :: * -> *) a b c.
Applicative f =>
(a -> b -> c) -> f a -> f b -> f c
liftA2 a -> b -> x
f Eff ff es a
a Eff ff es b
b
{-# INLINE parallelToSequential #-}

runForAsParallel
    :: forall t a es ff c
     . (Parallel :> es, Traversable t, Applicative (Eff ff es), Free c ff)
    => Eff ff (For t ': es) a
    -> Eff ff es a
runForAsParallel :: forall (t :: * -> *) a (es :: [Effect]) (ff :: Effect)
       (c :: (* -> *) -> Constraint).
(Parallel :> es, Traversable t, Applicative (Eff ff es),
 Free c ff) =>
Eff ff (For t : es) a -> Eff ff es a
runForAsParallel = (For t ~~> Eff ff es) -> Eff ff (For t : es) a -> Eff ff es a
forall (e :: Effect) (es :: [Effect]) (ff :: Effect) a
       (c :: (* -> *) -> Constraint).
(KnownOrder e, Free c ff) =>
(e ~~> Eff ff es) -> Eff ff (e : es) a -> Eff ff es a
interpret For t (Eff ff es) x -> Eff ff es x
For t ~~> Eff ff es
forall (t :: * -> *) a (es :: [Effect]) (ff :: Effect)
       (c :: (* -> *) -> Constraint).
(Parallel :> es, Traversable t, Applicative (Eff ff es),
 Free c ff) =>
For t (Eff ff es) a -> Eff ff es a
forToParallel
{-# INLINE runForAsParallel #-}