module Test.Framework.Runners.ThreadPool (
executeOnPool
) where
import Control.Concurrent
( getChanContents,
newChan,
readChan,
writeChan,
writeList2Chan,
forkIO,
myThreadId,
Chan )
import Control.Monad ( forM_ )
import qualified Data.IntMap as IM
import Foreign.StablePtr ( newStablePtr )
data WorkerEvent token a = WorkerTermination
| WorkerItem token a
executeOnPool :: Int
-> [IO a]
-> IO [a]
executeOnPool :: forall a. Int -> [IO a] -> IO [a]
executeOnPool Int
n [IO a]
actions = do
Chan (WorkerEvent Int (IO a))
input_chan <- IO (Chan (WorkerEvent Int (IO a)))
forall a. IO (Chan a)
newChan
Chan (WorkerEvent Int a)
output_chan <- IO (Chan (WorkerEvent Int a))
forall a. IO (Chan a)
newChan
ThreadId
_ <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ Chan (WorkerEvent Int (IO a)) -> [WorkerEvent Int (IO a)] -> IO ()
forall a. Chan a -> [a] -> IO ()
writeList2Chan Chan (WorkerEvent Int (IO a))
input_chan ((Int -> IO a -> WorkerEvent Int (IO a))
-> [Int] -> [IO a] -> [WorkerEvent Int (IO a)]
forall a b c. (a -> b -> c) -> [a] -> [b] -> [c]
zipWith Int -> IO a -> WorkerEvent Int (IO a)
forall token a. token -> a -> WorkerEvent token a
WorkerItem [Int
0..] [IO a]
actions [WorkerEvent Int (IO a)]
-> [WorkerEvent Int (IO a)] -> [WorkerEvent Int (IO a)]
forall a. [a] -> [a] -> [a]
++ Int -> WorkerEvent Int (IO a) -> [WorkerEvent Int (IO a)]
forall a. Int -> a -> [a]
replicate Int
n WorkerEvent Int (IO a)
forall token a. WorkerEvent token a
WorkerTermination)
[Int] -> (Int -> IO ThreadId) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Int
1..Int
n] (IO ThreadId -> Int -> IO ThreadId
forall a b. a -> b -> a
const (IO ThreadId -> Int -> IO ThreadId)
-> IO ThreadId -> Int -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ Chan (WorkerEvent Int (IO a)) -> Chan (WorkerEvent Int a) -> IO ()
forall token a.
Chan (WorkerEvent token (IO a))
-> Chan (WorkerEvent token a) -> IO ()
poolWorker Chan (WorkerEvent Int (IO a))
input_chan Chan (WorkerEvent Int a)
output_chan)
StablePtr ThreadId
_stablePtr <- IO ThreadId
myThreadId IO ThreadId
-> (ThreadId -> IO (StablePtr ThreadId)) -> IO (StablePtr ThreadId)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ThreadId -> IO (StablePtr ThreadId)
forall a. a -> IO (StablePtr a)
newStablePtr
([WorkerEvent Int a] -> [a]) -> IO [WorkerEvent Int a] -> IO [a]
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Int -> [(Int, a)] -> [a]
forall a. Int -> [(Int, a)] -> [a]
reorderFrom Int
0 ([(Int, a)] -> [a])
-> ([WorkerEvent Int a] -> [(Int, a)])
-> [WorkerEvent Int a]
-> [a]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> [WorkerEvent Int a] -> [(Int, a)]
forall token a. Int -> [WorkerEvent token a] -> [(token, a)]
takeWhileWorkersExist Int
n) (IO [WorkerEvent Int a] -> IO [a])
-> IO [WorkerEvent Int a] -> IO [a]
forall a b. (a -> b) -> a -> b
$ Chan (WorkerEvent Int a) -> IO [WorkerEvent Int a]
forall a. Chan a -> IO [a]
getChanContents Chan (WorkerEvent Int a)
output_chan
poolWorker :: Chan (WorkerEvent token (IO a)) -> Chan (WorkerEvent token a) -> IO ()
poolWorker :: forall token a.
Chan (WorkerEvent token (IO a))
-> Chan (WorkerEvent token a) -> IO ()
poolWorker Chan (WorkerEvent token (IO a))
input_chan Chan (WorkerEvent token a)
output_chan = do
WorkerEvent token (IO a)
action_item <- Chan (WorkerEvent token (IO a)) -> IO (WorkerEvent token (IO a))
forall a. Chan a -> IO a
readChan Chan (WorkerEvent token (IO a))
input_chan
case WorkerEvent token (IO a)
action_item of
WorkerEvent token (IO a)
WorkerTermination -> Chan (WorkerEvent token a) -> WorkerEvent token a -> IO ()
forall a. Chan a -> a -> IO ()
writeChan Chan (WorkerEvent token a)
output_chan WorkerEvent token a
forall token a. WorkerEvent token a
WorkerTermination
WorkerItem token
token IO a
action -> do
a
result <- IO a
action
Chan (WorkerEvent token a) -> WorkerEvent token a -> IO ()
forall a. Chan a -> a -> IO ()
writeChan Chan (WorkerEvent token a)
output_chan (token -> a -> WorkerEvent token a
forall token a. token -> a -> WorkerEvent token a
WorkerItem token
token a
result)
Chan (WorkerEvent token (IO a))
-> Chan (WorkerEvent token a) -> IO ()
forall token a.
Chan (WorkerEvent token (IO a))
-> Chan (WorkerEvent token a) -> IO ()
poolWorker Chan (WorkerEvent token (IO a))
input_chan Chan (WorkerEvent token a)
output_chan
takeWhileWorkersExist :: Int -> [WorkerEvent token a] -> [(token, a)]
takeWhileWorkersExist :: forall token a. Int -> [WorkerEvent token a] -> [(token, a)]
takeWhileWorkersExist Int
worker_count [WorkerEvent token a]
events
| Int
worker_count Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0 = []
| Bool
otherwise = case [WorkerEvent token a]
events of
(WorkerEvent token a
WorkerTermination:[WorkerEvent token a]
events') -> Int -> [WorkerEvent token a] -> [(token, a)]
forall token a. Int -> [WorkerEvent token a] -> [(token, a)]
takeWhileWorkersExist (Int
worker_count Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) [WorkerEvent token a]
events'
(WorkerItem token
token a
x:[WorkerEvent token a]
events') -> (token
token, a
x) (token, a) -> [(token, a)] -> [(token, a)]
forall a. a -> [a] -> [a]
: Int -> [WorkerEvent token a] -> [(token, a)]
forall token a. Int -> [WorkerEvent token a] -> [(token, a)]
takeWhileWorkersExist Int
worker_count [WorkerEvent token a]
events'
[] -> []
reorderFrom :: Int -> [(Int, a)] -> [a]
reorderFrom :: forall a. Int -> [(Int, a)] -> [a]
reorderFrom Int
from [(Int, a)]
initial_things = Int -> [(Int, a)] -> IntMap a -> Bool -> [a]
forall {a}. Int -> [(Int, a)] -> IntMap a -> Bool -> [a]
go Int
from [(Int, a)]
initial_things IntMap a
forall a. IntMap a
IM.empty Bool
False
where go :: Int -> [(Int, a)] -> IntMap a -> Bool -> [a]
go Int
next [] IntMap a
buf Bool
_
| IntMap a -> Bool
forall a. IntMap a -> Bool
IM.null IntMap a
buf = []
| Bool
otherwise = Int -> [(Int, a)] -> IntMap a -> Bool -> [a]
go Int
next (IntMap a -> [(Int, a)]
forall a. IntMap a -> [(Int, a)]
IM.toList IntMap a
buf) IntMap a
forall a. IntMap a
IM.empty Bool
False
go Int
next all_things :: [(Int, a)]
all_things@((Int
token, a
x):[(Int, a)]
things) IntMap a
buf Bool
buf_useful
| Int
token Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
next
= a
x a -> [a] -> [a]
forall a. a -> [a] -> [a]
: Int -> [(Int, a)] -> IntMap a -> Bool -> [a]
go (Int
next Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) [(Int, a)]
things IntMap a
buf Bool
True
| Bool
buf_useful
, (Just a
x', IntMap a
buf') <- (Int -> a -> Maybe a) -> Int -> IntMap a -> (Maybe a, IntMap a)
forall a.
(Int -> a -> Maybe a) -> Int -> IntMap a -> (Maybe a, IntMap a)
IM.updateLookupWithKey ((a -> Maybe a) -> Int -> a -> Maybe a
forall a b. a -> b -> a
const ((a -> Maybe a) -> Int -> a -> Maybe a)
-> (a -> Maybe a) -> Int -> a -> Maybe a
forall a b. (a -> b) -> a -> b
$ Maybe a -> a -> Maybe a
forall a b. a -> b -> a
const Maybe a
forall a. Maybe a
Nothing) Int
next IntMap a
buf
= a
x' a -> [a] -> [a]
forall a. a -> [a] -> [a]
: Int -> [(Int, a)] -> IntMap a -> Bool -> [a]
go (Int
next Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) [(Int, a)]
all_things IntMap a
buf' Bool
True
| Bool
otherwise
= Int -> [(Int, a)] -> IntMap a -> Bool -> [a]
go Int
next [(Int, a)]
things (Int -> a -> IntMap a -> IntMap a
forall a. Int -> a -> IntMap a -> IntMap a
IM.insert Int
token a
x IntMap a
buf) Bool
False