{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE CPP #-}
import Test.Hspec
import Test.Hspec.QuickCheck (prop)
import Test.QuickCheck.Monadic (assert, monadicIO, run)

import Control.Exception (IOException)
import qualified Data.Conduit as C
import qualified Data.Conduit.Lift as C
import qualified Data.Conduit.Internal as CI
import qualified Data.Conduit.List as CL
import Control.Monad.Trans.Resource as C (runExceptionT, runResourceT)
import Data.Maybe   (fromMaybe,catMaybes,fromJust)
import qualified Data.List as DL
import Control.Monad.ST (runST)
import Data.Monoid
import qualified Data.ByteString as S
import qualified Data.ByteString.Char8 as S8
import qualified Data.IORef as I
import qualified Data.ByteString.Lazy as L
import Data.ByteString.Lazy.Char8 ()
import qualified Data.Text as T
import qualified Data.Text.Lazy as TL
import qualified Data.Text.Lazy.Encoding as TLE
import Control.Monad.Trans.Resource (runExceptionT, runExceptionT_, allocate, resourceForkIO)
import Control.Concurrent (threadDelay, killThread)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Trans.Class (lift)
import Control.Monad.Trans.Writer (execWriter, tell, runWriterT)
import Control.Monad.Trans.State (evalStateT, get, put, modify)
import Control.Monad.Trans.Maybe (MaybeT (..))
import Control.Applicative (pure, (<$>), (<*>))
import Data.Functor.Identity (Identity,runIdentity)
import Control.Monad (forever, void)
import Data.Void (Void)
import qualified Control.Concurrent.MVar as M
import Control.Monad.Error (catchError, throwError, Error)
import qualified Data.Map as Map
import Control.Arrow (first)
import qualified Data.Conduit.Extra.ZipConduitSpec as ZipConduit

(@=?) :: (Eq a, Show a) => a -> a -> IO ()
(@=?) = flip shouldBe

-- Quickcheck property for testing equivalence of list processing
-- functions and their conduit counterparts
equivToList :: Eq b => ([a] -> [b]) -> CI.Conduit a Identity b -> [a] -> Bool
equivToList f conduit xs =
  f xs == runIdentity (CL.sourceList xs C.$$ conduit C.=$= CL.consume)


main :: IO ()
main = hspec $ do
    describe "data loss rules" $ do
        it "consumes the source to quickly" $ do
            x <- runResourceT $ CL.sourceList [1..10 :: Int] C.$$ do
                  strings <- CL.map show C.=$ CL.take 5
                  liftIO $ putStr $ unlines strings
                  CL.fold (+) 0
            40 `shouldBe` x

        it "correctly consumes a chunked resource" $ do
            x <- runResourceT $ (CL.sourceList [1..5 :: Int] `mappend` CL.sourceList [6..10]) C.$$ do
                strings <- CL.map show C.=$ CL.take 5
                liftIO $ putStr $ unlines strings
                CL.fold (+) 0
            40 `shouldBe` x

    describe "filter" $ do
        it "even" $ do
            x <- runResourceT $ CL.sourceList [1..10] C.$$ CL.filter even C.=$ CL.consume
            x `shouldBe` filter even [1..10 :: Int]

    prop "concat" $ equivToList (concat :: [[Int]]->[Int]) CL.concat

    describe "mapFoldable" $ do
        prop "list" $
            equivToList (concatMap (:[]) :: [Int]->[Int]) (CL.mapFoldable  (:[]))
        let f x = if odd x then Just x else Nothing
        prop "Maybe" $
            equivToList (catMaybes . map f :: [Int]->[Int]) (CL.mapFoldable f)

    prop "scanl" $ equivToList (tail . scanl (+) 0 :: [Int]->[Int]) (CL.scanl (\a s -> (a+s,a+s)) 0)

    -- mapFoldableM and scanlM are fully polymorphic in type of monad
    -- so it suffice to check only with Identity.
    describe "mapFoldableM" $ do
        prop "list" $
            equivToList (concatMap (:[]) :: [Int]->[Int]) (CL.mapFoldableM (return . (:[])))
        let f x = if odd x then Just x else Nothing
        prop "Maybe" $
            equivToList (catMaybes . map f :: [Int]->[Int]) (CL.mapFoldableM (return . f))

    prop "scanl" $ equivToList (tail . scanl (+) 0 :: [Int]->[Int]) (CL.scanlM (\a s -> return (a+s,a+s)) 0)

    describe "ResourceT" $ do
        it "resourceForkIO" $ do
            counter <- I.newIORef 0
            let w = allocate
                        (I.atomicModifyIORef counter $ \i ->
                            (i + 1, ()))
                        (const $ I.atomicModifyIORef counter $ \i ->
                            (i - 1, ()))
            runResourceT $ do
                _ <- w
                _ <- resourceForkIO $ return ()
                _ <- resourceForkIO $ return ()
                sequence_ $ replicate 1000 $ do
                    tid <- resourceForkIO $ return ()
                    liftIO $ killThread tid
                _ <- resourceForkIO $ return ()
                _ <- resourceForkIO $ return ()
                return ()

            -- give enough of a chance to the cleanup code to finish
            threadDelay 1000
            res <- I.readIORef counter
            res `shouldBe` (0 :: Int)

    describe "sum" $ do
        it "works for 1..10" $ do
            x <- runResourceT $ CL.sourceList [1..10] C.$$ CL.fold (+) (0 :: Int)
            x `shouldBe` sum [1..10]
        prop "is idempotent" $ \list ->
            (runST $ CL.sourceList list C.$$ CL.fold (+) (0 :: Int))
            == sum list

    describe "foldMap" $ do
        it "sums 1..10" $ do
            Sum x <- CL.sourceList [1..(10 :: Int)] C.$$ CL.foldMap Sum
            x `shouldBe` sum [1..10]

        it "preserves order" $ do
            x <- CL.sourceList [[4],[2],[3],[1]] C.$$ CL.foldMap (++[(9 :: Int)])
            x `shouldBe` [4,9,2,9,3,9,1,9]

    describe "foldMapM" $ do
        it "sums 1..10" $ do
            Sum x <- CL.sourceList [1..(10 :: Int)] C.$$ CL.foldMapM (return . Sum)
            x `shouldBe` sum [1..10]

        it "preserves order" $ do
            x <- CL.sourceList [[4],[2],[3],[1]] C.$$ CL.foldMapM (return . (++[(9 :: Int)]))
            x `shouldBe` [4,9,2,9,3,9,1,9]

    describe "unfold" $ do
        it "works" $ do
            let f 0 = Nothing
                f i = Just (show i, i - 1)
                seed = 10 :: Int
            x <- CL.unfold f seed C.$$ CL.consume
            let y = DL.unfoldr f seed
            x `shouldBe` y

    describe "Monoid instance for Source" $ do
        it "mappend" $ do
            x <- runResourceT $ (CL.sourceList [1..5 :: Int] `mappend` CL.sourceList [6..10]) C.$$ CL.fold (+) 0
            x `shouldBe` sum [1..10]
        it "mconcat" $ do
            x <- runResourceT $ mconcat
                [ CL.sourceList [1..5 :: Int]
                , CL.sourceList [6..10]
                , CL.sourceList [11..20]
                ] C.$$ CL.fold (+) 0
            x `shouldBe` sum [1..20]

    describe "zipping" $ do
        it "zipping two small lists" $ do
            res <- runResourceT $ CI.zipSources (CL.sourceList [1..10]) (CL.sourceList [11..12]) C.$$ CL.consume
            res @=? zip [1..10 :: Int] [11..12 :: Int]

    describe "zipping sinks" $ do
        it "take all" $ do
            res <- runResourceT $ CL.sourceList [1..10] C.$$ CI.zipSinks CL.consume CL.consume
            res @=? ([1..10 :: Int], [1..10 :: Int])
        it "take fewer on left" $ do
            res <- runResourceT $ CL.sourceList [1..10] C.$$ CI.zipSinks (CL.take 4) CL.consume
            res @=? ([1..4 :: Int], [1..10 :: Int])
        it "take fewer on right" $ do
            res <- runResourceT $ CL.sourceList [1..10] C.$$ CI.zipSinks CL.consume (CL.take 4)
            res @=? ([1..10 :: Int], [1..4 :: Int])

    describe "Monad instance for Sink" $ do
        it "binding" $ do
            x <- runResourceT $ CL.sourceList [1..10] C.$$ do
                _ <- CL.take 5
                CL.fold (+) (0 :: Int)
            x `shouldBe` sum [6..10]

    describe "Applicative instance for Sink" $ do
        it "<$> and <*>" $ do
            x <- runResourceT $ CL.sourceList [1..10] C.$$
                (+) <$> pure 5 <*> CL.fold (+) (0 :: Int)
            x `shouldBe` sum [1..10] + 5

    describe "resumable sources" $ do
        it "simple" $ do
            (x, y, z) <- runResourceT $ do
                let src1 = CL.sourceList [1..10 :: Int]
                (src2, x) <- src1 C.$$+ CL.take 5
                (src3, y) <- src2 C.$$++ CL.fold (+) 0
                z <- src3 C.$$+- CL.consume
                return (x, y, z)
            x `shouldBe` [1..5] :: IO ()
            y `shouldBe` sum [6..10]
            z `shouldBe` []

    describe "conduits" $ do
        it "map, left" $ do
            x <- runResourceT $
                CL.sourceList [1..10]
                    C.$= CL.map (* 2)
                    C.$$ CL.fold (+) 0
            x `shouldBe` 2 * sum [1..10 :: Int]

        it "map, left >+>" $ do
            x <- runResourceT $
                CI.ConduitM
                    (CI.unConduitM (CL.sourceList [1..10])
                    CI.>+> CI.injectLeftovers (CI.unConduitM $ CL.map (* 2)))
                    C.$$ CL.fold (+) 0
            x `shouldBe` 2 * sum [1..10 :: Int]

        it "map, right" $ do
            x <- runResourceT $
                CL.sourceList [1..10]
                    C.$$ CL.map (* 2)
                    C.=$ CL.fold (+) 0
            x `shouldBe` 2 * sum [1..10 :: Int]

        it "groupBy" $ do
            let input = [1::Int, 1, 2, 3, 3, 3, 4, 5, 5]
            x <- runResourceT $ CL.sourceList input
                    C.$$ CL.groupBy (==)
                    C.=$ CL.consume
            x `shouldBe` DL.groupBy (==) input

        it "groupBy (nondup begin/end)" $ do
            let input = [1::Int, 2, 3, 3, 3, 4, 5]
            x <- runResourceT $ CL.sourceList input
                    C.$$ CL.groupBy (==)
                    C.=$ CL.consume
            x `shouldBe` DL.groupBy (==) input

        it "mapMaybe" $ do
            let input = [Just (1::Int), Nothing, Just 2, Nothing, Just 3]
            x <- runResourceT $ CL.sourceList input
                    C.$$ CL.mapMaybe ((+2) <$>)
                    C.=$ CL.consume
            x `shouldBe` [3, 4, 5]

        it "mapMaybeM" $ do
            let input = [Just (1::Int), Nothing, Just 2, Nothing, Just 3]
            x <- runResourceT $ CL.sourceList input
                    C.$$ CL.mapMaybeM (return . ((+2) <$>))
                    C.=$ CL.consume
            x `shouldBe` [3, 4, 5]

        it "catMaybes" $ do
            let input = [Just (1::Int), Nothing, Just 2, Nothing, Just 3]
            x <- runResourceT $ CL.sourceList input
                    C.$$ CL.catMaybes
                    C.=$ CL.consume
            x `shouldBe` [1, 2, 3]

        it "concatMap" $ do
            let input = [1, 11, 21]
            x <- runResourceT $ CL.sourceList input
                    C.$$ CL.concatMap (\i -> enumFromTo i (i + 9))
                    C.=$ CL.fold (+) (0 :: Int)
            x `shouldBe` sum [1..30]

        it "bind together" $ do
            let conduit = CL.map (+ 5) C.=$= CL.map (* 2)
            x <- runResourceT $ CL.sourceList [1..10] C.$= conduit C.$$ CL.fold (+) 0
            x `shouldBe` sum (map (* 2) $ map (+ 5) [1..10 :: Int])

#if !FAST
    describe "isolate" $ do
        it "bound to resumable source" $ do
            (x, y) <- runResourceT $ do
                let src1 = CL.sourceList [1..10 :: Int]
                (src2, x) <- src1 C.$= CL.isolate 5 C.$$+ CL.consume
                y <- src2 C.$$+- CL.consume
                return (x, y)
            x `shouldBe` [1..5]
            y `shouldBe` []

        it "bound to sink, non-resumable" $ do
            (x, y) <- runResourceT $ do
                CL.sourceList [1..10 :: Int] C.$$ do
                    x <- CL.isolate 5 C.=$ CL.consume
                    y <- CL.consume
                    return (x, y)
            x `shouldBe` [1..5]
            y `shouldBe` [6..10]

        it "bound to sink, resumable" $ do
            (x, y) <- runResourceT $ do
                let src1 = CL.sourceList [1..10 :: Int]
                (src2, x) <- src1 C.$$+ CL.isolate 5 C.=$ CL.consume
                y <- src2 C.$$+- CL.consume
                return (x, y)
            x `shouldBe` [1..5]
            y `shouldBe` [6..10]

        it "consumes all data" $ do
            x <- runResourceT $ CL.sourceList [1..10 :: Int] C.$$ do
                CL.isolate 5 C.=$ CL.sinkNull
                CL.consume
            x `shouldBe` [6..10]

    describe "sequence" $ do
        it "simple sink" $ do
            let sumSink = do
                    ma <- CL.head
                    case ma of
                        Nothing -> return 0
                        Just a  -> (+a) . fromMaybe 0 <$> CL.head

            res <- runResourceT $ CL.sourceList [1..11 :: Int]
                             C.$= CL.sequence sumSink
                             C.$$ CL.consume
            res `shouldBe` [3, 7, 11, 15, 19, 11]

        it "sink with unpull behaviour" $ do
            let sumSink = do
                    ma <- CL.head
                    case ma of
                        Nothing -> return 0
                        Just a  -> (+a) . fromMaybe 0 <$> CL.peek

            res <- runResourceT $ CL.sourceList [1..11 :: Int]
                             C.$= CL.sequence sumSink
                             C.$$ CL.consume
            res `shouldBe` [3, 5, 7, 9, 11, 13, 15, 17, 19, 21, 11]

#endif

    describe "peek" $ do
        it "works" $ do
            (a, b) <- runResourceT $ CL.sourceList [1..10 :: Int] C.$$ do
                a <- CL.peek
                b <- CL.consume
                return (a, b)
            (a, b) `shouldBe` (Just 1, [1..10])

    describe "unbuffering" $ do
        it "works" $ do
            x <- runResourceT $ do
                let src1 = CL.sourceList [1..10 :: Int]
                (src2, ()) <- src1 C.$$+ CL.drop 5
                src2 C.$$+- CL.fold (+) 0
            x `shouldBe` sum [6..10]

    describe "operators" $ do
        it "only use =$=" $
            runIdentity
            (    CL.sourceList [1..10 :: Int]
              C.$$ CL.map (+ 1)
             C.=$  CL.map (subtract 1)
             C.=$  CL.mapM (return . (* 2))
             C.=$  CL.map (`div` 2)
             C.=$  CL.fold (+) 0
            ) `shouldBe` sum [1..10]
        it "only use =$" $
            runIdentity
            (    CL.sourceList [1..10 :: Int]
              C.$$ CL.map (+ 1)
              C.=$ CL.map (subtract 1)
              C.=$ CL.map (* 2)
              C.=$ CL.map (`div` 2)
              C.=$ CL.fold (+) 0
            ) `shouldBe` sum [1..10]
        it "chain" $ do
            x <-      CL.sourceList [1..10 :: Int]
                C.$=  CL.map (+ 1)
                C.$= CL.map (+ 1)
                C.$=  CL.map (+ 1)
                C.$= CL.map (subtract 3)
                C.$= CL.map (* 2)
                C.$$  CL.map (`div` 2)
                C.=$  CL.map (+ 1)
                C.=$  CL.map (+ 1)
                C.=$  CL.map (+ 1)
                C.=$  CL.map (subtract 3)
                C.=$  CL.fold (+) 0
            x `shouldBe` sum [1..10]


    describe "termination" $ do
        it "terminates early" $ do
            let src = forever $ C.yield ()
            x <- src C.$$ CL.head
            x `shouldBe` Just ()
        it "bracket" $ do
            ref <- I.newIORef (0 :: Int)
            let src = C.bracketP
                    (I.modifyIORef ref (+ 1))
                    (\() -> I.modifyIORef ref (+ 2))
                    (\() -> forever $ C.yield (1 :: Int))
            val <- C.runResourceT $ src C.$$ CL.isolate 10 C.=$ CL.fold (+) 0
            val `shouldBe` 10
            i <- I.readIORef ref
            i `shouldBe` 3
        it "bracket skipped if not needed" $ do
            ref <- I.newIORef (0 :: Int)
            let src = C.bracketP
                    (I.modifyIORef ref (+ 1))
                    (\() -> I.modifyIORef ref (+ 2))
                    (\() -> forever $ C.yield (1 :: Int))
                src' = CL.sourceList $ repeat 1
            val <- C.runResourceT $ (src' >> src) C.$$ CL.isolate 10 C.=$ CL.fold (+) 0
            val `shouldBe` 10
            i <- I.readIORef ref
            i `shouldBe` 0
        it "bracket + toPipe" $ do
            ref <- I.newIORef (0 :: Int)
            let src = C.bracketP
                    (I.modifyIORef ref (+ 1))
                    (\() -> I.modifyIORef ref (+ 2))
                    (\() -> forever $ C.yield (1 :: Int))
            val <- C.runResourceT $ src C.$$ CL.isolate 10 C.=$ CL.fold (+) 0
            val `shouldBe` 10
            i <- I.readIORef ref
            i `shouldBe` 3
        it "bracket skipped if not needed" $ do
            ref <- I.newIORef (0 :: Int)
            let src = C.bracketP
                    (I.modifyIORef ref (+ 1))
                    (\() -> I.modifyIORef ref (+ 2))
                    (\() -> forever $ C.yield (1 :: Int))
                src' = CL.sourceList $ repeat 1
            val <- C.runResourceT $ (src' >> src) C.$$ CL.isolate 10 C.=$ CL.fold (+) 0
            val `shouldBe` 10
            i <- I.readIORef ref
            i `shouldBe` 0

    describe "invariant violations" $ do
        it "leftovers without input" $ do
            ref <- I.newIORef []
            let add x = I.modifyIORef ref (x:)
                adder' = CI.NeedInput (\a -> liftIO (add a) >> adder') return
                adder = CI.ConduitM adder'
                residue x = CI.ConduitM $ CI.Leftover (CI.Done ()) x

            _ <- C.yield 1 C.$$ adder
            x <- I.readIORef ref
            x `shouldBe` [1 :: Int]
            I.writeIORef ref []

            _ <- C.yield 1 C.$$ (residue 2 >> residue 3) >> adder
            y <- I.readIORef ref
            y `shouldBe` [1, 2, 3]
            I.writeIORef ref []

            _ <- C.yield 1 C.$$ residue 2 >> (residue 3 >> adder)
            z <- I.readIORef ref
            z `shouldBe` [1, 2, 3]
            I.writeIORef ref []

    describe "sane yield/await'" $ do
        it' "yield terminates" $ do
            let is = [1..10] ++ undefined
                src [] = return ()
                src (x:xs) = C.yield x >> src xs
            x <- src is C.$$ CL.take 10
            x `shouldBe` [1..10 :: Int]
        it' "yield terminates (2)" $ do
            let is = [1..10] ++ undefined
            x <- mapM_ C.yield is C.$$ CL.take 10
            x `shouldBe` [1..10 :: Int]
        it' "yieldOr finalizer called" $ do
            iref <- I.newIORef (0 :: Int)
            let src = mapM_ (\i -> C.yieldOr i $ I.writeIORef iref i) [1..]
            src C.$$ CL.isolate 10 C.=$ CL.sinkNull
            x <- I.readIORef iref
            x `shouldBe` 10

    describe "upstream results" $ do
        it' "works" $ do
            let foldUp :: (b -> a -> b) -> b -> CI.Pipe l a Void u IO (u, b)
                foldUp f b = CI.awaitE >>= either (\u -> return (u, b)) (\a -> let b' = f b a in b' `seq` foldUp f b')
                passFold :: (b -> a -> b) -> b -> CI.Pipe l a a () IO b
                passFold f b = CI.await >>= maybe (return b) (\a -> let b' = f b a in b' `seq` CI.yield a >> passFold f b')
            (x, y) <- CI.runPipe $ mapM_ CI.yield [1..10 :: Int] CI.>+> passFold (+) 0 CI.>+>  foldUp (*) 1
            (x, y) `shouldBe` (sum [1..10], product [1..10])

    describe "input/output mapping" $ do
        it' "mapOutput" $ do
            x <- C.mapOutput (+ 1) (CL.sourceList [1..10 :: Int]) C.$$ CL.fold (+) 0
            x `shouldBe` sum [2..11]
        it' "mapOutputMaybe" $ do
            x <- C.mapOutputMaybe (\i -> if even i then Just i else Nothing) (CL.sourceList [1..10 :: Int]) C.$$ CL.fold (+) 0
            x `shouldBe` sum [2, 4..10]
        it' "mapInput" $ do
            xyz <- (CL.sourceList $ map show [1..10 :: Int]) C.$$ do
                (x, y) <- C.mapInput read (Just . show) $ ((do
                    x <- CL.isolate 5 C.=$ CL.fold (+) 0
                    y <- CL.peek
                    return (x :: Int, y :: Maybe Int)) :: C.Sink Int IO (Int, Maybe Int))
                z <- CL.consume
                return (x, y, concat z)

            xyz `shouldBe` (sum [1..5], Just 6, "678910")

    describe "left/right identity" $ do
        it' "left identity" $ do
            x <- CL.sourceList [1..10 :: Int] C.$$ CI.ConduitM CI.idP C.=$ CL.fold (+) 0
            y <- CL.sourceList [1..10 :: Int] C.$$ CL.fold (+) 0
            x `shouldBe` y
        it' "right identity" $ do
            x <- CI.runPipe $ mapM_ CI.yield [1..10 :: Int] CI.>+> (CI.injectLeftovers $ CI.unConduitM $ CL.fold (+) 0) CI.>+> CI.idP
            y <- CI.runPipe $ mapM_ CI.yield [1..10 :: Int] CI.>+> (CI.injectLeftovers $ CI.unConduitM $ CL.fold (+) 0)
            x `shouldBe` y

    describe "generalizing" $ do
        it' "works" $ do
            x <-     CI.runPipe
                   $ CI.sourceToPipe  (CL.sourceList [1..10 :: Int])
               CI.>+> CI.conduitToPipe (CL.map (+ 1))
               CI.>+> CI.sinkToPipe    (CL.fold (+) 0)
            x `shouldBe` sum [2..11]

    describe "withUpstream" $ do
        it' "works" $ do
            let src = mapM_ CI.yield [1..10 :: Int] >> return True
                fold f =
                    loop
                  where
                    loop accum =
                        CI.await >>= maybe (return accum) go
                      where
                        go a =
                            let accum' = f accum a
                             in accum' `seq` loop accum'
                sink = CI.withUpstream $ fold (+) 0
            res <- CI.runPipe $ src CI.>+> sink
            res `shouldBe` (True, sum [1..10])

    describe "iterate" $ do
        it' "works" $ do
            res <- CL.iterate (+ 1) (1 :: Int) C.$$ CL.isolate 10 C.=$ CL.fold (+) 0
            res `shouldBe` sum [1..10]

    describe "unwrapResumable" $ do
        it' "works" $ do
            ref <- I.newIORef (0 :: Int)
            let src0 = do
                    C.yieldOr () $ I.writeIORef ref 1
                    C.yieldOr () $ I.writeIORef ref 2
                    C.yieldOr () $ I.writeIORef ref 3
            (rsrc0, Just ()) <- src0 C.$$+ CL.head

            x0 <- I.readIORef ref
            x0 `shouldBe` 0

            (_, final) <- C.unwrapResumable rsrc0

            x1 <- I.readIORef ref
            x1 `shouldBe` 0

            final

            x2 <- I.readIORef ref
            x2 `shouldBe` 1

        it' "isn't called twice" $ do
            ref <- I.newIORef (0 :: Int)
            let src0 = do
                    C.yieldOr () $ I.writeIORef ref 1
                    C.yieldOr () $ I.writeIORef ref 2
            (rsrc0, Just ()) <- src0 C.$$+ CL.head

            x0 <- I.readIORef ref
            x0 `shouldBe` 0

            (src1, final) <- C.unwrapResumable rsrc0

            x1 <- I.readIORef ref
            x1 `shouldBe` 0

            Just () <- src1 C.$$ CL.head

            x2 <- I.readIORef ref
            x2 `shouldBe` 2

            final

            x3 <- I.readIORef ref
            x3 `shouldBe` 2

        it' "source isn't used" $ do
            ref <- I.newIORef (0 :: Int)
            let src0 = do
                    C.yieldOr () $ I.writeIORef ref 1
                    C.yieldOr () $ I.writeIORef ref 2
            (rsrc0, Just ()) <- src0 C.$$+ CL.head

            x0 <- I.readIORef ref
            x0 `shouldBe` 0

            (src1, final) <- C.unwrapResumable rsrc0

            x1 <- I.readIORef ref
            x1 `shouldBe` 0

            () <- src1 C.$$ return ()

            x2 <- I.readIORef ref
            x2 `shouldBe` 0

            final

            x3 <- I.readIORef ref
            x3 `shouldBe` 1
    describe "injectLeftovers" $ do
        it "works" $ do
            let src = mapM_ CI.yield [1..10 :: Int]
                conduit = CI.injectLeftovers $ CI.unConduitM $ C.awaitForever $ \i -> do
                    js <- CL.take 2
                    mapM_ C.leftover $ reverse js
                    C.yield i
            res <- CI.ConduitM (src CI.>+> CI.injectLeftovers conduit) C.$$ CL.consume
            res `shouldBe` [1..10]
    describe "up-upstream finalizers" $ do
        it "pipe" $ do
            let p1 = CI.await >>= maybe (return ()) CI.yield
                p2 = idMsg "p2-final"
                p3 = idMsg "p3-final"
                idMsg msg = CI.addCleanup (const $ tell [msg]) $ CI.awaitForever CI.yield
                printer = CI.awaitForever $ lift . tell . return . show
                src = mapM_ CI.yield [1 :: Int ..]
            let run' p = execWriter $ CI.runPipe $ printer CI.<+< p CI.<+< src
            run' (p1 CI.<+< (p2 CI.<+< p3)) `shouldBe` run' ((p1 CI.<+< p2) CI.<+< p3)
        it "conduit" $ do
            let p1 = C.await >>= maybe (return ()) C.yield
                p2 = idMsg "p2-final"
                p3 = idMsg "p3-final"
                idMsg msg = C.addCleanup (const $ tell [msg]) $ C.awaitForever C.yield
                printer = C.awaitForever $ lift . tell . return . show
                src = CL.sourceList [1 :: Int ..]
            let run' p = execWriter $ src C.$$ p C.=$ printer
            run' ((p3 C.=$= p2) C.=$= p1) `shouldBe` run' (p3 C.=$= (p2 C.=$= p1))
    describe "monad transformer laws" $ do
        it "transPipe" $ do
            let source = CL.sourceList $ replicate 10 ()
            let tell' x = tell [x :: Int]

            let replaceNum1 = C.awaitForever $ \() -> do
                    i <- lift get
                    lift $ (put $ i + 1) >> (get >>= lift . tell')
                    C.yield i

            let replaceNum2 = C.awaitForever $ \() -> do
                    i <- lift get
                    lift $ put $ i + 1
                    lift $ get >>= lift . tell'
                    C.yield i

            x <- runWriterT $ source C.$$ C.transPipe (`evalStateT` 1) replaceNum1 C.=$ CL.consume
            y <- runWriterT $ source C.$$ C.transPipe (`evalStateT` 1) replaceNum2 C.=$ CL.consume
            x `shouldBe` y
    describe "iterM" $ do
        prop "behavior" $ \l -> monadicIO $ do
            let counter ref = CL.iterM (const $ liftIO $ M.modifyMVar_ ref (\i -> return $! i + 1))
            v <- run $ do
                ref <- M.newMVar 0
                CL.sourceList l C.$= counter ref C.$$ CL.mapM_ (const $ return ())
                M.readMVar ref

            assert $ v == length (l :: [Int])
        prop "mapM_ equivalence" $ \l -> monadicIO $ do
            let runTest h = run $ do
                    ref <- M.newMVar (0 :: Int)
                    let f = action ref
                    s <- CL.sourceList (l :: [Int]) C.$= h f C.$$ CL.fold (+) 0
                    c <- M.readMVar ref

                    return (c, s)

                action ref = const $ liftIO $ M.modifyMVar_ ref (\i -> return $! i + 1)
            (c1, s1) <- runTest CL.iterM
            (c2, s2) <- runTest (\f -> CL.mapM (\a -> f a >>= \() -> return a))

            assert $ c1 == c2
            assert $ s1 == s2

    describe "generalizing" $ do
        it "works" $ do
            let src :: Int -> C.Source IO Int
                src i = CL.sourceList [1..i]
                sink :: C.Sink Int IO Int
                sink = CL.fold (+) 0
            res <- C.yield 10 C.$$ C.awaitForever (C.toProducer . src) C.=$ (C.toConsumer sink >>= C.yield) C.=$ C.await
            res `shouldBe` Just (sum [1..10])

    describe "passthroughSink" $ do
        it "works" $ do
            ref <- I.newIORef (-1)
            let sink = CL.fold (+) (0 :: Int)
                conduit = C.passthroughSink sink (I.writeIORef ref)
                input = [1..10]
            output <- mapM_ C.yield input C.$$ conduit C.=$ CL.consume
            output `shouldBe` input
            x <- I.readIORef ref
            x `shouldBe` sum input
        it "does nothing when downstream does nothing" $ do
            ref <- I.newIORef (-1)
            let sink = CL.fold (+) (0 :: Int)
                conduit = C.passthroughSink sink (I.writeIORef ref)
                input = [undefined]
            mapM_ C.yield input C.$$ conduit C.=$ return ()
            x <- I.readIORef ref
            x `shouldBe` (-1)

    describe "mtl instances" $ do
        it "ErrorT" $ do
            let src = flip catchError (const $ C.yield 4) $ do
                    lift $ return ()
                    C.yield 1
                    lift $ return ()
                    C.yield 2
                    lift $ return ()
                    () <- throwError DummyError
                    lift $ return ()
                    C.yield 3
                    lift $ return ()
            (src C.$$ CL.consume) `shouldBe` Right [1, 2, 4 :: Int]

    describe "finalizers" $ do
        it "promptness" $ do
            imsgs <- I.newIORef []
            let add x = liftIO $ do
                    msgs <- I.readIORef imsgs
                    I.writeIORef imsgs $ msgs ++ [x]
                src' = C.bracketP
                    (add "acquire")
                    (const $ add "release")
                    (const $ C.addCleanup (const $ add "inside") (mapM_ C.yield [1..5]))
                src = do
                    src' C.$= CL.isolate 4
                    add "computation"
                sink = CL.mapM (\x -> add (show x) >> return x) C.=$ CL.consume

            res <- C.runResourceT $ src C.$$ sink

            msgs <- I.readIORef imsgs
            -- FIXME this would be better msgs `shouldBe` words "acquire 1 2 3 4 inside release computation"
            msgs `shouldBe` words "acquire 1 2 3 4 release inside computation"

            res `shouldBe` [1..4 :: Int]

        it "left associative" $ do
            imsgs <- I.newIORef []
            let add x = liftIO $ do
                    msgs <- I.readIORef imsgs
                    I.writeIORef imsgs $ msgs ++ [x]
                p1 = C.bracketP (add "start1") (const $ add "stop1") (const $ add "inside1" >> C.yield ())
                p2 = C.bracketP (add "start2") (const $ add "stop2") (const $ add "inside2" >> C.await >>= maybe (return ()) C.yield)
                p3 = C.bracketP (add "start3") (const $ add "stop3") (const $ add "inside3" >> C.await)

            res <- C.runResourceT $ (p1 C.$= p2) C.$$ p3
            res `shouldBe` Just ()

            msgs <- I.readIORef imsgs
            msgs `shouldBe` words "start3 inside3 start2 inside2 start1 inside1 stop3 stop2 stop1"

        it "right associative" $ do
            imsgs <- I.newIORef []
            let add x = liftIO $ do
                    msgs <- I.readIORef imsgs
                    I.writeIORef imsgs $ msgs ++ [x]
                p1 = C.bracketP (add "start1") (const $ add "stop1") (const $ add "inside1" >> C.yield ())
                p2 = C.bracketP (add "start2") (const $ add "stop2") (const $ add "inside2" >> C.await >>= maybe (return ()) C.yield)
                p3 = C.bracketP (add "start3") (const $ add "stop3") (const $ add "inside3" >> C.await)

            res <- C.runResourceT $ p1 C.$$ (p2 C.=$ p3)
            res `shouldBe` Just ()

            msgs <- I.readIORef imsgs
            msgs `shouldBe` words "start3 inside3 start2 inside2 start1 inside1 stop3 stop2 stop1"

        describe "dan burton's associative tests" $ do
            let tellLn = tell . (++ "\n")
                finallyP fin = CI.addCleanup (const fin)
                printer = CI.awaitForever $ lift . tellLn . show
                idMsg msg = finallyP (tellLn msg) CI.idP
                takeP 0 = return ()
                takeP n = CI.awaitE >>= \ex -> case ex of
                  Left _u -> return ()
                  Right i -> CI.yield i >> takeP (pred n)

                testPipe p = execWriter $ runPipe $ printer <+< p <+< CI.sourceList ([1..] :: [Int])

                p1 = takeP (1 :: Int)
                p2 = idMsg "foo"
                p3 = idMsg "bar"

                (<+<) = (CI.<+<)
                runPipe = CI.runPipe

                test1L = testPipe $ (p1 <+< p2) <+< p3
                test1R = testPipe $ p1 <+< (p2 <+< p3)

                _test2L = testPipe $ (p2 <+< p1) <+< p3
                _test2R = testPipe $ p2 <+< (p1 <+< p3)

                test3L = testPipe $ (p2 <+< p3) <+< p1
                test3R = testPipe $ p2 <+< (p3 <+< p1)

                verify testL testR p1' p2' p3'
                  | testL == testR = return () :: IO ()
                  | otherwise = error $ unlines
                    [ "FAILURE"
                    , ""
                    , "(" ++ p1' ++ " <+< " ++ p2' ++ ") <+< " ++ p3'
                    , "------------------"
                    , testL
                    , ""
                    , p1' ++ " <+< (" ++ p2' ++ " <+< " ++ p3' ++ ")"
                    , "------------------"
                    , testR
                    ]

            it "test1" $ verify test1L test1R "p1" "p2" "p3"
            -- FIXME this is broken it "test2" $ verify test2L test2R "p2" "p1" "p3"
            it "test3" $ verify test3L test3R "p2" "p3" "p1"

    describe "Data.Conduit.Lift" $ do
        it "execStateC" $ do
            let sink = C.execStateLC 0 $ CL.mapM_ $ modify . (+)
                src = mapM_ C.yield [1..10 :: Int]
            res <- src C.$$ sink
            res `shouldBe` sum [1..10]

        it "execWriterC" $ do
            let sink = C.execWriterLC $ CL.mapM_ $ tell . return
                src = mapM_ C.yield [1..10 :: Int]
            res <- src C.$$ sink
            res `shouldBe` [1..10]

        it "runErrorC" $ do
            let sink = C.runErrorC $ do
                    x <- C.catchErrorC (lift $ throwError "foo") return
                    return $ x ++ "bar"
            res <- return () C.$$ sink
            res `shouldBe` Right ("foobar" :: String)

        it "runMaybeC" $ do
            let src = void $ C.runMaybeC $ do
                    C.yield 1
                    () <- lift $ MaybeT $ return Nothing
                    C.yield 2
                sink = CL.consume
            res <- src C.$$ sink
            res `shouldBe` [1 :: Int]

    describe "sequenceSources" $ do
        it "works" $ do
            let src1 = mapM_ C.yield [1, 2, 3 :: Int]
                src2 = mapM_ C.yield [3, 2, 1]
                src3 = mapM_ C.yield $ repeat 2
                srcs = C.sequenceSources $ Map.fromList
                    [ (1 :: Int, src1)
                    , (2, src2)
                    , (3, src3)
                    ]
            res <- srcs C.$$ CL.consume
            res `shouldBe`
                [ Map.fromList [(1, 1), (2, 3), (3, 2)]
                , Map.fromList [(1, 2), (2, 2), (3, 2)]
                , Map.fromList [(1, 3), (2, 1), (3, 2)]
                ]
    describe "zipSink" $ do
        it "zip equal-sized" $ do
            x <- runResourceT $
                    CL.sourceList [1..100] C.$$
                    C.sequenceSinks [ CL.fold (+) 0,
                                   (`mod` 101) <$> CL.fold (*) 1 ]
            x `shouldBe` [5050, 100 :: Integer]

        it "zip distinct sizes" $ do
            let sink = C.getZipSink $
                        (*) <$> C.ZipSink (CL.fold (+) 0)
                            <*> C.ZipSink (Data.Maybe.fromJust <$> C.await)
            x <- C.runResourceT $ CL.sourceList [100,99..1] C.$$ sink
            x `shouldBe` (505000 :: Integer)

    ZipConduit.spec

it' :: String -> IO () -> Spec
it' = it

data DummyError = DummyError
    deriving (Show, Eq)
instance Error DummyError