polysemy-conc-0.14.1.0: Polysemy effects for concurrency
Safe HaskellSafe-Inferred
LanguageGHC2021

Polysemy.Conc

Description

 
Synopsis

Introduction

This library provides an assortment of tools for concurrency-related tasks:

Queues

data Queue d :: Effect Source #

Abstracts queues like TBQueue.

For documentation on the constructors, see the module Polysemy.Conc.Data.Queue.

import Polysemy.Conc (Queue, QueueResult)
import Polysemy.Conc.Effect.Queue as Queue

prog :: Member (Queue Int) r => Sem r (QueueResult Int)
prog = do
  Queue.write 5
  Queue.write 10
  Queue.read >>= \case
    QueueResult.Success i -> fmap (i +) <$> Queue.read
    r -> pure r

data QueueResult d Source #

Encodes failure reasons for queues.

For documentation on the constructors, see the module Polysemy.Conc.Data.QueueResult.

import qualified Polysemy.Conc.Data.QueueResult as QueueResult

Instances

Instances details
Functor QueueResult Source # 
Instance details

Defined in Polysemy.Conc.Data.QueueResult

Methods

fmap :: (a -> b) -> QueueResult a -> QueueResult b #

(<$) :: a -> QueueResult b -> QueueResult a #

Monoid d => Monoid (QueueResult d) Source # 
Instance details

Defined in Polysemy.Conc.Data.QueueResult

Semigroup d => Semigroup (QueueResult d) Source # 
Instance details

Defined in Polysemy.Conc.Data.QueueResult

Generic (QueueResult d) Source # 
Instance details

Defined in Polysemy.Conc.Data.QueueResult

Associated Types

type Rep (QueueResult d) :: Type -> Type #

Methods

from :: QueueResult d -> Rep (QueueResult d) x #

to :: Rep (QueueResult d) x -> QueueResult d #

Show d => Show (QueueResult d) Source # 
Instance details

Defined in Polysemy.Conc.Data.QueueResult

Eq d => Eq (QueueResult d) Source # 
Instance details

Defined in Polysemy.Conc.Data.QueueResult

Ord d => Ord (QueueResult d) Source # 
Instance details

Defined in Polysemy.Conc.Data.QueueResult

type Rep (QueueResult d) Source # 
Instance details

Defined in Polysemy.Conc.Data.QueueResult

type Rep (QueueResult d) = D1 ('MetaData "QueueResult" "Polysemy.Conc.Data.QueueResult" "polysemy-conc-0.14.1.0-5GXcfqzrZMn5wiB51wOodp" 'False) (C1 ('MetaCons "Success" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 d)) :+: (C1 ('MetaCons "NotAvailable" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "Closed" 'PrefixI 'False) (U1 :: Type -> Type)))

Interpreters

interpretQueueTBM Source #

Arguments

:: forall d r. Members [Resource, Race, Embed IO] r 
=> Int

Buffer size

-> InterpreterFor (Queue d) r 

Interpret Queue with a TBMQueue.

interpretQueueTB Source #

Arguments

:: forall d r. Members [Race, Embed IO] r 
=> Natural

Buffer size

-> InterpreterFor (Queue d) r 

Interpret Queue with a TBQueue.

interpretQueueListReadOnlyAtomicWith :: forall d r. Member (AtomicState [d]) r => InterpreterFor (Queue d) r Source #

Reinterpret Queue as AtomicState with a list that cannot be written to. Useful for testing.

interpretQueueListReadOnlyStateWith :: forall d r. Member (State [d]) r => InterpreterFor (Queue d) r Source #

Reinterpret Queue as State with a list that cannot be written to. Useful for testing.

Combinators

loop :: Member (Queue d) r => (d -> Sem r ()) -> Sem r () Source #

Read from a Queue repeatedly until it is closed.

When an element is received, call action and recurse.

loopOr :: Member (Queue d) r => Sem r Bool -> (d -> Sem r Bool) -> Sem r () Source #

Read from a Queue repeatedly until it is closed.

When an element is received, call action and recurse if it returns True. When no element is available, evaluate na and recurse if it returns True.

MVars

An MVar is abstracted as Sync since it can be used to synchronize threads.

data Sync d :: Effect Source #

Abstracts an MVar.

For documentation on the constructors, see the module Polysemy.Conc.Effect.Sync.

import Polysemy.Conc (Sync)
import qualified Polysemy.Conc.Effect.Sync as Sync

prog :: Member (Sync Int) r => Sem r Int
prog = do
  Sync.putTry 5
  Sync.takeBlock

