module UnagiBounded(unagiBoundedMain) where

import Control.Concurrent.Chan.Unagi.Bounded
import qualified Control.Concurrent.Chan.Unagi.Bounded.Internal as UI
import Control.Monad
import qualified Data.Primitive as P
import Data.IORef

import Control.Concurrent(forkIO,threadDelay)
import Control.Concurrent.MVar
import Control.Exception
import Data.Atomics.Counter.Fat
import Control.Applicative

import Data.Maybe(isNothing)

import Prelude

unagiBoundedMain :: IO ()
unagiBoundedMain = do
    putStrLn "==================="
    putStrLn "Testing Unagi.Bounded details:"
    -- ------
    putStr "Smoke test at different starting offsets, spanning overflow... "
    forM_ [1,2,4,1024,  3,5,513] $ \segLen-> do
        -- ------
        mapM_ (overflowSanity segLen) $ 
            [ (maxBound - segLen - 1) .. maxBound] 
            ++ [minBound .. (minBound + segLen + 1)]
        -- ------
        newChanSanity segLen
    putStrLn "OK"
    -- ------
    putStr "Correct first write and read... "
    mapM_ correctFirstWriteRead [ maxBound - 5, maxBound - 4, maxBound - 3, maxBound, minBound, 0]
    putStrLn "OK"
    -- ------
    let tries = 10000
    putStrLn $ "Checking for deadlocks from killed Unagi reader in a fancy way, x"++show tries
    checkDeadlocksReaderUnagiBounded tries
    -- ------
    putStr "Test bounds blocking... "
    forM_ [(1::Int),2,4,8] $ \n->
        testBoundsBlocking (2^n)
    putStrLn "OK"
    -- ------
    putStr "Test behavior of tryWriteChan... "
    forM_ [(1::Int),2,4,8] $ \n-> do
        tryWriteChanSmoke (2^n)
        tryWriteChanConcurrent (2^n)
    putStrLn "OK"
    -- ------
    putStrLn "Testing Unagi.Bounded components:"
    -- ------
    putStr "    Checkpoint tests... "
    checkpointTest1 100000000
    checkpointTest2 100000000
    putStrLn "OK"


