| Safe Haskell | None |
|---|---|
| Language | Haskell2010 |
UnliftIO.MessageBox
Description
Fast and robust message queues for concurrent processes.
Processes of an application can exchange message using Message Boxes.
This library is meant to be a wrapper around a
well tested and benchmarked subset of unagi-chan
for applications using unliftio.
In addition to the basic functionality, i.e. _Message Boxes_, there is a very little bit of type level magic dust in UnliftIO.MessageBox.Command that helps to write code that sends a message and expects the receiving process to send a reply.
This module re-exports most of the library.
Synopsis
- class IsInput input where
- deliver :: MonadUnliftIO m => input a -> a -> m Bool
- deliver_ :: MonadUnliftIO m => input a -> a -> m ()
- class IsInput (Input box) => IsMessageBox box where
- type Input box :: Type -> Type
- receive :: MonadUnliftIO m => box a -> m (Maybe a)
- tryReceive :: MonadUnliftIO m => box a -> m (Future a)
- receiveAfter :: MonadUnliftIO m => box a -> Int -> m (Maybe a)
- newInput :: MonadUnliftIO m => box a -> m (Input box a)
- class (IsMessageBox (MessageBox argument), IsInput (Input (MessageBox argument))) => IsMessageBoxArg argument where
- type MessageBox argument :: Type -> Type
- getConfiguredMessageLimit :: argument -> Maybe Int
- newMessageBox :: MonadUnliftIO m => argument -> m (MessageBox argument a)
- handleMessage :: (MonadUnliftIO m, IsMessageBox box) => box message -> (message -> m b) -> m (Maybe b)
- data WaitingInput a = WaitingInput !Int !(BlockingInput a)
- data WaitingBox a = WaitingBox WaitingBoxLimit (BlockingBox a)
- data WaitingBoxLimit = WaitingBoxLimit !(Maybe Int) !Int !MessageLimit
- newtype NonBlockingInput a = NonBlockingInput (BlockingInput a)
- data NonBlockingBox a
- newtype NonBlockingBoxLimit = NonBlockingBoxLimit MessageLimit
- data BlockingInput a
- data BlockingBox a
- newtype BlockingBoxLimit = BlockingBoxLimit MessageLimit
- data MessageLimit
- messageLimitToInt :: MessageLimit -> Int
- data BlockingUnlimited = BlockingUnlimited
- data UnlimitedBoxInput a
- data UnlimitedBox a
- newtype CatchAllInput i a = CatchAllInput (i a)
- newtype CatchAllBox box a = CatchAllBox (box a)
- newtype CatchAllArg cfg = CatchAllArg cfg
- data AsyncReply r
- newtype DuplicateReply = DuplicateReply CallId
- data CommandError where
- data ReplyBox a
- data Message apiTag where
- Blocking :: Show (Command apiTag ('Return result)) => Command apiTag ('Return result) -> ReplyBox result -> Message apiTag
- NonBlocking :: Show (Command apiTag 'FireAndForget) => Command apiTag 'FireAndForget -> Message apiTag
- data ReturnType where
- FireAndForget :: ReturnType
- Return :: Type -> ReturnType
- data family Command apiTag :: ReturnType -> Type
- cast :: (MonadUnliftIO m, IsInput o, Show (Command apiTag 'FireAndForget)) => o (Message apiTag) -> Command apiTag 'FireAndForget -> m Bool
- call :: (HasCallIdCounter env, MonadReader env m, MonadUnliftIO m, IsInput input, Show (Command apiTag ('Return result))) => input (Message apiTag) -> Command apiTag ('Return result) -> Int -> m (Either CommandError result)
- replyTo :: MonadUnliftIO m => ReplyBox a -> a -> m ()
- delegateCall :: (MonadUnliftIO m, IsInput o, Show (Command apiTag ('Return r))) => o (Message apiTag) -> Command apiTag ('Return r) -> ReplyBox r -> m Bool
- callAsync :: (HasCallIdCounter env, MonadReader env m, MonadUnliftIO m, IsInput o, Show (Command apiTag ('Return result))) => o (Message apiTag) -> Command apiTag ('Return result) -> m (Maybe (AsyncReply result))
- waitForReply :: MonadUnliftIO m => Int -> AsyncReply result -> m (Either CommandError result)
- tryTakeReply :: MonadUnliftIO m => AsyncReply result -> m (Maybe (Either CommandError result))
- class HasCallIdCounter env where
- getCallIdCounter :: env -> CounterVar CallId
- newtype CallId = MkCallId Int
- newCallIdCounter :: MonadIO m => m (CounterVar CallId)
- takeNext :: (MonadReader env m, HasCallIdCounter env, MonadUnliftIO m) => m CallId
- class HasCounterVar a env | env -> a where
- getCounterVar :: env -> CounterVar a
- data CounterVar a
- fresh :: forall a env m. (MonadReader env m, MonadIO m, HasCounterVar a env, Coercible a Int) => m a
- incrementAndGet :: forall a m. (MonadIO m, Coercible a Int) => CounterVar a -> m a
- newCounterVar :: forall a m. MonadIO m => m (CounterVar a)
- newtype Future a = Future (IO (Maybe a))
- tryNow :: MonadUnliftIO m => Future a -> m (Maybe a)
- awaitFuture :: MonadUnliftIO m => Future b -> m b
Documentation
class IsInput input where Source #
A type class for input types. A common interface for delivering messages.
Minimal complete definition
Methods
deliver :: MonadUnliftIO m => input a -> a -> m Bool Source #
Send a message. Take whatever time it takes. Depending on the implementation, this might be a non-blocking operation. Return if the operation was successful.
NOTE: False may sporadically be returned, especially
when there is a lot of load, so please make sure to
build your application in such a way, that it
anticipates failure.
deliver_ :: MonadUnliftIO m => input a -> a -> m () Source #
Instances
class IsInput (Input box) => IsMessageBox box where Source #
A type class for msgBox types. A common interface for receiving messages.
Minimal complete definition
Methods
receive :: MonadUnliftIO m => box a -> m (Maybe a) Source #
Receive a message. Take whatever time it takes.
Return Just the value or Nothing when an error
occurred.
NOTE: Nothing may sporadically be returned, especially when there is a lot of load, so please make sure to build your application in such a way, that it anticipates failure.
tryReceive :: MonadUnliftIO m => box a -> m (Future a) Source #
Return a Future that can be used to wait for the
arrival of the next message.
NOTE: Each future value represents the next slot in the queue
so one future corresponds to exactly that message (should it arrive)
and if that future value is dropped, that message will be lost!
Arguments
| :: MonadUnliftIO m | |
| => box a | Message box |
| -> Int | Time in micro seconds to wait until the action is invoked. |
| -> m (Maybe a) |
Wait for an incoming message or return Nothing.
The default implementation uses tryReceive to get a
Future on which awaitFuture inside a timeout is called.
Instances might override this with more performant implementations especially non-blocking Unagi channel based implementation.
NOTE: Nothing may sporadically be returned, especially when there is a lot of load, so please make sure to build your application in such a way, that it anticipates failure.
newInput :: MonadUnliftIO m => box a -> m (Input box a) Source #
Create a new input that enqueus messages,
which are received by the box
Instances
class (IsMessageBox (MessageBox argument), IsInput (Input (MessageBox argument))) => IsMessageBoxArg argument where Source #
Types that configure and allow the creation of a MessageBox.
Create IsMessageBox instances from a parameter.
Types that determine MessageBox values.
For a limited message box this might be the limit of the message queue.
Associated Types
type MessageBox argument :: Type -> Type Source #
The message box that can be created from the message box argument
Methods
getConfiguredMessageLimit :: argument -> Maybe Int Source #
Return a message limit.
NOTE: This method was added for unit tests. Although the method is totally valid, it might not be super useful in production code. Also note that the naming follows the rule: Reserve short names for entities that are used often.
newMessageBox :: MonadUnliftIO m => argument -> m (MessageBox argument a) Source #
Create a new msgBox according to the argument.
This is required to receive a message.
NOTE: Only one process may receive on an msgBox.
Instances
handleMessage :: (MonadUnliftIO m, IsMessageBox box) => box message -> (message -> m b) -> m (Maybe b) Source #
Receive a message and apply a function to it.
data WaitingInput a Source #
An input for a BlockingBox that will block
for not much more than the given timeout when
the message box is full.
Constructors
| WaitingInput !Int !(BlockingInput a) |
Instances
| IsInput WaitingInput Source # | |
Defined in UnliftIO.MessageBox.Limited Methods deliver :: MonadUnliftIO m => WaitingInput a -> a -> m Bool Source # deliver_ :: MonadUnliftIO m => WaitingInput a -> a -> m () Source # | |
data WaitingBox a Source #
A BlockingBox an a WaitingBoxLimit for
the IsMessageBox instance.
Constructors
| WaitingBox WaitingBoxLimit (BlockingBox a) |
Instances
| IsMessageBox WaitingBox Source # | |
Defined in UnliftIO.MessageBox.Limited Methods receive :: MonadUnliftIO m => WaitingBox a -> m (Maybe a) Source # tryReceive :: MonadUnliftIO m => WaitingBox a -> m (Future a) Source # receiveAfter :: MonadUnliftIO m => WaitingBox a -> Int -> m (Maybe a) Source # newInput :: MonadUnliftIO m => WaitingBox a -> m (Input WaitingBox a) Source # | |
| type Input WaitingBox Source # | |
Defined in UnliftIO.MessageBox.Limited | |
data WaitingBoxLimit Source #
A IsMessageBoxArg instance wrapping the BlockingBox
with independently configurable timeouts for receive and deliver.
Constructors
| WaitingBoxLimit !(Maybe Int) !Int !MessageLimit |
Instances
newtype NonBlockingInput a Source #
A wrapper around BlockingInput with a non-blocking IsInput instance.
deliver will enqueue the message or return False immediately,
if the message box already contains more messages than
it's limit allows.
Constructors
| NonBlockingInput (BlockingInput a) |
Instances
| IsInput NonBlockingInput Source # | |
Defined in UnliftIO.MessageBox.Limited Methods deliver :: MonadUnliftIO m => NonBlockingInput a -> a -> m Bool Source # deliver_ :: MonadUnliftIO m => NonBlockingInput a -> a -> m () Source # | |
data NonBlockingBox a Source #
A BlockingBox wrapper for non-blocking IsMessageBox instances.
The difference to the BlockingBox instance is that deliver
immediately returns if the message box limit is surpassed.
Instances
| IsMessageBox NonBlockingBox Source # | |
Defined in UnliftIO.MessageBox.Limited Methods receive :: MonadUnliftIO m => NonBlockingBox a -> m (Maybe a) Source # tryReceive :: MonadUnliftIO m => NonBlockingBox a -> m (Future a) Source # receiveAfter :: MonadUnliftIO m => NonBlockingBox a -> Int -> m (Maybe a) Source # newInput :: MonadUnliftIO m => NonBlockingBox a -> m (Input NonBlockingBox a) Source # | |
| type Input NonBlockingBox Source # | |
Defined in UnliftIO.MessageBox.Limited | |
newtype NonBlockingBoxLimit Source #
A BlockingBoxLimit wrapper for non-blocking IsMessageBoxArg instances.
Constructors
| NonBlockingBoxLimit MessageLimit |
Instances
data BlockingInput a Source #
A message queue into which messages can be enqued by,
e.g. tryToDeliver.
Messages can be received from an BlockingBox.
The Input is the counter part of a BlockingBox.
Instances
| IsInput BlockingInput Source # | A blocking instance that invokes |
Defined in UnliftIO.MessageBox.Limited Methods deliver :: MonadUnliftIO m => BlockingInput a -> a -> m Bool Source # deliver_ :: MonadUnliftIO m => BlockingInput a -> a -> m () Source # | |
data BlockingBox a Source #
A message queue out of which messages can by received.
This is the counter part of Input. Can be used for reading
messages.
Messages can be received by receive or tryReceive.
Instances
| IsMessageBox BlockingBox Source # | A blocking instance that invokes |
Defined in UnliftIO.MessageBox.Limited Methods receive :: MonadUnliftIO m => BlockingBox a -> m (Maybe a) Source # tryReceive :: MonadUnliftIO m => BlockingBox a -> m (Future a) Source # receiveAfter :: MonadUnliftIO m => BlockingBox a -> Int -> m (Maybe a) Source # newInput :: MonadUnliftIO m => BlockingBox a -> m (Input BlockingBox a) Source # | |
| type Input BlockingBox Source # | |
Defined in UnliftIO.MessageBox.Limited | |
newtype BlockingBoxLimit Source #
Contains the (vague) limit of messages that a BlockingBox
can buffer, i.e. that deliver can put into a BlockingInput
of a BlockingBox.
Constructors
| BlockingBoxLimit MessageLimit |
Instances
data MessageLimit Source #
Message Limit
The message limit must be a reasonable small positive integer that is also a power of two. This stems from the fact that Unagi is used under the hood.
The limit is a lower bound.
Constructors
Instances
messageLimitToInt :: MessageLimit -> Int Source #
Convert a MessageLimit to the
Int representation.
data BlockingUnlimited Source #
The (empty) configuration for creating
UnlimitedBoxes using the IsMessageBoxArg methods.
Constructors
| BlockingUnlimited |
Instances
| Show BlockingUnlimited Source # | |
Defined in UnliftIO.MessageBox.Unlimited Methods showsPrec :: Int -> BlockingUnlimited -> ShowS # show :: BlockingUnlimited -> String # showList :: [BlockingUnlimited] -> ShowS # | |
| IsMessageBoxArg BlockingUnlimited Source # | |
Defined in UnliftIO.MessageBox.Unlimited Associated Types type MessageBox BlockingUnlimited :: Type -> Type Source # Methods getConfiguredMessageLimit :: BlockingUnlimited -> Maybe Int Source # newMessageBox :: MonadUnliftIO m => BlockingUnlimited -> m (MessageBox BlockingUnlimited a) Source # | |
| type MessageBox BlockingUnlimited Source # | |
Defined in UnliftIO.MessageBox.Unlimited | |
data UnlimitedBoxInput a Source #
A message queue into which messages can be enqued by,
e.g. deliver.
Messages can be received from an UnlimitedBox.
The UnlimitedBoxInput is the counter part of a UnlimitedBox.
Instances
| IsInput UnlimitedBoxInput Source # | A blocking instance that invokes |
Defined in UnliftIO.MessageBox.Unlimited Methods deliver :: MonadUnliftIO m => UnlimitedBoxInput a -> a -> m Bool Source # deliver_ :: MonadUnliftIO m => UnlimitedBoxInput a -> a -> m () Source # | |
data UnlimitedBox a Source #
A message queue out of which messages can
by received.
This is the counter part of Input. Can be
used for reading messages.
Messages can be received by receive or tryReceive.
Instances
| IsMessageBox UnlimitedBox Source # | A blocking instance that invokes |
Defined in UnliftIO.MessageBox.Unlimited Methods receive :: MonadUnliftIO m => UnlimitedBox a -> m (Maybe a) Source # tryReceive :: MonadUnliftIO m => UnlimitedBox a -> m (Future a) Source # receiveAfter :: MonadUnliftIO m => UnlimitedBox a -> Int -> m (Maybe a) Source # newInput :: MonadUnliftIO m => UnlimitedBox a -> m (Input UnlimitedBox a) Source # | |
| type Input UnlimitedBox Source # | |
Defined in UnliftIO.MessageBox.Unlimited | |
newtype CatchAllInput i a Source #
A wrapper around values that are instances
of IsInput.
Constructors
| CatchAllInput (i a) |
Instances
| IsInput i => IsInput (CatchAllInput i) Source # | |
Defined in UnliftIO.MessageBox.CatchAll Methods deliver :: MonadUnliftIO m => CatchAllInput i a -> a -> m Bool Source # deliver_ :: MonadUnliftIO m => CatchAllInput i a -> a -> m () Source # | |
newtype CatchAllBox box a Source #
A wrapper around values that are instances
of IsMessageBox.
The Input type will be wrapped using
CatchAllInput.
Constructors
| CatchAllBox (box a) |
Instances
| IsMessageBox box => IsMessageBox (CatchAllBox box) Source # | |
Defined in UnliftIO.MessageBox.CatchAll Methods receive :: MonadUnliftIO m => CatchAllBox box a -> m (Maybe a) Source # tryReceive :: MonadUnliftIO m => CatchAllBox box a -> m (Future a) Source # receiveAfter :: MonadUnliftIO m => CatchAllBox box a -> Int -> m (Maybe a) Source # newInput :: MonadUnliftIO m => CatchAllBox box a -> m (Input (CatchAllBox box) a) Source # | |
| type Input (CatchAllBox box) Source # | |
Defined in UnliftIO.MessageBox.CatchAll | |
newtype CatchAllArg cfg Source #
A wrapper around values that are instances
of IsMessageBoxArg. The factory wraps
the result of the delegated newMessageBox
invocation into a CatchAllBox.
Constructors
| CatchAllArg cfg |
Instances
data AsyncReply r Source #
The result of callAsync.
Use waitForReply or tryTakeReply.
Instances
| Typeable r => Show (AsyncReply r) Source # | |
Defined in UnliftIO.MessageBox.Command Methods showsPrec :: Int -> AsyncReply r -> ShowS # show :: AsyncReply r -> String # showList :: [AsyncReply r] -> ShowS # | |
newtype DuplicateReply Source #
Constructors
| DuplicateReply CallId |
Instances
| Eq DuplicateReply Source # | |
Defined in UnliftIO.MessageBox.Command Methods (==) :: DuplicateReply -> DuplicateReply -> Bool # (/=) :: DuplicateReply -> DuplicateReply -> Bool # | |
| Show DuplicateReply Source # | |
Defined in UnliftIO.MessageBox.Command Methods showsPrec :: Int -> DuplicateReply -> ShowS # show :: DuplicateReply -> String # showList :: [DuplicateReply] -> ShowS # | |
| Exception DuplicateReply Source # | |
Defined in UnliftIO.MessageBox.Command Methods toException :: DuplicateReply -> SomeException # | |
data CommandError where Source #
The failures that the receiver of a Return Command, i.e. a Blocking,
can communicate to the caller, in order to indicate that
processing a request did not or will not lead to the result the
caller is blocked waiting for.
Constructors
| CouldNotEnqueueCommand :: !CallId -> CommandError | Failed to enqueue a |
| BlockingCommandFailure :: !CallId -> CommandError | The request has failed for reasons. |
| BlockingCommandTimedOut :: !CallId -> CommandError | Timeout waiting for the result. |
Instances
| Eq CommandError Source # | |
Defined in UnliftIO.MessageBox.Command | |
| Show CommandError Source # | |
Defined in UnliftIO.MessageBox.Command Methods showsPrec :: Int -> CommandError -> ShowS # show :: CommandError -> String # showList :: [CommandError] -> ShowS # | |
This is like Input, it can be used
by the receiver of a Blocking
to either send a reply using reply
or to fail/abort the request using sendRequestError
data Message apiTag where Source #
A message valid for some user defined apiTag.
The apiTag tag (phantom-) type defines the
messages allowed here, declared by the instance of
Command for apiTag.
Constructors
| Blocking :: Show (Command apiTag ('Return result)) => Command apiTag ('Return result) -> ReplyBox result -> Message apiTag | Wraps a Such a message can formed by using A |
| NonBlocking :: Show (Command apiTag 'FireAndForget) => Command apiTag 'FireAndForget -> Message apiTag | If the The smart constructor |
data ReturnType where Source #
Indicates if a Command requires the
receiver to send a reply or not.
Constructors
| FireAndForget :: ReturnType | Indicates that a Values of a |
| Return :: Type -> ReturnType | Indicates that a Values of a |
data family Command apiTag :: ReturnType -> Type Source #
This family allows to encode imperative commands.
The clauses of a Command define the commands that
a process should execute.
Every clause may specify an individual ReturnType that
declares if and what response is valid for a message.
For example:
type LampId = Int data instance Command LightControl r where GetLamps :: Command LigthControl (Return [LampId]) SwitchOn :: LampId -> Command LigthControl FireAndForget data LightControl -- the phantom type
The type index of the Command family is the uninhabited
LightControl type.
.
The second type parameter indicates if a message requires the receiver to send a reply back to the blocked and waiting sender, or if no reply is necessary.
cast :: (MonadUnliftIO m, IsInput o, Show (Command apiTag 'FireAndForget)) => o (Message apiTag) -> Command apiTag 'FireAndForget -> m Bool Source #
Enqueue a NonBlocking Message into an Input.
This is just for symetry to call, this is
equivalent to: input -> MessageBox.tryToDeliver input . NonBlocking
The
call :: (HasCallIdCounter env, MonadReader env m, MonadUnliftIO m, IsInput input, Show (Command apiTag ('Return result))) => input (Message apiTag) -> Command apiTag ('Return result) -> Int -> m (Either CommandError result) Source #
Enqueue a Blocking Message into an IsInput and wait for the
response.
If message delivery failed, return Left .CouldNotEnqueueCommand
If no reply was given by the receiving process (using replyTo) within
a given duration, return Left .BlockingCommandTimedOut
Important: The given timeout starts after deliver has returned,
if deliver blocks and delays, call might take longer than the
specified timeout.
The receiving process can either delegate the call using
delegateCall or reply to the call by using: replyTo.
replyTo :: MonadUnliftIO m => ReplyBox a -> a -> m () Source #
delegateCall :: (MonadUnliftIO m, IsInput o, Show (Command apiTag ('Return r))) => o (Message apiTag) -> Command apiTag ('Return r) -> ReplyBox r -> m Bool Source #
callAsync :: (HasCallIdCounter env, MonadReader env m, MonadUnliftIO m, IsInput o, Show (Command apiTag ('Return result))) => o (Message apiTag) -> Command apiTag ('Return result) -> m (Maybe (AsyncReply result)) Source #
Arguments
| :: MonadUnliftIO m | |
| => Int | The time in micro seconds to wait
before returning |
| -> AsyncReply result | |
| -> m (Either CommandError result) |
tryTakeReply :: MonadUnliftIO m => AsyncReply result -> m (Maybe (Either CommandError result)) Source #
class HasCallIdCounter env where Source #
Class of environment records containing a CounterVar for CallIds.
Methods
getCallIdCounter :: env -> CounterVar CallId Source #
Instances
| HasCallIdCounter (CounterVar CallId) Source # | |
Defined in UnliftIO.MessageBox.Util.CallId Methods getCallIdCounter :: CounterVar CallId -> CounterVar CallId Source # | |
An identifier value every command send by calls.
Instances
| Eq CallId Source # | |
| Ord CallId Source # | |
| Show CallId Source # | |
| HasCallIdCounter (CounterVar CallId) Source # | |
Defined in UnliftIO.MessageBox.Util.CallId Methods getCallIdCounter :: CounterVar CallId -> CounterVar CallId Source # | |
newCallIdCounter :: MonadIO m => m (CounterVar CallId) Source #
Create a new CallId CounterVar.
takeNext :: (MonadReader env m, HasCallIdCounter env, MonadUnliftIO m) => m CallId Source #
Increment and get a new CallId.
class HasCounterVar a env | env -> a where Source #
A type class for MonadReader based
applications.
Methods
getCounterVar :: env -> CounterVar a Source #
Instances
| HasCounterVar (t :: k) (CounterVar t) Source # | |
Defined in UnliftIO.MessageBox.Util.Fresh Methods getCounterVar :: CounterVar t -> CounterVar t Source # | |
data CounterVar a Source #
An AtomicCounter.
Instances
| HasCounterVar (t :: k) (CounterVar t) Source # | |
Defined in UnliftIO.MessageBox.Util.Fresh Methods getCounterVar :: CounterVar t -> CounterVar t Source # | |
| HasCallIdCounter (CounterVar CallId) Source # | |
Defined in UnliftIO.MessageBox.Util.CallId Methods getCallIdCounter :: CounterVar CallId -> CounterVar CallId Source # | |
fresh :: forall a env m. (MonadReader env m, MonadIO m, HasCounterVar a env, Coercible a Int) => m a Source #
A threadsafe atomic a
Atomically increment and get the value of the Counter
for type a that must be present in the env.
incrementAndGet :: forall a m. (MonadIO m, Coercible a Int) => CounterVar a -> m a Source #
Atomically increment and get the value of the Counter
for type a that must be present in the env.
newCounterVar :: forall a m. MonadIO m => m (CounterVar a) Source #
Create a new CounterVar starting at 0.
A wrapper around an IO action that returns value in the future.
awaitFuture :: MonadUnliftIO m => Future b -> m b Source #
Poll a Future until the value is present.