{-# LANGUAGE CPP #-}
-- |
-- Module      : Streamly.Internal.Data.Stream.Exception
-- Copyright   : (c) 2020 Composewell Technologies and Contributors
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC

module Streamly.Internal.Data.Stream.Exception
    (
    -- * Resources
      before
    , afterIO
    , afterUnsafe
    , finallyIO
    , finallyIO'
    , finallyIO''
    , finallyUnsafe
    , gbracket_
    , gbracket
    , bracketUnsafe
    , bracketIO3
    , bracketIO
    , bracketIO'
    , bracketIO''

    , withAcquireIO
    , withAcquireIO'

    -- * Exceptions
    , onException
    , ghandle
    , handle
    )
where

#include "inline.hs"

import Control.Monad.IO.Class (MonadIO(..))
import Control.Exception (Exception, SomeException, mask_)
import Control.Monad.Catch (MonadCatch)
import Data.IORef (newIORef)
import GHC.Exts (inline)
import Streamly.Internal.Control.Exception
    (AcquireIO(..), acquire, allocator, releaser)
import Streamly.Internal.Data.IOFinalizer
    (newIOFinalizer, runIOFinalizer, clearingIOFinalizer)

import qualified Control.Monad.Catch as MC
import qualified Data.IntMap.Strict as Map

import Streamly.Internal.Data.Stream.Type

#include "DocTestDataStream.hs"

data GbracketState s1 s2 v
    = GBracketInit
    | GBracketNormal s1 v
    | GBracketException s2

-- | Like 'gbracket' but with following differences:
--
-- * alloc action @m c@ runs with async exceptions enabled
-- * cleanup action @c -> m d@ won't run if the stream is garbage collected
--   after partial evaluation.
--
-- /Inhibits stream fusion/
--
-- /Pre-release/
--
{-# INLINE_NORMAL gbracket_ #-}
gbracket_
    :: Monad m
    => m c                                  -- ^ before
    -> (c -> m d)                           -- ^ after, on normal stop
    -> (c -> e -> Stream m b -> m (Stream m b)) -- ^ on exception
    -> (forall s. m s -> m (Either e s))    -- ^ try (exception handling)
    -> (c -> Stream m b)                    -- ^ stream generator
    -> Stream m b
gbracket_ :: forall (m :: * -> *) c d e b.
Monad m =>
m c
-> (c -> m d)
-> (c -> e -> Stream m b -> m (Stream m b))
-> (forall s. m s -> m (Either e s))
-> (c -> Stream m b)
-> Stream m b
gbracket_ m c
bef c -> m d
aft c -> e -> Stream m b -> m (Stream m b)
onExc forall s. m s -> m (Either e s)
ftry c -> Stream m b
action =
    (State StreamK m b
 -> GbracketState (Stream m b) (Stream m b) c
 -> m (Step (GbracketState (Stream m b) (Stream m b) c) b))
-> GbracketState (Stream m b) (Stream m b) c -> Stream m b
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m b
-> GbracketState (Stream m b) (Stream m b) c
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
step GbracketState (Stream m b) (Stream m b) c
forall s1 s2 v. GbracketState s1 s2 v
GBracketInit

    where

    {-# INLINE_LATE step #-}
    step :: State StreamK m b
-> GbracketState (Stream m b) (Stream m b) c
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
step State StreamK m b
_ GbracketState (Stream m b) (Stream m b) c
GBracketInit = do
        c
r <- m c
bef
        Step (GbracketState (Stream m b) (Stream m b) c) b
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GbracketState (Stream m b) (Stream m b) c) b
 -> m (Step (GbracketState (Stream m b) (Stream m b) c) b))
-> Step (GbracketState (Stream m b) (Stream m b) c) b
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
forall a b. (a -> b) -> a -> b
$ GbracketState (Stream m b) (Stream m b) c
-> Step (GbracketState (Stream m b) (Stream m b) c) b
forall s a. s -> Step s a
Skip (GbracketState (Stream m b) (Stream m b) c
 -> Step (GbracketState (Stream m b) (Stream m b) c) b)
-> GbracketState (Stream m b) (Stream m b) c
-> Step (GbracketState (Stream m b) (Stream m b) c) b
forall a b. (a -> b) -> a -> b
$ Stream m b -> c -> GbracketState (Stream m b) (Stream m b) c
forall s1 s2 v. s1 -> v -> GbracketState s1 s2 v
GBracketNormal (c -> Stream m b
action c
r) c
r

    step State StreamK m b
gst (GBracketNormal (UnStream State StreamK m b -> s -> m (Step s b)
step1 s
st) c
v) = do
        Either e (Step s b)
res <- m (Step s b) -> m (Either e (Step s b))
forall s. m s -> m (Either e s)
ftry (m (Step s b) -> m (Either e (Step s b)))
-> m (Step s b) -> m (Either e (Step s b))
forall a b. (a -> b) -> a -> b
$ State StreamK m b -> s -> m (Step s b)
step1 State StreamK m b
gst s
st
        case Either e (Step s b)
res of
            Right Step s b
r -> case Step s b
r of
                Yield b
x s
s ->
                    Step (GbracketState (Stream m b) (Stream m b) c) b
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GbracketState (Stream m b) (Stream m b) c) b
 -> m (Step (GbracketState (Stream m b) (Stream m b) c) b))
-> Step (GbracketState (Stream m b) (Stream m b) c) b
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
forall a b. (a -> b) -> a -> b
$ b
-> GbracketState (Stream m b) (Stream m b) c
-> Step (GbracketState (Stream m b) (Stream m b) c) b
forall s a. a -> s -> Step s a
Yield b
x (Stream m b -> c -> GbracketState (Stream m b) (Stream m b) c
forall s1 s2 v. s1 -> v -> GbracketState s1 s2 v
GBracketNormal ((State StreamK m b -> s -> m (Step s b)) -> s -> Stream m b
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m b -> s -> m (Step s b)
step1 s
s) c
v)
                Skip s
s -> Step (GbracketState (Stream m b) (Stream m b) c) b
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GbracketState (Stream m b) (Stream m b) c) b
 -> m (Step (GbracketState (Stream m b) (Stream m b) c) b))
-> Step (GbracketState (Stream m b) (Stream m b) c) b
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
forall a b. (a -> b) -> a -> b
$ GbracketState (Stream m b) (Stream m b) c
-> Step (GbracketState (Stream m b) (Stream m b) c) b
forall s a. s -> Step s a
Skip (Stream m b -> c -> GbracketState (Stream m b) (Stream m b) c
forall s1 s2 v. s1 -> v -> GbracketState s1 s2 v
GBracketNormal ((State StreamK m b -> s -> m (Step s b)) -> s -> Stream m b
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m b -> s -> m (Step s b)
step1 s
s) c
v)
                Step s b
Stop -> c -> m d
aft c
v m d
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Step (GbracketState (Stream m b) (Stream m b) c) b
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Step (GbracketState (Stream m b) (Stream m b) c) b
forall s a. Step s a
Stop
            -- XXX Do not handle async exceptions, just rethrow them.
            Left e
e -> do
                Stream m b
strm <- c -> e -> Stream m b -> m (Stream m b)
onExc c
v e
e ((State StreamK m b -> s -> m (Step s b)) -> s -> Stream m b
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
UnStream State StreamK m b -> s -> m (Step s b)
step1 s
st)
                Step (GbracketState (Stream m b) (Stream m b) c) b
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GbracketState (Stream m b) (Stream m b) c) b
 -> m (Step (GbracketState (Stream m b) (Stream m b) c) b))
-> Step (GbracketState (Stream m b) (Stream m b) c) b
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
forall a b. (a -> b) -> a -> b
$ GbracketState (Stream m b) (Stream m b) c
-> Step (GbracketState (Stream m b) (Stream m b) c) b
forall s a. s -> Step s a
Skip (Stream m b -> GbracketState (Stream m b) (Stream m b) c
forall s1 s2 v. s2 -> GbracketState s1 s2 v
GBracketException Stream m b
strm)
    step State StreamK m b
gst (GBracketException (UnStream State StreamK m b -> s -> m (Step s b)
step1 s
st)) = do
        Step s b
res <- State StreamK m b -> s -> m (Step s b)
step1 State StreamK m b
gst s
st
        case Step s b
res of
            Yield b
x s
s -> Step (GbracketState (Stream m b) (Stream m b) c) b
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GbracketState (Stream m b) (Stream m b) c) b
 -> m (Step (GbracketState (Stream m b) (Stream m b) c) b))
-> Step (GbracketState (Stream m b) (Stream m b) c) b
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
forall a b. (a -> b) -> a -> b
$ b
-> GbracketState (Stream m b) (Stream m b) c
-> Step (GbracketState (Stream m b) (Stream m b) c) b
forall s a. a -> s -> Step s a
Yield b
x (Stream m b -> GbracketState (Stream m b) (Stream m b) c
forall s1 s2 v. s2 -> GbracketState s1 s2 v
GBracketException ((State StreamK m b -> s -> m (Step s b)) -> s -> Stream m b
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m b -> s -> m (Step s b)
step1 s
s))
            Skip s
s    -> Step (GbracketState (Stream m b) (Stream m b) c) b
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GbracketState (Stream m b) (Stream m b) c) b
 -> m (Step (GbracketState (Stream m b) (Stream m b) c) b))
-> Step (GbracketState (Stream m b) (Stream m b) c) b
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
forall a b. (a -> b) -> a -> b
$ GbracketState (Stream m b) (Stream m b) c
-> Step (GbracketState (Stream m b) (Stream m b) c) b
forall s a. s -> Step s a
Skip (Stream m b -> GbracketState (Stream m b) (Stream m b) c
forall s1 s2 v. s2 -> GbracketState s1 s2 v
GBracketException ((State StreamK m b -> s -> m (Step s b)) -> s -> Stream m b
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m b -> s -> m (Step s b)
step1 s
s))
            Step s b
