streamly-core
Copyright(c) 2019 Composewell Technologies
LicenseBSD3
Maintainerstreamly@composewell.com
Stabilityexperimental
PortabilityGHC
Safe HaskellNone
LanguageHaskell2010

Streamly.Internal.Control.Exception

Description

Additional Control.Exception utilities.

Synopsis

Verify

verify :: Bool -> a -> a Source #

Like assert but is not removed by the compiler, it is always present in production code.

Pre-release

verifyM :: Applicative f => Bool -> f () Source #

Resource Management

Exception safe, thread safe resource managment operations, similar to but more powerful than the bracket and finally operations available in the base package.

These operations support allocation and free only in the IO monad, hence the IO suffix.

newtype AcquireIO Source #

AcquireIO is used to acquire a resource safely such that it is automatically released if not released manually.

See withAcquireIO.

Constructors

AcquireIO (forall b c. Priority -> IO b -> (b -> IO c) -> IO (b, IO ())) 

data Priority Source #

Resources with Priority1 are freed before Priority2. Priority is especially introduced to take care of the case where we need to free concurrency channels, so that all the workers of the channel are cleaned up before we free the resources allocated by the workers of the channel. Otherwise we might free the resources and workers may be trying to use them and start misbehaving.

Constructors

Priority1 
Priority2 

Instances

Instances details
Show Priority Source # 
Instance details

Defined in Streamly.Internal.Control.Exception

allocator :: MonadIO m => IORef (Int, IntMap (IO ()), IntMap (IO ())) -> Priority -> IO a -> (a -> IO b) -> m (a, m ()) Source #

Internal.

releaser :: MonadIO m => IORef (a, IntMap (IO b), IntMap (IO b)) -> m () Source #

We ensure that all async workers for concurrent streams are stopped before we release the resources so that nobody could be using the resource after they are freed.

The only other possibility, could be user issued forkIO not being tracked by us, however, that would be a programming error and any such threads could misbehave if we freed the resources from under them.

We use GC based hooks in 'Stream.bracketIO'' so there could be async threads spawned by GC, releasing resources concurrently with us. For that reason we need to make sure that the "release" in the bracket end action is executed only once in that case.

Internal.

withAcquireIO :: (MonadIO m, MonadMask m) => (AcquireIO -> m a) -> m a Source #

withAcquireIO action runs the given action, providing it with a an AcquireIO reference called ref as argument. ref is used for resource acquisition or hook registeration within the scope of action. An acquire ref alloc free call can be used within action any number of times to acquire resources that are automatically freed when the scope of action ends or if an exception occurs at any time. alloc is a function supplied by the user to allocate a resource and free is supplied to free the allocated resource. acquire returns (resource, release) -- the acquired resource and a release action to release it.

acquire allocates a resource in an exception safe manner and sets up its automatic release on exception or when the scope of action ends. The release function returned by acquire can be used to free the resource manually at any time. release is guaranteed to free the resource once and only once even if it is called concurrently or multiple times.

Here is an example to allocate resources that are guaranteed to be released automatically, and can be released manually as well:

>>> :{
close x h = do
 putStrLn $ "closing: " ++ x
 hClose h
:}
>>> :{
action ref =
     Stream.fromList ["file1", "file2"]
   & Stream.mapM
       (\x -> do
           (h, release) <- Exception.acquire ref (openFile x ReadMode) (close x)
           -- use h here
           threadDelay 1000000
           when (x == "file1") $ do
               putStrLn $ "Manually releasing: " ++ x
               release
           return x
       )
   & Stream.trace print
   & Stream.fold Fold.drain
:}
>>> run = Exception.withAcquireIO action

In the above code, you should see the "closing:" message for both the files, and only once for each file. Even if you interrupt the program with CTRL-C you should still see the "closing:" message for the files opened before the interrupt. Make sure you create "file1" and "file2" before running this code snippet.

Cleanup is guaranteed to happen as soon as the scope of action finishes or if an exception occurs.

Here is an example for just registering hooks to be called eventually:

>>> :{
action ref =
     Stream.fromList ["file1", "file2"]
   & Stream.mapM
       (\x -> do
           Exception.register ref $ putStrLn $ "saw: " ++ x
           threadDelay 1000000
           return x
       )
   & Stream.trace print
   & Stream.fold Fold.drain
:}
>>> run = Exception.withAcquireIO action

In the above code, even if you interrupt the program with CTRL-C you should still see the "saw:" message for the elements seen before the interrupt.

The registered hooks are guaranteed to be invoked as soon as the scope of action finishes or if an exception occurs.

This function provides functionality similar to the bracket function available in the base library. However, it is more powerful as any number of resources can be allocated and released within the scope of action.

Exception safe, thread safe.

acquireWith :: Priority -> AcquireIO -> IO b -> (b -> IO c) -> IO (b, IO ()) Source #

Like acquire but allows specifying a priority for releasing the resource. Priority1 resources are released before Priority2. This allows us to specify a dependency between resource release.

acquire :: AcquireIO -> IO b -> (b -> IO c) -> IO (b, IO ()) Source #

acquire ref alloc free is used in bracket-style safe resource allocation functions, where alloc is a function supplied by the user to allocate a resource and free is supplied to free it. acquire returns a tuple (resource, release) where resource is the allocated resource and release is an action that can be called later to release the resource. Both alloc and free are invoked with async signals masked. You can use allowInterrupt from base package for allowing interrupts if required.

The release action can be called multiple times or even concurrently from multiple threads, but it will release the resource only once. If release is never called by the programmer it will be automatically called at the end of the bracket scope.

acquire_ :: AcquireIO -> IO b -> (b -> IO c) -> IO b Source #

Like acquire but does not return a release action. The resource is freed automatically only.

registerWith :: Priority -> AcquireIO -> IO () -> IO () Source #

Like register but specifies a Priority for calling the hook.

register :: AcquireIO -> IO () -> IO () Source #

Register a hook to be executed at the end of a bracket.

hook :: AcquireIO -> IO () -> IO (IO ()) Source #

Like register but returns a hook release function as well. When the returned hook release function is called, the hook is invoked and removed. If the returned function is never called by the programmer then it is automatically invoked at the end of the bracket. The hook is invoked once and only once.