{-# LANGUAGE FlexibleContexts #-}
module Distribution.Client.JobControl
( JobControl
, newSerialJobControl
, newParallelJobControl
, newSemaphoreJobControl
, spawnJob
, collectJob
, remainingJobs
, cancelJobs
, cleanupJobControl
, jobControlSemaphore
, JobLimit
, newJobLimit
, withJobLimit
, Lock
, newLock
, criticalSection
) where
import Distribution.Client.Compat.Prelude
import Prelude ()
import Control.Concurrent (forkIO, forkIOWithUnmask, threadDelay)
import Control.Concurrent.MVar
import Control.Concurrent.STM (STM, TVar, atomically, modifyTVar', newTVarIO, readTVar)
import Control.Concurrent.STM.TChan
import Control.Exception (bracket_, mask_, try)
import Control.Monad (forever, replicateM_)
import Distribution.Client.Compat.Semaphore
import Distribution.Compat.Stack
import Distribution.Simple.Utils
import System.Semaphore
data JobControl m a = JobControl
{ forall (m :: * -> *) a. JobControl m a -> m a -> m ()
spawnJob :: m a -> m ()
, forall (m :: * -> *) a. JobControl m a -> m a
collectJob :: m a
, forall (m :: * -> *) a. JobControl m a -> m Bool
remainingJobs :: m Bool
, forall (m :: * -> *) a. JobControl m a -> m ()
cancelJobs :: m ()
, forall (m :: * -> *) a. JobControl m a -> m ()
cleanupJobControl :: m ()
, forall (m :: * -> *) a. JobControl m a -> Maybe SemaphoreName
jobControlSemaphore :: Maybe SemaphoreName
}
newSerialJobControl :: IO (JobControl IO a)
newSerialJobControl :: forall a. IO (JobControl IO a)
newSerialJobControl = do
TChan (IO a)
qVar <- IO (TChan (IO a))
forall a. IO (TChan a)
newTChanIO
JobControl IO a -> IO (JobControl IO a)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return
JobControl
{ spawnJob :: IO a -> IO ()
spawnJob = TChan (IO a) -> IO a -> IO ()
forall a. TChan (IO a) -> IO a -> IO ()
spawn TChan (IO a)
qVar
, collectJob :: IO a
collectJob = TChan (IO a) -> IO a
forall a. TChan (IO a) -> IO a
collect TChan (IO a)
qVar
, remainingJobs :: IO Bool
remainingJobs = TChan (IO a) -> IO Bool
forall a. TChan (IO a) -> IO Bool
remaining TChan (IO a)
qVar
, cancelJobs :: IO ()
cancelJobs = TChan (IO a) -> IO ()
forall a. TChan (IO a) -> IO ()
cancel TChan (IO a)
qVar
, cleanupJobControl :: IO ()
cleanupJobControl = () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
, jobControlSemaphore :: Maybe SemaphoreName
jobControlSemaphore = Maybe SemaphoreName
forall a. Maybe a
Nothing
}
where
spawn :: TChan (IO a) -> IO a -> IO ()
spawn :: forall a. TChan (IO a) -> IO a -> IO ()
spawn TChan (IO a)
qVar IO a
job = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TChan (IO a) -> IO a -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan (IO a)
qVar IO a
job
collect :: TChan (IO a) -> IO a
collect :: forall a. TChan (IO a) -> IO a
collect TChan (IO a)
qVar =
IO (IO a) -> IO a
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO a) -> IO a) -> IO (IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ STM (IO a) -> IO (IO a)
forall a. STM a -> IO a
atomically (STM (IO a) -> IO (IO a)) -> STM (IO a) -> IO (IO a)
forall a b. (a -> b) -> a -> b
$ TChan (IO a) -> STM (IO a)
forall a. TChan a -> STM a
readTChan TChan (IO a)
qVar
remaining :: TChan (IO a) -> IO Bool
remaining :: forall a. TChan (IO a) -> IO Bool
remaining TChan (IO a)
qVar = (Bool -> Bool) -> IO Bool -> IO Bool
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Bool -> Bool
not (IO Bool -> IO Bool) -> IO Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ TChan (IO a) -> STM Bool
forall a. TChan a -> STM Bool
isEmptyTChan TChan (IO a)
qVar
cancel :: TChan (IO a) -> IO ()
cancel :: forall a. TChan (IO a) -> IO ()
cancel TChan (IO a)
qVar = do
[IO a]
_ <- STM [IO a] -> IO [IO a]
forall a. STM a -> IO a
atomically (STM [IO a] -> IO [IO a]) -> STM [IO a] -> IO [IO a]
forall a b. (a -> b) -> a -> b
$ TChan (IO a) -> STM [IO a]
forall a. TChan a -> STM [a]
readAllTChan TChan (IO a)
qVar
() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
newParallelJobControl :: WithCallStack (Int -> IO (JobControl IO a))
newParallelJobControl :: forall a. WithCallStack (Int -> IO (JobControl IO a))
newParallelJobControl Int
n
| Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
1 Bool -> Bool -> Bool
|| Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
1000 =
[Char] -> IO (JobControl IO a)
forall a. HasCallStack => [Char] -> a
error ([Char] -> IO (JobControl IO a)) -> [Char] -> IO (JobControl IO a)
forall a b. (a -> b) -> a -> b
$ [Char]
"newParallelJobControl: not a sensible number of jobs: " [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show Int
n
newParallelJobControl Int
maxJobLimit = do
TChan (IO a)
inqVar <- IO (TChan (IO a))
forall a. IO (TChan a)
newTChanIO
TChan (Either SomeException a)
outqVar <- IO (TChan (Either SomeException a))
forall a. IO (TChan a)
newTChanIO
TVar Int
countVar <- Int -> IO (TVar Int)
forall a. a -> IO (TVar a)
newTVarIO Int
0
Int -> IO ThreadId -> IO ()
forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
maxJobLimit (IO ThreadId -> IO ()) -> IO ThreadId -> IO ()
forall a b. (a -> b) -> a -> b
$
IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$
TChan (IO a) -> TChan (Either SomeException a) -> IO ()
forall a. TChan (IO a) -> TChan (Either SomeException a) -> IO ()
worker TChan (IO a)
inqVar TChan (Either SomeException a)
outqVar
JobControl IO a -> IO (JobControl IO a)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return
JobControl
{ spawnJob :: IO a -> IO ()
spawnJob = TChan (IO a) -> TVar Int -> IO a -> IO ()
forall a. TChan (IO a) -> TVar Int -> IO a -> IO ()
spawn TChan (IO a)
inqVar TVar Int
countVar
, collectJob :: IO a
collectJob = TChan (Either SomeException a) -> TVar Int -> IO a
forall a. TChan (Either SomeException a) -> TVar Int -> IO a
collect TChan (Either SomeException a)
outqVar TVar Int
countVar
, remainingJobs :: IO Bool
remainingJobs = TVar Int -> IO Bool
remaining TVar Int
countVar
, cancelJobs :: IO ()
cancelJobs = TChan (IO a) -> TVar Int -> IO ()
forall a. TChan (IO a) -> TVar Int -> IO ()
cancel TChan (IO a)
inqVar TVar Int
countVar
, cleanupJobControl :: IO ()
cleanupJobControl = () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
, jobControlSemaphore :: Maybe SemaphoreName
jobControlSemaphore = Maybe SemaphoreName
forall a. Maybe a
Nothing
}
where
worker :: TChan (IO a) -> TChan (Either SomeException a) -> IO ()
worker :: forall a. TChan (IO a) -> TChan (Either SomeException a) -> IO ()
worker TChan (IO a)
inqVar TChan (Either SomeException a)
outqVar =
IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
IO a
job <- STM (IO a) -> IO (IO a)
forall a. STM a -> IO a
atomically (STM (IO a) -> IO (IO a)) -> STM (IO a) -> IO (IO a)
forall a b. (a -> b) -> a -> b
$ TChan (IO a) -> STM (IO a)
forall a. TChan a -> STM a
readTChan TChan (IO a)
inqVar
Either SomeException a
res <- IO a -> IO (Either SomeException a)
forall e a. Exception e => IO a -> IO (Either e a)
try IO a
job
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TChan (Either SomeException a) -> Either SomeException a -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan (Either SomeException a)
outqVar Either SomeException a
res
spawn :: TChan (IO a) -> TVar Int -> IO a -> IO ()
spawn :: forall a. TChan (IO a) -> TVar Int -> IO a -> IO ()
spawn TChan (IO a)
inqVar TVar Int
countVar IO a
job =
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
countVar (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
TChan (IO a) -> IO a -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan (IO a)
inqVar IO a
job
collect :: TChan (Either SomeException a) -> TVar Int -> IO a
collect :: forall a. TChan (Either SomeException a) -> TVar Int -> IO a
collect TChan (Either SomeException a)
outqVar TVar Int
countVar = do
Either SomeException a
res <- STM (Either SomeException a) -> IO (Either SomeException a)
forall a. STM a -> IO a
atomically (STM (Either SomeException a) -> IO (Either SomeException a))
-> STM (Either SomeException a) -> IO (Either SomeException a)
forall a b. (a -> b) -> a -> b
$ do
TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
countVar (Int -> Int -> Int
forall a. Num a => a -> a -> a
subtract Int
1)
TChan (Either SomeException a) -> STM (Either SomeException a)
forall a. TChan a -> STM a
readTChan TChan (Either SomeException a)
outqVar
(SomeException -> IO a)
-> (a -> IO a) -> Either SomeException a -> IO a
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either SomeException -> IO a
forall e a. Exception e => e -> IO a
throwIO a -> IO a
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Either SomeException a
res
remaining :: TVar Int -> IO Bool
remaining :: TVar Int -> IO Bool
remaining TVar Int
countVar = (Int -> Bool) -> IO Int -> IO Bool
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
/= Int
0) (IO Int -> IO Bool) -> IO Int -> IO Bool
forall a b. (a -> b) -> a -> b
$ STM Int -> IO Int
forall a. STM a -> IO a
atomically (STM Int -> IO Int) -> STM Int -> IO Int
forall a b. (a -> b) -> a -> b
$ TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
countVar
cancel :: TChan (IO a) -> TVar Int -> IO ()
cancel :: forall a. TChan (IO a) -> TVar Int -> IO ()
cancel TChan (IO a)
inqVar TVar Int
countVar =
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
[IO a]
xs <- TChan (IO a) -> STM [IO a]
forall a. TChan a -> STM [a]
readAllTChan TChan (IO a)
inqVar
TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
countVar (Int -> Int -> Int
forall a. Num a => a -> a -> a
subtract ([IO a] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [IO a]
xs))
readAllTChan :: TChan a -> STM [a]
readAllTChan :: forall a. TChan a -> STM [a]
readAllTChan TChan a
qvar = [a] -> STM [a]
go []
where
go :: [a] -> STM [a]
go [a]
xs = do
Maybe a
mx <- TChan a -> STM (Maybe a)
forall a. TChan a -> STM (Maybe a)
tryReadTChan TChan a
qvar
case Maybe a
mx of
Maybe a
Nothing -> [a] -> STM [a]
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return ([a] -> [a]
forall a. [a] -> [a]
reverse [a]
xs)
Just a
x -> [a] -> STM [a]
go (a
x a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
xs)
newSemaphoreJobControl :: WithCallStack (Verbosity -> Int -> IO (JobControl IO a))
newSemaphoreJobControl :: forall a. WithCallStack (Verbosity -> Int -> IO (JobControl IO a))
newSemaphoreJobControl Verbosity
_ Int
n
| Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
1 Bool -> Bool -> Bool
|| Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
1000 =
[Char] -> IO (JobControl IO a)
forall a. HasCallStack => [Char] -> a
error ([Char] -> IO (JobControl IO a)) -> [Char] -> IO (JobControl IO a)
forall a b. (a -> b) -> a -> b
$ [Char]
"newParallelJobControl: not a sensible number of jobs: " [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show Int
n
newSemaphoreJobControl Verbosity
verbosity Int
maxJobLimit = do
Semaphore
sem <- [Char] -> Int -> IO Semaphore
freshSemaphore [Char]
"cabal_semaphore" Int
maxJobLimit
Verbosity -> [Char] -> IO ()
notice Verbosity
verbosity ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$
[Char]
"Created semaphore called "
[Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ SemaphoreName -> [Char]
getSemaphoreName (Semaphore -> SemaphoreName
semaphoreName Semaphore
sem)
[Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char]
" with "
[Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show Int
maxJobLimit
[Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char]
" slots."
TChan (Either SomeException a)
outqVar <- IO (TChan (Either SomeException a))
forall a. IO (TChan a)
newTChanIO
TChan (IO a)
inqVar <- IO (TChan (IO a))
forall a. IO (TChan a)
newTChanIO
TVar Int
countVar <- Int -> IO (TVar Int)
forall a. a -> IO (TVar a)
newTVarIO Int
0
IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO () -> IO ThreadId
forkIO (Semaphore
-> TChan (IO a) -> TChan (Either SomeException a) -> IO ()
forall a.
Semaphore
-> TChan (IO a) -> TChan (Either SomeException a) -> IO ()
worker Semaphore
sem TChan (IO a)
inqVar TChan (Either SomeException a)
outqVar))
JobControl IO a -> IO (JobControl IO a)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return
JobControl
{ spawnJob :: IO a -> IO ()
spawnJob = TChan (IO a) -> TVar Int -> IO a -> IO ()
forall a. TChan (IO a) -> TVar Int -> IO a -> IO ()
spawn TChan (IO a)
inqVar TVar Int
countVar
, collectJob :: IO a
collectJob = TChan (Either SomeException a) -> TVar Int -> IO a
forall a. TChan (Either SomeException a) -> TVar Int -> IO a
collect TChan (Either SomeException a)
outqVar TVar Int
countVar
, remainingJobs :: IO Bool
remainingJobs = TVar Int -> IO Bool
remaining TVar Int
countVar
, cancelJobs :: IO ()
cancelJobs = TChan (IO a) -> TVar Int -> IO ()
forall a. TChan (IO a) -> TVar Int -> IO ()
cancel TChan (IO a)
inqVar TVar Int
countVar
, cleanupJobControl :: IO ()
cleanupJobControl = Semaphore -> IO ()
destroySemaphore Semaphore
sem
, jobControlSemaphore :: Maybe SemaphoreName
jobControlSemaphore = SemaphoreName -> Maybe SemaphoreName
forall a. a -> Maybe a
Just (Semaphore -> SemaphoreName
semaphoreName Semaphore
sem)
}
where
worker :: Semaphore -> TChan (IO a) -> TChan (Either SomeException a) -> IO ()
worker :: forall a.
Semaphore
-> TChan (IO a) -> TChan (Either SomeException a) -> IO ()
worker Semaphore
sem TChan (IO a)
inqVar TChan (Either SomeException a)
outqVar =
IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
IO a
job <- STM (IO a) -> IO (IO a)
forall a. STM a -> IO a
atomically (STM (IO a) -> IO (IO a)) -> STM (IO a) -> IO (IO a)
forall a b. (a -> b) -> a -> b
$ TChan (IO a) -> STM (IO a)
forall a. TChan a -> STM a
readTChan TChan (IO a)
inqVar
IO () -> IO ()
forall a. IO a -> IO a
mask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Semaphore -> IO ()
waitOnSemaphore Semaphore
sem
IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> IO ThreadId -> IO ()
forall a b. (a -> b) -> a -> b
$ ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forkIOWithUnmask (((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId)
-> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
unmask -> do
Either SomeException a
res <- IO a -> IO (Either SomeException a)
forall e a. Exception e => IO a -> IO (Either e a)
try (IO a -> IO a
forall a. IO a -> IO a
unmask IO a
job)
Semaphore -> Int -> IO ()
releaseSemaphore Semaphore
sem Int
1
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TChan (Either SomeException a) -> Either SomeException a -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan (Either SomeException a)
outqVar Either SomeException a
res
Int -> IO ()
threadDelay Int
250000
spawn :: TChan (IO a) -> TVar Int -> IO a -> IO ()
spawn :: forall a. TChan (IO a) -> TVar Int -> IO a -> IO ()
spawn TChan (IO a)
inqVar TVar Int
countVar IO a
job =
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
countVar (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
TChan (IO a) -> IO a -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan (IO a)
inqVar IO a
job
collect :: TChan (Either SomeException a) -> TVar Int -> IO a
collect :: forall a. TChan (Either SomeException a) -> TVar Int -> IO a
collect TChan (Either SomeException a)
outqVar TVar Int
countVar = do
Either SomeException a
res <- STM (Either SomeException a) -> IO (Either SomeException a)
forall a. STM a -> IO a
atomically (STM (Either SomeException a) -> IO (Either SomeException a))
-> STM (Either SomeException a) -> IO (Either SomeException a)
forall a b. (a -> b) -> a -> b
$ do
TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
countVar (Int -> Int -> Int
forall a. Num a => a -> a -> a
subtract Int
1)
TChan (Either SomeException a) -> STM (Either SomeException a)
forall a. TChan a -> STM a
readTChan TChan (Either SomeException a)
outqVar
(SomeException -> IO a)
-> (a -> IO a) -> Either SomeException a -> IO a
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either SomeException -> IO a
forall e a. Exception e => e -> IO a
throwIO a -> IO a
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Either SomeException a
res
remaining :: TVar Int -> IO Bool
remaining :: TVar Int -> IO Bool
remaining TVar Int
countVar = (Int -> Bool) -> IO Int -> IO Bool
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
/= Int
0) (IO Int -> IO Bool) -> IO Int -> IO Bool
forall a b. (a -> b) -> a -> b
$ STM Int -> IO Int
forall a. STM a -> IO a
atomically (STM Int -> IO Int) -> STM Int -> IO Int
forall a b. (a -> b) -> a -> b
$ TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
countVar
cancel :: TChan (IO a) -> TVar Int -> IO ()
cancel :: forall a. TChan (IO a) -> TVar Int -> IO ()
cancel TChan (IO a)
inqVar TVar Int
countVar =
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
[IO a]
xs <- TChan (IO a) -> STM [IO a]
forall a. TChan a -> STM [a]
readAllTChan TChan (IO a)
inqVar
TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
countVar (Int -> Int -> Int
forall a. Num a => a -> a -> a
subtract ([IO a] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [IO a]
xs))
data JobLimit = JobLimit QSem
newJobLimit :: Int -> IO JobLimit
newJobLimit :: Int -> IO JobLimit
newJobLimit Int
n =
(QSem -> JobLimit) -> IO QSem -> IO JobLimit
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap QSem -> JobLimit
JobLimit (Int -> IO QSem
newQSem Int
n)
withJobLimit :: JobLimit -> IO a -> IO a
withJobLimit :: forall a. JobLimit -> IO a -> IO a
withJobLimit (JobLimit QSem
sem) =
IO () -> IO () -> IO a -> IO a
forall a b c. IO a -> IO b -> IO c -> IO c
bracket_ (QSem -> IO ()
waitQSem QSem
sem) (QSem -> IO ()
signalQSem QSem
sem)
newtype Lock = Lock (MVar ())
newLock :: IO Lock
newLock :: IO Lock
newLock = (MVar () -> Lock) -> IO (MVar ()) -> IO Lock
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap MVar () -> Lock
Lock (IO (MVar ()) -> IO Lock) -> IO (MVar ()) -> IO Lock
forall a b. (a -> b) -> a -> b
$ () -> IO (MVar ())
forall a. a -> IO (MVar a)
newMVar ()
criticalSection :: Lock -> IO a -> IO a
criticalSection :: forall a. Lock -> IO a -> IO a
criticalSection (Lock MVar ()
lck) IO a
act = IO () -> IO () -> IO a -> IO a
forall a b c. IO a -> IO b -> IO c -> IO c
bracket_ (MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
lck) (MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
lck ()) IO a
act