data SyncRead (d :: Type) :: Effect Source #

An interface to a shared variable (MVar) that can only be read.

type ScopedSync a = Scoped_ (Sync a) Source #

Convenience alias.

Interpreters

interpretSync :: forall d r. Members [Race, Embed IO] r => InterpreterFor (Sync d) r Source #

Interpret Sync with an empty MVar.

interpretSyncAs :: forall d r. Members [Race, Embed IO] r => d -> InterpreterFor (Sync d) r Source #

Interpret Sync with an MVar containing the specified value.

withSync :: forall d r. Member (ScopedSync d) r => InterpreterFor (Sync d) r Source #

Run an action with a locally scoped Sync variable.

This avoids a dependency on Embed IO in application logic while still allowing the variable to be scoped.

interpretScopedSync :: forall d r. Members [Resource, Race, Embed IO] r => InterpreterFor (Scoped_ (Sync d)) r Source #

Interpret Sync for locally scoped use with an empty MVar.

interpretScopedSyncAs :: forall d r. Members [Resource, Race, Embed IO] r => d -> InterpreterFor (Scoped_ (Sync d)) r Source #

Interpret Sync for locally scoped use with an MVar containing the specified value.

syncRead :: forall d r. Member (Sync d) r => InterpreterFor (SyncRead d) r Source #

Run SyncRead in terms of Sync.

Lock

data Lock :: Effect Source #

An exclusive lock or mutex, protecting a region from concurrent access.

lock :: forall r a. Member Lock r => Sem r a -> Sem r a Source #

Run an action if the lock is available, block otherwise.

lockOr :: forall r a. Member Lock r => Sem r a -> Sem r a -> Sem r a Source #

Run the second action if the lock is available, or the first action otherwise.

lockOrSkip :: forall r a. Member Lock r => Sem r a -> Sem r (Maybe a) Source #

Run an action if the lock is available, skip and return Nothing otherwise.

lockOrSkip_ :: forall r a. Member Lock r => Sem r a -> Sem r () Source #

Run an action if the lock is available, skip otherwise. Return ().

Interpreters

interpretLockReentrant :: Members [Resource, Race, Mask, Embed IO] r => InterpreterFor Lock r Source #

Interpret Lock as a reentrant lock, allowing nested calls to lock unless called from a different thread (as in, async was called in a higher-order action passed to lock.)

interpretLockPermissive :: InterpreterFor Lock r Source #

Interpret Lock by executing all actions unconditionally.

Semaphores

data Semaphore :: Effect Source #

This effect abstracts over the concept of a quantity semaphore, a concurrency primitive that contains a number of slots that can be acquired and released.

Interpreters

Gate

data Gate :: Effect Source #

A single-use synchronization point that blocks all consumers who called gate until signal is called.

The constructors are exported from Polysemy.Conc.Gate.

type Gates = Scoped_ Gate Source #

Convenience alias for scoped Gate.

Interpreters

interpretGate :: forall r. Member (Embed IO) r => InterpreterFor Gate r Source #

Interpret Gate with an MVar.

Racing

Racing works like this:

prog =
 Polysemy.Conc.race (httpRequest "hackage.haskell.org") (readFile "/path/to/file") >>= \case
   Left _ -> putStrLn "hackage was faster"
   Right _ -> putStrLn "file was faster"

When the first thunk finishes, the other will be killed.

data Race :: Effect Source #

Abstract the concept of running two programs concurrently, aborting the other when one terminates. Timeout is a simpler variant, where one thread just sleeps for a given interval.

race :: forall a b r. Member Race r => Sem r a -> Sem r b -> Sem r (Either a b) Source #

Run both programs concurrently, returning the result of the faster one.

race_ :: Member Race r => Sem r a -> Sem r a -> Sem r a Source #

Specialization of race for the case where both actions return the same type, obviating the need for Either.

timeout :: forall a b u r. TimeUnit u => Member Race r => Sem r a -> u -> Sem r b -> Sem r (Either a b) Source #

Run the first action if the second action doesn't finish within the specified interval.

timeout_ :: TimeUnit u => Member Race r => Sem r a -> u -> Sem r a -> Sem r a Source #

Specialization of timeout for the case where the main action returns the same type as the fallback, obviating the need for Either.

timeoutAs :: TimeUnit u => Member Race r => a -> u -> Sem r b -> Sem r (Either a b) Source #

Version of timeout that takes a pure fallback value.

timeoutAs_ :: TimeUnit u => Member Race r => a -> u -> Sem r a -> Sem r a Source #

Specialization of timeoutAs for the case where the main action return the same type as the fallback, obviating the need for Either.

