module Test.Framework.Runners.ThreadPool (
        executeOnPool
    ) where

import Control.Concurrent
    ( getChanContents,
      newChan,
      readChan,
      writeChan,
      writeList2Chan,
      forkIO,
      myThreadId,
      Chan )
import Control.Monad ( forM_ )

import qualified Data.IntMap as IM

import Foreign.StablePtr ( newStablePtr )


data WorkerEvent token a = WorkerTermination
                         | WorkerItem token a

-- | Execute IO actions on several threads and return their results in the original
-- order.  It is guaranteed that no action from the input list is executed unless all
-- the items that precede it in the list have been executed or are executing at that
-- moment.
executeOnPool :: Int    -- ^ Number of threads to use
              -> [IO a] -- ^ Actions to execute: these will be scheduled left to right
              -> IO [a] -- ^ Ordered results of executing the given IO actions in parallel
executeOnPool :: forall a. Int -> [IO a] -> IO [a]
executeOnPool Int
n [IO a]
actions = do
    -- Prepare the channels
    Chan (WorkerEvent Int (IO a))
input_chan <- IO (Chan (WorkerEvent Int (IO a)))
forall a. IO (Chan a)
newChan
    Chan (WorkerEvent Int a)
output_chan <- IO (Chan (WorkerEvent Int a))
forall a. IO (Chan a)
newChan

    -- Write the actions as items to the channel followed by one termination per thread
    -- that indicates they should terminate. We do this on another thread for
    -- maximum laziness (in case one the actions we are going to run depend on the
    -- output from previous actions..)
    ThreadId
_ <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ Chan (WorkerEvent Int (IO a)) -> [WorkerEvent Int (IO a)] -> IO ()
forall a. Chan a -> [a] -> IO ()
writeList2Chan Chan (WorkerEvent Int (IO a))
input_chan ((Int -> IO a -> WorkerEvent Int (IO a))
-> [Int] -> [IO a] -> [WorkerEvent Int (IO a)]
forall a b c. (a -> b -> c) -> [a] -> [b] -> [c]
zipWith Int -> IO a -> WorkerEvent Int (IO a)
forall token a. token -> a -> WorkerEvent token a
WorkerItem [Int
0..] [IO a]
actions [WorkerEvent Int (IO a)]
-> [WorkerEvent Int (IO a)] -> [WorkerEvent Int (IO a)]
forall a. [a] -> [a] -> [a]
++ Int -> WorkerEvent Int (IO a) -> [WorkerEvent Int (IO a)]
forall a. Int -> a -> [a]
replicate Int
n WorkerEvent Int (IO a)
forall token a. WorkerEvent token a
WorkerTermination)

    -- Spawn workers
    [Int] -> (Int -> IO ThreadId) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Int
1..Int
n] (IO ThreadId -> Int -> IO ThreadId
forall a b. a -> b -> a
const (IO ThreadId -> Int -> IO ThreadId)
-> IO ThreadId -> Int -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ Chan (WorkerEvent Int (IO a)) -> Chan (WorkerEvent Int a) -> IO ()
forall token a.
Chan (WorkerEvent token (IO a))
-> Chan (WorkerEvent token a) -> IO ()
poolWorker Chan (WorkerEvent Int (IO a))
input_chan Chan (WorkerEvent Int a)
output_chan)

    -- Short version: make sure we do the right thing if a test blocks on dead
    -- MVars or TVars.
    -- Long version: GHC is clever enough to throw an exception (BlockedOnDeadMVar
    -- or BlockedIndefinitely) when a thread is waiting for a MVar or TVar that can't
    -- be written to. However, it doesn't know anything about the handlers for those
    -- exceptions. Therefore, when a worker runs a test that causes this exception,
    -- since the main thread is blocking on the worker, the main thread gets the
    -- exception too despite the fact that the main thread will be runnable as soon
    -- as the worker catches its own exception. The below makes sure the main thread
    -- is always reachable by the GC, which is the mechanism for finding threads
    -- that are unrunnable.
    -- See also the ticket where SimonM (semi-cryptically) explains this:
    -- http://hackage.haskell.org/trac/ghc/ticket/3291
    --
    -- NB: this actually leaks stable pointers. We could prevent this by making
    -- takeWhileWorkersExist do |unsafePerformIO newStablePtr| when returning the
    -- lazily-demanded tail of the list, but its a bit of a pain. For now, just
    -- grit our teeth and accept the leak.
    StablePtr ThreadId