-- TODO NOTE WHEN WE FACTOR THIS OUT OF UNAGI*-SPECIFIC TESTS, make function closed over sEGMENT_LENGTH as we do here.
-- NOTE: FORMERLY smoke2
--
-- w/r/w/r... spanning overflow
overflowSanity :: Int -> Int -> IO ()
overflowSanity segLen n = do
    (i,o) <- UI.newChanStarting n segLen
    let inp = [0 .. (segLen * 3)]
    mapM_ (check i o) inp
 where check i o x = do
         writeChan i x
         x' <- readChan o
         unless (x == x') $
            error $ "Smoke test failed with starting offset of: "++(show n)++"at write: "++(show x)

-- exercise power-of-two rounding, and make sure array sizes and recorded
-- bounds match up
newChanSanity :: Int -> IO ()
newChanSanity bnds = do
    (UI.InChan _ _ inCE, UI.OutChan outCE) <- newChan bnds
    check inCE
    check outCE
    checkSame inCE outCE
  where checkSame (UI.ChanEnd x y _ _ _) (UI.ChanEnd x' y' _ _ _) =
           unless (x == x' && y == y') $
             error $ "newChanSanity: "++(show (x,x',y,y'))

        check (UI.ChanEnd logBounds boundsMn1 _segSource _ _) = do
           let storedBoundsSane = (2^logBounds) == (boundsMn1 + 1)
           lengthEqualsBounds <- return True -- TODO get and check length from segSource arr, possible?
           unless (storedBoundsSane && lengthEqualsBounds) $
            error $ "newChanSanity: PLORT!"


-- Basic first write sanity checking and writer unblocking mechanics
correctFirstWriteRead :: Int -> IO ()
correctFirstWriteRead n = do
    let size = 4
    (i, o@(UI.OutChan (UI.ChanEnd _logBounds _boundsMn1 _segSource _cntr strHeadRef))) <- UI.newChanStarting n size

    writeChan i ()

    (UI.StreamHead offset0 (UI.Stream arr next)) <- readIORef strHeadRef
    cell <- P.readArray arr 0
    case cell of
         UI.Written () -> return ()
         _ -> error "Expected a Write at index 0"
    unless (n == offset0)$
        error $ "offset0 /= "++(show n)++", instead == "++(show offset0)

    -- The next segment should not be set up yet:
    noSegment <- readIORef next
    unless (isNothing noSegment) $
        error "Next segment should not be set up yet!"

    -- First read should set up next segment for unblocked writers.
    () <- readChan o `onException` putStrLn "Read of first and only value failed!"
    nextSeg <- readIORef next
    case nextSeg of
         Nothing -> error "Next should have been set up after first read!"
         Just (UI.NextByReader (UI.Stream _arr' next')) -> do
            noSegment' <- readIORef next'
            unless (isNothing noSegment') $
                error "Next of next segment should not be set up yet!"
         _ -> error "Next marked installed by writer!"



-- test for deadlocks caused by async exceptions in reader.
checkDeadlocksReaderUnagiBounded :: Int -> IO ()
checkDeadlocksReaderUnagiBounded times = do
  let run 0 normalRetries numRace = putStrLn $ "Lates: "++(show normalRetries)++", Races: "++(show numRace)
      run n normalRetries numRace
       | (normalRetries + numRace) > (times `div` 3) = error "This test is taking too long. Please retry, and if still failing send the log to me"
       | otherwise = do
         -- we'll kill the reader with our special exception half the time,
         -- expecting that we never get our race condition on those runs:
         let usingReadChanOnException = even n

         let numPreloaded = 10000
         (i,o) <- UI.newChanStarting 0 $ numPreloaded+2
         -- preload a chan with 0s
         replicateM_ numPreloaded $ writeChan i (0::Int)

         rStart <- newEmptyMVar
         saved <- newEmptyMVar -- for holding potential saved (IO a) from blocked readChanOnException
         rid <- forkIO $ (\rd-> putMVar rStart () >> (forever $ void $ rd o)) $
                    if usingReadChanOnException 
                        then flip readChanOnException ( putMVar saved )
                        else readChan
         takeMVar rStart >> threadDelay 1
         throwTo rid ThreadKilled

         -- did killing reader damage queue for reads or writes?
         writeChan i 1 `onException` ( putStrLn "Exception from first writeChan!")
         writeChan i 2 `onException` ( putStrLn "Exception from second writeChan!")
         finalRead <- readChan o `onException` ( putStrLn "Exception from final readChan!")
         
         oCnt <- readCounter $ (\(UI.OutChan(UI.ChanEnd _ _ _ cntr _))-> cntr) o
         iCnt <- readCounter $ (\(UI.InChan _ _ (UI.ChanEnd _ _ _ cntr _))-> cntr) i
         unless (iCnt == numPreloaded + 2) $ 
            error "The InChan counter doesn't match what we'd expect from numPreloaded!"

         case finalRead of
              0 -> if oCnt <= 0 -- (technically, -1 means hasn't started yet)
                      -- reader didn't have a chance to get started
                     then putStr "0" >> run n (normalRetries+1) numRace
                      -- normal run; we tested that killing a reader didn't
                      -- break chan for other readers/writers:
                     else putStr "." >> run (n-1) normalRetries numRace
              --
              -- Rare. Reader was killed after reading all pre-loaded messages
              -- but before starting what would be the blocking read:
              1 | oCnt == numPreloaded+1 -> putStr "X" >> run n normalRetries (numRace + 1)
                | otherwise -> error $ "Having read final 1, "++
                                       "Expecting a counter value of "++(show $ numPreloaded+1)++
                                       " but got: "++(show oCnt)
              2 -> do when usingReadChanOnException $ do
                        shouldBe1 <- join $ takeMVar saved
                        unless (shouldBe1 == 1) $
                          error "The handler for our readChanOnException should only have returned 1"
                      unless (oCnt == numPreloaded + 2) $
                        error $ "Having read final 2, "++
                                "Expecting a counter value of "++(show $ numPreloaded+2)++
                                " but got: "++(show oCnt)
                      putStr "+" >> run n (normalRetries + 1) numRace

              _ -> error "Fix your #$%@ test!"

  run times 0 0
  putStrLn ""

-- idempotent & non-conflicting puts/takes
checkpointTest1 :: Int -> IO ()
checkpointTest1 n = do
    x <- newEmptyMVar
    checkpt <- UI.WriterCheckpoint <$> newEmptyMVar
    -- check writerCheckin and tryWriterCheckin concurrently against idempotent unblockWriters
    void $ forkIO ((replicateM_ n $ (UI.writerCheckin checkpt >> UI.tryWriterCheckin checkpt)) >> putMVar x ())
    threadDelay 10000
    replicateM_ n $ UI.unblockWriters checkpt
    takeMVar x `onException` putStrLn "checkpointTest1: Forked writerCheckin deadlocked!"

-- No deadlocks in read:
checkpointTest2 :: Int -> IO ()
checkpointTest2 n = do
    x <- newEmptyMVar
    checkpt <- UI.WriterCheckpoint <$> newEmptyMVar
    -- check writerCheckin and tryWriterCheckin concurrently against writerCheckin
    void $ forkIO ((replicateM_ n $ (UI.writerCheckin checkpt >> UI.tryWriterCheckin checkpt)) >> putMVar x ())
    threadDelay 10000
    UI.unblockWriters checkpt
    replicateM_ n $ UI.writerCheckin checkpt
    takeMVar x `onException` putStrLn "checkpointTest2: Forked writerCheckin deadlocked!"


-- Test that our bounding works as expected
testBoundsBlocking :: Int -> IO ()
testBoundsBlocking bnds = do
    v <- newEmptyMVar
    (inC,outC) <- newChan bnds
    -- make sure none of these block.
    replicateM_ bnds $ writeChan inC ()
    -- but this blocks
    void $ forkIO $ writeChan inC () >> putMVar v ()
    threadDelay 100000
    noth <- tryTakeMVar v
    unless (noth == Nothing) $
        error "bnds+1 writeChan should have blocked!"
    -- ...until:
    readChan outC
    takeMVar v `onException` putStrLn "Read should have unblocked bnds+1 writer"
    -- Now we fork one which work together to fill remaining bnds-1 slots, without blocking:
    (replicateM_ (bnds-1) $ writeChan inC ())
        `onException` putStrLn "Writes should not have blocked in second segment"
    -- now we're at 2x bounds.
    -- This next, again, should block:
    v2 <- newEmptyMVar 
    void $ forkIO $ writeChan inC () >> putMVar v2 ()
    threadDelay 100000
    noth2 <- tryTakeMVar v2
    unless (noth2 == Nothing) $
        error "bnds*2+1 writeChan should have blocked!"
    -- and unblock as soon as (but no sooner than after bnds reads):
    replicateM_ (bnds-1) $ readChan outC
    threadDelay 100000
    noth3 <- tryTakeMVar v2
    unless (noth3 == Nothing) $
        error "bnds*2+1 writeChan should still have been blocked!"
    -- now this should unblock:
    readChan outC
    takeMVar v2 `onException` putStrLn "Read should have unblocked bnds*2+1 writer"


{- OLD AND NO-LONGER RELEVANT
-- A fork of the above, for tryWriteChan
-- TODO these coule be more thoughtful/thorough
testTryWriteChan :: Int -> IO ()
testTryWriteChan bnds = do
    (inC,outC) <- newChan bnds
    -- make sure none of these block.
    trues <- replicateM bnds $ tryWriteChan inC ()
    unless (and trues) $
        error "Some of the first writes failed!"
    -- but this ought to fail:
    success1 <- tryWriteChan inC () `onException` putStrLn "Our tryTakeMVar seems to have blocked instead of returning False!"
    when success1 $
        error "bnds+1 tryWriteChan should have failed!"
    -- ...until we read:
    readChan outC
    -- then this should succeed:
    success2 <- tryWriteChan inC () `onException` putStrLn "Our tryTakeMVar number 2 seems to have blocked instead of returning true!"
    unless success2 $
        error "bnds+1 tryWriteChan should have succeeded this time!"
    --
    -- Now we fork one which work together to fill remaining bnds-1 slots, without blocking:
    successes2 <- replicateM (bnds-1) $ tryWriteChan inC ()
        `onException` putStrLn "tryWrites should not have blocked in second segment"
    unless (and successes2) $
        error "failures seen in succcesses2!"
    -- now we're at 2x bounds.
    -- This next, again, should block:
    success3 <- tryWriteChan inC () `onException` putStrLn "success3: Our tryTakeMVar seems to have blocked instead of returning False!"
    when success3 $
        error "bnds*2+1 tryWriteChan should have failed!"
    -- and unblock as soon as (but no sooner than after bnds reads):
    replicateM_ (bnds-1) $ readChan outC
    success4 <- tryWriteChan inC () `onException` putStrLn "success4: Our tryTakeMVar seems to have blocked instead of returning False!"
    when success4 $
        error "bnds*2+1 writeChan should still have failed!"
    -- now this should unblock:
    readChan outC
    success5 <- tryWriteChan inC () `onException` putStrLn "success5: Our tryTakeMVar seems to have blocked instead of returning False!"
    unless success5 $
        error "success5: should have succeeded!"

    -- Now fork a couple writers and make sure none block:
    vs <- replicateM 2 newEmptyMVar 
    forM_ vs $ \v-> forkIO $ do
        replicateM_ (5*bnds) $
            tryWriteChan inC ()
        putMVar v ()
    forM_ vs $ \v-> 
        takeMVar v `onException` putStrLn "A tryWriteChan seems to have blocked!"
-}
tryWriteChanSmoke :: Int -> IO ()
tryWriteChanSmoke bnds = do
    (inC,outC) <- newChan bnds
    -- make sure none of these block.
    trues <- replicateM bnds $ tryWriteChan inC ()
    unless (and trues) $
        error "Some of the first writes failed!"
    -- but this ought to fail:
    success1 <- tryWriteChan inC () `onException` putStrLn "Our tryTakeMVar seems to have blocked instead of returning False!"
    when success1 $
        error "bnds+1 tryWriteChan should have failed!"
    -- ...until we read:
    readChan outC
    -- then this should succeed:
    success2 <- tryWriteChan inC () `onException` putStrLn "Our tryTakeMVar number 2 seems to have blocked instead of returning true!"
    unless success2 $
        error "bnds+1 tryWriteChan should have succeeded this time!"
    -- And the next should fail again.
    success3 <- tryWriteChan inC () `onException` putStrLn "Our tryTakeMVar number 3 seems to have blocked instead of returning true!"
    when success3 $
        error "bnds+2 tryWriteChan should have failed!"
    readChan outC
    success4 <- tryWriteChan inC () `onException` putStrLn "Our tryTakeMVar number 4 seems to have blocked instead of returning true!"
    unless success4 $
        error "bnds+3 tryWriteChan should have succeeded this time!"

-- Now do some concurrency tests, forking readers and writers that retry until
-- they can fill their quota
tryWriteChanConcurrent :: Int -> IO ()
tryWriteChanConcurrent bnds = do
    (inC,outC) <- newChan bnds
    let n = 1000000
        writer :: Int -> Int -> MVar Int -> IO ()
        writer cnt failed v
            | cnt > 0 = do
                success <- tryWriteChan inC ()
                if success 
                    then writer (cnt-1) failed v
                    else writer cnt (failed+1) v -- or yield here
            | otherwise = putMVar v failed
    v1 <- newEmptyMVar
    -- Test with truly concurrent reader and writer:
    void $ forkIO $ writer n 0 v1
    replicateM_ n $ readChan outC
    _failed0 <- takeMVar v1
    -- print _failed0

    -- And now with hopefully some concurrent writers:
    v2 <- newEmptyMVar
    void $ forkIO $ writer n 0 v1
    void $ forkIO $ writer n 0 v2
    void $ forkIO $ replicateM_ (n*2) $ readChan outC
    _failed1 <- takeMVar v1
    _failed2 <- takeMVar v2
    -- print _failed1
    -- print _failed2
    return ()