timeoutU :: TimeUnit u => Member Race r => u -> Sem r () -> Sem r () Source #

Specialization of timeout for unit actions.

timeoutMaybe :: TimeUnit u => Member Race r => u -> Sem r a -> Sem r (Maybe a) Source #

Variant of timeout that returns Maybe.

timeoutStop :: TimeUnit u => Members [Race, Stop err] r => err -> u -> Sem r a -> Sem r a Source #

Variant of timeout that calls Stop with the supplied error when the action times out.

retrying Source #

Arguments

:: forall e w u t d r a. TimeUnit w 
=> TimeUnit u 
=> Members [Race, Time t d] r 
=> w

The timeout after which the attempt is abandoned.

-> u

The waiting interval between two tries.

-> Sem r (Either e a) 
-> Sem r (Maybe a) 

Run an action repeatedly until it returns Right or the timout has been exceeded.

retryingWithError Source #

Arguments

:: forall e w u t d r a. TimeUnit w 
=> TimeUnit u 
=> Members [Race, Time t d, Embed IO] r 
=> w

The timeout after which the attempt is abandoned.

-> u

The waiting interval between two tries.

-> Sem r (Either e a) 
-> Sem r (Maybe (Either e a)) 

Run an action repeatedly until it returns Right or the timout has been exceeded.

If the action failed at least once, the last error will be returned in case of timeout.

Interpreters

interpretRace :: Member (Final IO) r => InterpreterFor Race r Source #

Interpret Race in terms of race and timeout. Since this has to pass higher-order thunks as IO arguments, it is interpreted in terms of 'Final IO'.

Event Channels

data Events (e :: Type) :: Effect Source #

An event publisher that can be consumed from multiple threads.

data Consume (e :: Type) :: Effect Source #

Consume events emitted by Events.

publish :: forall e r. Member (Events e) r => e -> Sem r () Source #

Publish one event.

consume :: forall e r. Member (Consume e) r => Sem r e Source #

Consume one event emitted by Events.

subscribe :: forall e r. Member (Scoped_ (Consume e)) r => InterpreterFor (Consume e) r Source #

Create a new scope for Events, causing the nested program to get its own copy of the event stream. To be used with interpretEventsChan.

subscribeGated :: forall e r. Members [EventConsumer e, Gate] r => InterpreterFor (Consume e) r Source #

Create a new scope for Events, causing the nested program to get its own copy of the event stream.

Calls signal before running the argument to ensure that subscribe has finished creating a channel, for use with asynchronous execution.

subscribeAsync :: forall e r a. Members [EventConsumer e, Scoped_ Gate, Resource, Race, Async] r => Sem (Consume e : r) () -> Sem r a -> Sem r a Source #

Create a new scope for Events, causing the nested program to get its own copy of the event stream.

Executes in a new thread, ensuring that the main thread blocks until subscribe has finished creating a channel.

subscribeWhile :: forall e r. Member (EventConsumer e) r => (e -> Sem r Bool) -> Sem r () Source #

Pull repeatedly from the Events channel, passing the event to the supplied callback. Stop when the action returns False.

subscribeWhileGated :: forall e r. Members [EventConsumer e, Gate] r => (e -> Sem r Bool) -> Sem r () Source #

Pull repeatedly from the Events channel, passing the event to the supplied callback. Stop when the action returns False.

Signals the caller that the channel was successfully subscribed to using the Gate effect.

subscribeWhileAsync :: forall e r a. Members [EventConsumer e, Gates, Resource, Race, Async] r => (e -> Sem (Consume e : r) Bool) -> Sem r a -> Sem r a Source #

Start a new thread that pulls repeatedly from the Events channel, passing the event to the supplied callback and stops when the action returns False.

subscribeLoop :: forall e r. Member (EventConsumer e) r => (e -> Sem r ()) -> Sem r () Source #

Pull repeatedly from the Events channel, passing the event to the supplied callback.

subscribeLoopGated :: forall e r. Members [EventConsumer e, Gate] r => (e -> Sem r ()) -> Sem r () Source #

Pull repeatedly from the Events channel, passing the event to the supplied callback.

Signals the caller that the channel was successfully subscribed to using the Gate effect.

subscribeLoopAsync :: forall e r a. Members [EventConsumer e, Gates, Resource, Race, Async] r => (e -> Sem (Consume e : r) ()) -> Sem r a -> Sem r a Source #

Start a new thread that pulls repeatedly from the Events channel, passing the event to the supplied callback.