_stablePtr <- IO ThreadId
myThreadId IO ThreadId
-> (ThreadId -> IO (StablePtr ThreadId)) -> IO (StablePtr ThreadId)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ThreadId -> IO (StablePtr ThreadId)
forall a. a -> IO (StablePtr a)
newStablePtr

    -- Return the results generated by the worker threads lazily and in
    -- the same order as we got the inputs
    ([WorkerEvent Int a] -> [a]) -> IO [WorkerEvent Int a] -> IO [a]
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Int -> [(Int, a)] -> [a]
forall a. Int -> [(Int, a)] -> [a]
reorderFrom Int
0 ([(Int, a)] -> [a])
-> ([WorkerEvent Int a] -> [(Int, a)])
-> [WorkerEvent Int a]
-> [a]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> [WorkerEvent Int a] -> [(Int, a)]
forall token a. Int -> [WorkerEvent token a] -> [(token, a)]
takeWhileWorkersExist Int
n) (IO [WorkerEvent Int a] -> IO [a])
-> IO [WorkerEvent Int a] -> IO [a]
forall a b. (a -> b) -> a -> b
$ Chan (WorkerEvent Int a) -> IO [WorkerEvent Int a]
forall a. Chan a -> IO [a]
getChanContents Chan (WorkerEvent Int a)
output_chan

poolWorker :: Chan (WorkerEvent token (IO a)) -> Chan (WorkerEvent token a) -> IO ()
poolWorker :: forall token a.
Chan (WorkerEvent token (IO a))
-> Chan (WorkerEvent token a) -> IO ()
poolWorker Chan (WorkerEvent token (IO a))
input_chan Chan (WorkerEvent token a)
output_chan = do
    -- Read an action and work out whether we should continue or stop
    WorkerEvent token (IO a)
action_item <- Chan (WorkerEvent token (IO a)) -> IO (WorkerEvent token (IO a))
forall a. Chan a -> IO a
readChan Chan (WorkerEvent token (IO a))
input_chan
    case WorkerEvent token (IO a)
action_item of
        WorkerEvent token (IO a)
WorkerTermination -> Chan (WorkerEvent token a) -> WorkerEvent token a -> IO ()
forall a. Chan a -> a -> IO ()
writeChan Chan (WorkerEvent token a)
output_chan WorkerEvent token a
forall token a. WorkerEvent token a
WorkerTermination -- Must have run out of real actions to execute
        WorkerItem token
token IO a
action -> do
            -- Do the action then loop
            a
result <- IO a
action
            Chan (WorkerEvent token a) -> WorkerEvent token a -> IO ()
forall a. Chan a -> a -> IO ()
writeChan Chan (WorkerEvent token a)
output_chan (token -> a -> WorkerEvent token a
forall token a. token -> a -> WorkerEvent token a
WorkerItem token
token a
result)
            Chan (WorkerEvent token (IO a))
-> Chan (WorkerEvent token a) -> IO ()
forall token a.
Chan (WorkerEvent token (IO a))
-> Chan (WorkerEvent token a) -> IO ()
poolWorker Chan (WorkerEvent token (IO a))
input_chan Chan (WorkerEvent token a)
output_chan

-- | Keep grabbing items out of the infinite list of worker outputs until we have
-- received word that all of the workers have shut down.  This lets us turn a possibly
-- infinite list of outputs into a certainly finite one suitable for use with reorderFrom.
takeWhileWorkersExist :: Int -> [WorkerEvent token a] -> [(token, a)]
takeWhileWorkersExist :: forall token a. Int -> [WorkerEvent token a] -> [(token, a)]
takeWhileWorkersExist Int
worker_count [WorkerEvent token a]
events
  | Int
worker_count Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0 = []
  | Bool
