{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE PatternGuards #-}
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE Rank2Types #-}
module Control.Distributed.Process.ManagedProcess.Internal.GenProcess
( recvLoop
, precvLoop
, currentTimeout
, systemTimeout
, drainTimeout
, processState
, processDefinition
, processFilters
, processUnhandledMsgPolicy
, processQueue
, gets
, getAndModifyState
, modifyState
, setUserTimeout
, setProcessState
, GenProcess
, peek
, push
, enqueue
, dequeue
, addUserTimer
, removeUserTimer
, eval
, act
, runAfter
, evalAfter
) where
import Control.Applicative (liftA2)
import Control.Distributed.Process
( match
, matchAny
, matchMessage
, handleMessage
, handleMessageIf
, receiveTimeout
, receiveWait
, forward
, catchesExit
, catchExit
, die
, unsafeWrapMessage
, Process
, ProcessId
, Match
)
import qualified Control.Distributed.Process as P
( liftIO
)
import Control.Distributed.Process.Internal.Types
( Message(..)
, ProcessExitException(..)
)
import Control.Distributed.Process.ManagedProcess.Server
( handleCast
, handleExitIf
, stop
, continue
)
import Control.Distributed.Process.ManagedProcess.Timer
( Timer(timerDelay)
, TimerKey
, TimedOut(..)
, delayTimer
, startTimer
, stopTimer
, matchTimeout
, matchKey
, matchRun
)
import Control.Distributed.Process.ManagedProcess.Internal.Types hiding (Message)
import qualified Control.Distributed.Process.ManagedProcess.Internal.PriorityQueue as Q
( empty
, dequeue
, enqueue
, peek
, toList
)
import Control.Distributed.Process.Extras
( ExitReason(..)
, Shutdown(..)
)
import qualified Control.Distributed.Process.Extras.SystemLog as Log
import Control.Distributed.Process.Extras.Time
import Control.Distributed.Process.Serializable (Serializable)
import Control.Monad (void)
import Control.Monad.Catch
( mask_
, catch
, throwM
, mask
, SomeException
)
import qualified Control.Monad.State.Strict as ST
( get
)
import Data.IORef (newIORef, atomicModifyIORef')
import Data.Maybe (fromJust)
import qualified Data.Map.Strict as Map
( size
, insert
, delete
, lookup
, empty
, foldrWithKey
)
type Safe = Bool
gets :: forall s a . (ProcessState s -> a) -> GenProcess s a
gets :: forall s a. (ProcessState s -> a) -> GenProcess s a
gets ProcessState s -> a
f = GenProcess s (State s)
forall s (m :: * -> *). MonadState s m => m s
ST.get GenProcess s (State s)
-> (State s -> GenProcess s a) -> GenProcess s a
forall a b.
GenProcess s a -> (a -> GenProcess s b) -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \(State s
s :: State s) -> IO a -> GenProcess s a
forall a s. IO a -> GenProcess s a
liftIO (IO a -> GenProcess s a) -> IO a -> GenProcess s a
forall a b. (a -> b) -> a -> b
$ do
State s -> (ProcessState s -> (ProcessState s, a)) -> IO a
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' State s
s ((ProcessState s -> (ProcessState s, a)) -> IO a)
-> (ProcessState s -> (ProcessState s, a)) -> IO a
forall a b. (a -> b) -> a -> b
$ \(ProcessState s
s' :: ProcessState s) -> (ProcessState s
s', ProcessState s -> a
f ProcessState s
s' :: a)
modifyState :: (ProcessState s -> ProcessState s) -> GenProcess s ()
modifyState :: forall s. (ProcessState s -> ProcessState s) -> GenProcess s ()
modifyState ProcessState s -> ProcessState s
f =
GenProcess s (IORef (ProcessState s))
forall s (m :: * -> *). MonadState s m => m s
ST.get GenProcess s (IORef (ProcessState s))
-> (IORef (ProcessState s) -> GenProcess s ()) -> GenProcess s ()
forall a b.
GenProcess s a -> (a -> GenProcess s b) -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \IORef (ProcessState s)
s -> IO () -> GenProcess s ()
forall a s. IO a -> GenProcess s a
liftIO (IO () -> GenProcess s ()) -> IO () -> GenProcess s ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall (m :: * -> *) a. (HasCallStack, MonadMask m) => m a -> m a
mask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
IORef (ProcessState s)
-> (ProcessState s -> (ProcessState s, ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (ProcessState s)
s ((ProcessState s -> (ProcessState s, ())) -> IO ())
-> (ProcessState s -> (ProcessState s, ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ProcessState s
s' -> (ProcessState s -> ProcessState s
f ProcessState s
s', ())
getAndModifyState :: (ProcessState s -> (ProcessState s, a))
-> GenProcess s a
getAndModifyState :: forall s a.
(ProcessState s -> (ProcessState s, a)) -> GenProcess s a
getAndModifyState ProcessState s -> (ProcessState s, a)
f =
GenProcess s (IORef (ProcessState s))
forall s (m :: * -> *). MonadState s m => m s
ST.get GenProcess s (IORef (ProcessState s))
-> (IORef (ProcessState s) -> GenProcess s a) -> GenProcess s a
forall a b.
GenProcess s a -> (a -> GenProcess s b) -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \IORef (ProcessState s)
s -> IO a -> GenProcess s a
forall a s. IO a -> GenProcess s a
liftIO (IO a -> GenProcess s a) -> IO a -> GenProcess s a
forall a b. (a -> b) -> a -> b
$ IO a -> IO a
forall (m :: * -> *) a. (HasCallStack, MonadMask m) => m a -> m a
mask_ (IO a -> IO a) -> IO a -> IO a
forall a b. (a -> b) -> a -> b
$ do
IORef (ProcessState s)
-> (ProcessState s -> (ProcessState s, a)) -> IO a
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (ProcessState s)
s ((ProcessState s -> (ProcessState s, a)) -> IO a)
-> (ProcessState s -> (ProcessState s, a)) -> IO a
forall a b. (a -> b) -> a -> b
$ \ProcessState s
s' -> ProcessState s -> (ProcessState s, a)
f ProcessState s
s'
setProcessState :: s -> GenProcess s ()
setProcessState :: forall s. s -> GenProcess s ()
setProcessState s
st' =
(ProcessState s -> ProcessState s) -> GenProcess s ()
forall s. (ProcessState s -> ProcessState s) -> GenProcess s ()
modifyState ((ProcessState s -> ProcessState s) -> GenProcess s ())
-> (ProcessState s -> ProcessState s) -> GenProcess s ()
forall a b. (a -> b) -> a -> b
$ \st :: ProcessState s
st@ProcessState{s
[DispatchPriority s]
[DispatchFilter s]
TimerMap
Delay
Queue
Timer
ProcessDefinition s
RecvTimeoutPolicy
timeoutSpec :: RecvTimeoutPolicy
procDef :: ProcessDefinition s
procPrio :: [DispatchPriority s]
procFilters :: [DispatchFilter s]
usrTimeout :: Delay
sysTimeout :: Timer
usrTimers :: TimerMap
internalQ :: Queue
procState :: s
timeoutSpec :: forall s. ProcessState s -> RecvTimeoutPolicy
procDef :: forall s. ProcessState s -> ProcessDefinition s
procPrio :: forall s. ProcessState s -> [DispatchPriority s]
procFilters :: forall s. ProcessState s -> [DispatchFilter s]
usrTimeout :: forall s. ProcessState s -> Delay
sysTimeout :: forall s. ProcessState s -> Timer
usrTimers :: forall s. ProcessState s -> TimerMap
internalQ :: forall s. ProcessState s -> Queue
procState :: forall s. ProcessState s -> s
..} -> ProcessState s
st { procState = st' }
setDrainTimeout :: Timer -> GenProcess s ()
setDrainTimeout :: forall s. Timer -> GenProcess s ()
setDrainTimeout Timer
t = (ProcessState s -> ProcessState s) -> GenProcess s ()
forall s. (ProcessState s -> ProcessState s) -> GenProcess s ()
modifyState ((ProcessState s -> ProcessState s) -> GenProcess s ())
-> (ProcessState s -> ProcessState s) -> GenProcess s ()
forall a b. (a -> b) -> a -> b
$ \st :: ProcessState s
st@ProcessState{s
[DispatchPriority s]
[DispatchFilter s]
TimerMap
Delay
Queue
Timer
ProcessDefinition s
RecvTimeoutPolicy
timeoutSpec :: forall s. ProcessState s -> RecvTimeoutPolicy
procDef :: forall s. ProcessState s -> ProcessDefinition s
procPrio :: forall s. ProcessState s -> [DispatchPriority s]
procFilters :: forall s. ProcessState s -> [DispatchFilter s]
usrTimeout :: forall s. ProcessState s -> Delay
sysTimeout :: forall s. ProcessState s -> Timer
usrTimers :: forall s. ProcessState s -> TimerMap
internalQ :: forall s. ProcessState s -> Queue
procState :: forall s. ProcessState s -> s
timeoutSpec :: RecvTimeoutPolicy
procDef :: ProcessDefinition s
procPrio :: [DispatchPriority s]
procFilters :: [DispatchFilter s]
usrTimeout :: Delay
sysTimeout :: Timer
usrTimers :: TimerMap
internalQ :: Queue
procState :: s
..} -> ProcessState s
st { sysTimeout = t }
setUserTimeout :: Delay -> GenProcess s ()
setUserTimeout :: forall s. Delay -> GenProcess s ()
setUserTimeout Delay
d =
(ProcessState s -> ProcessState s) -> GenProcess s ()
forall s. (ProcessState s -> ProcessState s) -> GenProcess s ()
modifyState ((ProcessState s -> ProcessState s) -> GenProcess s ())
-> (ProcessState s -> ProcessState s) -> GenProcess s ()
forall a b. (a -> b) -> a -> b
$ \st :: ProcessState s
st@ProcessState{s
[DispatchPriority s]
[DispatchFilter s]
TimerMap
Delay
Queue
Timer
ProcessDefinition s
RecvTimeoutPolicy
timeoutSpec :: forall s. ProcessState s -> RecvTimeoutPolicy
procDef :: forall s. ProcessState s -> ProcessDefinition s
procPrio :: forall s. ProcessState s -> [DispatchPriority s]
procFilters :: forall s. ProcessState s -> [DispatchFilter s]
usrTimeout :: forall s. ProcessState s -> Delay
sysTimeout :: forall s. ProcessState s -> Timer
usrTimers :: forall s. ProcessState s -> TimerMap
internalQ :: forall s. ProcessState s -> Queue
procState :: forall s. ProcessState s -> s
timeoutSpec :: RecvTimeoutPolicy
procDef :: ProcessDefinition s
procPrio :: [DispatchPriority s]
procFilters :: [DispatchFilter s]
usrTimeout :: Delay
sysTimeout :: Timer
usrTimers :: TimerMap
internalQ :: Queue
procState :: s
..} -> ProcessState s
st { usrTimeout = d }
addUserTimer :: Timer -> Message -> GenProcess s TimerKey
addUserTimer :: forall s. Timer -> Message -> GenProcess s TimerKey
addUserTimer Timer
t Message
m =
(ProcessState s -> (ProcessState s, TimerKey))
-> GenProcess s TimerKey
forall s a.
(ProcessState s -> (ProcessState s, a)) -> GenProcess s a
getAndModifyState ((ProcessState s -> (ProcessState s, TimerKey))
-> GenProcess s TimerKey)
-> (ProcessState s -> (ProcessState s, TimerKey))
-> GenProcess s TimerKey
forall a b. (a -> b) -> a -> b
$ \st :: ProcessState s
st@ProcessState{s
[DispatchPriority s]
[DispatchFilter s]
TimerMap
Delay
Queue
Timer
ProcessDefinition s
RecvTimeoutPolicy
timeoutSpec :: forall s. ProcessState s -> RecvTimeoutPolicy
procDef :: forall s. ProcessState s -> ProcessDefinition s
procPrio :: forall s. ProcessState s -> [DispatchPriority s]
procFilters :: forall s. ProcessState s -> [DispatchFilter s]
usrTimeout :: forall s. ProcessState s -> Delay
sysTimeout :: forall s. ProcessState s -> Timer
usrTimers :: forall s. ProcessState s -> TimerMap
internalQ :: forall s. ProcessState s -> Queue
procState :: forall s. ProcessState s -> s
timeoutSpec :: RecvTimeoutPolicy
procDef :: ProcessDefinition s
procPrio :: [DispatchPriority s]
procFilters :: [DispatchFilter s]
usrTimeout :: Delay
sysTimeout :: Timer
usrTimers :: TimerMap
internalQ :: Queue
procState :: s
..} ->
let sz :: TimerKey
sz = TimerMap -> TimerKey
forall k a. Map k a -> TimerKey
Map.size TimerMap
usrTimers
tk :: TimerKey
tk = TimerKey
sz TimerKey -> TimerKey -> TimerKey
forall a. Num a => a -> a -> a
+ TimerKey
1
in (ProcessState s
st { usrTimers = (Map.insert tk (t, m) usrTimers) }, TimerKey
tk)
removeUserTimer :: TimerKey -> GenProcess s ()
removeUserTimer :: forall s. TimerKey -> GenProcess s ()
removeUserTimer TimerKey
i =
(ProcessState s -> ProcessState s) -> GenProcess s ()
forall s. (ProcessState s -> ProcessState s) -> GenProcess s ()
modifyState ((ProcessState s -> ProcessState s) -> GenProcess s ())
-> (ProcessState s -> ProcessState s) -> GenProcess s ()
forall a b. (a -> b) -> a -> b
$ \st :: ProcessState s
st@ProcessState{s
[DispatchPriority s]
[DispatchFilter s]
TimerMap
Delay
Queue
Timer
ProcessDefinition s
RecvTimeoutPolicy
timeoutSpec :: forall s. ProcessState s -> RecvTimeoutPolicy
procDef :: forall s. ProcessState s -> ProcessDefinition s
procPrio :: forall s. ProcessState s -> [DispatchPriority s]
procFilters :: forall s. ProcessState s -> [DispatchFilter s]
usrTimeout :: forall s. ProcessState s -> Delay
sysTimeout :: forall s. ProcessState s -> Timer
usrTimers :: forall s. ProcessState s -> TimerMap
internalQ :: forall s. ProcessState s -> Queue
procState :: forall s. ProcessState s -> s
timeoutSpec :: RecvTimeoutPolicy
procDef :: ProcessDefinition s
procPrio :: [DispatchPriority s]
procFilters :: [DispatchFilter s]
usrTimeout :: Delay
sysTimeout :: Timer
usrTimers :: TimerMap
internalQ :: Queue
procState :: s
..} -> ProcessState s
st { usrTimers = (Map.delete i usrTimers) }
consumeTimer :: forall s a . TimerKey -> (Message -> GenProcess s a) -> GenProcess s a
consumeTimer :: forall s a.
TimerKey -> (Message -> GenProcess s a) -> GenProcess s a
consumeTimer TimerKey
k Message -> GenProcess s a
f = do
TimerMap
mt <- (ProcessState s -> TimerMap) -> GenProcess s TimerMap
forall s a. (ProcessState s -> a) -> GenProcess s a
gets ProcessState s -> TimerMap
forall s. ProcessState s -> TimerMap
usrTimers
let tm :: Maybe (Timer, Message)
tm = TimerKey -> TimerMap -> Maybe (Timer, Message)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup TimerKey
k TimerMap
mt
let ut :: TimerMap
ut = TimerKey -> TimerMap -> TimerMap
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete TimerKey
k TimerMap
mt
(ProcessState s -> ProcessState s) -> GenProcess s ()
forall s. (ProcessState s -> ProcessState s) -> GenProcess s ()
modifyState ((ProcessState s -> ProcessState s) -> GenProcess s ())
-> (ProcessState s -> ProcessState s) -> GenProcess s ()
forall a b. (a -> b) -> a -> b
$ \st :: ProcessState s
st@ProcessState{s
[DispatchPriority s]
[DispatchFilter s]
TimerMap
Delay
Queue
Timer
ProcessDefinition s
RecvTimeoutPolicy
timeoutSpec :: forall s. ProcessState s -> RecvTimeoutPolicy
procDef :: forall s. ProcessState s -> ProcessDefinition s
procPrio :: forall s. ProcessState s -> [DispatchPriority s]
procFilters :: forall s. ProcessState s -> [DispatchFilter s]
usrTimeout :: forall s. ProcessState s -> Delay
sysTimeout :: forall s. ProcessState s -> Timer
usrTimers :: forall s. ProcessState s -> TimerMap
internalQ :: forall s. ProcessState s -> Queue
procState :: forall s. ProcessState s -> s
timeoutSpec :: RecvTimeoutPolicy
procDef :: ProcessDefinition s
procPrio :: [DispatchPriority s]
procFilters :: [DispatchFilter s]
usrTimeout :: Delay
sysTimeout :: Timer
usrTimers :: TimerMap
internalQ :: Queue
procState :: s
..} -> ProcessState s
st { usrTimers = ut }
case Maybe (Timer, Message)
tm of
Maybe (Timer, Message)
Nothing -> Process a -> GenProcess s a
forall a s. Process a -> GenProcess s a
lift (Process a -> GenProcess s a) -> Process a -> GenProcess s a
forall a b. (a -> b) -> a -> b
$ String -> Process a
forall a b. Serializable a => a -> Process b
die (String -> Process a) -> String -> Process a
forall a b. (a -> b) -> a -> b
$ String
"GenProcess.consumeTimer - InvalidTimerKey"
Just (Timer
_, Message
m) -> Message -> GenProcess s a
f Message
m
processDefinition :: GenProcess s (ProcessDefinition s)
processDefinition :: forall s. GenProcess s (ProcessDefinition s)
processDefinition = (ProcessState s -> ProcessDefinition s)
-> GenProcess s (ProcessDefinition s)
forall s a. (ProcessState s -> a) -> GenProcess s a
gets ProcessState s -> ProcessDefinition s
forall s. ProcessState s -> ProcessDefinition s
procDef
processPriorities :: GenProcess s ([DispatchPriority s])
processPriorities :: forall s. GenProcess s [DispatchPriority s]
processPriorities = (ProcessState s -> [DispatchPriority s])
-> GenProcess s [DispatchPriority s]
forall s a. (ProcessState s -> a) -> GenProcess s a
gets ProcessState s -> [DispatchPriority s]
forall s. ProcessState s -> [DispatchPriority s]
procPrio
processFilters :: GenProcess s ([DispatchFilter s])
processFilters :: forall s. GenProcess s [DispatchFilter s]
processFilters = (ProcessState s -> [DispatchFilter s])
-> GenProcess s [DispatchFilter s]
forall s a. (ProcessState s -> a) -> GenProcess s a
gets ProcessState s -> [DispatchFilter s]
forall s. ProcessState s -> [DispatchFilter s]
procFilters
processState :: GenProcess s s
processState :: forall s. GenProcess s s
processState = (ProcessState s -> s) -> GenProcess s s
forall s a. (ProcessState s -> a) -> GenProcess s a
gets ProcessState s -> s
forall s. ProcessState s -> s
procState
processUnhandledMsgPolicy :: GenProcess s UnhandledMessagePolicy
processUnhandledMsgPolicy :: forall s. GenProcess s UnhandledMessagePolicy
processUnhandledMsgPolicy = (ProcessState s -> UnhandledMessagePolicy)
-> GenProcess s UnhandledMessagePolicy
forall s a. (ProcessState s -> a) -> GenProcess s a
gets (ProcessDefinition s -> UnhandledMessagePolicy
forall s. ProcessDefinition s -> UnhandledMessagePolicy
unhandledMessagePolicy (ProcessDefinition s -> UnhandledMessagePolicy)
-> (ProcessState s -> ProcessDefinition s)
-> ProcessState s
-> UnhandledMessagePolicy
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProcessState s -> ProcessDefinition s
forall s. ProcessState s -> ProcessDefinition s
procDef)
processQueue :: GenProcess s [Message]
processQueue :: forall s. GenProcess s [Message]
processQueue = (ProcessState s -> Queue) -> GenProcess s Queue
forall s a. (ProcessState s -> a) -> GenProcess s a
gets ProcessState s -> Queue
forall s. ProcessState s -> Queue
internalQ GenProcess s Queue
-> (Queue -> GenProcess s [Message]) -> GenProcess s [Message]
forall a b.
GenProcess s a -> (a -> GenProcess s b) -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= [Message] -> GenProcess s [Message]
forall a. a -> GenProcess s a
forall (m :: * -> *) a. Monad m => a -> m a
return ([Message] -> GenProcess s [Message])
-> (Queue -> [Message]) -> Queue -> GenProcess s [Message]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Queue -> [Message]
forall k a. Ord k => PriorityQ k a -> [a]
Q.toList
systemTimeout :: GenProcess s Timer
systemTimeout :: forall s. GenProcess s Timer
systemTimeout = (ProcessState s -> Timer) -> GenProcess s Timer
forall s a. (ProcessState s -> a) -> GenProcess s a
gets ProcessState s -> Timer
forall s. ProcessState s -> Timer
sysTimeout
timeoutPolicy :: GenProcess s RecvTimeoutPolicy
timeoutPolicy :: forall s. GenProcess s RecvTimeoutPolicy
timeoutPolicy = (ProcessState s -> RecvTimeoutPolicy)
-> GenProcess s RecvTimeoutPolicy
forall s a. (ProcessState s -> a) -> GenProcess s a
gets ProcessState s -> RecvTimeoutPolicy
forall s. ProcessState s -> RecvTimeoutPolicy
timeoutSpec
drainTimeout :: GenProcess s Delay
drainTimeout :: forall s. GenProcess s Delay
drainTimeout = (ProcessState s -> Delay) -> GenProcess s Delay
forall s a. (ProcessState s -> a) -> GenProcess s a
gets (Timer -> Delay
timerDelay (Timer -> Delay)
-> (ProcessState s -> Timer) -> ProcessState s -> Delay
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProcessState s -> Timer
forall s. ProcessState s -> Timer
sysTimeout)
currentTimeout :: GenProcess s Delay
currentTimeout :: forall s. GenProcess s Delay
currentTimeout = (ProcessState s -> Delay) -> GenProcess s Delay
forall s a. (ProcessState s -> a) -> GenProcess s a
gets ProcessState s -> Delay
forall s. ProcessState s -> Delay
usrTimeout
updateQueue :: (Queue -> Queue) -> GenProcess s ()
updateQueue :: forall s. (Queue -> Queue) -> GenProcess s ()
updateQueue Queue -> Queue
f =
(ProcessState s -> ProcessState s) -> GenProcess s ()
forall s. (ProcessState s -> ProcessState s) -> GenProcess s ()
modifyState ((ProcessState s -> ProcessState s) -> GenProcess s ())
-> (ProcessState s -> ProcessState s) -> GenProcess s ()
forall a b. (a -> b) -> a -> b
$ \st :: ProcessState s
st@ProcessState{s
[DispatchPriority s]
[DispatchFilter s]
TimerMap
Delay
Queue
Timer
ProcessDefinition s
RecvTimeoutPolicy
timeoutSpec :: forall s. ProcessState s -> RecvTimeoutPolicy
procDef :: forall s. ProcessState s -> ProcessDefinition s
procPrio :: forall s. ProcessState s -> [DispatchPriority s]
procFilters :: forall s. ProcessState s -> [DispatchFilter s]
usrTimeout :: forall s. ProcessState s -> Delay
sysTimeout :: forall s. ProcessState s -> Timer
usrTimers :: forall s. ProcessState s -> TimerMap
internalQ :: forall s. ProcessState s -> Queue
procState :: forall s. ProcessState s -> s
timeoutSpec :: RecvTimeoutPolicy
procDef :: ProcessDefinition s
procPrio :: [DispatchPriority s]
procFilters :: [DispatchFilter s]
usrTimeout :: Delay
sysTimeout :: Timer
usrTimers :: TimerMap
internalQ :: Queue
procState :: s
..} -> ProcessState s
st { internalQ = f internalQ }
evalAfter :: forall s m . (Serializable m) => TimeInterval -> m -> s -> Action s
evalAfter :: forall s m. Serializable m => TimeInterval -> m -> s -> Action s
evalAfter TimeInterval
d m
m s
s = GenProcess s () -> Action s
forall s. GenProcess s () -> Action s
act (GenProcess s () -> Action s) -> GenProcess s () -> Action s
forall a b. (a -> b) -> a -> b
$ TimeInterval -> m -> GenProcess s ()
forall s m. Serializable m => TimeInterval -> m -> GenProcess s ()
runAfter TimeInterval
d m
m GenProcess s () -> GenProcess s () -> GenProcess s ()
forall a b. GenProcess s a -> GenProcess s b -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> s -> GenProcess s ()
forall s. s -> GenProcess s ()
setProcessState s
s
act :: forall s . GenProcess s () -> Action s
act :: forall s. GenProcess s () -> Action s
act = ProcessAction s -> Process (ProcessAction s)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (ProcessAction s -> Process (ProcessAction s))
-> (GenProcess s () -> ProcessAction s)
-> GenProcess s ()
-> Process (ProcessAction s)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. GenProcess s () -> ProcessAction s
forall s. GenProcess s () -> ProcessAction s
ProcessActivity
{-# WARNING act "This interface is intended for internal use only" #-}
eval :: forall s . GenProcess s (ProcessAction s) -> Action s
eval :: forall s. GenProcess s (ProcessAction s) -> Action s
eval = ProcessAction s -> Process (ProcessAction s)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (ProcessAction s -> Process (ProcessAction s))
-> (GenProcess s (ProcessAction s) -> ProcessAction s)
-> GenProcess s (ProcessAction s)
-> Process (ProcessAction s)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. GenProcess s (ProcessAction s) -> ProcessAction s
forall s. GenProcess s (ProcessAction s) -> ProcessAction s
ProcessExpression
runAfter :: forall s m . (Serializable m) => TimeInterval -> m -> GenProcess s ()
runAfter :: forall s m. Serializable m => TimeInterval -> m -> GenProcess s ()
runAfter TimeInterval
d m
m = do
Timer
t <- Process Timer -> GenProcess s Timer
forall a s. Process a -> GenProcess s a
lift (Process Timer -> GenProcess s Timer)
-> Process Timer -> GenProcess s Timer
forall a b. (a -> b) -> a -> b
$ Delay -> Process Timer
startTimer (TimeInterval -> Delay
Delay TimeInterval
d)
GenProcess s TimerKey -> GenProcess s ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (GenProcess s TimerKey -> GenProcess s ())
-> GenProcess s TimerKey -> GenProcess s ()
forall a b. (a -> b) -> a -> b
$ Timer -> Message -> GenProcess s TimerKey
forall s. Timer -> Message -> GenProcess s TimerKey
addUserTimer Timer
t (m -> Message
forall a. Serializable a => a -> Message
unsafeWrapMessage m
m)
{-# WARNING runAfter "This interface is intended for internal use only" #-}
dequeue :: GenProcess s (Maybe Message)
dequeue :: forall s. GenProcess s (Maybe Message)
dequeue = (ProcessState s -> (ProcessState s, Maybe Message))
-> GenProcess s (Maybe Message)
forall s a.
(ProcessState s -> (ProcessState s, a)) -> GenProcess s a
getAndModifyState ((ProcessState s -> (ProcessState s, Maybe Message))
-> GenProcess s (Maybe Message))
-> (ProcessState s -> (ProcessState s, Maybe Message))
-> GenProcess s (Maybe Message)
forall a b. (a -> b) -> a -> b
$ \ProcessState s
st -> do
let pq :: Queue
pq = ProcessState s -> Queue
forall s. ProcessState s -> Queue
internalQ ProcessState s
st
case Queue -> Maybe (Message, Queue)
forall k v. Ord k => PriorityQ k v -> Maybe (v, PriorityQ k v)
Q.dequeue Queue
pq of
Maybe (Message, Queue)
Nothing -> (ProcessState s
st, Maybe Message
forall a. Maybe a
Nothing)
Just (Message
m, Queue
q') -> (ProcessState s
st { internalQ = q' }, Message -> Maybe Message
forall a. a -> Maybe a
Just Message
m)
peek :: GenProcess s (Maybe Message)
peek :: forall s. GenProcess s (Maybe Message)
peek = (ProcessState s -> (ProcessState s, Maybe Message))
-> GenProcess s (Maybe Message)
forall s a.
(ProcessState s -> (ProcessState s, a)) -> GenProcess s a
getAndModifyState ((ProcessState s -> (ProcessState s, Maybe Message))
-> GenProcess s (Maybe Message))
-> (ProcessState s -> (ProcessState s, Maybe Message))
-> GenProcess s (Maybe Message)
forall a b. (a -> b) -> a -> b
$ \ProcessState s
st -> do
let pq :: Queue
pq = ProcessState s -> Queue
forall s. ProcessState s -> Queue
internalQ ProcessState s
st
(ProcessState s
st, Queue -> Maybe Message
forall k v. Ord k => PriorityQ k v -> Maybe v
Q.peek Queue
pq)
push :: forall s . Message -> GenProcess s ()
push :: forall s. Message -> GenProcess s ()
push Message
m = do
s
st <- GenProcess s s
forall s. GenProcess s s
processState
s -> [DispatchPriority s] -> Message -> GenProcess s ()
forall s. s -> [DispatchPriority s] -> Message -> GenProcess s ()
enqueueMessage s
st [ PrioritiseInfo {
prioritise :: s -> Message -> Process (Maybe (TimerKey, Message))
prioritise = (\s
_ Message
m' ->
Maybe (TimerKey, Message) -> Process (Maybe (TimerKey, Message))
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (TimerKey, Message) -> Process (Maybe (TimerKey, Message)))
-> Maybe (TimerKey, Message) -> Process (Maybe (TimerKey, Message))
forall a b. (a -> b) -> a -> b
$ (TimerKey, Message) -> Maybe (TimerKey, Message)
forall a. a -> Maybe a
Just ((TimerKey
101 :: Int), Message
m')) :: s -> Message -> Process (Maybe (Int, Message)) } ] Message
m
enqueue :: forall s . Message -> GenProcess s ()
enqueue :: forall s. Message -> GenProcess s ()
enqueue Message
m = do
s
st <- GenProcess s s
forall s. GenProcess s s
processState
s -> [DispatchPriority s] -> Message -> GenProcess s ()
forall s. s -> [DispatchPriority s] -> Message -> GenProcess s ()
enqueueMessage s
st [] Message
m
enqueueMessage :: forall s . s
-> [DispatchPriority s]
-> Message
-> GenProcess s ()
enqueueMessage :: forall s. s -> [DispatchPriority s] -> Message -> GenProcess s ()
enqueueMessage s
s [] Message
m' =
s -> [DispatchPriority s] -> Message -> GenProcess s ()
forall s. s -> [DispatchPriority s] -> Message -> GenProcess s ()
enqueueMessage s
s [ PrioritiseInfo {
prioritise :: s -> Message -> Process (Maybe (TimerKey, Message))
prioritise = (\s
_ Message
m ->
Maybe (TimerKey, Message) -> Process (Maybe (TimerKey, Message))
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (TimerKey, Message) -> Process (Maybe (TimerKey, Message)))
-> Maybe (TimerKey, Message) -> Process (Maybe (TimerKey, Message))
forall a b. (a -> b) -> a -> b
$ (TimerKey, Message) -> Maybe (TimerKey, Message)
forall a. a -> Maybe a
Just ((-TimerKey
1 :: Int), Message
m)) :: s -> Message -> Process (Maybe (Int, Message)) } ] Message
m'
enqueueMessage s
s (DispatchPriority s
p:[DispatchPriority s]
ps) Message
m' = let checkPrio :: Message -> Process (Maybe (TimerKey, Message))
checkPrio = DispatchPriority s
-> s -> Message -> Process (Maybe (TimerKey, Message))
forall s.
DispatchPriority s
-> s -> Message -> Process (Maybe (TimerKey, Message))
prioritise DispatchPriority s
p s
s in do
(Process (Maybe (TimerKey, Message))
-> GenProcess s (Maybe (TimerKey, Message))
forall a s. Process a -> GenProcess s a
lift (Process (Maybe (TimerKey, Message))
-> GenProcess s (Maybe (TimerKey, Message)))
-> Process (Maybe (TimerKey, Message))
-> GenProcess s (Maybe (TimerKey, Message))
forall a b. (a -> b) -> a -> b
$ Message -> Process (Maybe (TimerKey, Message))
checkPrio Message
m') GenProcess s (Maybe (TimerKey, Message))
-> (Maybe (TimerKey, Message) -> GenProcess s ())
-> GenProcess s ()
forall a b.
GenProcess s a -> (a -> GenProcess s b) -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= s
-> [DispatchPriority s]
-> Message
-> Maybe (TimerKey, Message)
-> GenProcess s ()
doEnqueue s
s [DispatchPriority s]
ps Message
m'
where
doEnqueue :: s
-> [DispatchPriority s]
-> Message
-> Maybe (Int, Message)
-> GenProcess s ()
doEnqueue :: s
-> [DispatchPriority s]
-> Message
-> Maybe (TimerKey, Message)
-> GenProcess s ()
doEnqueue s
s' [DispatchPriority s]
ps' Message
msg Maybe (TimerKey, Message)
Nothing = s -> [DispatchPriority s] -> Message -> GenProcess s ()
forall s. s -> [DispatchPriority s] -> Message -> GenProcess s ()
enqueueMessage s
s' [DispatchPriority s]
ps' Message
msg
doEnqueue s
_ [DispatchPriority s]
_ Message
_ (Just (TimerKey
i, Message
m)) = (Queue -> Queue) -> GenProcess s ()
forall s. (Queue -> Queue) -> GenProcess s ()
updateQueue (TimerKey -> Message -> Queue -> Queue
forall k v. Ord k => k -> v -> PriorityQ k v -> PriorityQ k v
Q.enqueue (TimerKey
i TimerKey -> TimerKey -> TimerKey
forall a. Num a => a -> a -> a
* (-TimerKey
1 :: Int)) Message
m)
class DynMessageHandler d where
dynHandleMessage :: UnhandledMessagePolicy
-> s
-> d s
-> Message
-> Process (Maybe (ProcessAction s))
instance DynMessageHandler Dispatcher where
dynHandleMessage :: forall s.
UnhandledMessagePolicy
-> s
-> Dispatcher s
-> Message
-> Process (Maybe (ProcessAction s))
dynHandleMessage UnhandledMessagePolicy
_ s
s (Dispatch s -> Message a b -> Process (ProcessAction s)
d) Message
msg = Message
-> (Message a b -> Process (ProcessAction s))
-> Process (Maybe (ProcessAction s))
forall (m :: * -> *) a b.
(Monad m, Serializable a) =>
Message -> (a -> m b) -> m (Maybe b)
handleMessage Message
msg (s -> Message a b -> Process (ProcessAction s)
d s
s)
dynHandleMessage UnhandledMessagePolicy
_ s
s (DispatchIf s -> Message a b -> Process (ProcessAction s)
d s -> Message a b -> Bool
c) Message
msg = Message
-> (Message a b -> Bool)
-> (Message a b -> Process (ProcessAction s))
-> Process (Maybe (ProcessAction s))
forall (m :: * -> *) a b.
(Monad m, Serializable a) =>
Message -> (a -> Bool) -> (a -> m b) -> m (Maybe b)
handleMessageIf Message
msg (s -> Message a b -> Bool
c s
s) (s -> Message a b -> Process (ProcessAction s)
d s
s)
instance DynMessageHandler ExternDispatcher where
dynHandleMessage :: forall s.
UnhandledMessagePolicy
-> s
-> ExternDispatcher s
-> Message
-> Process (Maybe (ProcessAction s))
dynHandleMessage UnhandledMessagePolicy
_ s
s (DispatchCC ReceivePort (Message a b)
_ s -> Message a b -> Process (ProcessAction s)
d) Message
msg = Message
-> (Message a b -> Process (ProcessAction s))
-> Process (Maybe (ProcessAction s))
forall (m :: * -> *) a b.
(Monad m, Serializable a) =>
Message -> (a -> m b) -> m (Maybe b)
handleMessage Message
msg (s -> Message a b -> Process (ProcessAction s)
d s
s)
dynHandleMessage UnhandledMessagePolicy
_ s
s (DispatchSTM STM a
_ s -> a -> Process (ProcessAction s)
d Match Message
_ forall m. (Message -> m) -> Match m
_) Message
msg = Message
-> (a -> Process (ProcessAction s))
-> Process (Maybe (ProcessAction s))
forall (m :: * -> *) a b.
(Monad m, Serializable a) =>
Message -> (a -> m b) -> m (Maybe b)
handleMessage Message
msg (s -> a -> Process (ProcessAction s)
d s
s)
instance DynMessageHandler DeferredDispatcher where
dynHandleMessage :: forall s.
UnhandledMessagePolicy
-> s
-> DeferredDispatcher s
-> Message
-> Process (Maybe (ProcessAction s))
dynHandleMessage UnhandledMessagePolicy
_ s
s (DeferredDispatcher s -> Message -> Process (Maybe (ProcessAction s))
d) = s -> Message -> Process (Maybe (ProcessAction s))
d s
s
class DynFilterHandler d where
dynHandleFilter :: s
-> d s
-> Message
-> Process (Maybe (Filter s))
instance DynFilterHandler DispatchFilter where
dynHandleFilter :: forall s.
s -> DispatchFilter s -> Message -> Process (Maybe (Filter s))
dynHandleFilter s
s (FilterApi s -> Message a b -> Process (Filter s)
d) Message
msg = Message
-> (Message a b -> Process (Filter s))
-> Process (Maybe (Filter s))
forall (m :: * -> *) a b.
(Monad m, Serializable a) =>
Message -> (a -> m b) -> m (Maybe b)
handleMessage Message
msg (s -> Message a b -> Process (Filter s)
d s
s)
dynHandleFilter s
s (FilterAny s -> a -> Process (Filter s)
d) Message
msg = Message -> (a -> Process (Filter s)) -> Process (Maybe (Filter s))
forall (m :: * -> *) a b.
(Monad m, Serializable a) =>
Message -> (a -> m b) -> m (Maybe b)
handleMessage Message
msg (s -> a -> Process (Filter s)
d s
s)
dynHandleFilter s
s (FilterRaw s -> Message -> Process (Maybe (Filter s))
d) Message
msg = s -> Message -> Process (Maybe (Filter s))
d s
s Message
msg
dynHandleFilter s
s (FilterState s -> Process (Maybe (Filter s))
d) Message
_ = s -> Process (Maybe (Filter s))
d s
s
precvLoop :: PrioritisedProcessDefinition s
-> s
-> Delay
-> Process ExitReason
precvLoop :: forall s.
PrioritisedProcessDefinition s -> s -> Delay -> Process ExitReason
precvLoop PrioritisedProcessDefinition s
ppDef s
pState Delay
recvDelay = do
IORef (ProcessState s)
st <- IO (IORef (ProcessState s)) -> Process (IORef (ProcessState s))
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
P.liftIO (IO (IORef (ProcessState s)) -> Process (IORef (ProcessState s)))
-> IO (IORef (ProcessState s)) -> Process (IORef (ProcessState s))
forall a b. (a -> b) -> a -> b
$ ProcessState s -> IO (IORef (ProcessState s))
forall a. a -> IO (IORef a)
newIORef (ProcessState s -> IO (IORef (ProcessState s)))
-> ProcessState s -> IO (IORef (ProcessState s))
forall a b. (a -> b) -> a -> b
$ ProcessState { timeoutSpec :: RecvTimeoutPolicy
timeoutSpec = PrioritisedProcessDefinition s -> RecvTimeoutPolicy
forall s. PrioritisedProcessDefinition s -> RecvTimeoutPolicy
recvTimeout PrioritisedProcessDefinition s
ppDef
, sysTimeout :: Timer
sysTimeout = Delay -> Timer
delayTimer Delay
Infinity
, usrTimeout :: Delay
usrTimeout = Delay
recvDelay
, internalQ :: Queue
internalQ = Queue
forall k v. Ord k => PriorityQ k v
Q.empty
, procState :: s
procState = s
pState
, procDef :: ProcessDefinition s
procDef = PrioritisedProcessDefinition s -> ProcessDefinition s
forall s. PrioritisedProcessDefinition s -> ProcessDefinition s
processDef PrioritisedProcessDefinition s
ppDef
, procPrio :: [DispatchPriority s]
procPrio = PrioritisedProcessDefinition s -> [DispatchPriority s]
forall s. PrioritisedProcessDefinition s -> [DispatchPriority s]
priorities PrioritisedProcessDefinition s
ppDef
, procFilters :: [DispatchFilter s]
procFilters = PrioritisedProcessDefinition s -> [DispatchFilter s]
forall s. PrioritisedProcessDefinition s -> [DispatchFilter s]
filters PrioritisedProcessDefinition s
ppDef
, usrTimers :: TimerMap
usrTimers = TimerMap
forall k a. Map k a
Map.empty
}
((forall a. Process a -> Process a) -> Process ExitReason)
-> Process ExitReason
forall b.
HasCallStack =>
((forall a. Process a -> Process a) -> Process b) -> Process b
forall (m :: * -> *) b.
(MonadMask m, HasCallStack) =>
((forall a. m a -> m a) -> m b) -> m b
mask (((forall a. Process a -> Process a) -> Process ExitReason)
-> Process ExitReason)
-> ((forall a. Process a -> Process a) -> Process ExitReason)
-> Process ExitReason
forall a b. (a -> b) -> a -> b
$ \forall a. Process a -> Process a
restore -> do
Either SomeException (ExitReason, IORef (ProcessState s))
res <- Process (Either SomeException (ExitReason, IORef (ProcessState s)))
-> (SomeException
-> Process
(Either SomeException (ExitReason, IORef (ProcessState s))))
-> Process
(Either SomeException (ExitReason, IORef (ProcessState s)))
forall e a.
(HasCallStack, Exception e) =>
Process a -> (e -> Process a) -> Process a
forall (m :: * -> *) e a.
(MonadCatch m, HasCallStack, Exception e) =>
m a -> (e -> m a) -> m a
catch (((ExitReason, IORef (ProcessState s))
-> Either SomeException (ExitReason, IORef (ProcessState s)))
-> Process (ExitReason, IORef (ProcessState s))
-> Process
(Either SomeException (ExitReason, IORef (ProcessState s)))
forall a b. (a -> b) -> Process a -> Process b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (ExitReason, IORef (ProcessState s))
-> Either SomeException (ExitReason, IORef (ProcessState s))
forall a b. b -> Either a b
Right (Process (ExitReason, IORef (ProcessState s))
-> Process
(Either SomeException (ExitReason, IORef (ProcessState s))))
-> Process (ExitReason, IORef (ProcessState s))
-> Process
(Either SomeException (ExitReason, IORef (ProcessState s)))
forall a b. (a -> b) -> a -> b
$ Process (ExitReason, IORef (ProcessState s))
-> Process (ExitReason, IORef (ProcessState s))
forall a. Process a -> Process a
restore (Process (ExitReason, IORef (ProcessState s))
-> Process (ExitReason, IORef (ProcessState s)))
-> Process (ExitReason, IORef (ProcessState s))
-> Process (ExitReason, IORef (ProcessState s))
forall a b. (a -> b) -> a -> b
$ IORef (ProcessState s)
-> Process (ExitReason, IORef (ProcessState s))
forall {s}. State s -> Process (ExitReason, State s)
loop IORef (ProcessState s)
st)
(\(SomeException
e :: SomeException) -> Either SomeException (ExitReason, IORef (ProcessState s))
-> Process
(Either SomeException (ExitReason, IORef (ProcessState s)))
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either SomeException (ExitReason, IORef (ProcessState s))
-> Process
(Either SomeException (ExitReason, IORef (ProcessState s))))
-> Either SomeException (ExitReason, IORef (ProcessState s))
-> Process
(Either SomeException (ExitReason, IORef (ProcessState s)))
forall a b. (a -> b) -> a -> b
$ SomeException
-> Either SomeException (ExitReason, IORef (ProcessState s))
forall a b. a -> Either a b
Left SomeException
e)
ProcessState s
ps <- IO (ProcessState s) -> Process (ProcessState s)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
P.liftIO (IO (ProcessState s) -> Process (ProcessState s))
-> IO (ProcessState s) -> Process (ProcessState s)
forall a b. (a -> b) -> a -> b
$ IORef (ProcessState s)
-> (ProcessState s -> (ProcessState s, ProcessState s))
-> IO (ProcessState s)
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (ProcessState s)
st ((ProcessState s -> (ProcessState s, ProcessState s))
-> IO (ProcessState s))
-> (ProcessState s -> (ProcessState s, ProcessState s))
-> IO (ProcessState s)
forall a b. (a -> b) -> a -> b
$ \ProcessState s
s' -> (ProcessState s
s', ProcessState s
s')
let st' :: s
st' = ProcessState s -> s
forall s. ProcessState s -> s
procState ProcessState s
ps
pd :: ProcessDefinition s
pd = ProcessState s -> ProcessDefinition s
forall s. ProcessState s -> ProcessDefinition s
procDef ProcessState s
ps
sh :: ShutdownHandler s
sh = ProcessDefinition s -> ShutdownHandler s
forall s. ProcessDefinition s -> ShutdownHandler s
shutdownHandler ProcessDefinition s
pd
case Either SomeException (ExitReason, IORef (ProcessState s))
res of
Right (ExitReason
exitReason, IORef (ProcessState s)
_) -> do
Process () -> Process ()
forall a. Process a -> Process a
restore (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$ ShutdownHandler s
sh (s -> ExitState s
forall s. s -> ExitState s
CleanShutdown s
st') ExitReason
exitReason
ExitReason -> Process ExitReason
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ExitReason
exitReason
Left SomeException
ex -> do
Process () -> Process ()
forall a. Process a -> Process a
restore (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$ ShutdownHandler s
sh (s -> ExitState s
forall s. s -> ExitState s
LastKnown s
st') (String -> ExitReason
ExitOther (String -> ExitReason) -> String -> ExitReason
forall a b. (a -> b) -> a -> b
$ SomeException -> String
forall a. Show a => a -> String
show SomeException
ex)
SomeException -> Process ExitReason
forall e a. (HasCallStack, Exception e) => e -> Process a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM SomeException
ex
where
loop :: State s -> Process (ExitReason, State s)
loop State s
st' = Process (ExitReason, State s)
-> (ProcessId -> ExitReason -> Process (ExitReason, State s))
-> Process (ExitReason, State s)
forall a b.
(Show a, Serializable a) =>
Process b -> (ProcessId -> a -> Process b) -> Process b
catchExit (State s -> GenProcess s ExitReason -> Process (ExitReason, State s)
forall s a. State s -> GenProcess s a -> Process (a, State s)
runProcess State s
st' GenProcess s ExitReason
forall s. GenProcess s ExitReason
recvQueue)
(\ProcessId
_ (ExitReason
r :: ExitReason) -> (ExitReason, State s) -> Process (ExitReason, State s)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (ExitReason
r, State s
st'))
recvQueue :: GenProcess s ExitReason
recvQueue :: forall s. GenProcess s ExitReason
recvQueue = do
ProcessDefinition s
pd <- GenProcess s (ProcessDefinition s)
forall s. GenProcess s (ProcessDefinition s)
processDefinition
let ex :: [ExitSignalDispatcher s]
ex = ExitSignalDispatcher s
forall s. ExitSignalDispatcher s
trapExitExitSignalDispatcher s
-> [ExitSignalDispatcher s] -> [ExitSignalDispatcher s]
forall a. a -> [a] -> [a]
:(ProcessDefinition s -> [ExitSignalDispatcher s]
forall s. ProcessDefinition s -> [ExitSignalDispatcher s]
exitHandlers (ProcessDefinition s -> [ExitSignalDispatcher s])
-> ProcessDefinition s -> [ExitSignalDispatcher s]
forall a b. (a -> b) -> a -> b
$ ProcessDefinition s
pd)
let exHandlers :: [s -> ProcessId -> Message -> Process (Maybe (ProcessAction s))]
exHandlers = (ExitSignalDispatcher s
-> s -> ProcessId -> Message -> Process (Maybe (ProcessAction s)))
-> [ExitSignalDispatcher s]
-> [s -> ProcessId -> Message -> Process (Maybe (ProcessAction s))]
forall a b. (a -> b) -> [a] -> [b]
map (\ExitSignalDispatcher s
d' -> (ExitSignalDispatcher s
-> s -> ProcessId -> Message -> Process (Maybe (ProcessAction s))
forall s.
ExitSignalDispatcher s
-> s -> ProcessId -> Message -> Process (Maybe (ProcessAction s))
dispatchExit ExitSignalDispatcher s
d')) [ExitSignalDispatcher s]
ex
GenProcess s ExitReason
-> (ProcessExitException -> GenProcess s ExitReason)
-> GenProcess s ExitReason
forall e a.
(HasCallStack, Exception e) =>
GenProcess s a -> (e -> GenProcess s a) -> GenProcess s a
forall (m :: * -> *) e a.
(MonadCatch m, HasCallStack, Exception e) =>
m a -> (e -> m a) -> m a
catch (GenProcess s ()
forall s. GenProcess s ()
drainMailbox GenProcess s ()
-> GenProcess s (ProcessAction s) -> GenProcess s (ProcessAction s)
forall a b. GenProcess s a -> GenProcess s b -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> GenProcess s (ProcessAction s)
forall s. GenProcess s (ProcessAction s)
processNext GenProcess s (ProcessAction s)
-> (ProcessAction s -> GenProcess s ExitReason)
-> GenProcess s ExitReason
forall a b.
GenProcess s a -> (a -> GenProcess s b) -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ProcessAction s -> GenProcess s ExitReason
forall s. ProcessAction s -> GenProcess s ExitReason
nextAction)
(\(ProcessExitException
e :: ProcessExitException) ->
[s -> ProcessId -> Message -> Process (Maybe (ProcessAction s))]
-> ProcessExitException -> GenProcess s (ProcessAction s)
forall s.
[s -> ProcessId -> Message -> Process (Maybe (ProcessAction s))]
-> ProcessExitException -> GenProcess s (ProcessAction s)
handleExit [s -> ProcessId -> Message -> Process (Maybe (ProcessAction s))]
exHandlers ProcessExitException
e GenProcess s (ProcessAction s)
-> (ProcessAction s -> GenProcess s ExitReason)
-> GenProcess s ExitReason
forall a b.
GenProcess s a -> (a -> GenProcess s b) -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ProcessAction s -> GenProcess s ExitReason
forall s. ProcessAction s -> GenProcess s ExitReason
nextAction)
where
handleExit :: [(s -> ProcessId -> Message -> Process (Maybe (ProcessAction s)))]
-> ProcessExitException
-> GenProcess s (ProcessAction s)
handleExit :: forall s.
[s -> ProcessId -> Message -> Process (Maybe (ProcessAction s))]
-> ProcessExitException -> GenProcess s (ProcessAction s)
handleExit [] ProcessExitException
ex = ProcessExitException -> GenProcess s (ProcessAction s)
forall e a. (HasCallStack, Exception e) => e -> GenProcess s a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM ProcessExitException
ex
handleExit (s -> ProcessId -> Message -> Process (Maybe (ProcessAction s))
h:[s -> ProcessId -> Message -> Process (Maybe (ProcessAction s))]
hs) ex :: ProcessExitException
ex@(ProcessExitException ProcessId
pid Message
msg) = do
Maybe (ProcessAction s)
r <- GenProcess s s
forall s. GenProcess s s
processState GenProcess s s
-> (s -> GenProcess s (Maybe (ProcessAction s)))
-> GenProcess s (Maybe (ProcessAction s))
forall a b.
GenProcess s a -> (a -> GenProcess s b) -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \s
s -> Process (Maybe (ProcessAction s))
-> GenProcess s (Maybe (ProcessAction s))
forall a s. Process a -> GenProcess s a
lift (Process (Maybe (ProcessAction s))
-> GenProcess s (Maybe (ProcessAction s)))
-> Process (Maybe (ProcessAction s))
-> GenProcess s (Maybe (ProcessAction s))
forall a b. (a -> b) -> a -> b
$ s -> ProcessId -> Message -> Process (Maybe (ProcessAction s))
h s
s ProcessId
pid Message
msg
case Maybe (ProcessAction s)
r of
Maybe (ProcessAction s)
Nothing -> [s -> ProcessId -> Message -> Process (Maybe (ProcessAction s))]
-> ProcessExitException -> GenProcess s (ProcessAction s)
forall s.
[s -> ProcessId -> Message -> Process (Maybe (ProcessAction s))]
-> ProcessExitException -> GenProcess s (ProcessAction s)
handleExit [s -> ProcessId -> Message -> Process (Maybe (ProcessAction s))]
hs ProcessExitException
ex
Just ProcessAction s
p -> ProcessAction s -> GenProcess s (ProcessAction s)
forall a. a -> GenProcess s a
forall (m :: * -> *) a. Monad m => a -> m a
return ProcessAction s
p
nextAction :: ProcessAction s -> GenProcess s ExitReason
nextAction :: forall s. ProcessAction s -> GenProcess s ExitReason
nextAction ProcessAction s
ac
| ProcessExpression GenProcess s (ProcessAction s)
expr <- ProcessAction s
ac = GenProcess s (ProcessAction s)
expr GenProcess s (ProcessAction s)
-> (ProcessAction s -> GenProcess s ExitReason)
-> GenProcess s ExitReason
forall a b.
GenProcess s a -> (a -> GenProcess s b) -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ProcessAction s -> GenProcess s ExitReason
forall s. ProcessAction s -> GenProcess s ExitReason
nextAction
| ProcessActivity GenProcess s ()
act' <- ProcessAction s
ac = GenProcess s ()
act' GenProcess s ()
-> GenProcess s ExitReason -> GenProcess s ExitReason
forall a b. GenProcess s a -> GenProcess s b -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> GenProcess s ExitReason
forall s. GenProcess s ExitReason
recvQueue
| ProcessAction s
ProcessSkip <- ProcessAction s
ac = GenProcess s ExitReason
forall s. GenProcess s ExitReason
recvQueue
| ProcessContinue s
ps' <- ProcessAction s
ac = s -> GenProcess s ExitReason
forall {s}. s -> GenProcess s ExitReason
recvQueueAux s
ps'
| ProcessTimeout Delay
d s
ps' <- ProcessAction s
ac = Delay -> GenProcess s ()
forall s. Delay -> GenProcess s ()
setUserTimeout Delay
d GenProcess s ()
-> GenProcess s ExitReason -> GenProcess s ExitReason
forall a b. GenProcess s a -> GenProcess s b -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> s -> GenProcess s ExitReason
forall {s}. s -> GenProcess s ExitReason
recvQueueAux s
ps'
| ProcessStop ExitReason
xr <- ProcessAction s
ac = ExitReason -> GenProcess s ExitReason
forall a. a -> GenProcess s a
forall (m :: * -> *) a. Monad m => a -> m a
return ExitReason
xr
| ProcessStopping s
ps' ExitReason
xr <- ProcessAction s
ac = s -> GenProcess s ()
forall s. s -> GenProcess s ()
setProcessState s
ps' GenProcess s ()
-> GenProcess s ExitReason -> GenProcess s ExitReason
forall a b. GenProcess s a -> GenProcess s b -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ExitReason -> GenProcess s ExitReason
forall a. a -> GenProcess s a
forall (m :: * -> *) a. Monad m => a -> m a
return ExitReason
xr
| ProcessHibernate TimeInterval
d' s
s' <- ProcessAction s
ac = (Process () -> GenProcess s ()
forall a s. Process a -> GenProcess s a
lift (Process () -> GenProcess s ()) -> Process () -> GenProcess s ()
forall a b. (a -> b) -> a -> b
$ TimeInterval -> Process ()
block TimeInterval
d') GenProcess s ()
-> GenProcess s ExitReason -> GenProcess s ExitReason
forall a b. GenProcess s a -> GenProcess s b -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> s -> GenProcess s ExitReason
forall {s}. s -> GenProcess s ExitReason
recvQueueAux s
s'
| ProcessBecome ProcessDefinition s
pd' s
ps' <- ProcessAction s
ac = do
(ProcessState s -> ProcessState s) -> GenProcess s ()
forall s. (ProcessState s -> ProcessState s) -> GenProcess s ()
modifyState ((ProcessState s -> ProcessState s) -> GenProcess s ())
-> (ProcessState s -> ProcessState s) -> GenProcess s ()
forall a b. (a -> b) -> a -> b
$ \st :: ProcessState s
st@ProcessState{s
[DispatchPriority s]
[DispatchFilter s]
TimerMap
Delay
Queue
Timer
ProcessDefinition s
RecvTimeoutPolicy
timeoutSpec :: forall s. ProcessState s -> RecvTimeoutPolicy
procDef :: forall s. ProcessState s -> ProcessDefinition s
procPrio :: forall s. ProcessState s -> [DispatchPriority s]
procFilters :: forall s. ProcessState s -> [DispatchFilter s]
usrTimeout :: forall s. ProcessState s -> Delay
sysTimeout :: forall s. ProcessState s -> Timer
usrTimers :: forall s. ProcessState s -> TimerMap
internalQ :: forall s. ProcessState s -> Queue
procState :: forall s. ProcessState s -> s
timeoutSpec :: RecvTimeoutPolicy
procDef :: ProcessDefinition s
procPrio :: [DispatchPriority s]
procFilters :: [DispatchFilter s]
usrTimeout :: Delay
sysTimeout :: Timer
usrTimers :: TimerMap
internalQ :: Queue
procState :: s
..} -> ProcessState s
st { procDef = pd', procState = ps' }
GenProcess s ExitReason
forall s. GenProcess s ExitReason
recvQueue
| Bool
otherwise = ExitReason -> GenProcess s ExitReason
forall a. a -> GenProcess s a
forall (m :: * -> *) a. Monad m => a -> m a
return (ExitReason -> GenProcess s ExitReason)
-> ExitReason -> GenProcess s ExitReason
forall a b. (a -> b) -> a -> b
$ String -> ExitReason
ExitOther String
"IllegalState"
recvQueueAux :: s -> GenProcess s ExitReason
recvQueueAux s
st = s -> GenProcess s ()
forall s. s -> GenProcess s ()
setProcessState s
st GenProcess s ()
-> GenProcess s ExitReason -> GenProcess s ExitReason
forall a b. GenProcess s a -> GenProcess s b -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> GenProcess s ExitReason
forall s. GenProcess s ExitReason
recvQueue
processNext :: GenProcess s (ProcessAction s)
processNext :: forall s. GenProcess s (ProcessAction s)
processNext = do
(UnhandledMessagePolicy
up, [DispatchFilter s]
pf) <- (ProcessState s -> (UnhandledMessagePolicy, [DispatchFilter s]))
-> GenProcess s (UnhandledMessagePolicy, [DispatchFilter s])
forall s a. (ProcessState s -> a) -> GenProcess s a
gets ((ProcessState s -> (UnhandledMessagePolicy, [DispatchFilter s]))
-> GenProcess s (UnhandledMessagePolicy, [DispatchFilter s]))
-> (ProcessState s -> (UnhandledMessagePolicy, [DispatchFilter s]))
-> GenProcess s (UnhandledMessagePolicy, [DispatchFilter s])
forall a b. (a -> b) -> a -> b
$ (UnhandledMessagePolicy
-> [DispatchFilter s]
-> (UnhandledMessagePolicy, [DispatchFilter s]))
-> (ProcessState s -> UnhandledMessagePolicy)
-> (ProcessState s -> [DispatchFilter s])
-> ProcessState s
-> (UnhandledMessagePolicy, [DispatchFilter s])
forall a b c.
(a -> b -> c)
-> (ProcessState s -> a)
-> (ProcessState s -> b)
-> ProcessState s
-> c
forall (f :: * -> *) a b c.
Applicative f =>
(a -> b -> c) -> f a -> f b -> f c
liftA2 (,) (ProcessDefinition s -> UnhandledMessagePolicy
forall s. ProcessDefinition s -> UnhandledMessagePolicy
unhandledMessagePolicy (ProcessDefinition s -> UnhandledMessagePolicy)
-> (ProcessState s -> ProcessDefinition s)
-> ProcessState s
-> UnhandledMessagePolicy
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProcessState s -> ProcessDefinition s
forall s. ProcessState s -> ProcessDefinition s
procDef) ProcessState s -> [DispatchFilter s]
forall s. ProcessState s -> [DispatchFilter s]
procFilters
case [DispatchFilter s]
pf of
[] -> GenProcess s (ProcessAction s)
forall s. GenProcess s (ProcessAction s)
consumeMessage
[DispatchFilter s]
_ -> (Message -> GenProcess s (ProcessAction s))
-> GenProcess s (ProcessAction s)
forall {s}.
(Message -> GenProcess s (ProcessAction s))
-> GenProcess s (ProcessAction s)
filterMessage (Bool
-> UnhandledMessagePolicy
-> [DispatchFilter s]
-> Maybe (Filter s)
-> Message
-> GenProcess s (ProcessAction s)
forall s.
Bool
-> UnhandledMessagePolicy
-> [DispatchFilter s]
-> Maybe (Filter s)
-> Message
-> GenProcess s (ProcessAction s)
filterNext Bool
False UnhandledMessagePolicy
up [DispatchFilter s]
pf Maybe (Filter s)
forall a. Maybe a
Nothing)
consumeMessage :: GenProcess s (ProcessAction s)
consumeMessage = GenProcess s (Maybe Message)
-> (Message -> GenProcess s (ProcessAction s))
-> GenProcess s (ProcessAction s)
forall s.
GenProcess s (Maybe Message)
-> (Message -> GenProcess s (ProcessAction s))
-> GenProcess s (ProcessAction s)
applyNext GenProcess s (Maybe Message)
forall s. GenProcess s (Maybe Message)
dequeue Message -> GenProcess s (ProcessAction s)
forall {s}. Message -> GenProcess s (ProcessAction s)
processApply
filterMessage :: (Message -> GenProcess s (ProcessAction s))
-> GenProcess s (ProcessAction s)
filterMessage = GenProcess s (Maybe Message)
-> (Message -> GenProcess s (ProcessAction s))
-> GenProcess s (ProcessAction s)
forall s.
GenProcess s (Maybe Message)
-> (Message -> GenProcess s (ProcessAction s))
-> GenProcess s (ProcessAction s)
applyNext GenProcess s (Maybe Message)
forall s. GenProcess s (Maybe Message)
peek
filterNext :: Safe
-> UnhandledMessagePolicy
-> [DispatchFilter s]
-> Maybe (Filter s)
-> Message
-> GenProcess s (ProcessAction s)
filterNext :: forall s.
Bool
-> UnhandledMessagePolicy
-> [DispatchFilter s]
-> Maybe (Filter s)
-> Message
-> GenProcess s (ProcessAction s)
filterNext Bool
isSafe UnhandledMessagePolicy
mp' [DispatchFilter s]
fs Maybe (Filter s)
mf Message
msg
| Just (FilterSafe s
s') <- Maybe (Filter s)
mf = Bool
-> UnhandledMessagePolicy
-> [DispatchFilter s]
-> Maybe (Filter s)
-> Message
-> GenProcess s (ProcessAction s)
forall s.
Bool
-> UnhandledMessagePolicy
-> [DispatchFilter s]
-> Maybe (Filter s)
-> Message
-> GenProcess s (ProcessAction s)
filterNext Bool
True UnhandledMessagePolicy
mp' [DispatchFilter s]
fs (Filter s -> Maybe (Filter s)
forall a. a -> Maybe a
Just (Filter s -> Maybe (Filter s)) -> Filter s -> Maybe (Filter s)
forall a b. (a -> b) -> a -> b
$ s -> Filter s
forall s. s -> Filter s
FilterOk s
s') Message
msg
| Just (FilterSkip s
s') <- Maybe (Filter s)
mf = s -> GenProcess s ()
forall s. s -> GenProcess s ()
setProcessState s
s' GenProcess s ()
-> GenProcess s (Maybe Message) -> GenProcess s (Maybe Message)
forall a b. GenProcess s a -> GenProcess s b -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> GenProcess s (Maybe Message)
forall s. GenProcess s (Maybe Message)
dequeue GenProcess s (Maybe Message)
-> GenProcess s (ProcessAction s) -> GenProcess s (ProcessAction s)
forall a b. GenProcess s a -> GenProcess s b -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ProcessAction s -> GenProcess s (ProcessAction s)
forall a. a -> GenProcess s a
forall (m :: * -> *) a. Monad m => a -> m a
return ProcessAction s
forall s. ProcessAction s
ProcessSkip
| Just (FilterStop s
s' ExitReason
r) <- Maybe (Filter s)
mf = ProcessAction s -> GenProcess s (ProcessAction s)
forall a. a -> GenProcess s a
forall (m :: * -> *) a. Monad m => a -> m a
return (ProcessAction s -> GenProcess s (ProcessAction s))
-> ProcessAction s -> GenProcess s (ProcessAction s)
forall a b. (a -> b) -> a -> b
$ s -> ExitReason -> ProcessAction s
forall s. s -> ExitReason -> ProcessAction s
ProcessStopping s
s' ExitReason
r
| Bool
isSafe
, Just (FilterOk s
s') <- Maybe (Filter s)
mf
, [] <- [DispatchFilter s]
fs = do s -> GenProcess s ()
forall s. s -> GenProcess s ()
setProcessState s
s'
ProcessAction s
act' <- Message -> GenProcess s (ProcessAction s)
forall {s}. Message -> GenProcess s (ProcessAction s)
processApply Message
msg
GenProcess s (Maybe Message)
forall s. GenProcess s (Maybe Message)
dequeue GenProcess s (Maybe Message)
-> GenProcess s (ProcessAction s) -> GenProcess s (ProcessAction s)
forall a b. GenProcess s a -> GenProcess s b -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ProcessAction s -> GenProcess s (ProcessAction s)
forall a. a -> GenProcess s a
forall (m :: * -> *) a. Monad m => a -> m a
return ProcessAction s
act'
| Just (FilterOk s
s') <- Maybe (Filter s)
mf
, [] <- [DispatchFilter s]
fs = s -> GenProcess s ()
forall s. s -> GenProcess s ()
setProcessState s
s' GenProcess s ()
-> GenProcess s (ProcessAction s) -> GenProcess s (ProcessAction s)
forall a b. GenProcess s a -> GenProcess s b -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> GenProcess s (Maybe Message)
-> (Message -> GenProcess s (ProcessAction s))
-> GenProcess s (ProcessAction s)
forall s.
GenProcess s (Maybe Message)
-> (Message -> GenProcess s (ProcessAction s))
-> GenProcess s (ProcessAction s)
applyNext GenProcess s (Maybe Message)
forall s. GenProcess s (Maybe Message)
dequeue Message -> GenProcess s (ProcessAction s)
forall {s}. Message -> GenProcess s (ProcessAction s)
processApply
| Maybe (Filter s)
Nothing <- Maybe (Filter s)
mf, [] <- [DispatchFilter s]
fs = GenProcess s (Maybe Message)
-> (Message -> GenProcess s (ProcessAction s))
-> GenProcess s (ProcessAction s)
forall s.
GenProcess s (Maybe Message)
-> (Message -> GenProcess s (ProcessAction s))
-> GenProcess s (ProcessAction s)
applyNext GenProcess s (Maybe Message)
forall s. GenProcess s (Maybe Message)
dequeue Message -> GenProcess s (ProcessAction s)
forall {s}. Message -> GenProcess s (ProcessAction s)
processApply
| Just (FilterOk s
s') <- Maybe (Filter s)
mf
, (DispatchFilter s
f:[DispatchFilter s]
fs') <- [DispatchFilter s]
fs = do
s -> GenProcess s ()
forall s. s -> GenProcess s ()
setProcessState s
s'
Maybe (Filter s)
act' <- Process (Maybe (Filter s)) -> GenProcess s (Maybe (Filter s))
forall a s. Process a -> GenProcess s a
lift (Process (Maybe (Filter s)) -> GenProcess s (Maybe (Filter s)))
-> Process (Maybe (Filter s)) -> GenProcess s (Maybe (Filter s))
forall a b. (a -> b) -> a -> b
$ s -> DispatchFilter s -> Message -> Process (Maybe (Filter s))
forall s.
s -> DispatchFilter s -> Message -> Process (Maybe (Filter s))
forall (d :: * -> *) s.
DynFilterHandler d =>
s -> d s -> Message -> Process (Maybe (Filter s))
dynHandleFilter s
s' DispatchFilter s
f Message
msg
Bool
-> UnhandledMessagePolicy
-> [DispatchFilter s]
-> Maybe (Filter s)
-> Message
-> GenProcess s (ProcessAction s)
forall s.
Bool
-> UnhandledMessagePolicy
-> [DispatchFilter s]
-> Maybe (Filter s)
-> Message
-> GenProcess s (ProcessAction s)
filterNext Bool
isSafe UnhandledMessagePolicy
mp' [DispatchFilter s]
fs' Maybe (Filter s)
act' Message
msg
| Just (FilterReject m
_ s
s') <- Maybe (Filter s)
mf = do
s -> GenProcess s ()
forall s. s -> GenProcess s ()
setProcessState s
s' GenProcess s ()
-> GenProcess s (Maybe Message) -> GenProcess s (Maybe Message)
forall a b. GenProcess s a -> GenProcess s b -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> GenProcess s (Maybe Message)
forall s. GenProcess s (Maybe Message)
dequeue GenProcess s (Maybe Message)
-> (Maybe Message -> GenProcess s (ProcessAction s))
-> GenProcess s (ProcessAction s)
forall a b.
GenProcess s a -> (a -> GenProcess s b) -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Process (ProcessAction s) -> GenProcess s (ProcessAction s)
forall a s. Process a -> GenProcess s a
lift (Process (ProcessAction s) -> GenProcess s (ProcessAction s))
-> (Maybe Message -> Process (ProcessAction s))
-> Maybe Message
-> GenProcess s (ProcessAction s)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. UnhandledMessagePolicy -> s -> Message -> Process (ProcessAction s)
forall s.
UnhandledMessagePolicy -> s -> Message -> Process (ProcessAction s)
applyPolicy UnhandledMessagePolicy
mp' s
s' (Message -> Process (ProcessAction s))
-> (Maybe Message -> Message)
-> Maybe Message
-> Process (ProcessAction s)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Maybe Message -> Message
forall a. HasCallStack => Maybe a -> a
fromJust
| Maybe (Filter s)
Nothing <- Maybe (Filter s)
mf
, (DispatchFilter s
f:[DispatchFilter s]
fs') <- [DispatchFilter s]
fs = GenProcess s s
forall s. GenProcess s s
processState GenProcess s s
-> (s -> GenProcess s (ProcessAction s))
-> GenProcess s (ProcessAction s)
forall a b.
GenProcess s a -> (a -> GenProcess s b) -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \s
s' -> do
Process (Maybe (Filter s)) -> GenProcess s (Maybe (Filter s))
forall a s. Process a -> GenProcess s a
lift (s -> DispatchFilter s -> Message -> Process (Maybe (Filter s))
forall s.
s -> DispatchFilter s -> Message -> Process (Maybe (Filter s))
forall (d :: * -> *) s.
DynFilterHandler d =>
s -> d s -> Message -> Process (Maybe (Filter s))
dynHandleFilter s
s' DispatchFilter s
f Message
msg) GenProcess s (Maybe (Filter s))
-> (Maybe (Filter s) -> GenProcess s (ProcessAction s))
-> GenProcess s (ProcessAction s)
forall a b.
GenProcess s a -> (a -> GenProcess s b) -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Maybe (Filter s)
a -> Bool
-> UnhandledMessagePolicy
-> [DispatchFilter s]
-> Maybe (Filter s)
-> Message
-> GenProcess s (ProcessAction s)
forall s.
Bool
-> UnhandledMessagePolicy
-> [DispatchFilter s]
-> Maybe (Filter s)
-> Message
-> GenProcess s (ProcessAction s)
filterNext Bool
isSafe UnhandledMessagePolicy
mp' [DispatchFilter s]
fs' Maybe (Filter s)
a Message
msg
applyNext :: (GenProcess s (Maybe Message))
-> (Message -> GenProcess s (ProcessAction s))
-> GenProcess s (ProcessAction s)
applyNext :: forall s.
GenProcess s (Maybe Message)
-> (Message -> GenProcess s (ProcessAction s))
-> GenProcess s (ProcessAction s)
applyNext GenProcess s (Maybe Message)
queueOp Message -> GenProcess s (ProcessAction s)
handler = do
Maybe Message
next <- GenProcess s (Maybe Message)
queueOp
case Maybe Message
next of
Maybe Message
Nothing -> GenProcess s (ProcessAction s)
forall s. GenProcess s (ProcessAction s)
drainOrTimeout
Just Message
msg -> Message -> GenProcess s (ProcessAction s)
handler Message
msg
processApply :: Message -> GenProcess s (ProcessAction s)
processApply Message
msg = do
(ProcessDefinition s
def, s
pState) <- (ProcessState s -> (ProcessDefinition s, s))
-> GenProcess s (ProcessDefinition s, s)
forall s a. (ProcessState s -> a) -> GenProcess s a
gets ((ProcessState s -> (ProcessDefinition s, s))
-> GenProcess s (ProcessDefinition s, s))
-> (ProcessState s -> (ProcessDefinition s, s))
-> GenProcess s (ProcessDefinition s, s)
forall a b. (a -> b) -> a -> b
$ (ProcessDefinition s -> s -> (ProcessDefinition s, s))
-> (ProcessState s -> ProcessDefinition s)
-> (ProcessState s -> s)
-> ProcessState s
-> (ProcessDefinition s, s)
forall a b c.
(a -> b -> c)
-> (ProcessState s -> a)
-> (ProcessState s -> b)
-> ProcessState s
-> c
forall (f :: * -> *) a b c.
Applicative f =>
(a -> b -> c) -> f a -> f b -> f c
liftA2 (,) ProcessState s -> ProcessDefinition s
forall s. ProcessState s -> ProcessDefinition s
procDef ProcessState s -> s
forall s. ProcessState s -> s
procState
let pol :: UnhandledMessagePolicy
pol = ProcessDefinition s -> UnhandledMessagePolicy
forall s. ProcessDefinition s -> UnhandledMessagePolicy
unhandledMessagePolicy ProcessDefinition s
def
apiMatchers :: [Message -> Process (Maybe (ProcessAction s))]
apiMatchers = (Dispatcher s -> Message -> Process (Maybe (ProcessAction s)))
-> [Dispatcher s] -> [Message -> Process (Maybe (ProcessAction s))]
forall a b. (a -> b) -> [a] -> [b]
map (UnhandledMessagePolicy
-> s
-> Dispatcher s
-> Message
-> Process (Maybe (ProcessAction s))
forall s.
UnhandledMessagePolicy
-> s
-> Dispatcher s
-> Message
-> Process (Maybe (ProcessAction s))
forall (d :: * -> *) s.
DynMessageHandler d =>
UnhandledMessagePolicy
-> s -> d s -> Message -> Process (Maybe (ProcessAction s))
dynHandleMessage UnhandledMessagePolicy
pol s
pState) (ProcessDefinition s -> [Dispatcher s]
forall s. ProcessDefinition s -> [Dispatcher s]
apiHandlers ProcessDefinition s
def)
infoMatchers :: [Message -> Process (Maybe (ProcessAction s))]
infoMatchers = (DeferredDispatcher s
-> Message -> Process (Maybe (ProcessAction s)))
-> [DeferredDispatcher s]
-> [Message -> Process (Maybe (ProcessAction s))]
forall a b. (a -> b) -> [a] -> [b]
map (UnhandledMessagePolicy
-> s
-> DeferredDispatcher s
-> Message
-> Process (Maybe (ProcessAction s))
forall s.
UnhandledMessagePolicy
-> s
-> DeferredDispatcher s
-> Message
-> Process (Maybe (ProcessAction s))
forall (d :: * -> *) s.
DynMessageHandler d =>
UnhandledMessagePolicy
-> s -> d s -> Message -> Process (Maybe (ProcessAction s))
dynHandleMessage UnhandledMessagePolicy
pol s
pState) (ProcessDefinition s -> [DeferredDispatcher s]
forall s. ProcessDefinition s -> [DeferredDispatcher s]
infoHandlers ProcessDefinition s
def)
extMatchers :: [Message -> Process (Maybe (ProcessAction s))]
extMatchers = (ExternDispatcher s
-> Message -> Process (Maybe (ProcessAction s)))
-> [ExternDispatcher s]
-> [Message -> Process (Maybe (ProcessAction s))]
forall a b. (a -> b) -> [a] -> [b]
map (UnhandledMessagePolicy
-> s
-> ExternDispatcher s
-> Message
-> Process (Maybe (ProcessAction s))
forall s.
UnhandledMessagePolicy
-> s
-> ExternDispatcher s
-> Message
-> Process (Maybe (ProcessAction s))
forall (d :: * -> *) s.
DynMessageHandler d =>
UnhandledMessagePolicy
-> s -> d s -> Message -> Process (Maybe (ProcessAction s))
dynHandleMessage UnhandledMessagePolicy
pol s
pState) (ProcessDefinition s -> [ExternDispatcher s]
forall s. ProcessDefinition s -> [ExternDispatcher s]
externHandlers ProcessDefinition s
def)
shutdown' :: Message -> Process (Maybe (ProcessAction s))
shutdown' = UnhandledMessagePolicy
-> s
-> Dispatcher s
-> Message
-> Process (Maybe (ProcessAction s))
forall s.
UnhandledMessagePolicy
-> s
-> Dispatcher s
-> Message
-> Process (Maybe (ProcessAction s))
forall (d :: * -> *) s.
DynMessageHandler d =>
UnhandledMessagePolicy
-> s -> d s -> Message -> Process (Maybe (ProcessAction s))
dynHandleMessage UnhandledMessagePolicy
pol s
pState Dispatcher s
forall s. Dispatcher s
shutdownHandler'
ms' :: [Message -> Process (Maybe (ProcessAction s))]
ms' = (Message -> Process (Maybe (ProcessAction s))
shutdown'(Message -> Process (Maybe (ProcessAction s)))
-> [Message -> Process (Maybe (ProcessAction s))]
-> [Message -> Process (Maybe (ProcessAction s))]
forall a. a -> [a] -> [a]
:[Message -> Process (Maybe (ProcessAction s))]
extMatchers) [Message -> Process (Maybe (ProcessAction s))]
-> [Message -> Process (Maybe (ProcessAction s))]
-> [Message -> Process (Maybe (ProcessAction s))]
forall a. [a] -> [a] -> [a]
++ [Message -> Process (Maybe (ProcessAction s))]
apiMatchers [Message -> Process (Maybe (ProcessAction s))]
-> [Message -> Process (Maybe (ProcessAction s))]
-> [Message -> Process (Maybe (ProcessAction s))]
forall a. [a] -> [a] -> [a]
++ [Message -> Process (Maybe (ProcessAction s))]
infoMatchers
[Message -> Process (Maybe (ProcessAction s))]
-> UnhandledMessagePolicy
-> s
-> Message
-> GenProcess s (ProcessAction s)
forall {t} {s}.
[Message -> Process (Maybe (ProcessAction t))]
-> UnhandledMessagePolicy
-> t
-> Message
-> GenProcess s (ProcessAction t)
processApplyAux [Message -> Process (Maybe (ProcessAction s))]
ms' UnhandledMessagePolicy
pol s
pState Message
msg
processApplyAux :: [Message -> Process (Maybe (ProcessAction t))]
-> UnhandledMessagePolicy
-> t
-> Message
-> GenProcess s (ProcessAction t)
processApplyAux [] UnhandledMessagePolicy
p' t
s' Message
m' = Process (ProcessAction t) -> GenProcess s (ProcessAction t)
forall a s. Process a -> GenProcess s a
lift (Process (ProcessAction t) -> GenProcess s (ProcessAction t))
-> Process (ProcessAction t) -> GenProcess s (ProcessAction t)
forall a b. (a -> b) -> a -> b
$ UnhandledMessagePolicy -> t -> Message -> Process (ProcessAction t)
forall s.
UnhandledMessagePolicy -> s -> Message -> Process (ProcessAction s)
applyPolicy UnhandledMessagePolicy
p' t
s' Message
m'
processApplyAux (Message -> Process (Maybe (ProcessAction t))
h:[Message -> Process (Maybe (ProcessAction t))]
hs) UnhandledMessagePolicy
p' t
s' Message
m' = do
Maybe (ProcessAction t)
attempt <- Process (Maybe (ProcessAction t))
-> GenProcess s (Maybe (ProcessAction t))
forall a s. Process a -> GenProcess s a
lift (Process (Maybe (ProcessAction t))
-> GenProcess s (Maybe (ProcessAction t)))
-> Process (Maybe (ProcessAction t))
-> GenProcess s (Maybe (ProcessAction t))
forall a b. (a -> b) -> a -> b
$ Message -> Process (Maybe (ProcessAction t))
h Message
m'
case Maybe (ProcessAction t)
attempt of
Maybe (ProcessAction t)
Nothing -> [Message -> Process (Maybe (ProcessAction t))]
-> UnhandledMessagePolicy
-> t
-> Message
-> GenProcess s (ProcessAction t)
processApplyAux [Message -> Process (Maybe (ProcessAction t))]
hs UnhandledMessagePolicy
p' t
s' Message
m'
Just ProcessAction t
act' -> ProcessAction t -> GenProcess s (ProcessAction t)
forall a. a -> GenProcess s a
forall (m :: * -> *) a. Monad m => a -> m a
return ProcessAction t
act'
drainMailbox :: GenProcess s ()
drainMailbox :: forall s. GenProcess s ()
drainMailbox = do
s
ps <- GenProcess s s
forall s. GenProcess s s
processState
ProcessDefinition s
pd <- GenProcess s (ProcessDefinition s)
forall s. GenProcess s (ProcessDefinition s)
processDefinition
[DispatchPriority s]
pp <- GenProcess s [DispatchPriority s]
forall s. GenProcess s [DispatchPriority s]
processPriorities
TimerMap
ut <- (ProcessState s -> TimerMap) -> GenProcess s TimerMap
forall s a. (ProcessState s -> a) -> GenProcess s a
gets ProcessState s -> TimerMap
forall s. ProcessState s -> TimerMap
usrTimers
let ts :: [Match (Either TimedOut Message)]
ts = (TimerKey
-> (Timer, Message)
-> [Match (Either TimedOut Message)]
-> [Match (Either TimedOut Message)])
-> [Match (Either TimedOut Message)]
-> TimerMap
-> [Match (Either TimedOut Message)]
forall k a b. (k -> a -> b -> b) -> b -> Map k a -> b
Map.foldrWithKey (\TimerKey
k (Timer
t, Message
_) [Match (Either TimedOut Message)]
ms -> [Match (Either TimedOut Message)]
ms [Match (Either TimedOut Message)]
-> [Match (Either TimedOut Message)]
-> [Match (Either TimedOut Message)]
forall a. [a] -> [a] -> [a]
++ TimerKey -> Timer -> [Match (Either TimedOut Message)]
matchKey TimerKey
k Timer
t) [] TimerMap
ut
let ms :: [Match (Either TimedOut Message)]
ms = [Match (Either TimedOut Message)]
ts [Match (Either TimedOut Message)]
-> [Match (Either TimedOut Message)]
-> [Match (Either TimedOut Message)]
forall a. [a] -> [a] -> [a]
++ ((Message -> Process (Either TimedOut Message))
-> Match (Either TimedOut Message)
forall b. (Message -> Process b) -> Match b
matchAny (Either TimedOut Message -> Process (Either TimedOut Message)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either TimedOut Message -> Process (Either TimedOut Message))
-> (Message -> Either TimedOut Message)
-> Message
-> Process (Either TimedOut Message)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Message -> Either TimedOut Message
forall a b. b -> Either a b
Right) Match (Either TimedOut Message)
-> [Match (Either TimedOut Message)]
-> [Match (Either TimedOut Message)]
forall a. a -> [a] -> [a]
: (s -> ProcessDefinition s -> [Match (Either TimedOut Message)]
forall s.
s -> ProcessDefinition s -> [Match (Either TimedOut Message)]
mkMatchers s
ps ProcessDefinition s
pd))
Maybe TimerKey
timerAcc <- GenProcess s RecvTimeoutPolicy
forall s. GenProcess s RecvTimeoutPolicy
timeoutPolicy GenProcess s RecvTimeoutPolicy
-> (RecvTimeoutPolicy -> GenProcess s (Maybe TimerKey))
-> GenProcess s (Maybe TimerKey)
forall a b.
GenProcess s a -> (a -> GenProcess s b) -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \RecvTimeoutPolicy
spec -> case RecvTimeoutPolicy
spec of
RecvTimer TimeInterval
_ -> Maybe TimerKey -> GenProcess s (Maybe TimerKey)
forall a. a -> GenProcess s a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe TimerKey
forall a. Maybe a
Nothing
RecvMaxBacklog TimerKey
cnt -> Maybe TimerKey -> GenProcess s (Maybe TimerKey)
forall a. a -> GenProcess s a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe TimerKey -> GenProcess s (Maybe TimerKey))
-> Maybe TimerKey -> GenProcess s (Maybe TimerKey)
forall a b. (a -> b) -> a -> b
$ TimerKey -> Maybe TimerKey
forall a. a -> Maybe a
Just TimerKey
cnt
GenProcess s () -> GenProcess s ()
forall (m :: * -> *) a. (HasCallStack, MonadMask m) => m a -> m a
mask_ (GenProcess s () -> GenProcess s ())
-> GenProcess s () -> GenProcess s ()
forall a b. (a -> b) -> a -> b
$ do
Timer
tt <- GenProcess s Timer
forall s. GenProcess s Timer
maybeStartTimer
s
-> [DispatchPriority s]
-> Maybe TimerKey
-> [Match (Either TimedOut Message)]
-> GenProcess s ()
forall s.
s
-> [DispatchPriority s]
-> Maybe TimerKey
-> [Match (Either TimedOut Message)]
-> GenProcess s ()
drainAux s
ps [DispatchPriority s]
pp Maybe TimerKey
timerAcc ([Match (Either TimedOut Message)]
ms [Match (Either TimedOut Message)]
-> [Match (Either TimedOut Message)]
-> [Match (Either TimedOut Message)]
forall a. [a] -> [a] -> [a]
++ Timer -> [Match (Either TimedOut Message)]
matchTimeout Timer
tt)
(Process Timer -> GenProcess s Timer
forall a s. Process a -> GenProcess s a
lift (Process Timer -> GenProcess s Timer)
-> Process Timer -> GenProcess s Timer
forall a b. (a -> b) -> a -> b
$ Timer -> Process Timer
stopTimer Timer
tt) GenProcess s Timer -> (Timer -> GenProcess s ()) -> GenProcess s ()
forall a b.
GenProcess s a -> (a -> GenProcess s b) -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Timer -> GenProcess s ()
forall s. Timer -> GenProcess s ()
setDrainTimeout
drainAux :: s
-> [DispatchPriority s]
-> Limit
-> [Match (Either TimedOut Message)]
-> GenProcess s ()
drainAux :: forall s.
s
-> [DispatchPriority s]
-> Maybe TimerKey
-> [Match (Either TimedOut Message)]
-> GenProcess s ()
drainAux s
ps' [DispatchPriority s]
pp' Maybe TimerKey
maxbq [Match (Either TimedOut Message)]
ms = do
(Maybe TimerKey
cnt, Maybe (Either TimedOut Message)
m) <- Maybe TimerKey
-> [Match (Either TimedOut Message)]
-> GenProcess s (Maybe TimerKey, Maybe (Either TimedOut Message))
forall s.
Maybe TimerKey
-> [Match (Either TimedOut Message)]
-> GenProcess s (Maybe TimerKey, Maybe (Either TimedOut Message))
scanMailbox Maybe TimerKey
maxbq [Match (Either TimedOut Message)]
ms
case Maybe (Either TimedOut Message)
m of
Maybe (Either TimedOut Message)
Nothing -> () -> GenProcess s ()
forall a. a -> GenProcess s a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just (Right Message
m') -> do s -> [DispatchPriority s] -> Message -> GenProcess s ()
forall s. s -> [DispatchPriority s] -> Message -> GenProcess s ()
enqueueMessage s
ps' [DispatchPriority s]
pp' Message
m'
s
-> [DispatchPriority s]
-> Maybe TimerKey
-> [Match (Either TimedOut Message)]
-> GenProcess s ()
forall s.
s
-> [DispatchPriority s]
-> Maybe TimerKey
-> [Match (Either TimedOut Message)]
-> GenProcess s ()
drainAux s
ps' [DispatchPriority s]
pp' Maybe TimerKey
cnt [Match (Either TimedOut Message)]
ms
Just (Left TimedOut
TimedOut) -> () -> GenProcess s ()
forall a. a -> GenProcess s a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just (Left (Yield TimerKey
i)) ->
TimerKey -> (Message -> GenProcess s ()) -> GenProcess s ()
forall s a.
TimerKey -> (Message -> GenProcess s a) -> GenProcess s a
consumeTimer TimerKey
i Message -> GenProcess s ()
forall s. Message -> GenProcess s ()
push GenProcess s () -> GenProcess s () -> GenProcess s ()
forall a b. GenProcess s a -> GenProcess s b -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> s
-> [DispatchPriority s]
-> Maybe TimerKey
-> [Match (Either TimedOut Message)]
-> GenProcess s ()
forall s.
s
-> [DispatchPriority s]
-> Maybe TimerKey
-> [Match (Either TimedOut Message)]
-> GenProcess s ()
drainAux s
ps' [DispatchPriority s]
pp' Maybe TimerKey
cnt [Match (Either TimedOut Message)]
ms
maybeStartTimer :: GenProcess s Timer
maybeStartTimer :: forall s. GenProcess s Timer
maybeStartTimer = do
RecvTimeoutPolicy
tp <- GenProcess s RecvTimeoutPolicy
forall s. GenProcess s RecvTimeoutPolicy
timeoutPolicy
Timer
t <- case RecvTimeoutPolicy
tp of
RecvTimer TimeInterval
d -> (Process Timer -> GenProcess s Timer
forall a s. Process a -> GenProcess s a
lift (Process Timer -> GenProcess s Timer)
-> Process Timer -> GenProcess s Timer
forall a b. (a -> b) -> a -> b
$ Delay -> Process Timer
startTimer (Delay -> Process Timer) -> Delay -> Process Timer
forall a b. (a -> b) -> a -> b
$ TimeInterval -> Delay
Delay TimeInterval
d)
RecvTimeoutPolicy
_ -> Timer -> GenProcess s Timer
forall a. a -> GenProcess s a
forall (m :: * -> *) a. Monad m => a -> m a
return (Timer -> GenProcess s Timer) -> Timer -> GenProcess s Timer
forall a b. (a -> b) -> a -> b
$ Delay -> Timer
delayTimer Delay
Infinity
Timer -> GenProcess s ()
forall s. Timer -> GenProcess s ()
setDrainTimeout Timer
t
Timer -> GenProcess s Timer
forall a. a -> GenProcess s a
forall (m :: * -> *) a. Monad m => a -> m a
return Timer
t
scanMailbox :: Limit
-> [Match (Either TimedOut Message)]
-> GenProcess s (Limit, Maybe (Either TimedOut Message))
scanMailbox :: forall s.
Maybe TimerKey
-> [Match (Either TimedOut Message)]
-> GenProcess s (Maybe TimerKey, Maybe (Either TimedOut Message))
scanMailbox Maybe TimerKey
lim [Match (Either TimedOut Message)]
ms
| Just TimerKey
0 <- Maybe TimerKey
lim = (Maybe TimerKey, Maybe (Either TimedOut Message))
-> GenProcess s (Maybe TimerKey, Maybe (Either TimedOut Message))
forall a. a -> GenProcess s a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe TimerKey
lim, Either TimedOut Message -> Maybe (Either TimedOut Message)
forall a. a -> Maybe a
Just (Either TimedOut Message -> Maybe (Either TimedOut Message))
-> Either TimedOut Message -> Maybe (Either TimedOut Message)
forall a b. (a -> b) -> a -> b
$ TimedOut -> Either TimedOut Message
forall a b. a -> Either a b
Left TimedOut
TimedOut)
| Just TimerKey
c <- Maybe TimerKey
lim = do
Process (Maybe TimerKey, Maybe (Either TimedOut Message))
-> GenProcess s (Maybe TimerKey, Maybe (Either TimedOut Message))
forall a s. Process a -> GenProcess s a
lift (Process (Maybe TimerKey, Maybe (Either TimedOut Message))
-> GenProcess s (Maybe TimerKey, Maybe (Either TimedOut Message)))
-> Process (Maybe TimerKey, Maybe (Either TimedOut Message))
-> GenProcess s (Maybe TimerKey, Maybe (Either TimedOut Message))
forall a b. (a -> b) -> a -> b
$ (Maybe (Either TimedOut Message)
-> (Maybe TimerKey, Maybe (Either TimedOut Message)))
-> Process (Maybe (Either TimedOut Message))
-> Process (Maybe TimerKey, Maybe (Either TimedOut Message))
forall a b. (a -> b) -> Process a -> Process b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (TimerKey -> Maybe TimerKey
forall a. a -> Maybe a
Just (TimerKey
c TimerKey -> TimerKey -> TimerKey
forall a. Num a => a -> a -> a
- TimerKey
1), ) (TimerKey
-> [Match (Either TimedOut Message)]
-> Process (Maybe (Either TimedOut Message))
forall b. TimerKey -> [Match b] -> Process (Maybe b)
receiveTimeout TimerKey
0 [Match (Either TimedOut Message)]
ms)
| Bool
otherwise = Process (Maybe TimerKey, Maybe (Either TimedOut Message))
-> GenProcess s (Maybe TimerKey, Maybe (Either TimedOut Message))
forall a s. Process a -> GenProcess s a
lift (Process (Maybe TimerKey, Maybe (Either TimedOut Message))
-> GenProcess s (Maybe TimerKey, Maybe (Either TimedOut Message)))
-> Process (Maybe TimerKey, Maybe (Either TimedOut Message))
-> GenProcess s (Maybe TimerKey, Maybe (Either TimedOut Message))
forall a b. (a -> b) -> a -> b
$ (Maybe (Either TimedOut Message)
-> (Maybe TimerKey, Maybe (Either TimedOut Message)))
-> Process (Maybe (Either TimedOut Message))
-> Process (Maybe TimerKey, Maybe (Either TimedOut Message))
forall a b. (a -> b) -> Process a -> Process b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Maybe TimerKey
lim, ) (TimerKey
-> [Match (Either TimedOut Message)]
-> Process (Maybe (Either TimedOut Message))
forall b. TimerKey -> [Match b] -> Process (Maybe b)
receiveTimeout TimerKey
0 [Match (Either TimedOut Message)]
ms)
drainOrTimeout :: GenProcess s (ProcessAction s)
drainOrTimeout :: forall s. GenProcess s (ProcessAction s)
drainOrTimeout = do
ProcessDefinition s
pd <- GenProcess s (ProcessDefinition s)
forall s. GenProcess s (ProcessDefinition s)
processDefinition
s
ps <- GenProcess s s
forall s. GenProcess s s
processState
Delay
ud <- GenProcess s Delay
forall s. GenProcess s Delay
currentTimeout
[Match Message]
mr <- GenProcess s [Match Message]
forall s. GenProcess s [Match Message]
mkMatchRunners
let ump :: UnhandledMessagePolicy
ump = ProcessDefinition s -> UnhandledMessagePolicy
forall s. ProcessDefinition s -> UnhandledMessagePolicy
unhandledMessagePolicy ProcessDefinition s
pd
hto :: TimeoutHandler s
hto = ProcessDefinition s -> TimeoutHandler s
forall s. ProcessDefinition s -> TimeoutHandler s
timeoutHandler ProcessDefinition s
pd
matches :: [Match Message]
matches = [Match Message]
mr [Match Message] -> [Match Message] -> [Match Message]
forall a. [a] -> [a] -> [a]
++ (((Message -> Process Message) -> Match Message
matchMessage Message -> Process Message
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return)Match Message -> [Match Message] -> [Match Message]
forall a. a -> [a] -> [a]
:(ExternDispatcher s -> Match Message)
-> [ExternDispatcher s] -> [Match Message]
forall a b. (a -> b) -> [a] -> [b]
map (UnhandledMessagePolicy -> s -> ExternDispatcher s -> Match Message
forall s.
UnhandledMessagePolicy -> s -> ExternDispatcher s -> Match Message
forall (d :: * -> *) s.
ExternMatcher d =>
UnhandledMessagePolicy -> s -> d s -> Match Message
matchExtern UnhandledMessagePolicy
ump s
ps) (ProcessDefinition s -> [ExternDispatcher s]
forall s. ProcessDefinition s -> [ExternDispatcher s]
externHandlers ProcessDefinition s
pd))
recv :: GenProcess s (Maybe Message)
recv = case Delay
ud of
Delay
Infinity -> Process (Maybe Message) -> GenProcess s (Maybe Message)
forall a s. Process a -> GenProcess s a
lift (Process (Maybe Message) -> GenProcess s (Maybe Message))
-> Process (Maybe Message) -> GenProcess s (Maybe Message)
forall a b. (a -> b) -> a -> b
$ (Message -> Maybe Message)
-> Process Message -> Process (Maybe Message)
forall a b. (a -> b) -> Process a -> Process b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Message -> Maybe Message
forall a. a -> Maybe a
Just ([Match Message] -> Process Message
forall b. [Match b] -> Process b
receiveWait [Match Message]
matches)
Delay
NoDelay -> Process (Maybe Message) -> GenProcess s (Maybe Message)
forall a s. Process a -> GenProcess s a
lift (Process (Maybe Message) -> GenProcess s (Maybe Message))
-> Process (Maybe Message) -> GenProcess s (Maybe Message)
forall a b. (a -> b) -> a -> b
$ TimerKey -> [Match Message] -> Process (Maybe Message)
forall b. TimerKey -> [Match b] -> Process (Maybe b)
receiveTimeout TimerKey
0 [Match Message]
matches
Delay TimeInterval
i -> Process (Maybe Message) -> GenProcess s (Maybe Message)
forall a s. Process a -> GenProcess s a
lift (Process (Maybe Message) -> GenProcess s (Maybe Message))
-> Process (Maybe Message) -> GenProcess s (Maybe Message)
forall a b. (a -> b) -> a -> b
$ TimerKey -> [Match Message] -> Process (Maybe Message)
forall b. TimerKey -> [Match b] -> Process (Maybe b)
receiveTimeout (TimeInterval -> TimerKey
asTimeout TimeInterval
i) [Match Message]
matches
((forall a. GenProcess s a -> GenProcess s a)
-> GenProcess s (ProcessAction s))
-> GenProcess s (ProcessAction s)
forall b.
HasCallStack =>
((forall a. GenProcess s a -> GenProcess s a) -> GenProcess s b)
-> GenProcess s b
forall (m :: * -> *) b.
(MonadMask m, HasCallStack) =>
((forall a. m a -> m a) -> m b) -> m b
mask (((forall a. GenProcess s a -> GenProcess s a)
-> GenProcess s (ProcessAction s))
-> GenProcess s (ProcessAction s))
-> ((forall a. GenProcess s a -> GenProcess s a)
-> GenProcess s (ProcessAction s))
-> GenProcess s (ProcessAction s)
forall a b. (a -> b) -> a -> b
$ \forall a. GenProcess s a -> GenProcess s a
restore -> GenProcess s (Maybe Message)
forall s. GenProcess s (Maybe Message)
recv GenProcess s (Maybe Message)
-> (Maybe Message -> GenProcess s (ProcessAction s))
-> GenProcess s (ProcessAction s)
forall a b.
GenProcess s a -> (a -> GenProcess s b) -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Maybe Message
r ->
case Maybe Message
r of
Maybe Message
Nothing -> GenProcess s (ProcessAction s) -> GenProcess s (ProcessAction s)
forall a. GenProcess s a -> GenProcess s a
restore (GenProcess s (ProcessAction s) -> GenProcess s (ProcessAction s))
-> GenProcess s (ProcessAction s) -> GenProcess s (ProcessAction s)
forall a b. (a -> b) -> a -> b
$ Process (ProcessAction s) -> GenProcess s (ProcessAction s)
forall a s. Process a -> GenProcess s a
lift (Process (ProcessAction s) -> GenProcess s (ProcessAction s))
-> Process (ProcessAction s) -> GenProcess s (ProcessAction s)
forall a b. (a -> b) -> a -> b
$ TimeoutHandler s
hto s
ps Delay
ud
Just Message
m -> do
[DispatchPriority s]
pp <- GenProcess s [DispatchPriority s]
forall s. GenProcess s [DispatchPriority s]
processPriorities
s -> [DispatchPriority s] -> Message -> GenProcess s ()
forall s. s -> [DispatchPriority s] -> Message -> GenProcess s ()
enqueueMessage s
ps [DispatchPriority s]
pp Message
m
GenProcess s (ProcessAction s) -> GenProcess s (ProcessAction s)
forall a. GenProcess s a -> GenProcess s a
restore (GenProcess s (ProcessAction s) -> GenProcess s (ProcessAction s))
-> GenProcess s (ProcessAction s) -> GenProcess s (ProcessAction s)
forall a b. (a -> b) -> a -> b
$ ProcessAction s -> GenProcess s (ProcessAction s)
forall a. a -> GenProcess s a
forall (m :: * -> *) a. Monad m => a -> m a
return ProcessAction s
forall s. ProcessAction s
ProcessSkip
mkMatchRunners :: GenProcess s [Match Message]
mkMatchRunners :: forall s. GenProcess s [Match Message]
mkMatchRunners = do
TimerMap
ut <- (ProcessState s -> TimerMap) -> GenProcess s TimerMap
forall s a. (ProcessState s -> a) -> GenProcess s a
gets ProcessState s -> TimerMap
forall s. ProcessState s -> TimerMap
usrTimers
TimerKey -> Process Message
fn <- GenProcess s (TimerKey -> Process Message)
forall s. GenProcess s (TimerKey -> Process Message)
mkRunner
let ms :: [Match Message]
ms = (TimerKey
-> (Timer, Message) -> [Match Message] -> [Match Message])
-> [Match Message] -> TimerMap -> [Match Message]
forall k a b. (k -> a -> b -> b) -> b -> Map k a -> b
Map.foldrWithKey (\TimerKey
k (Timer
t, Message
_) [Match Message]
ms' -> [Match Message]
ms' [Match Message] -> [Match Message] -> [Match Message]
forall a. [a] -> [a] -> [a]
++ (TimerKey -> Process Message)
-> TimerKey -> Timer -> [Match Message]
matchRun TimerKey -> Process Message
fn TimerKey
k Timer
t) [] TimerMap
ut
[Match Message] -> GenProcess s [Match Message]
forall a. a -> GenProcess s a
forall (m :: * -> *) a. Monad m => a -> m a
return [Match Message]
ms
mkRunner :: GenProcess s (TimerKey -> Process Message)
mkRunner :: forall s. GenProcess s (TimerKey -> Process Message)
mkRunner = do
State s
st <- GenProcess s (State s)
forall s (m :: * -> *). MonadState s m => m s
ST.get
let fn :: TimerKey -> Process Message
fn = \TimerKey
k -> do (Message
m, State s
_) <- State s -> GenProcess s Message -> Process (Message, State s)
forall s a. State s -> GenProcess s a -> Process (a, State s)
runProcess State s
st (TimerKey
-> (Message -> GenProcess s Message) -> GenProcess s Message
forall s a.
TimerKey -> (Message -> GenProcess s a) -> GenProcess s a
consumeTimer TimerKey
k Message -> GenProcess s Message
forall a. a -> GenProcess s a
forall (m :: * -> *) a. Monad m => a -> m a
return)
Message -> Process Message
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return Message
m
(TimerKey -> Process Message)
-> GenProcess s (TimerKey -> Process Message)
forall a. a -> GenProcess s a
forall (m :: * -> *) a. Monad m => a -> m a
return TimerKey -> Process Message
fn
mkMatchers :: s
-> ProcessDefinition s
-> [Match (Either TimedOut Message)]
mkMatchers :: forall s.
s -> ProcessDefinition s -> [Match (Either TimedOut Message)]
mkMatchers s
st ProcessDefinition s
df =
(ExternDispatcher s -> Match (Either TimedOut Message))
-> [ExternDispatcher s] -> [Match (Either TimedOut Message)]
forall a b. (a -> b) -> [a] -> [b]
map (UnhandledMessagePolicy
-> s
-> (Message -> Either TimedOut Message)
-> ExternDispatcher s
-> Match (Either TimedOut Message)
forall m s.
UnhandledMessagePolicy
-> s -> (Message -> m) -> ExternDispatcher s -> Match m
forall (d :: * -> *) m s.
ExternMatcher d =>
UnhandledMessagePolicy -> s -> (Message -> m) -> d s -> Match m
matchMapExtern (ProcessDefinition s -> UnhandledMessagePolicy
forall s. ProcessDefinition s -> UnhandledMessagePolicy
unhandledMessagePolicy ProcessDefinition s
df) s
st Message -> Either TimedOut Message
toRight)
(ProcessDefinition s -> [ExternDispatcher s]
forall s. ProcessDefinition s -> [ExternDispatcher s]
externHandlers ProcessDefinition s
df)
toRight :: Message -> Either TimedOut Message
toRight :: Message -> Either TimedOut Message
toRight = Message -> Either TimedOut Message
forall a b. b -> Either a b
Right
recvLoop :: ProcessDefinition s -> s -> Delay -> Process ExitReason
recvLoop :: forall s. ProcessDefinition s -> s -> Delay -> Process ExitReason
recvLoop ProcessDefinition s
pDef s
pState Delay
recvDelay =
let p :: UnhandledMessagePolicy
p = ProcessDefinition s -> UnhandledMessagePolicy
forall s. ProcessDefinition s -> UnhandledMessagePolicy
unhandledMessagePolicy ProcessDefinition s
pDef
handleTimeout :: TimeoutHandler s
handleTimeout = ProcessDefinition s -> TimeoutHandler s
forall s. ProcessDefinition s -> TimeoutHandler s
timeoutHandler ProcessDefinition s
pDef
handleStop :: ShutdownHandler s
handleStop = ProcessDefinition s -> ShutdownHandler s
forall s. ProcessDefinition s -> ShutdownHandler s
shutdownHandler ProcessDefinition s
pDef
shutdown' :: Match (ProcessAction s)
shutdown' = UnhandledMessagePolicy
-> s -> Dispatcher s -> Match (ProcessAction s)
forall s.
UnhandledMessagePolicy
-> s -> Dispatcher s -> Match (ProcessAction s)
forall (d :: * -> *) s.
MessageMatcher d =>
UnhandledMessagePolicy -> s -> d s -> Match (ProcessAction s)
matchDispatch UnhandledMessagePolicy
p s
pState Dispatcher s
forall s. Dispatcher s
shutdownHandler'
extMatchers :: [Match (ProcessAction s)]
extMatchers = (ExternDispatcher s -> Match (ProcessAction s))
-> [ExternDispatcher s] -> [Match (ProcessAction s)]
forall a b. (a -> b) -> [a] -> [b]
map (UnhandledMessagePolicy
-> s -> ExternDispatcher s -> Match (ProcessAction s)
forall s.
UnhandledMessagePolicy
-> s -> ExternDispatcher s -> Match (ProcessAction s)
forall (d :: * -> *) s.
MessageMatcher d =>
UnhandledMessagePolicy -> s -> d s -> Match (ProcessAction s)
matchDispatch UnhandledMessagePolicy
p s
pState) (ProcessDefinition s -> [ExternDispatcher s]
forall s. ProcessDefinition s -> [ExternDispatcher s]
externHandlers ProcessDefinition s
pDef)
matchers :: [Match (ProcessAction s)]
matchers = [Match (ProcessAction s)]
extMatchers [Match (ProcessAction s)]
-> [Match (ProcessAction s)] -> [Match (ProcessAction s)]
forall a. [a] -> [a] -> [a]
++ ((Dispatcher s -> Match (ProcessAction s))
-> [Dispatcher s] -> [Match (ProcessAction s)]
forall a b. (a -> b) -> [a] -> [b]
map (UnhandledMessagePolicy
-> s -> Dispatcher s -> Match (ProcessAction s)
forall s.
UnhandledMessagePolicy
-> s -> Dispatcher s -> Match (ProcessAction s)
forall (d :: * -> *) s.
MessageMatcher d =>
UnhandledMessagePolicy -> s -> d s -> Match (ProcessAction s)
matchDispatch UnhandledMessagePolicy
p s
pState) (ProcessDefinition s -> [Dispatcher s]
forall s. ProcessDefinition s -> [Dispatcher s]
apiHandlers ProcessDefinition s
pDef))
ex' :: [ExitSignalDispatcher s]
ex' = (ExitSignalDispatcher s
forall s. ExitSignalDispatcher s
trapExitExitSignalDispatcher s
-> [ExitSignalDispatcher s] -> [ExitSignalDispatcher s]
forall a. a -> [a] -> [a]
:(ProcessDefinition s -> [ExitSignalDispatcher s]
forall s. ProcessDefinition s -> [ExitSignalDispatcher s]
exitHandlers ProcessDefinition s
pDef))
ms' :: [Match (ProcessAction s)]
ms' = (Match (ProcessAction s)
shutdown'Match (ProcessAction s)
-> [Match (ProcessAction s)] -> [Match (ProcessAction s)]
forall a. a -> [a] -> [a]
:[Match (ProcessAction s)]
matchers) [Match (ProcessAction s)]
-> [Match (ProcessAction s)] -> [Match (ProcessAction s)]
forall a. [a] -> [a] -> [a]
++ UnhandledMessagePolicy
-> s -> [DeferredDispatcher s] -> [Match (ProcessAction s)]
forall s.
UnhandledMessagePolicy
-> s -> [DeferredDispatcher s] -> [Match (ProcessAction s)]
matchAux UnhandledMessagePolicy
p s
pState (ProcessDefinition s -> [DeferredDispatcher s]
forall s. ProcessDefinition s -> [DeferredDispatcher s]
infoHandlers ProcessDefinition s
pDef)
in do
ProcessAction s
ac <- Process (ProcessAction s)
-> [ProcessId -> Message -> Process (Maybe (ProcessAction s))]
-> Process (ProcessAction s)
forall b.
Process b
-> [ProcessId -> Message -> Process (Maybe b)] -> Process b
catchesExit ([Match (ProcessAction s)] -> TimeoutHandler s -> TimeoutHandler s
forall s.
[Match (ProcessAction s)] -> TimeoutHandler s -> TimeoutHandler s
processReceive [Match (ProcessAction s)]
ms' TimeoutHandler s
handleTimeout s
pState Delay
recvDelay)
((ExitSignalDispatcher s
-> ProcessId -> Message -> Process (Maybe (ProcessAction s)))
-> [ExitSignalDispatcher s]
-> [ProcessId -> Message -> Process (Maybe (ProcessAction s))]
forall a b. (a -> b) -> [a] -> [b]
map (\ExitSignalDispatcher s
d' -> (ExitSignalDispatcher s
-> s -> ProcessId -> Message -> Process (Maybe (ProcessAction s))
forall s.
ExitSignalDispatcher s
-> s -> ProcessId -> Message -> Process (Maybe (ProcessAction s))
dispatchExit ExitSignalDispatcher s
d') s
pState) [ExitSignalDispatcher s]
ex')
case ProcessAction s
ac of
ProcessAction s
ProcessSkip -> ProcessDefinition s -> s -> Delay -> Process ExitReason
forall s. ProcessDefinition s -> s -> Delay -> Process ExitReason
recvLoop ProcessDefinition s
pDef s
pState Delay
recvDelay
(ProcessContinue s
s') -> ProcessDefinition s -> s -> Delay -> Process ExitReason
forall s. ProcessDefinition s -> s -> Delay -> Process ExitReason
recvLoop ProcessDefinition s
pDef s
s' Delay
recvDelay
(ProcessTimeout Delay
t' s
s') -> ProcessDefinition s -> s -> Delay -> Process ExitReason
forall s. ProcessDefinition s -> s -> Delay -> Process ExitReason
recvLoop ProcessDefinition s
pDef s
s' Delay
t'
(ProcessHibernate TimeInterval
d' s
s') -> TimeInterval -> Process ()
block TimeInterval
d' Process () -> Process ExitReason -> Process ExitReason
forall a b. Process a -> Process b -> Process b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ProcessDefinition s -> s -> Delay -> Process ExitReason
forall s. ProcessDefinition s -> s -> Delay -> Process ExitReason
recvLoop ProcessDefinition s
pDef s
s' Delay
recvDelay
(ProcessStop ExitReason
r) -> ShutdownHandler s
handleStop (s -> ExitState s
forall s. s -> ExitState s
LastKnown s
pState) ExitReason
r Process () -> Process ExitReason -> Process ExitReason
forall a b. Process a -> Process b -> Process b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ExitReason -> Process ExitReason
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (ExitReason
r :: ExitReason)
(ProcessStopping s
s' ExitReason
r) -> ShutdownHandler s
handleStop (s -> ExitState s
forall s. s -> ExitState s
LastKnown s
s') ExitReason
r Process () -> Process ExitReason -> Process ExitReason
forall a b. Process a -> Process b -> Process b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ExitReason -> Process ExitReason
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (ExitReason
r :: ExitReason)
(ProcessBecome ProcessDefinition s
d' s
s') -> ProcessDefinition s -> s -> Delay -> Process ExitReason
forall s. ProcessDefinition s -> s -> Delay -> Process ExitReason
recvLoop ProcessDefinition s
d' s
s' Delay
recvDelay
(ProcessActivity GenProcess s ()
_) -> String -> Process ExitReason
forall a b. Serializable a => a -> Process b
die (String -> Process ExitReason) -> String -> Process ExitReason
forall a b. (a -> b) -> a -> b
$ String
"recvLoop.InvalidState - ProcessActivityNotSupported"
(ProcessExpression GenProcess s (ProcessAction s)
_) -> String -> Process ExitReason
forall a b. Serializable a => a -> Process b
die (String -> Process ExitReason) -> String -> Process ExitReason
forall a b. (a -> b) -> a -> b
$ String
"recvLoop.InvalidState - ProcessExpressionNotSupported"
where
matchAux :: UnhandledMessagePolicy
-> s
-> [DeferredDispatcher s]
-> [Match (ProcessAction s)]
matchAux :: forall s.
UnhandledMessagePolicy
-> s -> [DeferredDispatcher s] -> [Match (ProcessAction s)]
matchAux UnhandledMessagePolicy
p s
ps [DeferredDispatcher s]
ds = [(Message -> Process (ProcessAction s)) -> Match (ProcessAction s)
forall b. (Message -> Process b) -> Match b
matchAny ((Message -> Process (ProcessAction s))
-> s
-> [DeferredDispatcher s]
-> Message
-> Process (ProcessAction s)
forall s.
(Message -> Process (ProcessAction s))
-> s
-> [DeferredDispatcher s]
-> Message
-> Process (ProcessAction s)
auxHandler (UnhandledMessagePolicy -> s -> Message -> Process (ProcessAction s)
forall s.
UnhandledMessagePolicy -> s -> Message -> Process (ProcessAction s)
applyPolicy UnhandledMessagePolicy
p s
ps) s
ps [DeferredDispatcher s]
ds)]
auxHandler :: (Message -> Process (ProcessAction s))
-> s
-> [DeferredDispatcher s]
-> Message
-> Process (ProcessAction s)
auxHandler :: forall s.
(Message -> Process (ProcessAction s))
-> s
-> [DeferredDispatcher s]
-> Message
-> Process (ProcessAction s)
auxHandler Message -> Process (ProcessAction s)
policy s
_ [] Message
msg = Message -> Process (ProcessAction s)
policy Message
msg
auxHandler Message -> Process (ProcessAction s)
policy s
st (DeferredDispatcher s
d:[DeferredDispatcher s]
ds :: [DeferredDispatcher s]) Message
msg
| [DeferredDispatcher s] -> TimerKey
forall a. [a] -> TimerKey
forall (t :: * -> *) a. Foldable t => t a -> TimerKey
length [DeferredDispatcher s]
ds TimerKey -> TimerKey -> Bool
forall a. Ord a => a -> a -> Bool
> TimerKey
0 = let dh :: s -> Message -> Process (Maybe (ProcessAction s))
dh = DeferredDispatcher s
-> s -> Message -> Process (Maybe (ProcessAction s))
forall s.
DeferredDispatcher s
-> s -> Message -> Process (Maybe (ProcessAction s))
dispatchInfo DeferredDispatcher s
d in do
Maybe (ProcessAction s)
m <- s -> Message -> Process (Maybe (ProcessAction s))
dh s
st Message
msg
case Maybe (ProcessAction s)
m of
Maybe (ProcessAction s)
Nothing -> (Message -> Process (ProcessAction s))
-> s
-> [DeferredDispatcher s]
-> Message
-> Process (ProcessAction s)
forall s.
(Message -> Process (ProcessAction s))
-> s
-> [DeferredDispatcher s]
-> Message
-> Process (ProcessAction s)
auxHandler Message -> Process (ProcessAction s)
policy s
st [DeferredDispatcher s]
ds Message
msg
Just ProcessAction s
act' -> ProcessAction s -> Process (ProcessAction s)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ProcessAction s
act'
| Bool
otherwise = let dh :: s -> Message -> Process (Maybe (ProcessAction s))
dh = DeferredDispatcher s
-> s -> Message -> Process (Maybe (ProcessAction s))
forall s.
DeferredDispatcher s
-> s -> Message -> Process (Maybe (ProcessAction s))
dispatchInfo DeferredDispatcher s
d in do
Maybe (ProcessAction s)
m <- s -> Message -> Process (Maybe (ProcessAction s))
dh s
st Message
msg
case Maybe (ProcessAction s)
m of
Maybe (ProcessAction s)
Nothing -> Message -> Process (ProcessAction s)
policy Message
msg
Just ProcessAction s
act' -> ProcessAction s -> Process (ProcessAction s)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ProcessAction s
act'
processReceive :: [Match (ProcessAction s)]
-> TimeoutHandler s
-> s
-> Delay
-> Process (ProcessAction s)
processReceive :: forall s.
[Match (ProcessAction s)] -> TimeoutHandler s -> TimeoutHandler s
processReceive [Match (ProcessAction s)]
ms TimeoutHandler s
handleTimeout s
st Delay
d = do
Maybe (ProcessAction s)
next <- [Match (ProcessAction s)]
-> Delay -> Process (Maybe (ProcessAction s))
forall s.
[Match (ProcessAction s)]
-> Delay -> Process (Maybe (ProcessAction s))
recv [Match (ProcessAction s)]
ms Delay
d
case Maybe (ProcessAction s)
next of
Maybe (ProcessAction s)
Nothing -> TimeoutHandler s
handleTimeout s
st Delay
d
Just ProcessAction s
pa -> ProcessAction s -> Process (ProcessAction s)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ProcessAction s
pa
recv :: [Match (ProcessAction s)]
-> Delay
-> Process (Maybe (ProcessAction s))
recv :: forall s.
[Match (ProcessAction s)]
-> Delay -> Process (Maybe (ProcessAction s))
recv [Match (ProcessAction s)]
matches Delay
d' =
case Delay
d' of
Delay
Infinity -> [Match (ProcessAction s)] -> Process (ProcessAction s)
forall b. [Match b] -> Process b
receiveWait [Match (ProcessAction s)]
matches Process (ProcessAction s)
-> (ProcessAction s -> Process (Maybe (ProcessAction s)))
-> Process (Maybe (ProcessAction s))
forall a b. Process a -> (a -> Process b) -> Process b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Maybe (ProcessAction s) -> Process (Maybe (ProcessAction s))
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (ProcessAction s) -> Process (Maybe (ProcessAction s)))
-> (ProcessAction s -> Maybe (ProcessAction s))
-> ProcessAction s
-> Process (Maybe (ProcessAction s))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProcessAction s -> Maybe (ProcessAction s)
forall a. a -> Maybe a
Just
Delay
NoDelay -> TimerKey
-> [Match (ProcessAction s)] -> Process (Maybe (ProcessAction s))
forall b. TimerKey -> [Match b] -> Process (Maybe b)
receiveTimeout TimerKey
0 [Match (ProcessAction s)]
matches
Delay TimeInterval
t' -> TimerKey
-> [Match (ProcessAction s)] -> Process (Maybe (ProcessAction s))
forall b. TimerKey -> [Match b] -> Process (Maybe b)
receiveTimeout (TimeInterval -> TimerKey
asTimeout TimeInterval
t') [Match (ProcessAction s)]
matches
shutdownHandler' :: Dispatcher s
shutdownHandler' :: forall s. Dispatcher s
shutdownHandler' = CastHandler s Shutdown -> Dispatcher s
forall a s. Serializable a => CastHandler s a -> Dispatcher s
handleCast (\s
_ Shutdown
Shutdown -> ExitReason -> Action s
forall s. ExitReason -> Action s
stop (ExitReason -> Action s) -> ExitReason -> Action s
forall a b. (a -> b) -> a -> b
$ ExitReason
ExitNormal)
trapExit :: ExitSignalDispatcher s
trapExit :: forall s. ExitSignalDispatcher s
trapExit = (s -> ExitReason -> Bool)
-> (ProcessId -> ActionHandler s ExitReason)
-> ExitSignalDispatcher s
forall s a.
Serializable a =>
(s -> a -> Bool)
-> (ProcessId -> ActionHandler s a) -> ExitSignalDispatcher s
handleExitIf (\s
_ ExitReason
e -> ExitReason
e ExitReason -> ExitReason -> Bool
forall a. Eq a => a -> a -> Bool
== ExitReason
ExitShutdown)
(\ProcessId
_ s
_ (ExitReason
r :: ExitReason) -> ExitReason -> Action s
forall s. ExitReason -> Action s
stop ExitReason
r)
block :: TimeInterval -> Process ()
block :: TimeInterval -> Process ()
block TimeInterval
i =
Process (Maybe ()) -> Process ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Process (Maybe ()) -> Process ())
-> Process (Maybe ()) -> Process ()
forall a b. (a -> b) -> a -> b
$ TimerKey -> [Match ()] -> Process (Maybe ())
forall b. TimerKey -> [Match b] -> Process (Maybe b)
receiveTimeout (TimeInterval -> TimerKey
asTimeout TimeInterval
i) [ (TimedOut -> Process ()) -> Match ()
forall a b. Serializable a => (a -> Process b) -> Match b
match (\(TimedOut
_ :: TimedOut) -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()) ]
applyPolicy :: UnhandledMessagePolicy
-> s
-> Message
-> Process (ProcessAction s)
applyPolicy :: forall s.
UnhandledMessagePolicy -> s -> Message -> Process (ProcessAction s)
applyPolicy UnhandledMessagePolicy
p s
s Message
m =
case UnhandledMessagePolicy
p of
UnhandledMessagePolicy
Terminate -> ExitReason -> Process (ProcessAction s)
forall s. ExitReason -> Action s
stop (ExitReason -> Process (ProcessAction s))
-> ExitReason -> Process (ProcessAction s)
forall a b. (a -> b) -> a -> b
$ String -> ExitReason
ExitOther String
"UnhandledInput"
DeadLetter ProcessId
pid -> Message -> ProcessId -> Process ()
forward Message
m ProcessId
pid Process ()
-> Process (ProcessAction s) -> Process (ProcessAction s)
forall a b. Process a -> Process b -> Process b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> s -> Process (ProcessAction s)
forall s. s -> Action s
continue s
s
UnhandledMessagePolicy
Drop -> s -> Process (ProcessAction s)
forall s. s -> Action s
continue s
s
UnhandledMessagePolicy
Log -> Process ()
logIt Process ()
-> Process (ProcessAction s) -> Process (ProcessAction s)
forall a b. Process a -> Process b -> Process b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> s -> Process (ProcessAction s)
forall s. s -> Action s
continue s
s
where
logIt :: Process ()
logIt =
(LogChan -> String -> Process ())
-> LogChan -> String -> Process ()
forall l.
Logger l =>
(l -> String -> Process ()) -> l -> String -> Process ()
Log.report LogChan -> String -> Process ()
forall l m.
(Logger l, Serializable m, ToLog m) =>
l -> m -> Process ()
Log.info LogChan
Log.logChannel (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$ String
"Unhandled Gen Input Message: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ (Message -> String
forall a. Show a => a -> String
show Message
m)