subscribeFind :: forall e r. Member (EventConsumer e) r => (e -> Sem (Consume e : r) Bool) -> Sem r e Source #

Block until a value matching the predicate has been published to the Events channel.

subscribeFirstJust :: forall e a r. Member (EventConsumer e) r => (e -> Sem (Consume e : r) (Maybe a)) -> Sem r a Source #

Return the first value published to the Events channel for which the function produces Just.

subscribeElem :: forall e r. Eq e => Member (EventConsumer e) r => e -> Sem r () Source #

Block until the specified value has been published to the Events channel.

consumeWhile :: Member (Consume e) r => (e -> Sem r Bool) -> Sem r () Source #

Pull repeatedly from Consume, passing the event to the supplied callback. Stop when the action returns False.

consumeLoop :: Member (Consume e) r => (e -> Sem r ()) -> Sem r () Source #

Pull repeatedly from Consume, passing the event to the supplied callback.

consumeFind :: forall e r. Member (Consume e) r => (e -> Sem r Bool) -> Sem r e Source #

Block until a value matching the predicate has been returned by Consume.

consumeFirstJust :: forall e a r. Member (Consume e) r => (e -> Sem r (Maybe a)) -> Sem r a Source #

Return the first value returned by Consume for which the function produces Just.

consumeElem :: forall e r. Eq e => Member (Consume e) r => e -> Sem r () Source #

Block until the specified value has been returned by Consume.

type EventConsumer e = Scoped_ (Consume e) Source #

Convenience alias for the consumer effect.

Interpreters

interpretEventsChan :: forall e r. Members [Resource, Race, Async, Embed IO] r => InterpretersFor [Events e, EventConsumer e] r Source #

Interpret Events and Consume together by connecting them to the two ends of an unagi channel. Consume is only interpreted in a Scoped manner, ensuring that a new duplicate of the channel is created so that all consumers see all events (from the moment they are connected).

This should be used in conjunction with subscribe:

interpretEventsChan do
  async $ subscribe do
    putStrLn =<< consume
  publish "hello"

Whenever subscribe creates a new scope, this interpreter calls dupChan and passes the duplicate to interpretConsumeChan.

Exceptions

data Critical :: Effect Source #

An effect that catches exceptions.

Provides the exact functionality of fromExceptionSem, but pushes the dependency on Final IO to the interpreter, and makes it optional.

Interpreters

Masking

type Mask = Scoped_ RestoreMask Source #

The scoped masking effect.

type UninterruptibleMask = Scoped_ RestoreMask Source #

The scoped uninterruptible masking effect.

mask :: Member Mask r => InterpreterFor RestoreMask r Source #

Mark a region as masked. Uses the Scoped_ pattern.

uninterruptibleMask :: Member UninterruptibleMask r => InterpreterFor RestoreMask r Source #

Mark a region as uninterruptibly masked. Uses the Scoped_ pattern.

restore :: forall r a. Member RestoreMask r => Sem r a -> Sem r a Source #

Restore the previous masking state. Can only be called inside of an action passed to mask or uninterruptibleMask.

data Restoration Source #

Resource type for the scoped Mask effect, wrapping the restore callback passed in by mask.

Interpreters

interpretMaskPure :: InterpreterFor Mask r Source #

Interpret Mask by sequencing the action without masking.

interpretUninterruptibleMaskPure :: InterpreterFor UninterruptibleMask r Source #

Interpret UninterruptibleMask by sequencing the action without masking.

Monitoring

data Monitor (action :: Type) :: Effect Source #

Mark a region as being subject to intervention by a monitoring program. This can mean that a thread is repeatedly checking a condition and cancelling this region when it is unmet. A use case could be checking whether a remote service is available, or whether the system was suspended and resumed. This should be used in a Scoped_ context, like withMonitor.

monitor :: forall action r a. Member (Monitor action) r => Sem r a -> Sem r a Source #

Mark a region as being subject to intervention by a monitoring program.

withMonitor :: forall action r. Member (ScopedMonitor action) r => InterpreterFor (Monitor action) r Source #

Start a region that can contain monitor-intervention regions.

restart :: Member (ScopedMonitor Restart) r => InterpreterFor (Monitor Restart) r Source #

Variant of withMonitor that uses the Restart strategy.

data Restart Source #

Marker type for the restarting action for Monitor.

Instances

Instances details
Show Restart Source # 
Instance details

Defined in Polysemy.Conc.Effect.Monitor

Eq Restart Source # 
Instance details

Defined in Polysemy.Conc.Effect.Monitor

Methods

(==) :: Restart -> Restart -> Bool #