otherwise         = case [WorkerEvent token a]
events of
                            (WorkerEvent token a
WorkerTermination:[WorkerEvent token a]
events')  -> Int -> [WorkerEvent token a] -> [(token, a)]
forall token a. Int -> [WorkerEvent token a] -> [(token, a)]
takeWhileWorkersExist (Int
worker_count Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) [WorkerEvent token a]
events'
                            (WorkerItem token
token a
x:[WorkerEvent token a]
events') -> (token
token, a
x) (token, a) -> [(token, a)] -> [(token, a)]
forall a. a -> [a] -> [a]
: Int -> [WorkerEvent token a] -> [(token, a)]
forall token a. Int -> [WorkerEvent token a] -> [(token, a)]
takeWhileWorkersExist Int
worker_count [WorkerEvent token a]
events'
                            []                           -> []

-- | This function carefully shuffles the input list so it in the total order
-- defined by the integers paired with the elements.  If the list is @xs@ and
-- the supplied initial integer is @n@, it must be the case that:
--
-- > sort (map fst xs) == [n..n + (length xs - 1)]
--
-- This function returns items in the lazy result list as soon as it is sure
-- it has the right item for that position.
reorderFrom :: Int -> [(Int, a)] -> [a]
reorderFrom :: forall a. Int -> [(Int, a)] -> [a]
reorderFrom Int
from [(Int, a)]
initial_things = Int -> [(Int, a)] -> IntMap a -> Bool -> [a]
forall {a}. Int -> [(Int, a)] -> IntMap a -> Bool -> [a]
go Int
from [(Int, a)]
initial_things IntMap a
forall a. IntMap a
IM.empty Bool
False
  where go :: Int -> [(Int, a)] -> IntMap a -> Bool -> [a]
go Int
next [] IntMap a
buf Bool
_
          | IntMap a -> Bool
forall a. IntMap a -> Bool
IM.null IntMap a
buf = []    -- If the buffer and input list is empty, we're done
          | Bool
otherwise   = Int -> [(Int, a)] -> IntMap a -> Bool -> [a]
go Int
next (IntMap a -> [(Int, a)]
forall a. IntMap a -> [(Int, a)]
IM.toList IntMap a
buf) IntMap a
forall a. IntMap a
IM.empty Bool
False    -- Make sure we check the buffer even if the list is done
        go Int
next all_things :: [(Int, a)]
all_things@((Int
token, a
x):[(Int, a)]
things) IntMap a
buf Bool
buf_useful
          | Int
token Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
next                       -- If the list token matches the one we were expecting we can just take the item
          = a
x a -> [a] -> [a]
forall a. a -> [a] -> [a]
: Int -> [(Int, a)] -> IntMap a -> Bool -> [a]
go (Int
next Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) [(Int, a)]
things IntMap a
buf Bool
True   -- Always worth checking the buffer now because the expected item has changed
          | Bool
buf_useful                                                              -- If it's worth checking the buffer, it's possible the token we need is in it
          , (Just a
x', IntMap a
buf') <- (Int -> a -> Maybe a) -> Int -> IntMap a -> (Maybe a, IntMap a)
forall a.
(Int -> a -> Maybe a) -> Int -> IntMap a -> (Maybe a, IntMap a)
IM.updateLookupWithKey ((a -> Maybe a) -> Int -> a -> Maybe a
forall a b. a -> b -> a
const ((a -> Maybe a) -> Int -> a -> Maybe a)
-> (a -> Maybe a) -> Int -> a -> Maybe a
forall a b. (a -> b) -> a -> b
$ Maybe a -> a -> Maybe a
forall a b. a -> b -> a
const Maybe a
forall a. Maybe a
Nothing) Int
next IntMap a
buf  -- Delete the found item from the map (if we find it) to save space
          = a
x' a -> [a] -> [a]
forall a. a -> [a] -> [a]
: Int -> [(Int, a)] -> IntMap a -> Bool -> [a]
go (Int
next Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) [(Int, a)]
all_things IntMap a
buf' Bool
True                                     -- Always worth checking the buffer now because the expected item has changed
          | Bool
otherwise                                       -- Token didn't match, buffer unhelpful: it must be in the tail of the list
          = Int -> [(Int, a)] -> IntMap a -> Bool -> [a]
go Int
next [(Int, a)]
things (Int -> a -> IntMap a -> IntMap a
forall a. Int -> a -> IntMap a -> IntMap a
IM.insert Int
token a
x IntMap a
buf) Bool
False    -- Since we've already checked the buffer, stop bothering to do so until something changes -}