Stop      -> Step (GbracketState (Stream m b) (Stream m b) c) b
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Step (GbracketState (Stream m b) (Stream m b) c) b
forall s a. Step s a
Stop

data GbracketIOState s1 s2 v wref
    = GBracketIOInit
    | GBracketIONormal s1 v wref
    | GBracketIOException s2

-- | Run the alloc action @m c@ with async exceptions disabled but keeping
-- blocking operations interruptible (see 'Control.Exception.mask').  Use the
-- output @c@ as input to @c -> Stream m b@ to generate an output stream. When
-- generating the stream use the supplied @try@ operation @forall s. m s -> m
-- (Either e s)@ to catch synchronous exceptions. If an exception occurs run
-- the exception handler @c -> e -> Stream m b -> m (Stream m b)@. Note that
-- 'gbracket' does not rethrow the exception, it has to be done by the
-- exception handler if desired.
--
-- The cleanup action @c -> m d@, runs whenever the stream ends normally, due
-- to a sync or async exception or if it gets garbage collected after a partial
-- lazy evaluation.  See 'bracket' for the semantics of the cleanup action.
--
-- 'gbracket' can express all other exception handling combinators.
--
-- /Inhibits stream fusion/
--
-- /Pre-release/
{-# INLINE_NORMAL gbracket #-}
gbracket
    :: MonadIO m
    => IO c -- ^ before
    -> (c -> IO d1) -- ^ on normal stop
    -> (c -> e -> Stream m b -> IO (Stream m b)) -- ^ on exception
    -> (c -> IO d2) -- ^ on GC without normal stop or exception
    -> (forall s. m s -> m (Either e s)) -- ^ try (exception handling)
    -> (c -> Stream m b) -- ^ stream generator
    -> Stream m b
gbracket :: forall (m :: * -> *) c d1 e b d2.
MonadIO m =>
IO c
-> (c -> IO d1)
-> (c -> e -> Stream m b -> IO (Stream m b))
-> (c -> IO d2)
-> (forall s. m s -> m (Either e s))
-> (c -> Stream m b)
-> Stream m b
gbracket IO c
bef c -> IO d1
aft c -> e -> Stream m b -> IO (Stream m b)
onExc c -> IO d2
onGC forall s. m s -> m (Either e s)
ftry c -> Stream m b
action =
    (State StreamK m b
 -> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
 -> m (Step
         (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b))
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> Stream m b
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m b
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
step GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
forall s1 s2 v wref. GbracketIOState s1 s2 v wref
GBracketIOInit

    where

    -- If the stream is never evaluated the "aft" action will never be
    -- called. For that to occur we will need the user of this API to pass a
    -- weak pointer to us.
    {-# INLINE_LATE step #-}
    step :: State StreamK m b
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
step State StreamK m b
_ GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
GBracketIOInit = do
        -- allocation of resource and installation of finalizer must be atomic
        -- with respect to async exception, otherwise we may leave a window
        -- where the resource may not be freed.
        (c
r, IOFinalizer
ref) <- IO (c, IOFinalizer) -> m (c, IOFinalizer)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (c, IOFinalizer) -> m (c, IOFinalizer))
-> IO (c, IOFinalizer) -> m (c, IOFinalizer)
forall a b. (a -> b) -> a -> b
$ IO (c, IOFinalizer) -> IO (c, IOFinalizer)
forall a. IO a -> IO a
mask_ (IO (c, IOFinalizer) -> IO (c, IOFinalizer))
-> IO (c, IOFinalizer) -> IO (c, IOFinalizer)
forall a b. (a -> b) -> a -> b
$ do
            c
r <- IO c
bef
            IOFinalizer
ref <- IO d2 -> IO IOFinalizer
forall (m :: * -> *) a. MonadIO m => IO a -> m IOFinalizer
newIOFinalizer (c -> IO d2
onGC c
r)
            (c, IOFinalizer) -> IO (c, IOFinalizer)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (c
r, IOFinalizer
ref)
        Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
 -> m (Step
         (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b))
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall a b. (a -> b) -> a -> b
$ GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
forall s a. s -> Step s a
Skip (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
 -> Step
      (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
forall a b. (a -> b) -> a -> b
$ Stream m b
-> c
-> IOFinalizer
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
forall s1 s2 v wref.
s1 -> v -> wref -> GbracketIOState s1 s2 v wref
GBracketIONormal (c -> Stream m b
action c
r) c
r IOFinalizer
ref

    step State StreamK m b
gst (GBracketIONormal (UnStream State StreamK m b -> s -> m (Step s b)
step1 s
st) c
v IOFinalizer
ref) = do
        -- IMPORTANT: Note that if an async exception occurs before try or
        -- after try, in those cases the exception will not be intercepted and
        -- the cleanup handler won't run. In those cases the cleanup handler
        -- will run via GC.
        Either e (Step s b)
res <- m (Step s b) -> m (Either e (Step s b))
forall s. m s -> m (Either e s)
ftry (m (Step s b) -> m (Either e (Step s b)))
-> m (Step s b) -> m (Either e (Step s b))
forall a b. (a -> b) -> a -> b
$ State StreamK m b -> s -> m (Step s b)
step1 State StreamK m b
gst s
st
        case Either e (Step s b)
res of
            Right Step s b
r -> case Step s b
r of
                Yield b
x s
s ->
                    Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
 -> m (Step
         (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b))
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall a b. (a -> b) -> a -> b
$ b
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
forall s a. a -> s -> Step s a
Yield b
x (Stream m b
-> c
-> IOFinalizer
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
forall s1 s2 v wref.
s1 -> v -> wref -> GbracketIOState s1 s2 v wref
GBracketIONormal ((State StreamK m b -> s -> m (Step s b)) -> s -> Stream m b
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m b -> s -> m (Step s b)
step1 s
s) c
v IOFinalizer
ref)
                Skip s
s ->
                    Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
 -> m (Step
         (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b))
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall a b. (a -> b) -> a -> b
$ GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
forall s a. s -> Step s a
Skip (Stream m b
-> c
-> IOFinalizer
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
forall s1 s2 v wref.
s1 -> v -> wref -> GbracketIOState s1 s2 v wref
GBracketIONormal ((State StreamK m b -> s -> m (Step s b)) -> s -> Stream m b
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m b -> s -> m (Step s b)
step1 s
s) c
v IOFinalizer
ref)
                Step s b
Stop ->
                    IO d1 -> m d1
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IOFinalizer -> IO d1 -> IO d1
forall (m :: * -> *) a. MonadIO m => IOFinalizer -> IO a -> m a
clearingIOFinalizer IOFinalizer
ref (c -> IO d1
aft c
v)) m d1
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
forall s a. Step s a
Stop
            -- XXX Do not handle async exceptions, just rethrow them.
            Left e
e -> do
                -- Clearing of finalizer and running of exception handler must
                -- be atomic wrt async exceptions. Otherwise if we have cleared
                -- the finalizer and have not run the exception handler then we
                -- may leak the resource.
                Stream m b
stream <-
                    IO (Stream m b) -> m (Stream m b)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IOFinalizer -> IO (Stream m b) -> IO (Stream m b)
forall (m :: * -> *) a. MonadIO m => IOFinalizer -> IO a -> m a
clearingIOFinalizer IOFinalizer
ref (c -> e -> Stream m b -> IO (Stream m b)
onExc c
v e
e ((State StreamK m b -> s -> m (Step s b)) -> s -> Stream m b
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
UnStream State StreamK m b -> s -> m (Step s b)
step1 s
st)))
                Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
 -> m (Step
         (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b))
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall a b. (a -> b) -> a -> b
$ GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
forall s a. s -> Step s a
Skip (Stream m b
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
forall s1 s2 v wref. s2 -> GbracketIOState s1 s2 v wref
GBracketIOException Stream m b
stream)
    step State StreamK m b
gst (GBracketIOException (UnStream State StreamK m b -> s -> m (Step s b)
step1 s
st)) = do
        Step s b
res <- State StreamK m b -> s -> m (Step s b)
step1 State StreamK m b
gst s
st
        case Step s b
res of
            Yield b
x s
s ->
                Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
 -> m (Step
         (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b))
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall a b. (a -> b) -> a -> b
$ b
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
forall s a. a -> s -> Step s a
Yield b
x (Stream m b
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
forall s1 s2 v wref. s2 -> GbracketIOState s1 s2 v wref
GBracketIOException ((State StreamK m b -> s -> m (Step s b)) -> s -> Stream m b
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m b -> s -> m (Step s b)
step1 s
s))
            Skip s
s    -> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
 -> m (Step
         (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b))
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall a b. (a -> b) -> a -> b
$ GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
forall s a. s -> Step s a
Skip (Stream m b
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
forall s1 s2 v wref. s2 -> GbracketIOState s1 s2 v wref
GBracketIOException ((State StreamK m b -> s -> m (Step s b)) -> s -> Stream m b
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m b -> s -> m (Step s b)
step1 s
s))
            Step s b
Stop      -> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
forall s a. Step s a
Stop

-- | Run the action @m b@ before the stream yields its first element.
--
-- Same as the following but more efficient due to fusion:
--
-- >>> before action xs = Stream.concatMap (const xs) (Stream.fromEffect action)
--
{-# INLINE_NORMAL before #-}
before :: Monad m => m b -> Stream m a -> Stream m a
before :: forall (m :: * -> *) b a.
Monad m =>
m b -> Stream m a -> Stream m a
before m b
action (Stream State StreamK m a -> s -> m (Step s a)
step s
state) = (State StreamK m a -> Maybe s -> m (Step (Maybe s) a))
-> Maybe s -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a -> Maybe s -> m (Step (Maybe s) a)
step' Maybe s
forall a. Maybe a
Nothing

    where

    {-# INLINE_LATE step' #-}
    step' :: State StreamK m a -> Maybe s -> m (Step (Maybe s) a)
step' State StreamK m a
_ Maybe s
Nothing = m b
action m b -> m (Step (Maybe s) a) -> m (Step (Maybe s) a)
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Step (Maybe s) a -> m (Step (Maybe s) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe s -> Step (Maybe s) a
forall s a. s -> Step s a
Skip (s -> Maybe s
forall a. a -> Maybe a
Just s
state))

    step' State StreamK m a
gst (Just s
st) = do
        Step s a
res <- State StreamK m a -> s -> m (Step s a)
step State StreamK m a
gst s
st
        case Step s a
res of
            Yield a
x s
s -> Step (Maybe s) a -> m (Step (Maybe s) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe s) a -> m (Step (Maybe s) a))
-> Step (Maybe s) a -> m (Step (Maybe s) a)
forall a b. (a -> b) -> a -> b
$ a -> Maybe s -> Step (Maybe s) a
forall s a. a -> s -> Step s a
Yield a
x (s -> Maybe s
forall a. a -> Maybe a
Just s
s)
            Skip s
s    -> Step (Maybe s) a -> m (Step (Maybe s) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe s) a -> m (Step (Maybe s) a))
-> Step (Maybe s) a -> m (Step (Maybe s) a)
forall a b. (a -> b) -> a -> b
$ Maybe s -> Step (Maybe s) a
forall s a. s -> Step s a
Skip (s -> Maybe s
forall a. a -> Maybe a
Just s
s)
            Step s a
Stop      -> Step (Maybe s) a -> m (Step (Maybe s) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Step (Maybe s) a
forall s a. Step s a
Stop

-- | Like 'after', with following differences:
--
-- * action @m b@ won't run if the stream is garbage collected
--   after partial evaluation.
-- * Monad @m@ does not require any other constraints.
-- * has slightly better performance than 'after'.
--
-- Same as the following, but with stream fusion:
--
-- >>> afterUnsafe action xs = xs <> Stream.nilM action
--
-- /Pre-release/
--
{-# INLINE_NORMAL afterUnsafe #-}
afterUnsafe :: Monad m => m b -> Stream m a -> Stream m a
afterUnsafe :: forall (m :: * -> *) b a.
Monad m =>
m b -> Stream m a -> Stream m a
afterUnsafe m b
action (Stream State StreamK m a -> s -> m (Step s a)
step s
state) = (State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a -> s -> m (Step s a)
step' s
state

    where

    {-# INLINE_LATE step' #-}
    step' :: State StreamK m a -> s -> m (Step s a)
step' State StreamK m a
gst s
st = do
        Step s a
res <- State StreamK m a -> s -> m (Step s a)
step State StreamK m a
gst s
st
        case Step s a
res of
            Yield a
x s
s -> Step s a -> m (Step s a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s a -> m (Step s a)) -> Step s a -> m (Step s a)
forall a b. (a -> b) -> a -> b
$ a -> s -> Step s a
forall s a. a -> s -> Step s a
Yield a
x s
s
            Skip s
s    -> Step s a -> m (Step s a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s a -> m (Step s a)) -> Step s a -> m (Step s a)
forall a b. (a -> b) -> a -> b
$ s -> Step s a
forall s a. s -> Step s a
Skip s
s
            Step s a
Stop      -> m b
action m b -> m (Step s a) -> m (Step s a)
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Step s a -> m (Step s a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Step s a
forall s a. Step s a
Stop

-- | Run the action @IO b@ whenever the stream is evaluated to completion, or
-- if it is garbage collected after a partial lazy evaluation.
--
-- The semantics of the action @IO b@ are similar to the semantics of cleanup
-- action in 'bracketIO'.
--
-- /See also 'afterUnsafe'/
--
{-# INLINE_NORMAL afterIO #-}
afterIO :: MonadIO m
    => IO b -> Stream m a -> Stream m a
afterIO :: forall (m :: * -> *) b a.
MonadIO m =>
IO b -> Stream m a -> Stream m a
afterIO IO b
action (Stream State StreamK m a -> s -> m (Step s a)
step s
state) = (State StreamK m a
 -> Maybe (s, IOFinalizer) -> m (Step (Maybe (s, IOFinalizer)) a))
-> Maybe (s, IOFinalizer) -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a
-> Maybe (s, IOFinalizer) -> m (Step (Maybe (s, IOFinalizer)) a)
step' Maybe (s, IOFinalizer)
forall a. Maybe a
Nothing

    where

    {-# INLINE_LATE step' #-}
    step' :: State StreamK m a
-> Maybe (s, IOFinalizer) -> m (Step (Maybe (s, IOFinalizer)) a)
step' State StreamK m a
_ Maybe (s, IOFinalizer)
Nothing = do
        IOFinalizer
ref <- IO IOFinalizer -> m IOFinalizer
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO IOFinalizer -> m IOFinalizer)
-> IO IOFinalizer -> m IOFinalizer
forall a b. (a -> b) -> a -> b
$ IO b -> IO IOFinalizer
forall (m :: * -> *) a. MonadIO m => IO a -> m IOFinalizer
newIOFinalizer IO b
action
        Step (Maybe (s, IOFinalizer)) a
-> m (Step (Maybe (s, IOFinalizer)) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (s, IOFinalizer)) a
 -> m (Step (Maybe (s, IOFinalizer)) a))
-> Step (Maybe (s, IOFinalizer)) a
-> m (Step (Maybe (s, IOFinalizer)) a)
forall a b. (a -> b) -> a -> b
$ Maybe (s, IOFinalizer) -> Step (Maybe (s, IOFinalizer)) a
forall s a. s -> Step s a
Skip (Maybe (s, IOFinalizer) -> Step (Maybe (s, IOFinalizer)) a)
-> Maybe (s, IOFinalizer) -> Step (Maybe (s, IOFinalizer)) a
forall a b. (a -> b) -> a -> b
$ (s, IOFinalizer) -> Maybe (s, IOFinalizer)
forall a. a -> Maybe a
Just (s
state, IOFinalizer
ref)
    step' State StreamK m a
gst (Just (s
st, IOFinalizer
ref)) = do
        Step s a
res <- State StreamK m a -> s -> m (Step s a)
step State StreamK m a
gst s
st
        case Step s a
res of
            Yield a
x s
s -> Step (Maybe (s, IOFinalizer)) a
-> m (Step (Maybe (s, IOFinalizer)) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (s, IOFinalizer)) a
 -> m (Step (Maybe (s, IOFinalizer)) a))
-> Step (Maybe (s, IOFinalizer)) a
-> m (Step (Maybe (s, IOFinalizer)) a)
forall a b. (a -> b) -> a -> b
$ a -> Maybe (s, IOFinalizer) -> Step (Maybe (s, IOFinalizer)) a
forall s a. a -> s -> Step s a
Yield a
x ((s, IOFinalizer) -> Maybe (s, IOFinalizer)
forall a. a -> Maybe a
Just (s
s, IOFinalizer
ref))
            Skip s
s    -> Step (Maybe (s, IOFinalizer)) a
-> m (Step (Maybe (s, IOFinalizer)) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (s, IOFinalizer)) a
 -> m (Step (Maybe (s, IOFinalizer)) a))
-> Step (Maybe (s, IOFinalizer)) a
-> m (Step (Maybe (s, IOFinalizer)) a)
forall a b. (a -> b) -> a -> b
$ Maybe (s, IOFinalizer) -> Step (Maybe (s, IOFinalizer)) a
forall s a. s -> Step s a
Skip ((s, IOFinalizer) -> Maybe (s, IOFinalizer)
forall a. a -> Maybe a
Just (s
s, IOFinalizer
ref))
            Step s a
Stop      -> do
                IOFinalizer -> m ()
forall (m :: * -> *). MonadIO m => IOFinalizer -> m ()
runIOFinalizer IOFinalizer
ref
                Step (Maybe (s, IOFinalizer)) a
-> m (Step (Maybe (s, IOFinalizer)) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Step (Maybe (s, IOFinalizer)) a
forall s a. Step s a
Stop

-- XXX For high performance error checks in busy streams we may need another
-- Error constructor in step.

-- | Run the action @m b@ if the stream evaluation is aborted due to an
-- exception. The exception is not caught, simply rethrown.
--
-- Observes exceptions only in the stream generation, and not in stream
-- consumers.
--
-- /Inhibits stream fusion/
--
{-# INLINE_NORMAL onException #-}
onException :: MonadCatch m => m b -> Stream m a -> Stream m a
onException :: forall (m :: * -> *) b a.
MonadCatch m =>
m b -> Stream m a -> Stream m a
onException m b
action Stream m a
stream =
    m ()
-> (() -> m ())
-> (() -> SomeException -> Stream m a -> m (Stream m a))
-> (forall s. m s -> m (Either SomeException s))
-> (() -> Stream m a)
-> Stream m a
forall (m :: * -> *) c d e b.
Monad m =>
m c
-> (c -> m d)
-> (c -> e -> Stream m b -> m (Stream m b))
-> (forall s. m s -> m (Either e s))
-> (c -> Stream m b)
-> Stream m b
gbracket_
        (() -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()) -- before
        () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return      -- after
        (\()
_ (SomeException
e :: MC.SomeException) Stream m a
_ -> m b
action m b -> m (Stream m a) -> m (Stream m a)
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> SomeException -> m (Stream m a)
forall e a. (HasCallStack, Exception e) => e -> m a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
MC.throwM SomeException
e)
        ((m s -> m (Either SomeException s))
-> m s -> m (Either SomeException s)
forall a. a -> a
inline m s -> m (Either SomeException s)
forall (m :: * -> *) e a.
(HasCallStack, MonadCatch m, Exception e) =>
m a -> m (Either e a)
MC.try)
        (Stream m a -> () -> Stream m a
forall a b. a -> b -> a
const Stream m a
stream)

{-# INLINE_NORMAL _onException #-}
_onException :: MonadCatch m => m b -> Stream m a -> Stream m a
_onException :: forall (m :: * -> *) b a.
MonadCatch m =>
m b -> Stream m a -> Stream m a
_onException m b
action (Stream State StreamK m a -> s -> m (Step s a)
step s
state) = (State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a -> s -> m (Step s a)
step' s
state

    where

    {-# INLINE_LATE step' #-}
    step' :: State StreamK m a -> s -> m (Step s a)
step' State StreamK m a
gst s
st = do
        Step s a
res <- State StreamK m a -> s -> m (Step s a)
step State StreamK m a
gst s
st m (Step s a) -> m b -> m (Step s a)
forall (m :: * -> *) a b.
(HasCallStack, MonadCatch m) =>
m a -> m b -> m a
`MC.onException` m b
action
        case Step s a
res of
            Yield a
x s
s -> Step s a -> m (Step s a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s a -> m (Step s a)) -> Step s a -> m (Step s a)
forall a b. (a -> b) -> a -> b
$ a -> s -> Step s a
forall s a. a -> s -> Step s a
Yield a
x s
s
            Skip s
s    -> Step s a -> m (Step s a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s a -> m (Step s a)) -> Step s a -> m (Step s a)
forall a b. (a -> b) -> a -> b
$ s -> Step s a
forall s a. s -> Step s a
Skip s
s
            Step s a
Stop      -> Step s a -> m (Step s a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Step s a
forall s a. Step s a
Stop

-- | Like 'bracket' but with following differences:
--
-- * alloc action @m b@ runs with async exceptions enabled
-- * cleanup action @b -> m c@ won't run if the stream is garbage collected
--   after partial evaluation.
-- * has slightly better performance than 'bracketIO'.
--
-- /Inhibits stream fusion/
--
-- /Pre-release/
--
{-# INLINE_NORMAL bracketUnsafe #-}
bracketUnsafe :: MonadCatch m
    => m b -> (b -> m c) -> (b -> Stream m a) -> Stream m a
bracketUnsafe :: forall (m :: * -> *) b c a.
MonadCatch m =>
m b -> (b -> m c) -> (b -> Stream m a) -> Stream m a
bracketUnsafe m b
bef b -> m c
aft =
    m b
-> (b -> m c)
-> (b -> SomeException -> Stream m a -> m (Stream m a))
-> (forall s. m s -> m (Either SomeException s))
-> (b -> Stream m a)
-> Stream m a
forall (m :: * -> *) c d e b.
Monad m =>
m c
-> (c -> m d)
-> (c -> e -> Stream m b -> m (Stream m b))
-> (forall s. m s -> m (Either e s))
-> (c -> Stream m b)
-> Stream m b
gbracket_
        m b
bef
        b -> m c
aft
        (\b
a (SomeException
e :: SomeException) Stream m a
_ -> b -> m c
aft b
a m c -> m (Stream m a) -> m (Stream m a)
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> SomeException -> m (Stream m a)
forall e a. (HasCallStack, Exception e) => e -> m a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
MC.throwM SomeException
e)
        ((m s -> m (Either SomeException s))
-> m s -> m (Either SomeException s)
forall a. a -> a
inline m s -> m (Either SomeException s)
forall (m :: * -> *) e a.
(HasCallStack, MonadCatch m, Exception e) =>
m a -> m (Either e a)
MC.try)

-- For a use case of this see the "streamly-process" package. It needs to kill
-- the process in case of exception or garbage collection, but waits for the
-- process to terminate in normal cases.

-- XXX Just use bracketIO2 instead - stop and exception.

-- | Like 'bracketIO' but can use 3 separate cleanup actions depending on the
-- mode of termination:
--
-- 1. When the stream stops normally
-- 2. When the stream is garbage collected
-- 3. When the stream encounters an exception
--
-- @bracketIO3 before onStop onGC onException action@ runs @action@ using the
-- result of @before@. If the stream stops, @onStop@ action is executed, if the
-- stream is abandoned @onGC@ is executed, if the stream encounters an
-- exception @onException@ is executed.
--
-- The exception is not caught, it is rethrown.
--
-- /Inhibits stream fusion/
--
-- /Pre-release/
{-# INLINE_NORMAL bracketIO3 #-}
bracketIO3 :: (MonadIO m, MonadCatch m) =>
       IO b
    -> (b -> IO c)
    -> (b -> IO d)
    -> (b -> IO e)
    -> (b -> Stream m a)
    -> Stream m a
bracketIO3 :: forall (m :: * -> *) b c d e a.
(MonadIO m, MonadCatch m) =>
IO b
-> (b -> IO c)
-> (b -> IO d)
-> (b -> IO e)
-> (b -> Stream m a)
-> Stream m a
bracketIO3 IO b
bef b -> IO c
aft b -> IO d
onExc b -> IO e
onGC =
    IO b
-> (b -> IO c)
-> (b -> SomeException -> Stream m a -> IO (Stream m a))
-> (b -> IO e)
-> (forall s. m s -> m (Either SomeException s))
-> (b -> Stream m a)
-> Stream m a
forall (m :: * -> *) c d1 e b d2.
MonadIO m =>
IO c
-> (c -> IO d1)
-> (c -> e -> Stream m b -> IO (Stream m b))
-> (c -> IO d2)
-> (forall s. m s -> m (Either e s))
-> (c -> Stream m b)
-> Stream m b
gbracket
        IO b
bef
        b -> IO c
aft
        (\b
a (SomeException
e :: SomeException) Stream m a
_ -> b -> IO d
onExc b
a IO d -> IO (Stream m a) -> IO (Stream m a)
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> SomeException -> IO (Stream m a)
forall e a. (HasCallStack, Exception e) => e -> IO a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
MC.throwM SomeException
e)
        b -> IO e
onGC
        ((m s -> m (Either SomeException s))
-> m s -> m (Either SomeException s)
forall a. a -> a
inline m s -> m (Either SomeException s)
forall (m :: * -> *) e a.
(HasCallStack, MonadCatch m, Exception e) =>
m a -> m (Either e a)
MC.try)

-- XXX Fix the early termination case not being prompt. Will require a "final"
-- function in the stream constructor.

-- Examples of cases where the stream is not fully consumed:
--
-- * a bracketed stream is folded but before the stream ends, the fold
-- terminates or encounters an exception abandoning the original stream.
-- * 'take' on a bracketed stream terminates without draining the stream
-- completely. To avoid this, bracket should be outermost combinator on a
-- stream.
-- * A synchronous exception is handled using 'handle', in that case the
-- original stream is abandoned and collected by GC.
--
-- In case of async exceptions, if the async exception occurs when we are
-- executing the stream code then it will be intercepted. After the stream
-- element is generated, control is handed over to the consumer (fold), async
-- exceptions occurring in this period are not intercepted by bracketIO, they
-- are intercepted by the fold's bracket instead. If an async exceptions occurs
-- in this part and the stream is abandoned, the cleanup handler runs on GC.

-- | The alloc action @IO b@ is executed with async exceptions disabled but keeping
-- blocking operations interruptible (see 'Control.Exception.mask').  Uses the
-- output @b@ of the IO action as input to the function @b -> Stream m a@ to
-- generate an output stream.
--
-- @b@ is usually a resource allocated under the IO monad, e.g. a file handle, that
-- requires a cleanup after use. The cleanup is done using the @b -> IO c@
-- action. bracketIO guarantees that the allocated resource is eventually (see
-- details below) cleaned up even in the face of sync or async exceptions. If
-- an exception occurs it is not caught, simply rethrown.
--
-- 'bracketIO' only guarantees that the cleanup action runs, and it runs with
-- __async exceptions enabled__. The action must ensure that it can successfully
-- cleanup the resource in the face of sync or async exceptions.
--
-- /Best case/: Cleanup happens immediately in the following cases:
--
-- * the stream is consumed completely
-- * an exception occurs in the bracketed part of the pipeline
--
-- /Worst case/: In the following cases cleanup is deferred to GC.
--
-- * the bracketed stream is partially consumed and abandoned
-- * pipeline is aborted due to an exception outside the bracket
--
-- Use Streamly.Control.Exception.'Streamly.Control.Exception.withAcquireIO'
-- for covering the entire pipeline with guaranteed cleanup at the end of
-- bracket.
--
-- Observes exceptions only in the stream generation, and not in stream
-- consumers.
--
-- /See also: 'bracketUnsafe'/
--
-- /Inhibits stream fusion/
--
{-# INLINE bracketIO #-}
bracketIO :: (MonadIO m, MonadCatch m)
    => IO b -> (b -> IO c) -> (b -> Stream m a) -> Stream m a
bracketIO :: forall (m :: * -> *) b c a.
(MonadIO m, MonadCatch m) =>
IO b -> (b -> IO c) -> (b -> Stream m a) -> Stream m a
bracketIO IO b
bef b -> IO c
aft = IO b
-> (b -> IO c)
-> (b -> IO c)
-> (b -> IO c)
-> (b -> Stream m a)
-> Stream m a
forall (m :: * -> *) b c d e a.
(MonadIO m, MonadCatch m) =>
IO b
-> (b -> IO c)
-> (b -> IO d)
-> (b -> IO e)
-> (b -> Stream m a)
-> Stream m a
bracketIO3 IO b
bef b -> IO c
aft b -> IO c
aft b -> IO c
aft

-- If you are recovering from exceptions using 'handle' then you should use
-- bracketIO'' which releases the resource promptly on exception before the
-- exception handler generates another stream. But for better performance
-- bracketIO' may be better and leave the resource to be freed by GC.
--
-- XXX If we want to recover from exceptions then we should probably have an
-- integrated combinator combining handling with bracketIO'' otherwise we will
-- have multiple layers of "try" which will not be good for perf.

data GbracketIO'State s ref release
    = GBracketIO'Init
    | GBracketIO'Normal s ref release

-- | Like 'bracketIO' but requires an 'Streamly.Control.Exception.AcquireIO' reference in the underlying monad
-- of the stream, and guarantees that all resources are freed before the
-- scope of the monad level resource manager
-- (Streamly.Control.Exception.'Streamly.Control.Exception.withAcquireIO')
-- ends. Where fusion matters, this combinator can be much faster than 'bracketIO' as it
-- allows stream fusion.
--
-- /Best case/: Cleanup happens immediately if the stream is consumed
-- completely.
--
-- /Worst case/: In the following cases cleanup is guaranteed to occur at the
-- end of the monad level bracket. However, if a GC occurs then cleanup will
-- occur even earlier than that.
--
-- * the bracketed stream is partially consumed and abandoned
-- * pipeline is aborted due to an exception
--
-- __This is the recommended default bracket operation.__
--
-- Note: You can use 'Streamly.Control.Exception.acquire' directly, instead of using this combinator, if
-- you don’t need to release the resource when the stream ends. However, if
-- you're using the stream inside another stream (like with concatMap), you
-- usually do want to release it at the end of the stream.
--
-- /Allows stream fusion/
--
{-# INLINE bracketIO' #-}
bracketIO' :: MonadIO m
    => AcquireIO -> IO b -> (b -> IO c) -> (b -> Stream m a) -> Stream m a
bracketIO' :: forall (m :: * -> *) b c a.
MonadIO m =>
AcquireIO -> IO b -> (b -> IO c) -> (b -> Stream m a) -> Stream m a
bracketIO' AcquireIO
bracket IO b
alloc b -> IO c
free b -> Stream m a
action =
    (State StreamK m a
 -> GbracketIO'State (Stream m a) IOFinalizer (IO ())
 -> m (Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a))
-> GbracketIO'State (Stream m a) IOFinalizer (IO ()) -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a
-> GbracketIO'State (Stream m a) IOFinalizer (IO ())
-> m (Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a)
step GbracketIO'State (Stream m a) IOFinalizer (IO ())
forall s ref release. GbracketIO'State s ref release
GBracketIO'Init

    where

    -- In nested stream cases, where the inner stream is abandoned due to early
    -- termination or due to exception handling, we use GC based cleanup as
    -- fallback because the monad level cleanup may not occur in deterministic
    -- amount of time, but GC may. Users can also implement backpressure
    -- themselves e.g. if the number of open fds is greater than n then perform
    -- GC until it comes down.
    {-# INLINE_LATE step #-}
    step :: State StreamK m a
-> GbracketIO'State (Stream m a) IOFinalizer (IO ())
-> m (Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a)
step State StreamK m a
_ GbracketIO'State (Stream m a) IOFinalizer (IO ())
GBracketIO'Init = do
        (b
r, IOFinalizer
ref, IO ()
release) <- IO (b, IOFinalizer, IO ()) -> m (b, IOFinalizer, IO ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (b, IOFinalizer, IO ()) -> m (b, IOFinalizer, IO ()))
-> IO (b, IOFinalizer, IO ()) -> m (b, IOFinalizer, IO ())
forall a b. (a -> b) -> a -> b
$ IO (b, IOFinalizer, IO ()) -> IO (b, IOFinalizer, IO ())
forall a. IO a -> IO a
mask_ (IO (b, IOFinalizer, IO ()) -> IO (b, IOFinalizer, IO ()))
-> IO (b, IOFinalizer, IO ()) -> IO (b, IOFinalizer, IO ())
forall a b. (a -> b) -> a -> b
$ do
            (b
r, IO ()
release) <- IO (b, IO ()) -> IO (b, IO ())
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (b, IO ()) -> IO (b, IO ())) -> IO (b, IO ()) -> IO (b, IO ())
forall a b. (a -> b) -> a -> b
$ AcquireIO -> IO b -> (b -> IO c) -> IO (b, IO ())
forall b c. AcquireIO -> IO b -> (b -> IO c) -> IO (b, IO ())
acquire AcquireIO
bracket IO b
alloc b -> IO c
free
            IOFinalizer
ref <- IO () -> IO IOFinalizer
forall (m :: * -> *) a. MonadIO m => IO a -> m IOFinalizer
newIOFinalizer IO ()
release
            (b, IOFinalizer, IO ()) -> IO (b, IOFinalizer, IO ())
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (b
r, IOFinalizer
ref, IO ()
release)
        Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a
-> m (Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a
 -> m (Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a))
-> Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a
-> m (Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a)
forall a b. (a -> b) -> a -> b
$ GbracketIO'State (Stream m a) IOFinalizer (IO ())
-> Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a
forall s a. s -> Step s a
Skip (GbracketIO'State (Stream m a) IOFinalizer (IO ())
 -> Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a)
-> GbracketIO'State (Stream m a) IOFinalizer (IO ())
-> Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a
forall a b. (a -> b) -> a -> b
$ Stream m a
-> IOFinalizer
-> IO ()
-> GbracketIO'State (Stream m a) IOFinalizer (IO ())
forall s ref release.
s -> ref -> release -> GbracketIO'State s ref release
GBracketIO'Normal (b -> Stream m a
action b
r) IOFinalizer
ref IO ()
release

    step State StreamK m a
gst (GBracketIO'Normal (UnStream State StreamK m a -> s -> m (Step s a)
step1 s
st) IOFinalizer
ref IO ()
release) = do
        Step s a
res <- State StreamK m a -> s -> m (Step s a)
step1 State StreamK m a
gst s
st
        case Step s a
res of
            Yield a
x s
s ->
                Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a
-> m (Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a
 -> m (Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a))
-> Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a
-> m (Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a)
forall a b. (a -> b) -> a -> b
$ a
-> GbracketIO'State (Stream m a) IOFinalizer (IO ())
-> Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a
forall s a. a -> s -> Step s a
Yield a
x (Stream m a
-> IOFinalizer
-> IO ()
-> GbracketIO'State (Stream m a) IOFinalizer (IO ())
forall s ref release.
s -> ref -> release -> GbracketIO'State s ref release
GBracketIO'Normal ((State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a -> s -> m (Step s a)
step1 s
s) IOFinalizer
ref IO ()
release)
            Skip s
s ->
                Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a
-> m (Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a
 -> m (Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a))
-> Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a
-> m (Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a)
forall a b. (a -> b) -> a -> b
$ GbracketIO'State (Stream m a) IOFinalizer (IO ())
-> Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a
forall s a. s -> Step s a
Skip (Stream m a
-> IOFinalizer
-> IO ()
-> GbracketIO'State (Stream m a) IOFinalizer (IO ())
forall s ref release.
s -> ref -> release -> GbracketIO'State s ref release
GBracketIO'Normal ((State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a -> s -> m (Step s a)
step1 s
s) IOFinalizer
ref IO ()
release)
            Step s a
Stop ->
                IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IOFinalizer -> IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IOFinalizer -> IO a -> m a
clearingIOFinalizer IOFinalizer
ref IO ()
release) m ()
-> m (Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a)
-> m (Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a)
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a
-> m (Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a
forall s a. Step s a
Stop

-- | Like bracketIO, the only difference is that there is a guarantee that the
-- resources will be freed at the end of the monad level bracket
-- ('Streamly.Control.Exception.AcquireIO').
--
-- /Best case/: Cleanup happens immediately in the following cases:
--
-- * the stream is consumed completely
-- * an exception occurs in the bracketed part of the pipeline
--
-- /Worst case/: In the following cases cleanup is guaranteed to occur at the
-- end of the monad level bracket. However, if a GC occurs before that then
-- cleanup will occur early.
--
-- * the bracketed stream is partially consumed and abandoned
-- * pipeline is aborted due to an exception outside the bracket
--
-- Note: Instead of using this combinator you can directly use
-- 'Streamly.Control.Exception.acquire'
-- if you do not care about releasing the resource at the end of the stream
-- and if you are not recovering from an exception using 'handle'. You may want
-- to care about releasing the resource at the end of a stream if you are using
-- it in a nested manner (e.g. in concatMap).
--
-- /Inhibits stream fusion/
--
{-# INLINE bracketIO'' #-}
bracketIO'' :: (MonadIO m, MonadCatch m)
    => AcquireIO -> IO b -> (b -> IO c) -> (b -> Stream m a) -> Stream m a
bracketIO'' :: forall (m :: * -> *) b c a.
(MonadIO m, MonadCatch m) =>
AcquireIO -> IO b -> (b -> IO c) -> (b -> Stream m a) -> Stream m a
bracketIO'' AcquireIO
bracket IO b
alloc b -> IO c
free b -> Stream m a
action =
    (State StreamK m a
 -> GbracketIO'State (Stream m a) IOFinalizer (IO ())
 -> m (Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a))
-> GbracketIO'State (Stream m a) IOFinalizer (IO ()) -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a
-> GbracketIO'State (Stream m a) IOFinalizer (IO ())
-> m (Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a)
step GbracketIO'State (Stream m a) IOFinalizer (IO ())
forall s ref release. GbracketIO'State s ref release
GBracketIO'Init

    where

    {-# INLINE_LATE step #-}
    step :: State StreamK m a
-> GbracketIO'State (Stream m a) IOFinalizer (IO ())
-> m (Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a)
step State StreamK m a
_ GbracketIO'State (Stream m a) IOFinalizer (IO ())
GBracketIO'Init = do
        (b
r, IOFinalizer
ref, IO ()
release) <- IO (b, IOFinalizer, IO ()) -> m (b, IOFinalizer, IO ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (b, IOFinalizer, IO ()) -> m (b, IOFinalizer, IO ()))
-> IO (b, IOFinalizer, IO ()) -> m (b, IOFinalizer, IO ())
forall a b. (a -> b) -> a -> b
$ IO (b, IOFinalizer, IO ()) -> IO (b, IOFinalizer, IO ())
forall a. IO a -> IO a
mask_ (IO (b, IOFinalizer, IO ()) -> IO (b, IOFinalizer, IO ()))
-> IO (b, IOFinalizer, IO ()) -> IO (b, IOFinalizer, IO ())
forall a b. (a -> b) -> a -> b
$ do
            (b
r, IO ()
release) <- IO (b, IO ()) -> IO (b, IO ())
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (b, IO ()) -> IO (b, IO ())) -> IO (b, IO ()) -> IO (b, IO ())
forall a b. (a -> b) -> a -> b
$ AcquireIO -> IO b -> (b -> IO c) -> IO (b, IO ())
forall b c. AcquireIO -> IO b -> (b -> IO c) -> IO (b, IO ())
acquire AcquireIO
bracket IO b
alloc b -> IO c
free
            IOFinalizer
ref <- IO () -> IO IOFinalizer
forall (m :: * -> *) a. MonadIO m => IO a -> m IOFinalizer
newIOFinalizer IO ()
release
            (b, IOFinalizer, IO ()) -> IO (b, IOFinalizer, IO ())
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (b
r, IOFinalizer
ref, IO ()
release)
        Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a
-> m (Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a
 -> m (Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a))
-> Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a
-> m (Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a)
forall a b. (a -> b) -> a -> b
$ GbracketIO'State (Stream m a) IOFinalizer (IO ())
-> Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a
forall s a. s -> Step s a
Skip (GbracketIO'State (Stream m a) IOFinalizer (IO ())
 -> Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a)
-> GbracketIO'State (Stream m a) IOFinalizer (IO ())
-> Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a
forall a b. (a -> b) -> a -> b
$ Stream m a
-> IOFinalizer
-> IO ()
-> GbracketIO'State (Stream m a) IOFinalizer (IO ())
forall s ref release.
s -> ref -> release -> GbracketIO'State s ref release
GBracketIO'Normal (b -> Stream m a
action b
r) IOFinalizer
ref IO ()
release

    step State StreamK m a
gst (GBracketIO'Normal (UnStream State StreamK m a -> s -> m (Step s a)
step1 s
st) IOFinalizer
ref IO ()
release) = do
        -- If an async exception occurs before try or after try, in those cases
        -- the exception will not be intercepted here. In those cases the
        -- release action will run via AcquireIO release hook.
        Either SomeException (Step s a)
res <- m (Step s a) -> m (Either SomeException (Step s a))
forall (m :: * -> *) e a.
(HasCallStack, MonadCatch m, Exception e) =>
m a -> m (Either e a)
MC.try (m (Step s a) -> m (Either SomeException (Step s a)))
-> m (Step s a) -> m (Either SomeException (Step s a))
forall a b. (a -> b) -> a -> b
$ State StreamK m a -> s -> m (Step s a)
step1 State StreamK m a
gst s
st
        case Either SomeException (Step s a)
res of
            Right Step s a
r ->
                case Step s a
r of
                    Yield a
x s
s ->
                        Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a
-> m (Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return
                            (Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a
 -> m (Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a))
-> Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a
-> m (Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a)
forall a b. (a -> b) -> a -> b
$ a
-> GbracketIO'State (Stream m a) IOFinalizer (IO ())
-> Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a
forall s a. a -> s -> Step s a
Yield a
x (Stream m a
-> IOFinalizer
-> IO ()
-> GbracketIO'State (Stream m a) IOFinalizer (IO ())
forall s ref release.
s -> ref -> release -> GbracketIO'State s ref release
GBracketIO'Normal ((State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a -> s -> m (Step s a)
step1 s
s) IOFinalizer
ref IO ()
release)
                    Skip s
s ->
                        Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a
-> m (Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return
                            (Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a
 -> m (Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a))
-> Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a
-> m (Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a)
forall a b. (a -> b) -> a -> b
$ GbracketIO'State (Stream m a) IOFinalizer (IO ())
-> Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a
forall s a. s -> Step s a
Skip (Stream m a
-> IOFinalizer
-> IO ()
-> GbracketIO'State (Stream m a) IOFinalizer (IO ())
forall s ref release.
s -> ref -> release -> GbracketIO'State s ref release
GBracketIO'Normal ((State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a -> s -> m (Step s a)
step1 s
s) IOFinalizer
ref IO ()
release)
                    Step s a
Stop ->
                        IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IOFinalizer -> IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IOFinalizer -> IO a -> m a
clearingIOFinalizer IOFinalizer
ref IO ()
release) m ()
-> m (Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a)
-> m (Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a)
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a
-> m (Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a
forall s a. Step s a
Stop
            Left (SomeException
e :: SomeException) ->
                IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IOFinalizer -> IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IOFinalizer -> IO a -> m a
clearingIOFinalizer IOFinalizer
ref IO ()
release) m ()
-> m (Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a)
-> m (Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a)
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> SomeException
-> m (Step (GbracketIO'State (Stream m a) IOFinalizer (IO ())) a)
forall e a. (HasCallStack, Exception e) => e -> m a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
MC.throwM SomeException
e

-- | Like finallyIO, based on bracketIO' semantics.
{-# INLINE finallyIO' #-}
finallyIO' :: MonadIO m => AcquireIO -> IO b -> Stream m a -> Stream m a
finallyIO' :: forall (m :: * -> *) b a.
MonadIO m =>
AcquireIO -> IO b -> Stream m a -> Stream m a
finallyIO' AcquireIO
bracket IO b
free Stream m a
stream =
    AcquireIO
-> IO () -> (() -> IO b) -> (() -> Stream m a) -> Stream m a
forall (m :: * -> *) b c a.
MonadIO m =>
AcquireIO -> IO b -> (b -> IO c) -> (b -> Stream m a) -> Stream m a
bracketIO' AcquireIO
bracket (() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()) (IO b -> () -> IO b
forall a b. a -> b -> a
const IO b
free) (Stream m a -> () -> Stream m a
forall a b. a -> b -> a
const Stream m a
stream)

-- | Like finallyIO, based on bracketIO'' semantics.
{-# INLINE finallyIO'' #-}
finallyIO'' :: (MonadIO m, MonadCatch m) =>
    AcquireIO -> IO b -> Stream m a -> Stream m a
finallyIO'' :: forall (m :: * -> *) b a.
(MonadIO m, MonadCatch m) =>
AcquireIO -> IO b -> Stream m a -> Stream m a
finallyIO'' AcquireIO
bracket IO b
free Stream m a
stream =
    AcquireIO
-> IO () -> (() -> IO b) -> (() -> Stream m a) -> Stream m a
forall (m :: * -> *) b c a.
(MonadIO m, MonadCatch m) =>
AcquireIO -> IO b -> (b -> IO c) -> (b -> Stream m a) -> Stream m a
bracketIO'' AcquireIO
bracket (() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()) (IO b -> () -> IO b
forall a b. a -> b -> a
const IO b
free) (Stream m a -> () -> Stream m a
forall a b. a -> b -> a
const Stream m a
stream)

-- | Like 'bracketIO' but with on-demand allocations and manual release
-- facility.
--
-- Here is an example:
--
-- >>> :{
-- close x h = do
--  putStrLn $ "closing: " ++ x
--  hClose h
-- :}
--
-- >>> :{
-- generate ref =
--      Stream.fromList ["file1", "file2"]
--    & Stream.mapM
--        (\x -> do
--            (h, release) <- Exception.acquire ref (openFile x ReadMode) (close x)
--            -- use h here
--            threadDelay 1000000
--            when (x == "file1") $ do
--                putStrLn $ "Manually releasing: " ++ x
--                release
--            return x
--        )
--    & Stream.trace print
-- :}
--
-- >>> :{
-- run =
--     Stream.withAcquireIO generate
--         & Stream.fold Fold.drain
-- :}
--
-- In the above code, you should see the \"closing:\" message for both the
-- files, and only once for each file. Make sure you create "file1" and "file2"
-- before running it.
--
-- Here is an example for just registering hooks to be called eventually:
--
-- >>> :{
-- generate ref =
--      Stream.fromList ["file1", "file2"]
--    & Stream.mapM
--        (\x -> do
--            Exception.register ref $ putStrLn $ "saw: " ++ x
--            threadDelay 1000000
--            return x
--        )
--    & Stream.trace print
-- :}
--
-- >>> :{
-- run =
--     Stream.withAcquireIO generate
--         & Stream.fold Fold.drain
-- :}
--
-- In the above code, even if you interrupt the program with CTRL-C you should
-- still see the "saw:" message for the elements seen before the interrupt.
--
-- See 'bracketIO' documentation for the caveats related to partially consumed
-- streams and async exceptions.
--
-- Use monad level bracket Streamly.Control.Exception.'Streamly..Control.Exception.withAcquireIO'
-- for guaranteed cleanup in the entire pipeline, however, monad level bracket does not provide
-- an automatic cleanup at the end of the stream; you can only release
-- resources manually or via automatic cleanup at the end of the monad bracket.
-- The end of stream cleanup is useful especially in nested streams where we
-- want to cleanup at the end of every inner stream instead of waiting for the
-- outer stream to end for cleaning up to happen.
--
{-# INLINE withAcquireIO #-}
withAcquireIO :: (MonadIO m, MonadCatch m) =>
    (AcquireIO -> Stream m a) -> Stream m a
withAcquireIO :: forall (m :: * -> *) a.
(MonadIO m, MonadCatch m) =>
(AcquireIO -> Stream m a) -> Stream m a
withAcquireIO AcquireIO -> Stream m a
action = do
    IO (IORef (Int, IntMap (IO ()), IntMap (IO ())), AcquireIO)
-> ((IORef (Int, IntMap (IO ()), IntMap (IO ())), AcquireIO)
    -> IO ())
-> ((IORef (Int, IntMap (IO ()), IntMap (IO ())), AcquireIO)
    -> Stream m a)
-> Stream m a
forall (m :: * -> *) b c a.
(MonadIO m, MonadCatch m) =>
IO b -> (b -> IO c) -> (b -> Stream m a) -> Stream m a
bracketIO IO (IORef (Int, IntMap (IO ()), IntMap (IO ())), AcquireIO)
bef (IORef (Int, IntMap (IO ()), IntMap (IO ())) -> IO ()
forall (m :: * -> *) a b.
MonadIO m =>
IORef (a, IntMap (IO b), IntMap (IO b)) -> m ()
releaser (IORef (Int, IntMap (IO ()), IntMap (IO ())) -> IO ())
-> ((IORef (Int, IntMap (IO ()), IntMap (IO ())), AcquireIO)
    -> IORef (Int, IntMap (IO ()), IntMap (IO ())))
-> (IORef (Int, IntMap (IO ()), IntMap (IO ())), AcquireIO)
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (IORef (Int, IntMap (IO ()), IntMap (IO ())), AcquireIO)
-> IORef (Int, IntMap (IO ()), IntMap (IO ()))
forall a b. (a, b) -> a
fst) (\(IORef (Int, IntMap (IO ()), IntMap (IO ()))
_, AcquireIO
alloc) -> AcquireIO -> Stream m a
action AcquireIO
alloc)

    where

    bef :: IO (IORef (Int, IntMap (IO ()), IntMap (IO ())), AcquireIO)
bef = do
        -- Assuming 64-bit int counter will never overflow
        IORef (Int, IntMap (IO ()), IntMap (IO ()))
ref <- IO (IORef (Int, IntMap (IO ()), IntMap (IO ())))
-> IO (IORef (Int, IntMap (IO ()), IntMap (IO ())))
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef (Int, IntMap (IO ()), IntMap (IO ())))
 -> IO (IORef (Int, IntMap (IO ()), IntMap (IO ()))))
-> IO (IORef (Int, IntMap (IO ()), IntMap (IO ())))
-> IO (IORef (Int, IntMap (IO ()), IntMap (IO ())))
forall a b. (a -> b) -> a -> b
$ (Int, IntMap (IO ()), IntMap (IO ()))
-> IO (IORef (Int, IntMap (IO ()), IntMap (IO ())))
forall a. a -> IO (IORef a)
newIORef (Int
0 :: Int, IntMap (IO ())
forall a. IntMap a
Map.empty, IntMap (IO ())
forall a. IntMap a
Map.empty)
        (IORef (Int, IntMap (IO ()), IntMap (IO ())), AcquireIO)
-> IO (IORef (Int, IntMap (IO ()), IntMap (IO ())), AcquireIO)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (IORef (Int, IntMap (IO ()), IntMap (IO ()))
ref, (forall b c. Priority -> IO b -> (b -> IO c) -> IO (b, IO ()))
-> AcquireIO
AcquireIO (IORef (Int, IntMap (IO ()), IntMap (IO ()))
-> Priority -> IO b -> (b -> IO c) -> IO (b, IO ())
forall (m :: * -> *) a b.
MonadIO m =>
IORef (Int, IntMap (IO ()), IntMap (IO ()))
-> Priority -> IO a -> (a -> IO b) -> m (a, m ())
allocator IORef (Int, IntMap (IO ()), IntMap (IO ()))
ref))

-- | We can also combine the stream local 'withAcquireIO' with the global monad
-- level bracket
-- Streamly.Internal.Control.Exception.'Streamly.Internal.Control.Exception.withAcquireIO'.
-- The release actions returned by the local allocator can be registered to be
-- called by the monad level bracket. This way we can guarantee that in the
-- worst case release actions happen at the end of bracket and do not depend on
-- GC. This is the most powerful way of allocating resources on-demand with
-- manual release inside a stream. If required a custom combinator can be
-- written to register the local allocator's release in the global allocator
-- automatically.
--
-- /Unimplemented/
{-# INLINE withAcquireIO' #-}
withAcquireIO' :: -- (MonadIO m, MonadCatch m) =>
    AcquireIO -> (AcquireIO -> Stream m a) -> Stream m a
withAcquireIO' :: forall (m :: * -> *) a.
AcquireIO -> (AcquireIO -> Stream m a) -> Stream m a
withAcquireIO' AcquireIO
_globalAlloc AcquireIO -> Stream m a
_action = Stream m a
forall a. HasCallStack => a
undefined

data BracketState s v = BracketInit | BracketRun s v

-- | Alternate (custom) implementation of 'bracket'.
--
{-# INLINE_NORMAL _bracket #-}
_bracket :: MonadCatch m
    => m b -> (b -> m c) -> (b -> Stream m a) -> Stream m a
_bracket :: forall (m :: * -> *) b c a.
MonadCatch m =>
m b -> (b -> m c) -> (b -> Stream m a) -> Stream m a
_bracket m b
bef b -> m c
aft b -> Stream m a
bet = (State StreamK m a
 -> BracketState (Stream m a) b
 -> m (Step (BracketState (Stream m a) b) a))
-> BracketState (Stream m a) b -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a
-> BracketState (Stream m a) b
-> m (Step (BracketState (Stream m a) b) a)
step' BracketState (Stream m a) b
forall s v. BracketState s v
BracketInit

    where

    {-# INLINE_LATE step' #-}
    step' :: State StreamK m a
-> BracketState (Stream m a) b
-> m (Step (BracketState (Stream m a) b) a)
step' State StreamK m a
_ BracketState (Stream m a) b
BracketInit = m b
bef m b
-> (b -> m (Step (BracketState (Stream m a) b) a))
-> m (Step (BracketState (Stream m a) b) a)
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \b
x -> Step (BracketState (Stream m a) b) a
-> m (Step (BracketState (Stream m a) b) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (BracketState (Stream m a) b -> Step (BracketState (Stream m a) b) a
forall s a. s -> Step s a
Skip (Stream m a -> b -> BracketState (Stream m a) b
forall s v. s -> v -> BracketState s v
BracketRun (b -> Stream m a
bet b
x) b
x))

    -- NOTE: It is important to use UnStream instead of the Stream pattern
    -- here, otherwise we get huge perf degradation, see note in concatMap.
    step' State StreamK m a
gst (BracketRun (UnStream State StreamK m a -> s -> m (Step s a)
step s
state) b
v) = do
        -- res <- step gst state `MC.onException` aft v
        Either SomeException (Step s a)
res <- (m (Step s a) -> m (Either SomeException (Step s a)))
-> m (Step s a) -> m (Either SomeException (Step s a))
forall a. a -> a
inline m (Step s a) -> m (Either SomeException (Step s a))
forall (m :: * -> *) e a.
(HasCallStack, MonadCatch m, Exception e) =>
m a -> m (Either e a)
MC.try (m (Step s a) -> m (Either SomeException (Step s a)))
-> m (Step s a) -> m (Either SomeException (Step s a))
forall a b. (a -> b) -> a -> b
$ State StreamK m a -> s -> m (Step s a)
step State StreamK m a
gst s
state
        case Either SomeException (Step s a)
res of
            Left (SomeException
e :: SomeException) -> b -> m c
aft b
v m c -> m Any -> m Any
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> SomeException -> m Any
forall e a. (HasCallStack, Exception e) => e -> m a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
MC.throwM SomeException
e m Any
-> m (Step (BracketState (Stream m a) b) a)
-> m (Step (BracketState (Stream m a) b) a)
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Step (BracketState (Stream m a) b) a
-> m (Step (BracketState (Stream m a) b) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Step (BracketState (Stream m a) b) a
forall s a. Step s a
Stop
            Right Step s a
r -> case Step s a
r of
                Yield a
x s
s -> Step (BracketState (Stream m a) b) a
-> m (Step (BracketState (Stream m a) b) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (BracketState (Stream m a) b) a
 -> m (Step (BracketState (Stream m a) b) a))
-> Step (BracketState (Stream m a) b) a
-> m (Step (BracketState (Stream m a) b) a)
forall a b. (a -> b) -> a -> b
$ a
-> BracketState (Stream m a) b
-> Step (BracketState (Stream m a) b) a
forall s a. a -> s -> Step s a
Yield a
x (Stream m a -> b -> BracketState (Stream m a) b
forall s v. s -> v -> BracketState s v
BracketRun ((State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a -> s -> m (Step s a)
step s
s) b
v)
                Skip s
s    -> Step (BracketState (Stream m a) b) a
-> m (Step (BracketState (Stream m a) b) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (BracketState (Stream m a) b) a
 -> m (Step (BracketState (Stream m a) b) a))
-> Step (BracketState (Stream m a) b) a
-> m (Step (BracketState (Stream m a) b) a)
forall a b. (a -> b) -> a -> b
$ BracketState (Stream m a) b -> Step (BracketState (Stream m a) b) a
forall s a. s -> Step s a
Skip (Stream m a -> b -> BracketState (Stream m a) b
forall s v. s -> v -> BracketState s v
BracketRun ((State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a -> s -> m (Step s a)
step s
s) b
v)
                Step s a
Stop      -> b -> m c
aft b
v m c
-> m (Step (BracketState (Stream m a) b) a)
-> m (Step (BracketState (Stream m a) b) a)
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Step (BracketState (Stream m a) b) a
-> m (Step (BracketState (Stream m a) b) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Step (BracketState (Stream m a) b) a
forall s a. Step s a
Stop

-- | Like 'finally' with following differences:
--
-- * action @m b@ won't run if the stream is garbage collected
--   after partial evaluation.
-- * has slightly better performance than 'finallyIO'.
--
-- /Inhibits stream fusion/
--
-- /Pre-release/
--
{-# INLINE finallyUnsafe #-}
finallyUnsafe :: MonadCatch m => m b -> Stream m a -> Stream m a
finallyUnsafe :: forall (m :: * -> *) b a.
MonadCatch m =>
m b -> Stream m a -> Stream m a
finallyUnsafe m b
action Stream m a
xs = m () -> (() -> m b) -> (() -> Stream m a) -> Stream m a
forall (m :: * -> *) b c a.
MonadCatch m =>
m b -> (b -> m c) -> (b -> Stream m a) -> Stream m a
bracketUnsafe (() -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()) (m b -> () -> m b
forall a b. a -> b -> a
const m b
action) (Stream m a -> () -> Stream m a
forall a b. a -> b -> a
const Stream m a
xs)

-- | Run the action @IO b@ whenever the stream stream stops normally, aborts
-- due to an exception or if it is garbage collected after a partial lazy
-- evaluation.
--
-- The semantics of running the action @IO b@ are similar to the cleanup action
-- semantics described in 'bracketIO'.
--
-- >>> finallyIO release stream = Stream.bracketIO (return ()) (const release) (const stream)
--
-- See also finallyIO' for stricter resource release guarantees.
--
-- /See also 'finallyUnsafe'/
--
-- /Inhibits stream fusion/
--
{-# INLINE finallyIO #-}
finallyIO :: (MonadIO m, MonadCatch m) => IO b -> Stream m a -> Stream m a
finallyIO :: forall (m :: * -> *) b a.
(MonadIO m, MonadCatch m) =>
IO b -> Stream m a -> Stream m a
finallyIO IO b
action Stream m a
xs = IO ()
-> (() -> IO b)
-> (() -> IO b)
-> (() -> IO b)
-> (() -> Stream m a)
-> Stream m a
forall (m :: * -> *) b c d e a.
(MonadIO m, MonadCatch m) =>
IO b
-> (b -> IO c)
-> (b -> IO d)
-> (b -> IO e)
-> (b -> Stream m a)
-> Stream m a
bracketIO3 (() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()) () -> IO b
forall {p}. p -> IO b
act () -> IO b
forall {p}. p -> IO b
act () -> IO b
forall {p}. p -> IO b
act (Stream m a -> () -> Stream m a
forall a b. a -> b -> a
const Stream m a
xs)
    where act :: p -> IO b
act p
_ = IO b
action

-- | Like 'handle' but the exception handler is also provided with the stream
-- that generated the exception as input. The exception handler can thus
-- re-evaluate the stream to retry the action that failed. The exception
-- handler can again call 'ghandle' on it to retry the action multiple times.
--
-- This is highly experimental. In a stream of actions we can map the stream
-- with a retry combinator to retry each action on failure.
--
-- /Inhibits stream fusion/
--
-- /Pre-release/
--
{-# INLINE_NORMAL ghandle #-}
ghandle :: (MonadCatch m, Exception e)
    => (e -> Stream m a -> m (Stream m a)) -> Stream m a -> Stream m a
ghandle :: forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
(e -> Stream m a -> m (Stream m a)) -> Stream m a -> Stream m a
ghandle e -> Stream m a -> m (Stream m a)
f Stream m a
stream =
    m ()
-> (() -> m ())
-> (() -> e -> Stream m a -> m (Stream m a))
-> (forall s. m s -> m (Either e s))
-> (() -> Stream m a)
-> Stream m a
forall (m :: * -> *) c d e b.
Monad m =>
m c
-> (c -> m d)
-> (c -> e -> Stream m b -> m (Stream m b))
-> (forall s. m s -> m (Either e s))
-> (c -> Stream m b)
-> Stream m b
gbracket_ (() -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()) () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ((e -> Stream m a -> m (Stream m a))
-> () -> e -> Stream m a -> m (Stream m a)
forall a b. a -> b -> a
const e -> Stream m a -> m (Stream m a)
f) ((m s -> m (Either e s)) -> m s -> m (Either e s)
forall a. a -> a
inline m s -> m (Either e s)
forall (m :: * -> *) e a.
(HasCallStack, MonadCatch m, Exception e) =>
m a -> m (Either e a)
MC.try) (Stream m a -> () -> Stream m a
forall a b. a -> b -> a
const Stream m a
stream)

-- | When evaluating a stream if an exception occurs, stream evaluation aborts
-- and the specified exception handler is run with the exception as argument.
-- The exception is caught and handled unless the handler decides to rethrow
-- it. Note that exception handling is not applied to the stream returned by
-- the exception handler.
--
-- Observes exceptions only in the stream generation, and not in stream
-- consumers.
--
-- /Inhibits stream fusion/
--
{-# INLINE_NORMAL handle #-}
handle :: (MonadCatch m, Exception e)
    => (e -> m (Stream m a)) -> Stream m a -> Stream m a
handle :: forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
(e -> m (Stream m a)) -> Stream m a -> Stream m a
handle e -> m (Stream m a)
f Stream m a
stream =
    m ()
-> (() -> m ())
-> (() -> e -> Stream m a -> m (Stream m a))
-> (forall s. m s -> m (Either e s))
-> (() -> Stream m a)
-> Stream m a
forall (m :: * -> *) c d e b.
Monad m =>
m c
-> (c -> m d)
-> (c -> e -> Stream m b -> m (Stream m b))
-> (forall s. m s -> m (Either e s))
-> (c -> Stream m b)
-> Stream m b
gbracket_ (() -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()) () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (\()
_ e
e Stream m a
_ -> e -> m (Stream m a)
f e
e) ((m s -> m (Either e s)) -> m s -> m (Either e s)
forall a. a -> a
inline m s -> m (Either e s)
forall (m :: * -> *) e a.
(HasCallStack, MonadCatch m, Exception e) =>
m a -> m (Either e a)
MC.try) (Stream m a -> () -> Stream m a
forall a b. a -> b -> a
const Stream m a
stream)

-- | Alternate (custom) implementation of 'handle'.
--
{-# INLINE_NORMAL _handle #-}
_handle :: (MonadCatch m, Exception e)
    => (e -> Stream m a) -> Stream m a -> Stream m a
_handle :: forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
(e -> Stream m a) -> Stream m a -> Stream m a
_handle e -> Stream m a
f (Stream State StreamK m a -> s -> m (Step s a)
step s
state) = (State StreamK m a
 -> Either s (Stream m a) -> m (Step (Either s (Stream m a)) a))
-> Either s (Stream m a) -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a
-> Either s (Stream m a) -> m (Step (Either s (Stream m a)) a)
step' (s -> Either s (Stream m a)
forall a b. a -> Either a b
Left s
state)

    where

    {-# INLINE_LATE step' #-}
    step' :: State StreamK m a
-> Either s (Stream m a) -> m (Step (Either s (Stream m a)) a)
step' State StreamK m a
gst (Left s
st) = do
        Either e (Step s a)
res <- (m (Step s a) -> m (Either e (Step s a)))
-> m (Step s a) -> m (Either e (Step s a))
forall a. a -> a
inline m (Step s a) -> m (Either e (Step s a))
forall (m :: * -> *) e a.
(HasCallStack, MonadCatch m, Exception e) =>
m a -> m (Either e a)
MC.try (m (Step s a) -> m (Either e (Step s a)))
-> m (Step s a) -> m (Either e (Step s a))
forall a b. (a -> b) -> a -> b
$ State StreamK m a -> s -> m (Step s a)
step State StreamK m a
gst s
st
        case Either e (Step s a)
res of
            Left e
e -> Step (Either s (Stream m a)) a
-> m (Step (Either s (Stream m a)) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Either s (Stream m a)) a
 -> m (Step (Either s (Stream m a)) a))
-> Step (Either s (Stream m a)) a
-> m (Step (Either s (Stream m a)) a)
forall a b. (a -> b) -> a -> b
$ Either s (Stream m a) -> Step (Either s (Stream m a)) a
forall s a. s -> Step s a
Skip (Either s (Stream m a) -> Step (Either s (Stream m a)) a)
-> Either s (Stream m a) -> Step (Either s (Stream m a)) a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Either s (Stream m a)
forall a b. b -> Either a b
Right (e -> Stream m a
f e
e)
            Right Step s a
r -> case Step s a
r of
                Yield a
x s
s -> Step (Either s (Stream m a)) a
-> m (Step (Either s (Stream m a)) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Either s (Stream m a)) a
 -> m (Step (Either s (Stream m a)) a))
-> Step (Either s (Stream m a)) a
-> m (Step (Either s (Stream m a)) a)
forall a b. (a -> b) -> a -> b
$ a -> Either s (Stream m a) -> Step (Either s (Stream m a)) a
forall s a. a -> s -> Step s a
Yield a
x (s -> Either s (Stream m a)
forall a b. a -> Either a b
Left s
s)
                Skip s
s    -> Step (Either s (Stream m a)) a
-> m (Step (Either s (Stream m a)) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Either s (Stream m a)) a
 -> m (Step (Either s (Stream m a)) a))
-> Step (Either s (Stream m a)) a
-> m (Step (Either s (Stream m a)) a)
forall a b. (a -> b) -> a -> b
$ Either s (Stream m a) -> Step (Either s (Stream m a)) a
forall s a. s -> Step s a
Skip (s -> Either s (Stream m a)
forall a b. a -> Either a b
Left s
s)
                Step s a
Stop      -> Step (Either s (Stream m a)) a
-> m (Step (Either s (Stream m a)) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Step (Either s (Stream m a)) a
forall s a. Step s a
Stop

    step' State StreamK m a
gst (Right (UnStream State StreamK m a -> s -> m (Step s a)
step1 s
st)) = do
        Step s a
res <- State StreamK m a -> s -> m (Step s a)
step1 State StreamK m a
gst s
st
        case Step s a
res of
            Yield a
x s
s -> Step (Either s (Stream m a)) a
-> m (Step (Either s (Stream m a)) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Either s (Stream m a)) a
 -> m (Step (Either s (Stream m a)) a))
-> Step (Either s (Stream m a)) a
-> m (Step (Either s (Stream m a)) a)
forall a b. (a -> b) -> a -> b
$ a -> Either s (Stream m a) -> Step (Either s (Stream m a)) a
forall s a. a -> s -> Step s a
Yield a
x (Stream m a -> Either s (Stream m a)
forall a b. b -> Either a b
Right ((State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a -> s -> m (Step s a)
step1 s
s))
            Skip s
s    -> Step (Either s (Stream m a)) a
-> m (Step (Either s (Stream m a)) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Either s (Stream m a)) a
 -> m (Step (Either s (Stream m a)) a))
-> Step (Either s (Stream m a)) a
-> m (Step (Either s (Stream m a)) a)
forall a b. (a -> b) -> a -> b
$ Either s (Stream m a) -> Step (Either s (Stream m a)) a
forall s a. s -> Step s a
Skip (Stream m a -> Either s (Stream m a)
forall a b. b -> Either a b
Right ((State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a -> s -> m (Step s a)
step1 s
s))
            Step s a
Stop      -> Step (Either s (Stream m a)) a
-> m (Step (Either s (Stream m a)) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Step (Either s (Stream m a)) a
forall s a. Step s a
Stop