(/=) :: Restart -> Restart -> Bool #

type RestartingMonitor = ScopedMonitor Restart Source #

Monitor specialized to the Restart action.

type ScopedMonitor (action :: Type) = Scoped_ (Monitor action) Source #

Convenience alias for a Scoped_ Monitor.

Interpreters

interpretMonitorRestart :: forall t d r. Members [Time t d, Resource, Async, Race, Final IO] r => MonitorCheck r -> InterpreterFor RestartingMonitor r Source #

Interpret Scoped Monitor with the Restart strategy. This takes a check action that may put an MVar when the scoped region should be restarted. The check is executed in a loop, with an interval given in MonitorCheck.

monitorClockSkew :: forall t d diff r. Torsor t diff => TimeUnit diff => Members [AtomicState (Maybe t), Time t d, Embed IO] r => ClockSkewConfig -> MonitorCheck r Source #

Check for Monitor that checks every interval whether the difference between the current time and the time at the last check is larger than interval + tolerance. Can be used to detect that the operating system suspended and resumed.

clockSkewConfig :: TimeUnit u1 => TimeUnit u2 => u1 -> u2 -> ClockSkewConfig Source #

Smart constructor for ClockSkewConfig that takes arbitrary TimeUnits.

Other Combinators

type ConcStack = [UninterruptibleMask, Mask, Gates, Race, Async, Resource, Embed IO, Final IO] Source #

A default basic stack with Final for _polysemy-conc_.

runConc :: Sem ConcStack a -> IO a Source #

Interprets UninterruptibleMask, Mask and Race in terms of Final IO and runs the entire rest of the stack.

interpretAtomic :: forall a r. Member (Embed IO) r => a -> InterpreterFor (AtomicState a) r Source #

Convenience wrapper around runAtomicStateTVar that creates a new TVar.

withAsyncBlock :: Members [Resource, Async] r => Sem r b -> (Async (Maybe b) -> Sem r a) -> Sem r a Source #

Run the first action asynchronously while the second action executes, then cancel the first action. Passes the handle into the action to allow it to await its result.

When cancelling, this variant will wait indefinitely for the thread to be gone.

withAsync :: Members [Resource, Race, Async] r => Sem r b -> (Async (Maybe b) -> Sem r a) -> Sem r a Source #

Run the first action asynchronously while the second action executes, then cancel the first action. Passes the handle into the sync action to allow it to await the async action's result.

When cancelling, this variant will wait for 500ms for the thread to be gone.

withAsync_ :: Members [Resource, Race, Async] r => Sem r b -> Sem r a -> Sem r a Source #

Run the first action asynchronously while the second action executes, then cancel the first action. Discards the handle, expecting the async action to either terminate or be cancelled.

When cancelling, this variant will wait for 500ms for the thread to be gone.

scheduleAsync :: forall b r a. Members [ScopedSync (), Async, Race] r => Sem r b -> (Async (Maybe b) -> Sem (Sync () : r) () -> Sem (Sync () : r) a) -> Sem r a Source #

Run an action with async, but don't start it right away, so the thread handle can be processed before the action executes.

Takes a callback function that is invoked after spawning the thread. The callback receives the Async handle and a unit action that starts the computation.

This is helpful if the Async has to be stored in state and the same state is written when the action finishes. In that case, the race condition causes the handle to be written over the finished state.

makeRequest = put Nothing

main = scheduleAsync makeRequest  handle start -> do
  put (Just handle)
  start -- now makeRequest is executed

scheduleAsyncIO :: forall b r a. Members [Resource, Async, Race, Embed IO] r => Sem r b -> (Async (Maybe b) -> Sem (Sync () : r) () -> Sem (Sync () : r) a) -> Sem r a Source #

Variant of scheduleAsync that directly interprets the MVar used for signalling.

withAsyncGated :: forall b r a. Members [Scoped_ Gate, Resource, Race, Async] r => Sem (Gate : r) b -> (Async (Maybe b) -> Sem r a) -> Sem r a Source #

Run the first action asynchronously while the second action executes, then cancel the first action.

The second action will only start when the first action calls signal.

Passes the handle into the sync action to allow it to await the async action's result.

This can be used to ensure that the async action has acquired its resources before the main action starts.

withAsyncGated_ :: forall b r a. Members [Scoped_ Gate, Resource, Race, Async] r => Sem (Gate : r) b -> Sem r a -> Sem r a Source #

Run the first action asynchronously while the second action executes, then cancel the first action.

The second action will only start when the first action calls signal.

This can be used to ensure that the async action has acquired its resources before the main action starts.