{-# LANGUAGE RecursiveDo #-}
module Multitasking.Race
(
raceTwo,
raceTwoMaybe,
raceMany,
raceManyMaybe,
timeout,
)
where
import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad (forever, void)
import Control.Monad.IO.Class
import Data.Foldable (for_)
import Multitasking.Communication
import Multitasking.Core
import Multitasking.Waiting
raceTwo :: (MonadIO m) => IO a -> IO a -> m a
raceTwo :: forall (m :: * -> *) a. MonadIO m => IO a -> IO a -> m a
raceTwo IO a
action1 IO a
action2 = (Coordinator -> IO a) -> m a
forall (m :: * -> *) a. MonadIO m => (Coordinator -> IO a) -> m a
multitask ((Coordinator -> IO a) -> m a) -> (Coordinator -> IO a) -> m a
forall a b. (a -> b) -> a -> b
$ \Coordinator
inner -> do
slot <- IO (Slot a)
forall (m :: * -> *) a. MonadSTM m => m (Slot a)
newSlot
_ <- start inner $ action1 >>= putSlot slot
_ <- start inner $ action2 >>= putSlot slot
awaitSlot slot
raceTwoMaybe :: (MonadIO m) => IO (Maybe a) -> IO (Maybe a) -> m (Maybe a)
raceTwoMaybe :: forall (m :: * -> *) a.
MonadIO m =>
IO (Maybe a) -> IO (Maybe a) -> m (Maybe a)
raceTwoMaybe IO (Maybe a)
action1 IO (Maybe a)
action2 = [IO (Maybe a)] -> m (Maybe a)
forall (m :: * -> *) (t :: * -> *) a.
(MonadIO m, Traversable t) =>
t (IO (Maybe a)) -> m (Maybe a)
raceManyMaybe [IO (Maybe a)
action1, IO (Maybe a)
action2]
raceMany :: (MonadIO m, Traversable t) => t (IO a) -> m a
raceMany :: forall (m :: * -> *) (t :: * -> *) a.
(MonadIO m, Traversable t) =>
t (IO a) -> m a
raceMany t (IO a)
actions = (Coordinator -> IO a) -> m a
forall (m :: * -> *) a. MonadIO m => (Coordinator -> IO a) -> m a
multitask ((Coordinator -> IO a) -> m a) -> (Coordinator -> IO a) -> m a
forall a b. (a -> b) -> a -> b
$ \Coordinator
inner -> do
slot <- IO (Slot a)
forall (m :: * -> *) a. MonadSTM m => m (Slot a)
newSlot
for_ actions $ \IO a
action -> Coordinator -> IO Bool -> IO (Task Bool)
forall (m :: * -> *) a.
MonadIO m =>
Coordinator -> IO a -> m (Task a)
start Coordinator
inner (IO Bool -> IO (Task Bool)) -> IO Bool -> IO (Task Bool)
forall a b. (a -> b) -> a -> b
$ IO a
action IO a -> (a -> IO Bool) -> IO Bool
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Slot a -> a -> IO Bool
forall (m :: * -> *) a. MonadSTM m => Slot a -> a -> m Bool
putSlot Slot a
slot
awaitSlot slot
raceManyMaybe :: (MonadIO m, Traversable t) => t (IO (Maybe a)) -> m (Maybe a)
raceManyMaybe :: forall (m :: * -> *) (t :: * -> *) a.
(MonadIO m, Traversable t) =>
t (IO (Maybe a)) -> m (Maybe a)
raceManyMaybe t (IO (Maybe a))
actions = (Coordinator -> IO (Maybe a)) -> m (Maybe a)
forall (m :: * -> *) a. MonadIO m => (Coordinator -> IO a) -> m a
multitask ((Coordinator -> IO (Maybe a)) -> m (Maybe a))
-> (Coordinator -> IO (Maybe a)) -> m (Maybe a)
forall a b. (a -> b) -> a -> b
$ \Coordinator
coordinator -> do
counter <- Int -> IO Counter
forall (m :: * -> *). MonadSTM m => Int -> m Counter
newCounter (t (IO (Maybe a)) -> Int
forall a. t a -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length t (IO (Maybe a))
actions)
slot <- newSlot
for_ actions $ \IO (Maybe a)
action -> Coordinator -> IO () -> IO (Task ())
forall (m :: * -> *) a.
MonadIO m =>
Coordinator -> IO a -> m (Task a)
start Coordinator
coordinator (IO () -> IO (Task ())) -> IO () -> IO (Task ())
forall a b. (a -> b) -> a -> b
$ do
maybeA <- IO (Maybe a)
action
decrementCounter counter
case maybeA of
Maybe a
Nothing -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Just a
a -> IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ Slot a -> a -> IO Bool
forall (m :: * -> *) a. MonadSTM m => Slot a -> a -> m Bool
putSlot Slot a
slot a
a
atomically $ do
maybeA <- probeSlot slot
case maybeA of
Just a
a -> Maybe a -> STM (Maybe a)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe a -> STM (Maybe a)) -> Maybe a -> STM (Maybe a)
forall a b. (a -> b) -> a -> b
$ a -> Maybe a
forall a. a -> Maybe a
Just a
a
Maybe a
Nothing -> do
finished <- Counter -> STM Int
forall (m :: * -> *). MonadSTM m => Counter -> m Int
getCounter Counter
counter
if finished == 0
then pure Nothing
else retry
timeout :: (MonadIO m) => Duration -> IO a -> m (Maybe a)
timeout :: forall (m :: * -> *) a.
MonadIO m =>
Duration -> IO a -> m (Maybe a)
timeout Duration
duration IO a
action =
IO (Maybe a) -> IO (Maybe a) -> m (Maybe a)
forall (m :: * -> *) a. MonadIO m => IO a -> IO a -> m a
raceTwo (a -> Maybe a
forall a. a -> Maybe a
Just (a -> Maybe a) -> IO a -> IO (Maybe a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO a
action) (Int -> IO ()
threadDelay Int
maxWaitTime IO () -> IO (Maybe a) -> IO (Maybe a)
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe a -> IO (Maybe a)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe a
forall a. Maybe a
Nothing)
where
maxWaitTime :: Int
maxWaitTime = Word -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Word -> Int) -> Word -> Int
forall a b. (a -> b) -> a -> b
$ Duration -> Word
durationToMicroseconds Duration
duration