{-# LANGUAGE ExistentialQuantification  #-}
{-# LANGUAGE ScopedTypeVariables        #-}
{-# LANGUAGE PatternGuards              #-}
{-# LANGUAGE BangPatterns               #-}
{-# LANGUAGE RecordWildCards            #-}
{-# LANGUAGE TupleSections              #-}
{-# LANGUAGE DeriveDataTypeable         #-}
{-# LANGUAGE DeriveGeneric              #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE MultiParamTypeClasses      #-}
{-# LANGUAGE Rank2Types                 #-}

-- | This is the @Process@ implementation of a /managed process/
module Control.Distributed.Process.ManagedProcess.Internal.GenProcess
  ( recvLoop
  , precvLoop
  , currentTimeout
  , systemTimeout
  , drainTimeout
  , processState
  , processDefinition
  , processFilters
  , processUnhandledMsgPolicy
  , processQueue
  , gets
  , getAndModifyState
  , modifyState
  , setUserTimeout
  , setProcessState
  , GenProcess
  , peek
  , push
  , enqueue
  , dequeue
  , addUserTimer
  , removeUserTimer
  , eval
  , act
  , runAfter
  , evalAfter
  ) where

import Control.Applicative (liftA2)
import Control.Distributed.Process
  ( match
  , matchAny
  , matchMessage
  , handleMessage
  , handleMessageIf
  , receiveTimeout
  , receiveWait
  , forward
  , catchesExit
  , catchExit
  , die
  , unsafeWrapMessage
  , Process
  , ProcessId
  , Match
  )
import qualified Control.Distributed.Process as P
  ( liftIO
  )
import Control.Distributed.Process.Internal.Types
  ( Message(..)
  , ProcessExitException(..)
  )
import Control.Distributed.Process.ManagedProcess.Server
  ( handleCast
  , handleExitIf
  , stop
  , continue
  )
import Control.Distributed.Process.ManagedProcess.Timer
  ( Timer(timerDelay)
  , TimerKey
  , TimedOut(..)
  , delayTimer
  , startTimer
  , stopTimer
  , matchTimeout
  , matchKey
  , matchRun
  )
import Control.Distributed.Process.ManagedProcess.Internal.Types hiding (Message)
import qualified Control.Distributed.Process.ManagedProcess.Internal.PriorityQueue as Q
  ( empty
  , dequeue
  , enqueue
  , peek
  , toList
  )
import Control.Distributed.Process.Extras
  ( ExitReason(..)
  , Shutdown(..)
  )
import qualified Control.Distributed.Process.Extras.SystemLog as Log
import Control.Distributed.Process.Extras.Time
import Control.Distributed.Process.Serializable (Serializable)
import Control.Monad (void)
import Control.Monad.Catch
  ( mask_
  , catch
  , throwM
  , mask
  , SomeException
  )
import qualified Control.Monad.State.Strict as ST
  ( get
  )
import Data.IORef (newIORef, atomicModifyIORef')
import Data.Maybe (fromJust)
import qualified Data.Map.Strict as Map
  ( size
  , insert
  , delete
  , lookup
  , empty
  , foldrWithKey
  )

--------------------------------------------------------------------------------
-- Priority Mailbox Handling                                                  --
--------------------------------------------------------------------------------

type Safe = Bool

-- | Evaluate the given function over the @ProcessState s@ for the caller, and
-- return the result.
gets :: forall s a . (ProcessState s -> a) -> GenProcess s a
gets :: forall s a. (ProcessState s -> a) -> GenProcess s a
gets ProcessState s -> a
f = GenProcess s (State s)
forall s (m :: * -> *). MonadState s m => m s
ST.get GenProcess s (State s)
-> (State s -> GenProcess s a) -> GenProcess s a
forall a b.
GenProcess s a -> (a -> GenProcess s b) -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \(State s
s :: State s) -> IO a -> GenProcess s a
forall a s. IO a -> GenProcess s a
liftIO (IO a -> GenProcess s a) -> IO a -> GenProcess s a
forall a b. (a -> b) -> a -> b
$ do
  State s -> (ProcessState s -> (ProcessState s, a)) -> IO a
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' State s
s ((ProcessState s -> (ProcessState s, a)) -> IO a)
-> (ProcessState s -> (ProcessState s, a)) -> IO a
forall a b. (a -> b) -> a -> b
$ \(ProcessState s
s' :: ProcessState s) -> (ProcessState s
s', ProcessState s -> a
f ProcessState s
s' :: a)

-- | Modify our state.
modifyState :: (ProcessState s -> ProcessState s) -> GenProcess s ()
modifyState :: forall s. (ProcessState s -> ProcessState s) -> GenProcess s ()
modifyState ProcessState s -> ProcessState s
f =
  GenProcess s (IORef (ProcessState s))
forall s (m :: * -> *). MonadState s m => m s
ST.get GenProcess s (IORef (ProcessState s))
-> (IORef (ProcessState s) -> GenProcess s ()) -> GenProcess s ()
forall a b.
GenProcess s a -> (a -> GenProcess s b) -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \IORef (ProcessState s)
s -> IO () -> GenProcess s ()
forall a s. IO a -> GenProcess s a
liftIO (IO () -> GenProcess s ()) -> IO () -> GenProcess s ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall (m :: * -> *) a. (HasCallStack, MonadMask m) => m a -> m a
mask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    IORef (ProcessState s)
-> (ProcessState s -> (ProcessState s, ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (ProcessState s)
s ((ProcessState s -> (ProcessState s, ())) -> IO ())
-> (ProcessState s -> (ProcessState s, ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ProcessState s
s' -> (ProcessState s -> ProcessState s
f ProcessState s
s', ())

-- | Modify our state and return a value (potentially from it).
getAndModifyState :: (ProcessState s -> (ProcessState s, a))
                  -> GenProcess s a
getAndModifyState :: forall s a.
(ProcessState s -> (ProcessState s, a)) -> GenProcess s a
getAndModifyState ProcessState s -> (ProcessState s, a)
f =
  GenProcess s (IORef (ProcessState s))
forall s (m :: * -> *). MonadState s m => m s
ST.get GenProcess s (IORef (ProcessState s))
-> (IORef (ProcessState s) -> GenProcess s a) -> GenProcess s a
forall a b.
GenProcess s a -> (a -> GenProcess s b) -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \IORef (ProcessState s)
s -> IO a -> GenProcess s a
forall a s. IO a -> GenProcess s a
liftIO (IO a -> GenProcess s a) -> IO a -> GenProcess s a
forall a b. (a -> b) -> a -> b
$ IO a -> IO a
forall (m :: * -> *) a. (HasCallStack, MonadMask m) => m a -> m a
mask_ (IO a -> IO a) -> IO a -> IO a
forall a b. (a -> b) -> a -> b
$ do
    IORef (ProcessState s)
-> (ProcessState s -> (ProcessState s, a)) -> IO a
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (ProcessState s)
s ((ProcessState s -> (ProcessState s, a)) -> IO a)
-> (ProcessState s -> (ProcessState s, a)) -> IO a
forall a b. (a -> b) -> a -> b
$ \ProcessState s
s' -> ProcessState s -> (ProcessState s, a)
f ProcessState s
s'

-- | Set the current process state.
setProcessState :: s -> GenProcess s ()
setProcessState :: forall s. s -> GenProcess s ()
setProcessState s
st' =
  (ProcessState s -> ProcessState s) -> GenProcess s ()
forall s. (ProcessState s -> ProcessState s) -> GenProcess s ()
modifyState ((ProcessState s -> ProcessState s) -> GenProcess s ())
-> (ProcessState s -> ProcessState s) -> GenProcess s ()
forall a b. (a -> b) -> a -> b
$ \st :: ProcessState s
st@ProcessState{s
[DispatchPriority s]
[DispatchFilter s]
TimerMap
Delay
Queue
Timer
ProcessDefinition s
RecvTimeoutPolicy
timeoutSpec :: RecvTimeoutPolicy
procDef :: ProcessDefinition s
procPrio :: [DispatchPriority s]
procFilters :: [DispatchFilter s]
usrTimeout :: Delay
sysTimeout :: Timer
usrTimers :: TimerMap
internalQ :: Queue
procState :: s
timeoutSpec :: forall s. ProcessState s -> RecvTimeoutPolicy
procDef :: forall s. ProcessState s -> ProcessDefinition s
procPrio :: forall s. ProcessState s -> [DispatchPriority s]
procFilters :: forall s. ProcessState s -> [DispatchFilter s]
usrTimeout :: forall s. ProcessState s -> Delay
sysTimeout :: forall s. ProcessState s -> Timer
usrTimers :: forall s. ProcessState s -> TimerMap
internalQ :: forall s. ProcessState s -> Queue
procState :: forall s. ProcessState s -> s
..} -> ProcessState s
st { procState = st' }

-- | Set the mailbox draining timer.
setDrainTimeout :: Timer -> GenProcess s ()
setDrainTimeout :: forall s. Timer -> GenProcess s ()
setDrainTimeout Timer
t = (ProcessState s -> ProcessState s) -> GenProcess s ()
forall s. (ProcessState s -> ProcessState s) -> GenProcess s ()
modifyState ((ProcessState s -> ProcessState s) -> GenProcess s ())
-> (ProcessState s -> ProcessState s) -> GenProcess s ()
forall a b. (a -> b) -> a -> b
$ \st :: ProcessState s
st@ProcessState{s
[DispatchPriority s]
[DispatchFilter s]
TimerMap
Delay
Queue
Timer
ProcessDefinition s
RecvTimeoutPolicy
timeoutSpec :: forall s. ProcessState s -> RecvTimeoutPolicy
procDef :: forall s. ProcessState s -> ProcessDefinition s
procPrio :: forall s. ProcessState s -> [DispatchPriority s]
procFilters :: forall s. ProcessState s -> [DispatchFilter s]
usrTimeout :: forall s. ProcessState s -> Delay
sysTimeout :: forall s. ProcessState s -> Timer
usrTimers :: forall s. ProcessState s -> TimerMap
internalQ :: forall s. ProcessState s -> Queue
procState :: forall s. ProcessState s -> s
timeoutSpec :: RecvTimeoutPolicy
procDef :: ProcessDefinition s
procPrio :: [DispatchPriority s]
procFilters :: [DispatchFilter s]
usrTimeout :: Delay
sysTimeout :: Timer
usrTimers :: TimerMap
internalQ :: Queue
procState :: s
..} -> ProcessState s
st { sysTimeout = t }

-- | Set the user timeout applied whilst a prioritised process loop is in
-- a blocking receive.
setUserTimeout :: Delay -> GenProcess s ()
setUserTimeout :: forall s. Delay -> GenProcess s ()
setUserTimeout Delay
d =
  (ProcessState s -> ProcessState s) -> GenProcess s ()
forall s. (ProcessState s -> ProcessState s) -> GenProcess s ()
modifyState ((ProcessState s -> ProcessState s) -> GenProcess s ())
-> (ProcessState s -> ProcessState s) -> GenProcess s ()
forall a b. (a -> b) -> a -> b
$ \st :: ProcessState s
st@ProcessState{s
[DispatchPriority s]
[DispatchFilter s]
TimerMap
Delay
Queue
Timer
ProcessDefinition s
RecvTimeoutPolicy
timeoutSpec :: forall s. ProcessState s -> RecvTimeoutPolicy
procDef :: forall s. ProcessState s -> ProcessDefinition s
procPrio :: forall s. ProcessState s -> [DispatchPriority s]
procFilters :: forall s. ProcessState s -> [DispatchFilter s]
usrTimeout :: forall s. ProcessState s -> Delay
sysTimeout :: forall s. ProcessState s -> Timer
usrTimers :: forall s. ProcessState s -> TimerMap
internalQ :: forall s. ProcessState s -> Queue
procState :: forall s. ProcessState s -> s
timeoutSpec :: RecvTimeoutPolicy
procDef :: ProcessDefinition s
procPrio :: [DispatchPriority s]
procFilters :: [DispatchFilter s]
usrTimeout :: Delay
sysTimeout :: Timer
usrTimers :: TimerMap
internalQ :: Queue
procState :: s
..} -> ProcessState s
st { usrTimeout = d }

-- | Add a /user timer/, bound to the given datum.
addUserTimer :: Timer -> Message -> GenProcess s TimerKey
addUserTimer :: forall s. Timer -> Message -> GenProcess s TimerKey
addUserTimer Timer
t Message
m =
  (ProcessState s -> (ProcessState s, TimerKey))
-> GenProcess s TimerKey
forall s a.
(ProcessState s -> (ProcessState s, a)) -> GenProcess s a
getAndModifyState ((ProcessState s -> (ProcessState s, TimerKey))
 -> GenProcess s TimerKey)
-> (ProcessState s -> (ProcessState s, TimerKey))
-> GenProcess s TimerKey
forall a b. (a -> b) -> a -> b
$ \st :: ProcessState s
st@ProcessState{s
[DispatchPriority s]
[DispatchFilter s]
TimerMap
Delay
Queue
Timer
ProcessDefinition s
RecvTimeoutPolicy
timeoutSpec :: forall s. ProcessState s -> RecvTimeoutPolicy
procDef :: forall s. ProcessState s -> ProcessDefinition s
procPrio :: forall s. ProcessState s -> [DispatchPriority s]
procFilters :: forall s. ProcessState s -> [DispatchFilter s]
usrTimeout :: forall s. ProcessState s -> Delay
sysTimeout :: forall s. ProcessState s -> Timer
usrTimers :: forall s. ProcessState s -> TimerMap
internalQ :: forall s. ProcessState s -> Queue
procState :: forall s. ProcessState s -> s
timeoutSpec :: RecvTimeoutPolicy
procDef :: ProcessDefinition s
procPrio :: [DispatchPriority s]
procFilters :: [DispatchFilter s]
usrTimeout :: Delay
sysTimeout :: Timer
usrTimers :: TimerMap
internalQ :: Queue
procState :: s
..} ->
    let sz :: TimerKey
sz = TimerMap -> TimerKey
forall k a. Map k a -> TimerKey
Map.size TimerMap
usrTimers
        tk :: TimerKey
tk = TimerKey
sz TimerKey -> TimerKey -> TimerKey
forall a. Num a => a -> a -> a
+ TimerKey
1
    in (ProcessState s
st { usrTimers = (Map.insert tk (t, m) usrTimers) }, TimerKey
tk)

-- | Remove a /user timer/, for the given key.
removeUserTimer :: TimerKey -> GenProcess s ()
removeUserTimer :: forall s. TimerKey -> GenProcess s ()
removeUserTimer TimerKey
i =
  (ProcessState s -> ProcessState s) -> GenProcess s ()
forall s. (ProcessState s -> ProcessState s) -> GenProcess s ()
modifyState ((ProcessState s -> ProcessState s) -> GenProcess s ())
-> (ProcessState s -> ProcessState s) -> GenProcess s ()
forall a b. (a -> b) -> a -> b
$ \st :: ProcessState s
st@ProcessState{s
[DispatchPriority s]
[DispatchFilter s]
TimerMap
Delay
Queue
Timer
ProcessDefinition s
RecvTimeoutPolicy
timeoutSpec :: forall s. ProcessState s -> RecvTimeoutPolicy
procDef :: forall s. ProcessState s -> ProcessDefinition s
procPrio :: forall s. ProcessState s -> [DispatchPriority s]
procFilters :: forall s. ProcessState s -> [DispatchFilter s]
usrTimeout :: forall s. ProcessState s -> Delay
sysTimeout :: forall s. ProcessState s -> Timer
usrTimers :: forall s. ProcessState s -> TimerMap
internalQ :: forall s. ProcessState s -> Queue
procState :: forall s. ProcessState s -> s
timeoutSpec :: RecvTimeoutPolicy
procDef :: ProcessDefinition s
procPrio :: [DispatchPriority s]
procFilters :: [DispatchFilter s]
usrTimeout :: Delay
sysTimeout :: Timer
usrTimers :: TimerMap
internalQ :: Queue
procState :: s
..} -> ProcessState s
st { usrTimers = (Map.delete i usrTimers) }

-- | Consume the timer with the given @TimerKey@. The timer is removed from the
-- @ProcessState@ and given to the supplied expression, whose evaluation is given
-- back to the caller.
consumeTimer :: forall s a . TimerKey -> (Message -> GenProcess s a) -> GenProcess s a
consumeTimer :: forall s a.
TimerKey -> (Message -> GenProcess s a) -> GenProcess s a
consumeTimer TimerKey
k Message -> GenProcess s a
f = do
  TimerMap
mt <- (ProcessState s -> TimerMap) -> GenProcess s TimerMap
forall s a. (ProcessState s -> a) -> GenProcess s a
gets ProcessState s -> TimerMap
forall s. ProcessState s -> TimerMap
usrTimers
  let tm :: Maybe (Timer, Message)
tm = TimerKey -> TimerMap -> Maybe (Timer, Message)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup TimerKey
k TimerMap
mt
  let ut :: TimerMap
ut = TimerKey -> TimerMap -> TimerMap
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete TimerKey
k TimerMap
mt
  (ProcessState s -> ProcessState s) -> GenProcess s ()
forall s. (ProcessState s -> ProcessState s) -> GenProcess s ()
modifyState ((ProcessState s -> ProcessState s) -> GenProcess s ())
-> (ProcessState s -> ProcessState s) -> GenProcess s ()
forall a b. (a -> b) -> a -> b
$ \st :: ProcessState s
st@ProcessState{s
[DispatchPriority s]
[DispatchFilter s]
TimerMap
Delay
Queue
Timer
ProcessDefinition s
RecvTimeoutPolicy
timeoutSpec :: forall s. ProcessState s -> RecvTimeoutPolicy
procDef :: forall s. ProcessState s -> ProcessDefinition s
procPrio :: forall s. ProcessState s -> [DispatchPriority s]
procFilters :: forall s. ProcessState s -> [DispatchFilter s]
usrTimeout :: forall s. ProcessState s -> Delay
sysTimeout :: forall s. ProcessState s -> Timer
usrTimers :: forall s. ProcessState s -> TimerMap
internalQ :: forall s. ProcessState s -> Queue
procState :: forall s. ProcessState s -> s
timeoutSpec :: RecvTimeoutPolicy
procDef :: ProcessDefinition s
procPrio :: [DispatchPriority s]
procFilters :: [DispatchFilter s]
usrTimeout :: Delay
sysTimeout :: Timer
usrTimers :: TimerMap
internalQ :: Queue
procState :: s
..} -> ProcessState s
st { usrTimers = ut }
  case Maybe (Timer, Message)
tm of
    Maybe (Timer, Message)
Nothing     -> Process a -> GenProcess s a
forall a s. Process a -> GenProcess s a
lift (Process a -> GenProcess s a) -> Process a -> GenProcess s a
forall a b. (a -> b) -> a -> b
$ String -> Process a
forall a b. Serializable a => a -> Process b
die (String -> Process a) -> String -> Process a
forall a b. (a -> b) -> a -> b
$ String
"GenProcess.consumeTimer - InvalidTimerKey"
    Just (Timer
_, Message
m) -> Message -> GenProcess s a
f Message
m

-- | The @ProcessDefinition@ for the current loop.
processDefinition :: GenProcess s (ProcessDefinition s)
processDefinition :: forall s. GenProcess s (ProcessDefinition s)
processDefinition = (ProcessState s -> ProcessDefinition s)
-> GenProcess s (ProcessDefinition s)
forall s a. (ProcessState s -> a) -> GenProcess s a
gets ProcessState s -> ProcessDefinition s
forall s. ProcessState s -> ProcessDefinition s
procDef

-- | The list of prioritisers for the current loop.
processPriorities :: GenProcess s ([DispatchPriority s])
processPriorities :: forall s. GenProcess s [DispatchPriority s]
processPriorities = (ProcessState s -> [DispatchPriority s])
-> GenProcess s [DispatchPriority s]
forall s a. (ProcessState s -> a) -> GenProcess s a
gets ProcessState s -> [DispatchPriority s]
forall s. ProcessState s -> [DispatchPriority s]
procPrio

-- | The list of filters for the current loop.
processFilters :: GenProcess s ([DispatchFilter s])
processFilters :: forall s. GenProcess s [DispatchFilter s]
processFilters = (ProcessState s -> [DispatchFilter s])
-> GenProcess s [DispatchFilter s]
forall s a. (ProcessState s -> a) -> GenProcess s a
gets ProcessState s -> [DispatchFilter s]
forall s. ProcessState s -> [DispatchFilter s]
procFilters

-- | Evaluates to the user defined state for the currently executing server loop.
processState :: GenProcess s s
processState :: forall s. GenProcess s s
processState = (ProcessState s -> s) -> GenProcess s s
forall s a. (ProcessState s -> a) -> GenProcess s a
gets ProcessState s -> s
forall s. ProcessState s -> s
procState

-- | Evaluates to the @UnhandledMessagePolicy@ for the current loop.
processUnhandledMsgPolicy :: GenProcess s UnhandledMessagePolicy
processUnhandledMsgPolicy :: forall s. GenProcess s UnhandledMessagePolicy
processUnhandledMsgPolicy = (ProcessState s -> UnhandledMessagePolicy)
-> GenProcess s UnhandledMessagePolicy
forall s a. (ProcessState s -> a) -> GenProcess s a
gets (ProcessDefinition s -> UnhandledMessagePolicy
forall s. ProcessDefinition s -> UnhandledMessagePolicy
unhandledMessagePolicy (ProcessDefinition s -> UnhandledMessagePolicy)
-> (ProcessState s -> ProcessDefinition s)
-> ProcessState s
-> UnhandledMessagePolicy
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProcessState s -> ProcessDefinition s
forall s. ProcessState s -> ProcessDefinition s
procDef)

-- | Returns a /read only view/ on the internal priority queue.
processQueue :: GenProcess s [Message]
processQueue :: forall s. GenProcess s [Message]
processQueue = (ProcessState s -> Queue) -> GenProcess s Queue
forall s a. (ProcessState s -> a) -> GenProcess s a
gets ProcessState s -> Queue
forall s. ProcessState s -> Queue
internalQ GenProcess s Queue
-> (Queue -> GenProcess s [Message]) -> GenProcess s [Message]
forall a b.
GenProcess s a -> (a -> GenProcess s b) -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= [Message] -> GenProcess s [Message]
forall a. a -> GenProcess s a
forall (m :: * -> *) a. Monad m => a -> m a
return ([Message] -> GenProcess s [Message])
-> (Queue -> [Message]) -> Queue -> GenProcess s [Message]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Queue -> [Message]
forall k a. Ord k => PriorityQ k a -> [a]
Q.toList

-- | The @Timer@ for the system timeout. See @drainTimeout@.
systemTimeout :: GenProcess s Timer
systemTimeout :: forall s. GenProcess s Timer
systemTimeout = (ProcessState s -> Timer) -> GenProcess s Timer
forall s a. (ProcessState s -> a) -> GenProcess s a
gets ProcessState s -> Timer
forall s. ProcessState s -> Timer
sysTimeout

-- | The policy for the system timeout. This is used to determine how the loop
-- should limit the time spent draining the /real/ process mailbox into our
-- internal priority queue.
timeoutPolicy :: GenProcess s RecvTimeoutPolicy
timeoutPolicy :: forall s. GenProcess s RecvTimeoutPolicy
timeoutPolicy = (ProcessState s -> RecvTimeoutPolicy)
-> GenProcess s RecvTimeoutPolicy
forall s a. (ProcessState s -> a) -> GenProcess s a
gets ProcessState s -> RecvTimeoutPolicy
forall s. ProcessState s -> RecvTimeoutPolicy
timeoutSpec

-- | The @Delay@ for the @drainTimeout@.
drainTimeout :: GenProcess s Delay
drainTimeout :: forall s. GenProcess s Delay
drainTimeout = (ProcessState s -> Delay) -> GenProcess s Delay
forall s a. (ProcessState s -> a) -> GenProcess s a
gets (Timer -> Delay
timerDelay (Timer -> Delay)
-> (ProcessState s -> Timer) -> ProcessState s -> Delay
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProcessState s -> Timer
forall s. ProcessState s -> Timer
sysTimeout)

-- | The current (user supplied) timeout.
currentTimeout :: GenProcess s Delay
currentTimeout :: forall s. GenProcess s Delay
currentTimeout = (ProcessState s -> Delay) -> GenProcess s Delay
forall s a. (ProcessState s -> a) -> GenProcess s a
gets ProcessState s -> Delay
forall s. ProcessState s -> Delay
usrTimeout

-- | Update and store the internal priority queue.
updateQueue :: (Queue -> Queue) -> GenProcess s ()
updateQueue :: forall s. (Queue -> Queue) -> GenProcess s ()
updateQueue Queue -> Queue
f =
  (ProcessState s -> ProcessState s) -> GenProcess s ()
forall s. (ProcessState s -> ProcessState s) -> GenProcess s ()
modifyState ((ProcessState s -> ProcessState s) -> GenProcess s ())
-> (ProcessState s -> ProcessState s) -> GenProcess s ()
forall a b. (a -> b) -> a -> b
$ \st :: ProcessState s
st@ProcessState{s
[DispatchPriority s]
[DispatchFilter s]
TimerMap
Delay
Queue
Timer
ProcessDefinition s
RecvTimeoutPolicy
timeoutSpec :: forall s. ProcessState s -> RecvTimeoutPolicy
procDef :: forall s. ProcessState s -> ProcessDefinition s
procPrio :: forall s. ProcessState s -> [DispatchPriority s]
procFilters :: forall s. ProcessState s -> [DispatchFilter s]
usrTimeout :: forall s. ProcessState s -> Delay
sysTimeout :: forall s. ProcessState s -> Timer
usrTimers :: forall s. ProcessState s -> TimerMap
internalQ :: forall s. ProcessState s -> Queue
procState :: forall s. ProcessState s -> s
timeoutSpec :: RecvTimeoutPolicy
procDef :: ProcessDefinition s
procPrio :: [DispatchPriority s]
procFilters :: [DispatchFilter s]
usrTimeout :: Delay
sysTimeout :: Timer
usrTimers :: TimerMap
internalQ :: Queue
procState :: s
..} -> ProcessState s
st { internalQ = f internalQ }

-- | Evaluate any matching /info handler/ with the supplied datum after waiting
-- for at least @TimeInterval@. The process state (for the resulting @Action s@)
-- is also given and the process loop will go on as per @Server.continue@.
--
-- Informally, evaluating this expression (such that the @Action@ is given as the
-- result of a handler or filter) will ensure that the supplied message (datum)
-- is availble for processing no sooner than @TimeInterval@.
--
-- Currently, this expression creates an @Action@ that triggers immediate
-- evaluation in the process loop before continuing with the given state. The
-- process loop stores a /user timeout/ for the given time interval, which is
-- trigerred like a wait/drain timeout. This implementation is subject to change.
evalAfter :: forall s m . (Serializable m) => TimeInterval -> m -> s -> Action s
evalAfter :: forall s m. Serializable m => TimeInterval -> m -> s -> Action s
evalAfter TimeInterval
d m
m s
s = GenProcess s () -> Action s
forall s. GenProcess s () -> Action s
act (GenProcess s () -> Action s) -> GenProcess s () -> Action s
forall a b. (a -> b) -> a -> b
$ TimeInterval -> m -> GenProcess s ()
forall s m. Serializable m => TimeInterval -> m -> GenProcess s ()
runAfter TimeInterval
d m
m GenProcess s () -> GenProcess s () -> GenProcess s ()
forall a b. GenProcess s a -> GenProcess s b -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> s -> GenProcess s ()
forall s. s -> GenProcess s ()
setProcessState s
s

-- | Produce an @Action s@ that, if it is the result of a handler, will cause the
-- server loop to evaluate the supplied expression. This is given in the @GenProcess@
-- monad, which is intended for internal use only.
act :: forall s . GenProcess s () -> Action s
act :: forall s. GenProcess s () -> Action s
act = ProcessAction s -> Process (ProcessAction s)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (ProcessAction s -> Process (ProcessAction s))
-> (GenProcess s () -> ProcessAction s)
-> GenProcess s ()
-> Process (ProcessAction s)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. GenProcess s () -> ProcessAction s
forall s. GenProcess s () -> ProcessAction s
ProcessActivity
{-# WARNING act "This interface is intended for internal use only" #-}

-- | Evaluate an expression in the 'GenProcess' monad.
eval :: forall s . GenProcess s (ProcessAction s) -> Action s
eval :: forall s. GenProcess s (ProcessAction s) -> Action s
eval = ProcessAction s -> Process (ProcessAction s)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (ProcessAction s -> Process (ProcessAction s))
-> (GenProcess s (ProcessAction s) -> ProcessAction s)
-> GenProcess s (ProcessAction s)
-> Process (ProcessAction s)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. GenProcess s (ProcessAction s) -> ProcessAction s
forall s. GenProcess s (ProcessAction s) -> ProcessAction s
ProcessExpression

-- | Starts a timer and adds it as a /user timeout/.
runAfter :: forall s m . (Serializable m) => TimeInterval -> m -> GenProcess s ()
runAfter :: forall s m. Serializable m => TimeInterval -> m -> GenProcess s ()
runAfter TimeInterval
d m
m = do
  Timer
t <- Process Timer -> GenProcess s Timer
forall a s. Process a -> GenProcess s a
lift (Process Timer -> GenProcess s Timer)
-> Process Timer -> GenProcess s Timer
forall a b. (a -> b) -> a -> b
$ Delay -> Process Timer
startTimer (TimeInterval -> Delay
Delay TimeInterval
d)
  GenProcess s TimerKey -> GenProcess s ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (GenProcess s TimerKey -> GenProcess s ())
-> GenProcess s TimerKey -> GenProcess s ()
forall a b. (a -> b) -> a -> b
$ Timer -> Message -> GenProcess s TimerKey
forall s. Timer -> Message -> GenProcess s TimerKey
addUserTimer Timer
t (m -> Message
forall a. Serializable a => a -> Message
unsafeWrapMessage m
m)
{-# WARNING runAfter "This interface is intended for internal use only" #-}

--------------------------------------------------------------------------------
-- Internal Priority Queue                                                    --
--------------------------------------------------------------------------------

-- | Dequeue a message from the internal priority queue.
dequeue :: GenProcess s (Maybe Message)
dequeue :: forall s. GenProcess s (Maybe Message)
dequeue = (ProcessState s -> (ProcessState s, Maybe Message))
-> GenProcess s (Maybe Message)
forall s a.
(ProcessState s -> (ProcessState s, a)) -> GenProcess s a
getAndModifyState ((ProcessState s -> (ProcessState s, Maybe Message))
 -> GenProcess s (Maybe Message))
-> (ProcessState s -> (ProcessState s, Maybe Message))
-> GenProcess s (Maybe Message)
forall a b. (a -> b) -> a -> b
$ \ProcessState s
st -> do
            let pq :: Queue
pq = ProcessState s -> Queue
forall s. ProcessState s -> Queue
internalQ ProcessState s
st
            case Queue -> Maybe (Message, Queue)
forall k v. Ord k => PriorityQ k v -> Maybe (v, PriorityQ k v)
Q.dequeue Queue
pq of
              Maybe (Message, Queue)
Nothing      -> (ProcessState s
st, Maybe Message
forall a. Maybe a
Nothing)
              Just (Message
m, Queue
q') -> (ProcessState s
st { internalQ = q' }, Message -> Maybe Message
forall a. a -> Maybe a
Just Message
m)

-- | Peek at the next available message in the internal priority queue, without
-- removing it.
peek :: GenProcess s (Maybe Message)
peek :: forall s. GenProcess s (Maybe Message)
peek = (ProcessState s -> (ProcessState s, Maybe Message))
-> GenProcess s (Maybe Message)
forall s a.
(ProcessState s -> (ProcessState s, a)) -> GenProcess s a
getAndModifyState ((ProcessState s -> (ProcessState s, Maybe Message))
 -> GenProcess s (Maybe Message))
-> (ProcessState s -> (ProcessState s, Maybe Message))
-> GenProcess s (Maybe Message)
forall a b. (a -> b) -> a -> b
$ \ProcessState s
st -> do
         let pq :: Queue
pq = ProcessState s -> Queue
forall s. ProcessState s -> Queue
internalQ ProcessState s
st
         (ProcessState s
st, Queue -> Maybe Message
forall k v. Ord k => PriorityQ k v -> Maybe v
Q.peek Queue
pq)

-- | Push a message to the head of the internal priority queue.
push :: forall s . Message -> GenProcess s ()
push :: forall s. Message -> GenProcess s ()
push Message
m = do
  s
st <- GenProcess s s
forall s. GenProcess s s
processState
  s -> [DispatchPriority s] -> Message -> GenProcess s ()
forall s. s -> [DispatchPriority s] -> Message -> GenProcess s ()
enqueueMessage s
st [ PrioritiseInfo {
    prioritise :: s -> Message -> Process (Maybe (TimerKey, Message))
prioritise = (\s
_ Message
m' ->
      Maybe (TimerKey, Message) -> Process (Maybe (TimerKey, Message))
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (TimerKey, Message) -> Process (Maybe (TimerKey, Message)))
-> Maybe (TimerKey, Message) -> Process (Maybe (TimerKey, Message))
forall a b. (a -> b) -> a -> b
$ (TimerKey, Message) -> Maybe (TimerKey, Message)
forall a. a -> Maybe a
Just ((TimerKey
101 :: Int), Message
m')) :: s -> Message -> Process (Maybe (Int, Message)) } ] Message
m

-- | Enqueue a message to the back of the internal priority queue.
enqueue :: forall s . Message -> GenProcess s ()
enqueue :: forall s. Message -> GenProcess s ()
enqueue Message
m = do
  s
st <- GenProcess s s
forall s. GenProcess s s
processState
  s -> [DispatchPriority s] -> Message -> GenProcess s ()
forall s. s -> [DispatchPriority s] -> Message -> GenProcess s ()
enqueueMessage s
st [] Message
m

-- | Enqueue a message in the internal priority queue. The given message will be
-- evaluated by all the supplied prioritisers, and if none match it, then it will
-- be assigned the lowest possible priority (i.e. put at the back of the queue).
enqueueMessage :: forall s . s
               -> [DispatchPriority s]
               -> Message
               -> GenProcess s ()
enqueueMessage :: forall s. s -> [DispatchPriority s] -> Message -> GenProcess s ()
enqueueMessage s
s [] Message
m' =
  s -> [DispatchPriority s] -> Message -> GenProcess s ()
forall s. s -> [DispatchPriority s] -> Message -> GenProcess s ()
enqueueMessage s
s [ PrioritiseInfo {
    prioritise :: s -> Message -> Process (Maybe (TimerKey, Message))
prioritise = (\s
_ Message
m ->
      Maybe (TimerKey, Message) -> Process (Maybe (TimerKey, Message))
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (TimerKey, Message) -> Process (Maybe (TimerKey, Message)))
-> Maybe (TimerKey, Message) -> Process (Maybe (TimerKey, Message))
forall a b. (a -> b) -> a -> b
$ (TimerKey, Message) -> Maybe (TimerKey, Message)
forall a. a -> Maybe a
Just ((-TimerKey
1 :: Int), Message
m)) :: s -> Message -> Process (Maybe (Int, Message)) } ] Message
m'
enqueueMessage s
s (DispatchPriority s
p:[DispatchPriority s]
ps) Message
m' = let checkPrio :: Message -> Process (Maybe (TimerKey, Message))
checkPrio = DispatchPriority s
-> s -> Message -> Process (Maybe (TimerKey, Message))
forall s.
DispatchPriority s
-> s -> Message -> Process (Maybe (TimerKey, Message))
prioritise DispatchPriority s
p s
s in do
    (Process (Maybe (TimerKey, Message))
-> GenProcess s (Maybe (TimerKey, Message))
forall a s. Process a -> GenProcess s a
lift (Process (Maybe (TimerKey, Message))
 -> GenProcess s (Maybe (TimerKey, Message)))
-> Process (Maybe (TimerKey, Message))
-> GenProcess s (Maybe (TimerKey, Message))
forall a b. (a -> b) -> a -> b
$ Message -> Process (Maybe (TimerKey, Message))
checkPrio Message
m') GenProcess s (Maybe (TimerKey, Message))
-> (Maybe (TimerKey, Message) -> GenProcess s ())
-> GenProcess s ()
forall a b.
GenProcess s a -> (a -> GenProcess s b) -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= s
-> [DispatchPriority s]
-> Message
-> Maybe (TimerKey, Message)
-> GenProcess s ()
doEnqueue s
s [DispatchPriority s]
ps Message
m'
  where
    doEnqueue :: s
              -> [DispatchPriority s]
              -> Message
              -> Maybe (Int, Message)
              -> GenProcess s ()
    doEnqueue :: s
-> [DispatchPriority s]
-> Message
-> Maybe (TimerKey, Message)
-> GenProcess s ()
doEnqueue s
s' [DispatchPriority s]
ps' Message
msg Maybe (TimerKey, Message)
Nothing       = s -> [DispatchPriority s] -> Message -> GenProcess s ()
forall s. s -> [DispatchPriority s] -> Message -> GenProcess s ()
enqueueMessage s
s' [DispatchPriority s]
ps' Message
msg
    doEnqueue s
_  [DispatchPriority s]
_   Message
_   (Just (TimerKey
i, Message
m)) = (Queue -> Queue) -> GenProcess s ()
forall s. (Queue -> Queue) -> GenProcess s ()
updateQueue (TimerKey -> Message -> Queue -> Queue
forall k v. Ord k => k -> v -> PriorityQ k v -> PriorityQ k v
Q.enqueue (TimerKey
i TimerKey -> TimerKey -> TimerKey
forall a. Num a => a -> a -> a
* (-TimerKey
1 :: Int)) Message
m)

--------------------------------------------------------------------------------
-- Process Loop Implementations                                               --
--------------------------------------------------------------------------------

-- | Maps handlers to a dynamic action that can take place outside of a
-- expect/recieve block. This is used by the prioritised process loop.
class DynMessageHandler d where
  dynHandleMessage :: UnhandledMessagePolicy
                   -> s
                   -> d s
                   -> Message
                   -> Process (Maybe (ProcessAction s))

instance DynMessageHandler Dispatcher where
  dynHandleMessage :: forall s.
UnhandledMessagePolicy
-> s
-> Dispatcher s
-> Message
-> Process (Maybe (ProcessAction s))
dynHandleMessage UnhandledMessagePolicy
_ s
s (Dispatch   s -> Message a b -> Process (ProcessAction s)
d)   Message
msg = Message
-> (Message a b -> Process (ProcessAction s))
-> Process (Maybe (ProcessAction s))
forall (m :: * -> *) a b.
(Monad m, Serializable a) =>
Message -> (a -> m b) -> m (Maybe b)
handleMessage Message
msg (s -> Message a b -> Process (ProcessAction s)
d s
s)
  dynHandleMessage UnhandledMessagePolicy
_ s
s (DispatchIf s -> Message a b -> Process (ProcessAction s)
d s -> Message a b -> Bool
c) Message
msg = Message
-> (Message a b -> Bool)
-> (Message a b -> Process (ProcessAction s))
-> Process (Maybe (ProcessAction s))
forall (m :: * -> *) a b.
(Monad m, Serializable a) =>
Message -> (a -> Bool) -> (a -> m b) -> m (Maybe b)
handleMessageIf Message
msg (s -> Message a b -> Bool
c s
s) (s -> Message a b -> Process (ProcessAction s)
d s
s)

instance DynMessageHandler ExternDispatcher where
  dynHandleMessage :: forall s.
UnhandledMessagePolicy
-> s
-> ExternDispatcher s
-> Message
-> Process (Maybe (ProcessAction s))
dynHandleMessage UnhandledMessagePolicy
_ s
s (DispatchCC  ReceivePort (Message a b)
_ s -> Message a b -> Process (ProcessAction s)
d)     Message
msg = Message
-> (Message a b -> Process (ProcessAction s))
-> Process (Maybe (ProcessAction s))
forall (m :: * -> *) a b.
(Monad m, Serializable a) =>
Message -> (a -> m b) -> m (Maybe b)
handleMessage Message
msg (s -> Message a b -> Process (ProcessAction s)
d s
s)
  dynHandleMessage UnhandledMessagePolicy
_ s
s (DispatchSTM STM a
_ s -> a -> Process (ProcessAction s)
d Match Message
_ forall m. (Message -> m) -> Match m
_) Message
msg = Message
-> (a -> Process (ProcessAction s))
-> Process (Maybe (ProcessAction s))
forall (m :: * -> *) a b.
(Monad m, Serializable a) =>
Message -> (a -> m b) -> m (Maybe b)
handleMessage Message
msg (s -> a -> Process (ProcessAction s)
d s
s)

instance DynMessageHandler DeferredDispatcher where
  dynHandleMessage :: forall s.
UnhandledMessagePolicy
-> s
-> DeferredDispatcher s
-> Message
-> Process (Maybe (ProcessAction s))
dynHandleMessage UnhandledMessagePolicy
_ s
s (DeferredDispatcher s -> Message -> Process (Maybe (ProcessAction s))
d) = s -> Message -> Process (Maybe (ProcessAction s))
d s
s

-- | Maps filters to an action that can take place outside of a
-- expect/recieve block.
class DynFilterHandler d where
  dynHandleFilter :: s
                  -> d s
                  -> Message
                  -> Process (Maybe (Filter s))

instance DynFilterHandler DispatchFilter where
  dynHandleFilter :: forall s.
s -> DispatchFilter s -> Message -> Process (Maybe (Filter s))
dynHandleFilter s
s (FilterApi s -> Message a b -> Process (Filter s)
d)   Message
msg = Message
-> (Message a b -> Process (Filter s))
-> Process (Maybe (Filter s))
forall (m :: * -> *) a b.
(Monad m, Serializable a) =>
Message -> (a -> m b) -> m (Maybe b)
handleMessage Message
msg (s -> Message a b -> Process (Filter s)
d s
s)
  dynHandleFilter s
s (FilterAny s -> a -> Process (Filter s)
d)   Message
msg = Message -> (a -> Process (Filter s)) -> Process (Maybe (Filter s))
forall (m :: * -> *) a b.
(Monad m, Serializable a) =>
Message -> (a -> m b) -> m (Maybe b)
handleMessage Message
msg (s -> a -> Process (Filter s)
d s
s)
  dynHandleFilter s
s (FilterRaw s -> Message -> Process (Maybe (Filter s))
d)   Message
msg = s -> Message -> Process (Maybe (Filter s))
d s
s Message
msg
  dynHandleFilter s
s (FilterState s -> Process (Maybe (Filter s))
d) Message
_   = s -> Process (Maybe (Filter s))
d s
s

-- | Prioritised process loop.
--
-- Evaluating this function will cause the caller to enter a server loop,
-- constantly reading messages from its mailbox (and/or other supplied control
-- planes) and passing these to handler functions in the supplied process
-- definition. Only when it is determined that the server process should
-- terminate - either by the handlers deciding to stop the process, or by an
-- unhandled exit signal or other form of failure condition (e.g. synchronous or
-- asynchronous exceptions).
--
-- ensureIOManagerIsRunning before evaluating this loop...
--
precvLoop :: PrioritisedProcessDefinition s
          -> s
          -> Delay
          -> Process ExitReason
precvLoop :: forall s.
PrioritisedProcessDefinition s -> s -> Delay -> Process ExitReason
precvLoop PrioritisedProcessDefinition s
ppDef s
pState Delay
recvDelay = do
  IORef (ProcessState s)
st <- IO (IORef (ProcessState s)) -> Process (IORef (ProcessState s))
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
P.liftIO (IO (IORef (ProcessState s)) -> Process (IORef (ProcessState s)))
-> IO (IORef (ProcessState s)) -> Process (IORef (ProcessState s))
forall a b. (a -> b) -> a -> b
$ ProcessState s -> IO (IORef (ProcessState s))
forall a. a -> IO (IORef a)
newIORef (ProcessState s -> IO (IORef (ProcessState s)))
-> ProcessState s -> IO (IORef (ProcessState s))
forall a b. (a -> b) -> a -> b
$ ProcessState { timeoutSpec :: RecvTimeoutPolicy
timeoutSpec = PrioritisedProcessDefinition s -> RecvTimeoutPolicy
forall s. PrioritisedProcessDefinition s -> RecvTimeoutPolicy
recvTimeout PrioritisedProcessDefinition s
ppDef
                                           , sysTimeout :: Timer
sysTimeout  = Delay -> Timer
delayTimer Delay
Infinity
                                           , usrTimeout :: Delay
usrTimeout  = Delay
recvDelay
                                           , internalQ :: Queue
internalQ   = Queue
forall k v. Ord k => PriorityQ k v
Q.empty
                                           , procState :: s
procState   = s
pState
                                           , procDef :: ProcessDefinition s
procDef     = PrioritisedProcessDefinition s -> ProcessDefinition s
forall s. PrioritisedProcessDefinition s -> ProcessDefinition s
processDef PrioritisedProcessDefinition s
ppDef
                                           , procPrio :: [DispatchPriority s]
procPrio    = PrioritisedProcessDefinition s -> [DispatchPriority s]
forall s. PrioritisedProcessDefinition s -> [DispatchPriority s]
priorities PrioritisedProcessDefinition s
ppDef
                                           , procFilters :: [DispatchFilter s]
procFilters = PrioritisedProcessDefinition s -> [DispatchFilter s]
forall s. PrioritisedProcessDefinition s -> [DispatchFilter s]
filters PrioritisedProcessDefinition s
ppDef
                                           , usrTimers :: TimerMap
usrTimers   = TimerMap
forall k a. Map k a
Map.empty
                                           }

  ((forall a. Process a -> Process a) -> Process ExitReason)
-> Process ExitReason
forall b.
HasCallStack =>
((forall a. Process a -> Process a) -> Process b) -> Process b
forall (m :: * -> *) b.
(MonadMask m, HasCallStack) =>
((forall a. m a -> m a) -> m b) -> m b
mask (((forall a. Process a -> Process a) -> Process ExitReason)
 -> Process ExitReason)
-> ((forall a. Process a -> Process a) -> Process ExitReason)
-> Process ExitReason
forall a b. (a -> b) -> a -> b
$ \forall a. Process a -> Process a
restore -> do
    Either SomeException (ExitReason, IORef (ProcessState s))
res <- Process (Either SomeException (ExitReason, IORef (ProcessState s)))
-> (SomeException
    -> Process
         (Either SomeException (ExitReason, IORef (ProcessState s))))
-> Process
     (Either SomeException (ExitReason, IORef (ProcessState s)))
forall e a.
(HasCallStack, Exception e) =>
Process a -> (e -> Process a) -> Process a
forall (m :: * -> *) e a.
(MonadCatch m, HasCallStack, Exception e) =>
m a -> (e -> m a) -> m a
catch (((ExitReason, IORef (ProcessState s))
 -> Either SomeException (ExitReason, IORef (ProcessState s)))
-> Process (ExitReason, IORef (ProcessState s))
-> Process
     (Either SomeException (ExitReason, IORef (ProcessState s)))
forall a b. (a -> b) -> Process a -> Process b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (ExitReason, IORef (ProcessState s))
-> Either SomeException (ExitReason, IORef (ProcessState s))
forall a b. b -> Either a b
Right (Process (ExitReason, IORef (ProcessState s))
 -> Process
      (Either SomeException (ExitReason, IORef (ProcessState s))))
-> Process (ExitReason, IORef (ProcessState s))
-> Process
     (Either SomeException (ExitReason, IORef (ProcessState s)))
forall a b. (a -> b) -> a -> b
$ Process (ExitReason, IORef (ProcessState s))
-> Process (ExitReason, IORef (ProcessState s))
forall a. Process a -> Process a
restore (Process (ExitReason, IORef (ProcessState s))
 -> Process (ExitReason, IORef (ProcessState s)))
-> Process (ExitReason, IORef (ProcessState s))
-> Process (ExitReason, IORef (ProcessState s))
forall a b. (a -> b) -> a -> b
$ IORef (ProcessState s)
-> Process (ExitReason, IORef (ProcessState s))
forall {s}. State s -> Process (ExitReason, State s)
loop IORef (ProcessState s)
st)
                 (\(SomeException
e :: SomeException) -> Either SomeException (ExitReason, IORef (ProcessState s))
-> Process
     (Either SomeException (ExitReason, IORef (ProcessState s)))
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either SomeException (ExitReason, IORef (ProcessState s))
 -> Process
      (Either SomeException (ExitReason, IORef (ProcessState s))))
-> Either SomeException (ExitReason, IORef (ProcessState s))
-> Process
     (Either SomeException (ExitReason, IORef (ProcessState s)))
forall a b. (a -> b) -> a -> b
$ SomeException
-> Either SomeException (ExitReason, IORef (ProcessState s))
forall a b. a -> Either a b
Left SomeException
e)

    -- res could be (Left ex), so we restore process state & def from our IORef
    ProcessState s
ps <- IO (ProcessState s) -> Process (ProcessState s)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
P.liftIO (IO (ProcessState s) -> Process (ProcessState s))
-> IO (ProcessState s) -> Process (ProcessState s)
forall a b. (a -> b) -> a -> b
$ IORef (ProcessState s)
-> (ProcessState s -> (ProcessState s, ProcessState s))
-> IO (ProcessState s)
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (ProcessState s)
st ((ProcessState s -> (ProcessState s, ProcessState s))
 -> IO (ProcessState s))
-> (ProcessState s -> (ProcessState s, ProcessState s))
-> IO (ProcessState s)
forall a b. (a -> b) -> a -> b
$ \ProcessState s
s' -> (ProcessState s
s', ProcessState s
s')
    let st' :: s
st' = ProcessState s -> s
forall s. ProcessState s -> s
procState ProcessState s
ps
        pd :: ProcessDefinition s
pd = ProcessState s -> ProcessDefinition s
forall s. ProcessState s -> ProcessDefinition s
procDef ProcessState s
ps
        sh :: ShutdownHandler s
sh = ProcessDefinition s -> ShutdownHandler s
forall s. ProcessDefinition s -> ShutdownHandler s
shutdownHandler ProcessDefinition s
pd
    case Either SomeException (ExitReason, IORef (ProcessState s))
res of
      Right (ExitReason
exitReason, IORef (ProcessState s)
_) -> do
        Process () -> Process ()
forall a. Process a -> Process a
restore (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$ ShutdownHandler s
sh (s -> ExitState s
forall s. s -> ExitState s
CleanShutdown s
st') ExitReason
exitReason
        ExitReason -> Process ExitReason
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ExitReason
exitReason
      Left SomeException
ex -> do
        -- we'll attempt to run the exit handler with the original state
        Process () -> Process ()
forall a. Process a -> Process a
restore (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$ ShutdownHandler s
sh (s -> ExitState s
forall s. s -> ExitState s
LastKnown s
st') (String -> ExitReason
ExitOther (String -> ExitReason) -> String -> ExitReason
forall a b. (a -> b) -> a -> b
$ SomeException -> String
forall a. Show a => a -> String
show SomeException
ex)
        SomeException -> Process ExitReason
forall e a. (HasCallStack, Exception e) => e -> Process a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM SomeException
ex
  where
    loop :: State s -> Process (ExitReason, State s)
loop State s
st' = Process (ExitReason, State s)
-> (ProcessId -> ExitReason -> Process (ExitReason, State s))
-> Process (ExitReason, State s)
forall a b.
(Show a, Serializable a) =>
Process b -> (ProcessId -> a -> Process b) -> Process b
catchExit (State s -> GenProcess s ExitReason -> Process (ExitReason, State s)
forall s a. State s -> GenProcess s a -> Process (a, State s)
runProcess State s
st' GenProcess s ExitReason
forall s. GenProcess s ExitReason
recvQueue)
                         (\ProcessId
_ (ExitReason
r :: ExitReason) -> (ExitReason, State s) -> Process (ExitReason, State s)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (ExitReason
r, State s
st'))

recvQueue :: GenProcess s ExitReason
recvQueue :: forall s. GenProcess s ExitReason
recvQueue = do
  ProcessDefinition s
pd <- GenProcess s (ProcessDefinition s)
forall s. GenProcess s (ProcessDefinition s)
processDefinition
  let ex :: [ExitSignalDispatcher s]
ex = ExitSignalDispatcher s
forall s. ExitSignalDispatcher s
trapExitExitSignalDispatcher s
-> [ExitSignalDispatcher s] -> [ExitSignalDispatcher s]
forall a. a -> [a] -> [a]
:(ProcessDefinition s -> [ExitSignalDispatcher s]
forall s. ProcessDefinition s -> [ExitSignalDispatcher s]
exitHandlers (ProcessDefinition s -> [ExitSignalDispatcher s])
-> ProcessDefinition s -> [ExitSignalDispatcher s]
forall a b. (a -> b) -> a -> b
$ ProcessDefinition s
pd)
  let exHandlers :: [s -> ProcessId -> Message -> Process (Maybe (ProcessAction s))]
exHandlers = (ExitSignalDispatcher s
 -> s -> ProcessId -> Message -> Process (Maybe (ProcessAction s)))
-> [ExitSignalDispatcher s]
-> [s -> ProcessId -> Message -> Process (Maybe (ProcessAction s))]
forall a b. (a -> b) -> [a] -> [b]
map (\ExitSignalDispatcher s
d' -> (ExitSignalDispatcher s
-> s -> ProcessId -> Message -> Process (Maybe (ProcessAction s))
forall s.
ExitSignalDispatcher s
-> s -> ProcessId -> Message -> Process (Maybe (ProcessAction s))
dispatchExit ExitSignalDispatcher s
d')) [ExitSignalDispatcher s]
ex

  GenProcess s ExitReason
-> (ProcessExitException -> GenProcess s ExitReason)
-> GenProcess s ExitReason
forall e a.
(HasCallStack, Exception e) =>
GenProcess s a -> (e -> GenProcess s a) -> GenProcess s a
forall (m :: * -> *) e a.
(MonadCatch m, HasCallStack, Exception e) =>
m a -> (e -> m a) -> m a
catch (GenProcess s ()
forall s. GenProcess s ()
drainMailbox GenProcess s ()
-> GenProcess s (ProcessAction s) -> GenProcess s (ProcessAction s)
forall a b. GenProcess s a -> GenProcess s b -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> GenProcess s (ProcessAction s)
forall s. GenProcess s (ProcessAction s)
processNext GenProcess s (ProcessAction s)
-> (ProcessAction s -> GenProcess s ExitReason)
-> GenProcess s ExitReason
forall a b.
GenProcess s a -> (a -> GenProcess s b) -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ProcessAction s -> GenProcess s ExitReason
forall s. ProcessAction s -> GenProcess s ExitReason
nextAction)
        (\(ProcessExitException
e :: ProcessExitException) ->
            [s -> ProcessId -> Message -> Process (Maybe (ProcessAction s))]
-> ProcessExitException -> GenProcess s (ProcessAction s)
forall s.
[s -> ProcessId -> Message -> Process (Maybe (ProcessAction s))]
-> ProcessExitException -> GenProcess s (ProcessAction s)
handleExit [s -> ProcessId -> Message -> Process (Maybe (ProcessAction s))]
exHandlers ProcessExitException
e GenProcess s (ProcessAction s)
-> (ProcessAction s -> GenProcess s ExitReason)
-> GenProcess s ExitReason
forall a b.
GenProcess s a -> (a -> GenProcess s b) -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ProcessAction s -> GenProcess s ExitReason
forall s. ProcessAction s -> GenProcess s ExitReason
nextAction)
  where

    handleExit :: [(s -> ProcessId -> Message -> Process (Maybe (ProcessAction s)))]
               -> ProcessExitException
               -> GenProcess s (ProcessAction s)
    handleExit :: forall s.
[s -> ProcessId -> Message -> Process (Maybe (ProcessAction s))]
-> ProcessExitException -> GenProcess s (ProcessAction s)
handleExit []     ProcessExitException
ex                                 = ProcessExitException -> GenProcess s (ProcessAction s)
forall e a. (HasCallStack, Exception e) => e -> GenProcess s a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM ProcessExitException
ex
    handleExit (s -> ProcessId -> Message -> Process (Maybe (ProcessAction s))
h:[s -> ProcessId -> Message -> Process (Maybe (ProcessAction s))]
hs) ex :: ProcessExitException
ex@(ProcessExitException ProcessId
pid Message
msg) = do
      Maybe (ProcessAction s)
r <- GenProcess s s
forall s. GenProcess s s
processState GenProcess s s
-> (s -> GenProcess s (Maybe (ProcessAction s)))
-> GenProcess s (Maybe (ProcessAction s))
forall a b.
GenProcess s a -> (a -> GenProcess s b) -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \s
s -> Process (Maybe (ProcessAction s))
-> GenProcess s (Maybe (ProcessAction s))
forall a s. Process a -> GenProcess s a
lift (Process (Maybe (ProcessAction s))
 -> GenProcess s (Maybe (ProcessAction s)))
-> Process (Maybe (ProcessAction s))
-> GenProcess s (Maybe (ProcessAction s))
forall a b. (a -> b) -> a -> b
$ s -> ProcessId -> Message -> Process (Maybe (ProcessAction s))
h s
s ProcessId
pid Message
msg
      case Maybe (ProcessAction s)
r of
        Maybe (ProcessAction s)
Nothing -> [s -> ProcessId -> Message -> Process (Maybe (ProcessAction s))]
-> ProcessExitException -> GenProcess s (ProcessAction s)
forall s.
[s -> ProcessId -> Message -> Process (Maybe (ProcessAction s))]
-> ProcessExitException -> GenProcess s (ProcessAction s)
handleExit [s -> ProcessId -> Message -> Process (Maybe (ProcessAction s))]
hs ProcessExitException
ex
        Just ProcessAction s
p  -> ProcessAction s -> GenProcess s (ProcessAction s)
forall a. a -> GenProcess s a
forall (m :: * -> *) a. Monad m => a -> m a
return ProcessAction s
p

    nextAction :: ProcessAction s -> GenProcess s ExitReason
    nextAction :: forall s. ProcessAction s -> GenProcess s ExitReason
nextAction ProcessAction s
ac
      | ProcessExpression GenProcess s (ProcessAction s)
expr   <- ProcessAction s
ac = GenProcess s (ProcessAction s)
expr GenProcess s (ProcessAction s)
-> (ProcessAction s -> GenProcess s ExitReason)
-> GenProcess s ExitReason
forall a b.
GenProcess s a -> (a -> GenProcess s b) -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ProcessAction s -> GenProcess s ExitReason
forall s. ProcessAction s -> GenProcess s ExitReason
nextAction
      | ProcessActivity  GenProcess s ()
act'    <- ProcessAction s
ac = GenProcess s ()
act' GenProcess s ()
-> GenProcess s ExitReason -> GenProcess s ExitReason
forall a b. GenProcess s a -> GenProcess s b -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> GenProcess s ExitReason
forall s. GenProcess s ExitReason
recvQueue
      | ProcessAction s
ProcessSkip              <- ProcessAction s
ac = GenProcess s ExitReason
forall s. GenProcess s ExitReason
recvQueue
      | ProcessContinue  s
ps'     <- ProcessAction s
ac = s -> GenProcess s ExitReason
forall {s}. s -> GenProcess s ExitReason
recvQueueAux s
ps'
      | ProcessTimeout   Delay
d   s
ps' <- ProcessAction s
ac = Delay -> GenProcess s ()
forall s. Delay -> GenProcess s ()
setUserTimeout Delay
d GenProcess s ()
-> GenProcess s ExitReason -> GenProcess s ExitReason
forall a b. GenProcess s a -> GenProcess s b -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> s -> GenProcess s ExitReason
forall {s}. s -> GenProcess s ExitReason
recvQueueAux s
ps'
      | ProcessStop      ExitReason
xr      <- ProcessAction s
ac = ExitReason -> GenProcess s ExitReason
forall a. a -> GenProcess s a
forall (m :: * -> *) a. Monad m => a -> m a
return ExitReason
xr
      | ProcessStopping  s
ps' ExitReason
xr  <- ProcessAction s
ac = s -> GenProcess s ()
forall s. s -> GenProcess s ()
setProcessState s
ps' GenProcess s ()
-> GenProcess s ExitReason -> GenProcess s ExitReason
forall a b. GenProcess s a -> GenProcess s b -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ExitReason -> GenProcess s ExitReason
forall a. a -> GenProcess s a
forall (m :: * -> *) a. Monad m => a -> m a
return ExitReason
xr
      | ProcessHibernate TimeInterval
d' s
s'   <- ProcessAction s
ac = (Process () -> GenProcess s ()
forall a s. Process a -> GenProcess s a
lift (Process () -> GenProcess s ()) -> Process () -> GenProcess s ()
forall a b. (a -> b) -> a -> b
$ TimeInterval -> Process ()
block TimeInterval
d') GenProcess s ()
-> GenProcess s ExitReason -> GenProcess s ExitReason
forall a b. GenProcess s a -> GenProcess s b -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> s -> GenProcess s ExitReason
forall {s}. s -> GenProcess s ExitReason
recvQueueAux s
s'
      | ProcessBecome    ProcessDefinition s
pd' s
ps' <- ProcessAction s
ac = do
          (ProcessState s -> ProcessState s) -> GenProcess s ()
forall s. (ProcessState s -> ProcessState s) -> GenProcess s ()
modifyState ((ProcessState s -> ProcessState s) -> GenProcess s ())
-> (ProcessState s -> ProcessState s) -> GenProcess s ()
forall a b. (a -> b) -> a -> b
$ \st :: ProcessState s
st@ProcessState{s
[DispatchPriority s]
[DispatchFilter s]
TimerMap
Delay
Queue
Timer
ProcessDefinition s
RecvTimeoutPolicy
timeoutSpec :: forall s. ProcessState s -> RecvTimeoutPolicy
procDef :: forall s. ProcessState s -> ProcessDefinition s
procPrio :: forall s. ProcessState s -> [DispatchPriority s]
procFilters :: forall s. ProcessState s -> [DispatchFilter s]
usrTimeout :: forall s. ProcessState s -> Delay
sysTimeout :: forall s. ProcessState s -> Timer
usrTimers :: forall s. ProcessState s -> TimerMap
internalQ :: forall s. ProcessState s -> Queue
procState :: forall s. ProcessState s -> s
timeoutSpec :: RecvTimeoutPolicy
procDef :: ProcessDefinition s
procPrio :: [DispatchPriority s]
procFilters :: [DispatchFilter s]
usrTimeout :: Delay
sysTimeout :: Timer
usrTimers :: TimerMap
internalQ :: Queue
procState :: s
..} -> ProcessState s
st { procDef = pd', procState = ps' }
          -- liftIO $ putStrLn "modified process def"
          GenProcess s ExitReason
forall s. GenProcess s ExitReason
recvQueue
      | Bool
otherwise {- compiler foo -}   = ExitReason -> GenProcess s ExitReason
forall a. a -> GenProcess s a
forall (m :: * -> *) a. Monad m => a -> m a
return (ExitReason -> GenProcess s ExitReason)
-> ExitReason -> GenProcess s ExitReason
forall a b. (a -> b) -> a -> b
$ String -> ExitReason
ExitOther String
"IllegalState"

    recvQueueAux :: s -> GenProcess s ExitReason
recvQueueAux s
st = s -> GenProcess s ()
forall s. s -> GenProcess s ()
setProcessState s
st GenProcess s ()
-> GenProcess s ExitReason -> GenProcess s ExitReason
forall a b. GenProcess s a -> GenProcess s b -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> GenProcess s ExitReason
forall s. GenProcess s ExitReason
recvQueue

      -- TODO: at some point we should re-implement our state monad in terms of
      -- mkWeakIORef instead of a full IORef. At that point, we can implement hiberation
      -- in the following terms:
      -- 1. the user defines (at some level, perhaps outside of this API) some
      --    means for writing a process' state to a backing store
      --    NB: this could be /persistent/, or a file, or database, etc...
      -- 2. when we enter hibernation, we do the following:
      --    (a) write the process state to the chosen backing store
      --    (b) evaluate yield (telling the RTS we're willing to give up our time slice)
      --    (c) enter a blocking receiveWait with no state on our stack...
      --        [NB] presumably at this point our state will be eligible for GC
      --    (d) when we finally receive a message, reboot the process thus:
      --        (i)   read our state back from the given backing store
      --        (ii)  call a user defined function to rebuild the state if custom
      --              actions need to be taken (e.g. they might've stored something
      --              like an STM TVar and need to request a new one from some
      --              well known service or registry - alt. they might want to
      --              /replay/ actions to rebuild their state as an FSM might)
      --        (iii) re-enter the recv loop and immediately processNext
      --
      -- This will give roughly the same semantics as erlang's hibernate/3, although
      -- the RTS does GC globally rather than per-thread, but that might change in
      -- some future release (who knows!?).
      --
      -- Also, this gives us the ability to migrate process state across remote
      -- boundaries. Not only can a process be moved in this way, if we generalise
      -- the mechanism to move a serialised closure, we can migrate the whole process
      -- and its state as well. The main difference here (with ordinary use of
      -- @Closure@ et al for moving processes around, is that we do not insist
      -- on the process state being serializable, simply that they provide a
      -- function to read+write the state, and a (state -> state) function to be
      -- called during rehydration if custom actions need to be taken.
      --

    processNext :: GenProcess s (ProcessAction s)
    processNext :: forall s. GenProcess s (ProcessAction s)
processNext = do
      (UnhandledMessagePolicy
up, [DispatchFilter s]
pf) <- (ProcessState s -> (UnhandledMessagePolicy, [DispatchFilter s]))
-> GenProcess s (UnhandledMessagePolicy, [DispatchFilter s])
forall s a. (ProcessState s -> a) -> GenProcess s a
gets ((ProcessState s -> (UnhandledMessagePolicy, [DispatchFilter s]))
 -> GenProcess s (UnhandledMessagePolicy, [DispatchFilter s]))
-> (ProcessState s -> (UnhandledMessagePolicy, [DispatchFilter s]))
-> GenProcess s (UnhandledMessagePolicy, [DispatchFilter s])
forall a b. (a -> b) -> a -> b
$ (UnhandledMessagePolicy
 -> [DispatchFilter s]
 -> (UnhandledMessagePolicy, [DispatchFilter s]))
-> (ProcessState s -> UnhandledMessagePolicy)
-> (ProcessState s -> [DispatchFilter s])
-> ProcessState s
-> (UnhandledMessagePolicy, [DispatchFilter s])
forall a b c.
(a -> b -> c)
-> (ProcessState s -> a)
-> (ProcessState s -> b)
-> ProcessState s
-> c
forall (f :: * -> *) a b c.
Applicative f =>
(a -> b -> c) -> f a -> f b -> f c
liftA2 (,) (ProcessDefinition s -> UnhandledMessagePolicy
forall s. ProcessDefinition s -> UnhandledMessagePolicy
unhandledMessagePolicy (ProcessDefinition s -> UnhandledMessagePolicy)
-> (ProcessState s -> ProcessDefinition s)
-> ProcessState s
-> UnhandledMessagePolicy
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProcessState s -> ProcessDefinition s
forall s. ProcessState s -> ProcessDefinition s
procDef) ProcessState s -> [DispatchFilter s]
forall s. ProcessState s -> [DispatchFilter s]
procFilters
      case [DispatchFilter s]
pf of
        [] -> GenProcess s (ProcessAction s)
forall s. GenProcess s (ProcessAction s)
consumeMessage
        [DispatchFilter s]
_  -> (Message -> GenProcess s (ProcessAction s))
-> GenProcess s (ProcessAction s)
forall {s}.
(Message -> GenProcess s (ProcessAction s))
-> GenProcess s (ProcessAction s)
filterMessage  (Bool
-> UnhandledMessagePolicy
-> [DispatchFilter s]
-> Maybe (Filter s)
-> Message
-> GenProcess s (ProcessAction s)
forall s.
Bool
-> UnhandledMessagePolicy
-> [DispatchFilter s]
-> Maybe (Filter s)
-> Message
-> GenProcess s (ProcessAction s)
filterNext Bool
False UnhandledMessagePolicy
up [DispatchFilter s]
pf Maybe (Filter s)
forall a. Maybe a
Nothing)

    consumeMessage :: GenProcess s (ProcessAction s)
consumeMessage = GenProcess s (Maybe Message)
-> (Message -> GenProcess s (ProcessAction s))
-> GenProcess s (ProcessAction s)
forall s.
GenProcess s (Maybe Message)
-> (Message -> GenProcess s (ProcessAction s))
-> GenProcess s (ProcessAction s)
applyNext GenProcess s (Maybe Message)
forall s. GenProcess s (Maybe Message)
dequeue Message -> GenProcess s (ProcessAction s)
forall {s}. Message -> GenProcess s (ProcessAction s)
processApply
    filterMessage :: (Message -> GenProcess s (ProcessAction s))
-> GenProcess s (ProcessAction s)
filterMessage = GenProcess s (Maybe Message)
-> (Message -> GenProcess s (ProcessAction s))
-> GenProcess s (ProcessAction s)
forall s.
GenProcess s (Maybe Message)
-> (Message -> GenProcess s (ProcessAction s))
-> GenProcess s (ProcessAction s)
applyNext GenProcess s (Maybe Message)
forall s. GenProcess s (Maybe Message)
peek

    filterNext :: Safe
               -> UnhandledMessagePolicy
               -> [DispatchFilter s]
               -> Maybe (Filter s)
               -> Message
               -> GenProcess s (ProcessAction s)
    filterNext :: forall s.
Bool
-> UnhandledMessagePolicy
-> [DispatchFilter s]
-> Maybe (Filter s)
-> Message
-> GenProcess s (ProcessAction s)
filterNext Bool
isSafe UnhandledMessagePolicy
mp' [DispatchFilter s]
fs Maybe (Filter s)
mf Message
msg
      | Just (FilterSafe s
s')   <- Maybe (Filter s)
mf = Bool
-> UnhandledMessagePolicy
-> [DispatchFilter s]
-> Maybe (Filter s)
-> Message
-> GenProcess s (ProcessAction s)
forall s.
Bool
-> UnhandledMessagePolicy
-> [DispatchFilter s]
-> Maybe (Filter s)
-> Message
-> GenProcess s (ProcessAction s)
filterNext Bool
True UnhandledMessagePolicy
mp' [DispatchFilter s]
fs (Filter s -> Maybe (Filter s)
forall a. a -> Maybe a
Just (Filter s -> Maybe (Filter s)) -> Filter s -> Maybe (Filter s)
forall a b. (a -> b) -> a -> b
$ s -> Filter s
forall s. s -> Filter s
FilterOk s
s') Message
msg
      | Just (FilterSkip s
s')   <- Maybe (Filter s)
mf = s -> GenProcess s ()
forall s. s -> GenProcess s ()
setProcessState s
s' GenProcess s ()
-> GenProcess s (Maybe Message) -> GenProcess s (Maybe Message)
forall a b. GenProcess s a -> GenProcess s b -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> GenProcess s (Maybe Message)
forall s. GenProcess s (Maybe Message)
dequeue GenProcess s (Maybe Message)
-> GenProcess s (ProcessAction s) -> GenProcess s (ProcessAction s)
forall a b. GenProcess s a -> GenProcess s b -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ProcessAction s -> GenProcess s (ProcessAction s)
forall a. a -> GenProcess s a
forall (m :: * -> *) a. Monad m => a -> m a
return ProcessAction s
forall s. ProcessAction s
ProcessSkip
      | Just (FilterStop s
s' ExitReason
r) <- Maybe (Filter s)
mf = ProcessAction s -> GenProcess s (ProcessAction s)
forall a. a -> GenProcess s a
forall (m :: * -> *) a. Monad m => a -> m a
return (ProcessAction s -> GenProcess s (ProcessAction s))
-> ProcessAction s -> GenProcess s (ProcessAction s)
forall a b. (a -> b) -> a -> b
$ s -> ExitReason -> ProcessAction s
forall s. s -> ExitReason -> ProcessAction s
ProcessStopping s
s' ExitReason
r
      | Bool
isSafe
      , Just (FilterOk s
s')     <- Maybe (Filter s)
mf
      , []                     <- [DispatchFilter s]
fs = do s -> GenProcess s ()
forall s. s -> GenProcess s ()
setProcessState s
s'
                                          ProcessAction s
act' <- Message -> GenProcess s (ProcessAction s)
forall {s}. Message -> GenProcess s (ProcessAction s)
processApply Message
msg
                                          GenProcess s (Maybe Message)
forall s. GenProcess s (Maybe Message)
dequeue GenProcess s (Maybe Message)
-> GenProcess s (ProcessAction s) -> GenProcess s (ProcessAction s)
forall a b. GenProcess s a -> GenProcess s b -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ProcessAction s -> GenProcess s (ProcessAction s)
forall a. a -> GenProcess s a
forall (m :: * -> *) a. Monad m => a -> m a
return ProcessAction s
act'
      | Just (FilterOk s
s')     <- Maybe (Filter s)
mf
      , []                     <- [DispatchFilter s]
fs = s -> GenProcess s ()
forall s. s -> GenProcess s ()
setProcessState s
s' GenProcess s ()
-> GenProcess s (ProcessAction s) -> GenProcess s (ProcessAction s)
forall a b. GenProcess s a -> GenProcess s b -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> GenProcess s (Maybe Message)
-> (Message -> GenProcess s (ProcessAction s))
-> GenProcess s (ProcessAction s)
forall s.
GenProcess s (Maybe Message)
-> (Message -> GenProcess s (ProcessAction s))
-> GenProcess s (ProcessAction s)
applyNext GenProcess s (Maybe Message)
forall s. GenProcess s (Maybe Message)
dequeue Message -> GenProcess s (ProcessAction s)
forall {s}. Message -> GenProcess s (ProcessAction s)
processApply
      | Maybe (Filter s)
Nothing <- Maybe (Filter s)
mf, []      <- [DispatchFilter s]
fs = GenProcess s (Maybe Message)
-> (Message -> GenProcess s (ProcessAction s))
-> GenProcess s (ProcessAction s)
forall s.
GenProcess s (Maybe Message)
-> (Message -> GenProcess s (ProcessAction s))
-> GenProcess s (ProcessAction s)
applyNext GenProcess s (Maybe Message)
forall s. GenProcess s (Maybe Message)
dequeue Message -> GenProcess s (ProcessAction s)
forall {s}. Message -> GenProcess s (ProcessAction s)
processApply
      | Just (FilterOk s
s')     <- Maybe (Filter s)
mf
      , (DispatchFilter s
f:[DispatchFilter s]
fs')                <- [DispatchFilter s]
fs = do
          s -> GenProcess s ()
forall s. s -> GenProcess s ()
setProcessState s
s'
          Maybe (Filter s)
act' <- Process (Maybe (Filter s)) -> GenProcess s (Maybe (Filter s))
forall a s. Process a -> GenProcess s a
lift (Process (Maybe (Filter s)) -> GenProcess s (Maybe (Filter s)))
-> Process (Maybe (Filter s)) -> GenProcess s (Maybe (Filter s))
forall a b. (a -> b) -> a -> b
$ s -> DispatchFilter s -> Message -> Process (Maybe (Filter s))
forall s.
s -> DispatchFilter s -> Message -> Process (Maybe (Filter s))
forall (d :: * -> *) s.
DynFilterHandler d =>
s -> d s -> Message -> Process (Maybe (Filter s))
dynHandleFilter s
s' DispatchFilter s
f Message
msg
          Bool
-> UnhandledMessagePolicy
-> [DispatchFilter s]
-> Maybe (Filter s)
-> Message
-> GenProcess s (ProcessAction s)
forall s.
Bool
-> UnhandledMessagePolicy
-> [DispatchFilter s]
-> Maybe (Filter s)
-> Message
-> GenProcess s (ProcessAction s)
filterNext Bool
isSafe UnhandledMessagePolicy
mp' [DispatchFilter s]
fs' Maybe (Filter s)
act' Message
msg
      | Just (FilterReject m
_ s
s') <- Maybe (Filter s)
mf = do
          s -> GenProcess s ()
forall s. s -> GenProcess s ()
setProcessState s
s' GenProcess s ()
-> GenProcess s (Maybe Message) -> GenProcess s (Maybe Message)
forall a b. GenProcess s a -> GenProcess s b -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> GenProcess s (Maybe Message)
forall s. GenProcess s (Maybe Message)
dequeue GenProcess s (Maybe Message)
-> (Maybe Message -> GenProcess s (ProcessAction s))
-> GenProcess s (ProcessAction s)
forall a b.
GenProcess s a -> (a -> GenProcess s b) -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Process (ProcessAction s) -> GenProcess s (ProcessAction s)
forall a s. Process a -> GenProcess s a
lift (Process (ProcessAction s) -> GenProcess s (ProcessAction s))
-> (Maybe Message -> Process (ProcessAction s))
-> Maybe Message
-> GenProcess s (ProcessAction s)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. UnhandledMessagePolicy -> s -> Message -> Process (ProcessAction s)
forall s.
UnhandledMessagePolicy -> s -> Message -> Process (ProcessAction s)
applyPolicy UnhandledMessagePolicy
mp' s
s' (Message -> Process (ProcessAction s))
-> (Maybe Message -> Message)
-> Maybe Message
-> Process (ProcessAction s)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Maybe Message -> Message
forall a. HasCallStack => Maybe a -> a
fromJust
      | Maybe (Filter s)
Nothing <- Maybe (Filter s)
mf {- filter didn't apply to the input type -}
      , (DispatchFilter s
f:[DispatchFilter s]
fs') <- [DispatchFilter s]
fs = GenProcess s s
forall s. GenProcess s s
processState GenProcess s s
-> (s -> GenProcess s (ProcessAction s))
-> GenProcess s (ProcessAction s)
forall a b.
GenProcess s a -> (a -> GenProcess s b) -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \s
s' -> do
          Process (Maybe (Filter s)) -> GenProcess s (Maybe (Filter s))
forall a s. Process a -> GenProcess s a
lift (s -> DispatchFilter s -> Message -> Process (Maybe (Filter s))
forall s.
s -> DispatchFilter s -> Message -> Process (Maybe (Filter s))
forall (d :: * -> *) s.
DynFilterHandler d =>
s -> d s -> Message -> Process (Maybe (Filter s))
dynHandleFilter s
s' DispatchFilter s
f Message
msg) GenProcess s (Maybe (Filter s))
-> (Maybe (Filter s) -> GenProcess s (ProcessAction s))
-> GenProcess s (ProcessAction s)
forall a b.
GenProcess s a -> (a -> GenProcess s b) -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Maybe (Filter s)
a -> Bool
-> UnhandledMessagePolicy
-> [DispatchFilter s]
-> Maybe (Filter s)
-> Message
-> GenProcess s (ProcessAction s)
forall s.
Bool
-> UnhandledMessagePolicy
-> [DispatchFilter s]
-> Maybe (Filter s)
-> Message
-> GenProcess s (ProcessAction s)
filterNext Bool
isSafe UnhandledMessagePolicy
mp' [DispatchFilter s]
fs' Maybe (Filter s)
a Message
msg

    applyNext :: (GenProcess s (Maybe Message))
              -> (Message -> GenProcess s (ProcessAction s))
              -> GenProcess s (ProcessAction s)
    applyNext :: forall s.
GenProcess s (Maybe Message)
-> (Message -> GenProcess s (ProcessAction s))
-> GenProcess s (ProcessAction s)
applyNext GenProcess s (Maybe Message)
queueOp Message -> GenProcess s (ProcessAction s)
handler = do
      Maybe Message
next <- GenProcess s (Maybe Message)
queueOp
      case Maybe Message
next of
        Maybe Message
Nothing  -> GenProcess s (ProcessAction s)
forall s. GenProcess s (ProcessAction s)
drainOrTimeout
        Just Message
msg -> Message -> GenProcess s (ProcessAction s)
handler Message
msg

    processApply :: Message -> GenProcess s (ProcessAction s)
processApply Message
msg = do
      (ProcessDefinition s
def, s
pState) <- (ProcessState s -> (ProcessDefinition s, s))
-> GenProcess s (ProcessDefinition s, s)
forall s a. (ProcessState s -> a) -> GenProcess s a
gets ((ProcessState s -> (ProcessDefinition s, s))
 -> GenProcess s (ProcessDefinition s, s))
-> (ProcessState s -> (ProcessDefinition s, s))
-> GenProcess s (ProcessDefinition s, s)
forall a b. (a -> b) -> a -> b
$ (ProcessDefinition s -> s -> (ProcessDefinition s, s))
-> (ProcessState s -> ProcessDefinition s)
-> (ProcessState s -> s)
-> ProcessState s
-> (ProcessDefinition s, s)
forall a b c.
(a -> b -> c)
-> (ProcessState s -> a)
-> (ProcessState s -> b)
-> ProcessState s
-> c
forall (f :: * -> *) a b c.
Applicative f =>
(a -> b -> c) -> f a -> f b -> f c
liftA2 (,) ProcessState s -> ProcessDefinition s
forall s. ProcessState s -> ProcessDefinition s
procDef ProcessState s -> s
forall s. ProcessState s -> s
procState
      let pol :: UnhandledMessagePolicy
pol          = ProcessDefinition s -> UnhandledMessagePolicy
forall s. ProcessDefinition s -> UnhandledMessagePolicy
unhandledMessagePolicy ProcessDefinition s
def
          apiMatchers :: [Message -> Process (Maybe (ProcessAction s))]
apiMatchers  = (Dispatcher s -> Message -> Process (Maybe (ProcessAction s)))
-> [Dispatcher s] -> [Message -> Process (Maybe (ProcessAction s))]
forall a b. (a -> b) -> [a] -> [b]
map (UnhandledMessagePolicy
-> s
-> Dispatcher s
-> Message
-> Process (Maybe (ProcessAction s))
forall s.
UnhandledMessagePolicy
-> s
-> Dispatcher s
-> Message
-> Process (Maybe (ProcessAction s))
forall (d :: * -> *) s.
DynMessageHandler d =>
UnhandledMessagePolicy
-> s -> d s -> Message -> Process (Maybe (ProcessAction s))
dynHandleMessage UnhandledMessagePolicy
pol s
pState) (ProcessDefinition s -> [Dispatcher s]
forall s. ProcessDefinition s -> [Dispatcher s]
apiHandlers ProcessDefinition s
def)
          infoMatchers :: [Message -> Process (Maybe (ProcessAction s))]
infoMatchers = (DeferredDispatcher s
 -> Message -> Process (Maybe (ProcessAction s)))
-> [DeferredDispatcher s]
-> [Message -> Process (Maybe (ProcessAction s))]
forall a b. (a -> b) -> [a] -> [b]
map (UnhandledMessagePolicy
-> s
-> DeferredDispatcher s
-> Message
-> Process (Maybe (ProcessAction s))
forall s.
UnhandledMessagePolicy
-> s
-> DeferredDispatcher s
-> Message
-> Process (Maybe (ProcessAction s))
forall (d :: * -> *) s.
DynMessageHandler d =>
UnhandledMessagePolicy
-> s -> d s -> Message -> Process (Maybe (ProcessAction s))
dynHandleMessage UnhandledMessagePolicy
pol s
pState) (ProcessDefinition s -> [DeferredDispatcher s]
forall s. ProcessDefinition s -> [DeferredDispatcher s]
infoHandlers ProcessDefinition s
def)
          extMatchers :: [Message -> Process (Maybe (ProcessAction s))]
extMatchers  = (ExternDispatcher s
 -> Message -> Process (Maybe (ProcessAction s)))
-> [ExternDispatcher s]
-> [Message -> Process (Maybe (ProcessAction s))]
forall a b. (a -> b) -> [a] -> [b]
map (UnhandledMessagePolicy
-> s
-> ExternDispatcher s
-> Message
-> Process (Maybe (ProcessAction s))
forall s.
UnhandledMessagePolicy
-> s
-> ExternDispatcher s
-> Message
-> Process (Maybe (ProcessAction s))
forall (d :: * -> *) s.
DynMessageHandler d =>
UnhandledMessagePolicy
-> s -> d s -> Message -> Process (Maybe (ProcessAction s))
dynHandleMessage UnhandledMessagePolicy
pol s
pState) (ProcessDefinition s -> [ExternDispatcher s]
forall s. ProcessDefinition s -> [ExternDispatcher s]
externHandlers ProcessDefinition s
def)
          shutdown' :: Message -> Process (Maybe (ProcessAction s))
shutdown'    = UnhandledMessagePolicy
-> s
-> Dispatcher s
-> Message
-> Process (Maybe (ProcessAction s))
forall s.
UnhandledMessagePolicy
-> s
-> Dispatcher s
-> Message
-> Process (Maybe (ProcessAction s))
forall (d :: * -> *) s.
DynMessageHandler d =>
UnhandledMessagePolicy
-> s -> d s -> Message -> Process (Maybe (ProcessAction s))
dynHandleMessage UnhandledMessagePolicy
pol s
pState Dispatcher s
forall s. Dispatcher s
shutdownHandler'
          ms' :: [Message -> Process (Maybe (ProcessAction s))]
ms'          = (Message -> Process (Maybe (ProcessAction s))
shutdown'(Message -> Process (Maybe (ProcessAction s)))
-> [Message -> Process (Maybe (ProcessAction s))]
-> [Message -> Process (Maybe (ProcessAction s))]
forall a. a -> [a] -> [a]
:[Message -> Process (Maybe (ProcessAction s))]
extMatchers) [Message -> Process (Maybe (ProcessAction s))]
-> [Message -> Process (Maybe (ProcessAction s))]
-> [Message -> Process (Maybe (ProcessAction s))]
forall a. [a] -> [a] -> [a]
++ [Message -> Process (Maybe (ProcessAction s))]
apiMatchers [Message -> Process (Maybe (ProcessAction s))]
-> [Message -> Process (Maybe (ProcessAction s))]
-> [Message -> Process (Maybe (ProcessAction s))]
forall a. [a] -> [a] -> [a]
++ [Message -> Process (Maybe (ProcessAction s))]
infoMatchers
      -- liftIO $ putStrLn $ "we have " ++ (show $ (length apiMatchers, length infoMatchers)) ++ " handlers"
      [Message -> Process (Maybe (ProcessAction s))]
-> UnhandledMessagePolicy
-> s
-> Message
-> GenProcess s (ProcessAction s)
forall {t} {s}.
[Message -> Process (Maybe (ProcessAction t))]
-> UnhandledMessagePolicy
-> t
-> Message
-> GenProcess s (ProcessAction t)
processApplyAux [Message -> Process (Maybe (ProcessAction s))]
ms' UnhandledMessagePolicy
pol s
pState Message
msg

    processApplyAux :: [Message -> Process (Maybe (ProcessAction t))]
-> UnhandledMessagePolicy
-> t
-> Message
-> GenProcess s (ProcessAction t)
processApplyAux []     UnhandledMessagePolicy
p' t
s' Message
m' = Process (ProcessAction t) -> GenProcess s (ProcessAction t)
forall a s. Process a -> GenProcess s a
lift (Process (ProcessAction t) -> GenProcess s (ProcessAction t))
-> Process (ProcessAction t) -> GenProcess s (ProcessAction t)
forall a b. (a -> b) -> a -> b
$ UnhandledMessagePolicy -> t -> Message -> Process (ProcessAction t)
forall s.
UnhandledMessagePolicy -> s -> Message -> Process (ProcessAction s)
applyPolicy UnhandledMessagePolicy
p' t
s' Message
m'
    processApplyAux (Message -> Process (Maybe (ProcessAction t))
h:[Message -> Process (Maybe (ProcessAction t))]
hs) UnhandledMessagePolicy
p' t
s' Message
m' = do
     Maybe (ProcessAction t)
attempt <- Process (Maybe (ProcessAction t))
-> GenProcess s (Maybe (ProcessAction t))
forall a s. Process a -> GenProcess s a
lift (Process (Maybe (ProcessAction t))
 -> GenProcess s (Maybe (ProcessAction t)))
-> Process (Maybe (ProcessAction t))
-> GenProcess s (Maybe (ProcessAction t))
forall a b. (a -> b) -> a -> b
$ Message -> Process (Maybe (ProcessAction t))
h Message
m'
     case Maybe (ProcessAction t)
attempt of
       Maybe (ProcessAction t)
Nothing   -> [Message -> Process (Maybe (ProcessAction t))]
-> UnhandledMessagePolicy
-> t
-> Message
-> GenProcess s (ProcessAction t)
processApplyAux [Message -> Process (Maybe (ProcessAction t))]
hs UnhandledMessagePolicy
p' t
s' Message
m'
       Just ProcessAction t
act' -> ProcessAction t -> GenProcess s (ProcessAction t)
forall a. a -> GenProcess s a
forall (m :: * -> *) a. Monad m => a -> m a
return ProcessAction t
act'

    drainMailbox :: GenProcess s ()
    drainMailbox :: forall s. GenProcess s ()
drainMailbox = do
      -- see note [timer handling whilst draining the process' mailbox]
      s
ps <- GenProcess s s
forall s. GenProcess s s
processState
      ProcessDefinition s
pd <- GenProcess s (ProcessDefinition s)
forall s. GenProcess s (ProcessDefinition s)
processDefinition
      [DispatchPriority s]
pp <- GenProcess s [DispatchPriority s]
forall s. GenProcess s [DispatchPriority s]
processPriorities
      TimerMap
ut <- (ProcessState s -> TimerMap) -> GenProcess s TimerMap
forall s a. (ProcessState s -> a) -> GenProcess s a
gets ProcessState s -> TimerMap
forall s. ProcessState s -> TimerMap
usrTimers
      let ts :: [Match (Either TimedOut Message)]
ts = (TimerKey
 -> (Timer, Message)
 -> [Match (Either TimedOut Message)]
 -> [Match (Either TimedOut Message)])
-> [Match (Either TimedOut Message)]
-> TimerMap
-> [Match (Either TimedOut Message)]
forall k a b. (k -> a -> b -> b) -> b -> Map k a -> b
Map.foldrWithKey (\TimerKey
k (Timer
t, Message
_) [Match (Either TimedOut Message)]
ms -> [Match (Either TimedOut Message)]
ms [Match (Either TimedOut Message)]
-> [Match (Either TimedOut Message)]
-> [Match (Either TimedOut Message)]
forall a. [a] -> [a] -> [a]
++ TimerKey -> Timer -> [Match (Either TimedOut Message)]
matchKey TimerKey
k Timer
t) [] TimerMap
ut
      let ms :: [Match (Either TimedOut Message)]
ms = [Match (Either TimedOut Message)]
ts [Match (Either TimedOut Message)]
-> [Match (Either TimedOut Message)]
-> [Match (Either TimedOut Message)]
forall a. [a] -> [a] -> [a]
++ ((Message -> Process (Either TimedOut Message))
-> Match (Either TimedOut Message)
forall b. (Message -> Process b) -> Match b
matchAny (Either TimedOut Message -> Process (Either TimedOut Message)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either TimedOut Message -> Process (Either TimedOut Message))
-> (Message -> Either TimedOut Message)
-> Message
-> Process (Either TimedOut Message)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Message -> Either TimedOut Message
forall a b. b -> Either a b
Right) Match (Either TimedOut Message)
-> [Match (Either TimedOut Message)]
-> [Match (Either TimedOut Message)]
forall a. a -> [a] -> [a]
: (s -> ProcessDefinition s -> [Match (Either TimedOut Message)]
forall s.
s -> ProcessDefinition s -> [Match (Either TimedOut Message)]
mkMatchers s
ps ProcessDefinition s
pd))
      Maybe TimerKey
timerAcc <- GenProcess s RecvTimeoutPolicy
forall s. GenProcess s RecvTimeoutPolicy
timeoutPolicy GenProcess s RecvTimeoutPolicy
-> (RecvTimeoutPolicy -> GenProcess s (Maybe TimerKey))
-> GenProcess s (Maybe TimerKey)
forall a b.
GenProcess s a -> (a -> GenProcess s b) -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \RecvTimeoutPolicy
spec -> case RecvTimeoutPolicy
spec of
                                               RecvTimer      TimeInterval
_   -> Maybe TimerKey -> GenProcess s (Maybe TimerKey)
forall a. a -> GenProcess s a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe TimerKey
forall a. Maybe a
Nothing
                                               RecvMaxBacklog TimerKey
cnt -> Maybe TimerKey -> GenProcess s (Maybe TimerKey)
forall a. a -> GenProcess s a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe TimerKey -> GenProcess s (Maybe TimerKey))
-> Maybe TimerKey -> GenProcess s (Maybe TimerKey)
forall a b. (a -> b) -> a -> b
$ TimerKey -> Maybe TimerKey
forall a. a -> Maybe a
Just TimerKey
cnt
      -- see note [handling async exceptions during non-blocking reads]
      -- Also note that we only use the system timeout here, dropping into the
      -- user timeout only if we end up in a blocking read on the mailbox.
      --
      GenProcess s () -> GenProcess s ()
forall (m :: * -> *) a. (HasCallStack, MonadMask m) => m a -> m a
mask_ (GenProcess s () -> GenProcess s ())
-> GenProcess s () -> GenProcess s ()
forall a b. (a -> b) -> a -> b
$ do
        Timer
tt <- GenProcess s Timer
forall s. GenProcess s Timer
maybeStartTimer
        s
-> [DispatchPriority s]
-> Maybe TimerKey
-> [Match (Either TimedOut Message)]
-> GenProcess s ()
forall s.
s
-> [DispatchPriority s]
-> Maybe TimerKey
-> [Match (Either TimedOut Message)]
-> GenProcess s ()
drainAux s
ps [DispatchPriority s]
pp Maybe TimerKey
timerAcc ([Match (Either TimedOut Message)]
ms [Match (Either TimedOut Message)]
-> [Match (Either TimedOut Message)]
-> [Match (Either TimedOut Message)]
forall a. [a] -> [a] -> [a]
++ Timer -> [Match (Either TimedOut Message)]
matchTimeout Timer
tt)
        (Process Timer -> GenProcess s Timer
forall a s. Process a -> GenProcess s a
lift (Process Timer -> GenProcess s Timer)
-> Process Timer -> GenProcess s Timer
forall a b. (a -> b) -> a -> b
$ Timer -> Process Timer
stopTimer Timer
tt) GenProcess s Timer -> (Timer -> GenProcess s ()) -> GenProcess s ()
forall a b.
GenProcess s a -> (a -> GenProcess s b) -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Timer -> GenProcess s ()
forall s. Timer -> GenProcess s ()
setDrainTimeout

    drainAux :: s
             -> [DispatchPriority s]
             -> Limit
             -> [Match (Either TimedOut Message)]
             -> GenProcess s ()
    drainAux :: forall s.
s
-> [DispatchPriority s]
-> Maybe TimerKey
-> [Match (Either TimedOut Message)]
-> GenProcess s ()
drainAux s
ps' [DispatchPriority s]
pp' Maybe TimerKey
maxbq [Match (Either TimedOut Message)]
ms = do
      (Maybe TimerKey
cnt, Maybe (Either TimedOut Message)
m) <- Maybe TimerKey
-> [Match (Either TimedOut Message)]
-> GenProcess s (Maybe TimerKey, Maybe (Either TimedOut Message))
forall s.
Maybe TimerKey
-> [Match (Either TimedOut Message)]
-> GenProcess s (Maybe TimerKey, Maybe (Either TimedOut Message))
scanMailbox Maybe TimerKey
maxbq [Match (Either TimedOut Message)]
ms
      case Maybe (Either TimedOut Message)
m of
        Maybe (Either TimedOut Message)
Nothing               -> () -> GenProcess s ()
forall a. a -> GenProcess s a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        Just (Right Message
m')       -> do s -> [DispatchPriority s] -> Message -> GenProcess s ()
forall s. s -> [DispatchPriority s] -> Message -> GenProcess s ()
enqueueMessage s
ps' [DispatchPriority s]
pp' Message
m'
                                    s
-> [DispatchPriority s]
-> Maybe TimerKey
-> [Match (Either TimedOut Message)]
-> GenProcess s ()
forall s.
s
-> [DispatchPriority s]
-> Maybe TimerKey
-> [Match (Either TimedOut Message)]
-> GenProcess s ()
drainAux s
ps' [DispatchPriority s]
pp' Maybe TimerKey
cnt [Match (Either TimedOut Message)]
ms
        Just (Left TimedOut
TimedOut)  -> () -> GenProcess s ()
forall a. a -> GenProcess s a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        Just (Left (Yield TimerKey
i)) ->
          -- we saw a user defined timer fire, and will have an associated message...
          -- this is a bit complex, we have to enqueue the message and remove the timer
          -- the latter part of which is handled for us by consumeTimer
          TimerKey -> (Message -> GenProcess s ()) -> GenProcess s ()
forall s a.
TimerKey -> (Message -> GenProcess s a) -> GenProcess s a
consumeTimer TimerKey
i Message -> GenProcess s ()
forall s. Message -> GenProcess s ()
push GenProcess s () -> GenProcess s () -> GenProcess s ()
forall a b. GenProcess s a -> GenProcess s b -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> s
-> [DispatchPriority s]
-> Maybe TimerKey
-> [Match (Either TimedOut Message)]
-> GenProcess s ()
forall s.
s
-> [DispatchPriority s]
-> Maybe TimerKey
-> [Match (Either TimedOut Message)]
-> GenProcess s ()
drainAux s
ps' [DispatchPriority s]
pp' Maybe TimerKey
cnt [Match (Either TimedOut Message)]
ms

    maybeStartTimer :: GenProcess s Timer
    maybeStartTimer :: forall s. GenProcess s Timer
maybeStartTimer = do
      RecvTimeoutPolicy
tp <- GenProcess s RecvTimeoutPolicy
forall s. GenProcess s RecvTimeoutPolicy
timeoutPolicy
      Timer
t <- case RecvTimeoutPolicy
tp of
             RecvTimer TimeInterval
d -> (Process Timer -> GenProcess s Timer
forall a s. Process a -> GenProcess s a
lift (Process Timer -> GenProcess s Timer)
-> Process Timer -> GenProcess s Timer
forall a b. (a -> b) -> a -> b
$ Delay -> Process Timer
startTimer (Delay -> Process Timer) -> Delay -> Process Timer
forall a b. (a -> b) -> a -> b
$ TimeInterval -> Delay
Delay TimeInterval
d)
             RecvTimeoutPolicy
_           -> Timer -> GenProcess s Timer
forall a. a -> GenProcess s a
forall (m :: * -> *) a. Monad m => a -> m a
return (Timer -> GenProcess s Timer) -> Timer -> GenProcess s Timer
forall a b. (a -> b) -> a -> b
$ Delay -> Timer
delayTimer Delay
Infinity
      Timer -> GenProcess s ()
forall s. Timer -> GenProcess s ()
setDrainTimeout Timer
t
      Timer -> GenProcess s Timer
forall a. a -> GenProcess s a
forall (m :: * -> *) a. Monad m => a -> m a
return Timer
t

    scanMailbox :: Limit
                -> [Match (Either TimedOut Message)]
                -> GenProcess s (Limit, Maybe (Either TimedOut Message))
    scanMailbox :: forall s.
Maybe TimerKey
-> [Match (Either TimedOut Message)]
-> GenProcess s (Maybe TimerKey, Maybe (Either TimedOut Message))
scanMailbox Maybe TimerKey
lim [Match (Either TimedOut Message)]
ms
      | Just TimerKey
0 <- Maybe TimerKey
lim = (Maybe TimerKey, Maybe (Either TimedOut Message))
-> GenProcess s (Maybe TimerKey, Maybe (Either TimedOut Message))
forall a. a -> GenProcess s a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe TimerKey
lim, Either TimedOut Message -> Maybe (Either TimedOut Message)
forall a. a -> Maybe a
Just (Either TimedOut Message -> Maybe (Either TimedOut Message))
-> Either TimedOut Message -> Maybe (Either TimedOut Message)
forall a b. (a -> b) -> a -> b
$ TimedOut -> Either TimedOut Message
forall a b. a -> Either a b
Left TimedOut
TimedOut)
      | Just TimerKey
c <- Maybe TimerKey
lim = do {- non-blocking read on our mailbox, any external inputs,
                              plus whatever match specs the TimeoutManager gives -}
                        Process (Maybe TimerKey, Maybe (Either TimedOut Message))
-> GenProcess s (Maybe TimerKey, Maybe (Either TimedOut Message))
forall a s. Process a -> GenProcess s a
lift (Process (Maybe TimerKey, Maybe (Either TimedOut Message))
 -> GenProcess s (Maybe TimerKey, Maybe (Either TimedOut Message)))
-> Process (Maybe TimerKey, Maybe (Either TimedOut Message))
-> GenProcess s (Maybe TimerKey, Maybe (Either TimedOut Message))
forall a b. (a -> b) -> a -> b
$ (Maybe (Either TimedOut Message)
 -> (Maybe TimerKey, Maybe (Either TimedOut Message)))
-> Process (Maybe (Either TimedOut Message))
-> Process (Maybe TimerKey, Maybe (Either TimedOut Message))
forall a b. (a -> b) -> Process a -> Process b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (TimerKey -> Maybe TimerKey
forall a. a -> Maybe a
Just (TimerKey
c TimerKey -> TimerKey -> TimerKey
forall a. Num a => a -> a -> a
- TimerKey
1), ) (TimerKey
-> [Match (Either TimedOut Message)]
-> Process (Maybe (Either TimedOut Message))
forall b. TimerKey -> [Match b] -> Process (Maybe b)
receiveTimeout TimerKey
0 [Match (Either TimedOut Message)]
ms)
      | Bool
otherwise     = Process (Maybe TimerKey, Maybe (Either TimedOut Message))
-> GenProcess s (Maybe TimerKey, Maybe (Either TimedOut Message))
forall a s. Process a -> GenProcess s a
lift (Process (Maybe TimerKey, Maybe (Either TimedOut Message))
 -> GenProcess s (Maybe TimerKey, Maybe (Either TimedOut Message)))
-> Process (Maybe TimerKey, Maybe (Either TimedOut Message))
-> GenProcess s (Maybe TimerKey, Maybe (Either TimedOut Message))
forall a b. (a -> b) -> a -> b
$ (Maybe (Either TimedOut Message)
 -> (Maybe TimerKey, Maybe (Either TimedOut Message)))
-> Process (Maybe (Either TimedOut Message))
-> Process (Maybe TimerKey, Maybe (Either TimedOut Message))
forall a b. (a -> b) -> Process a -> Process b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Maybe TimerKey
lim, ) (TimerKey
-> [Match (Either TimedOut Message)]
-> Process (Maybe (Either TimedOut Message))
forall b. TimerKey -> [Match b] -> Process (Maybe b)
receiveTimeout TimerKey
0 [Match (Either TimedOut Message)]
ms)

    -- see note [timer handling whilst draining the process' mailbox]
    drainOrTimeout :: GenProcess s (ProcessAction s)
    drainOrTimeout :: forall s. GenProcess s (ProcessAction s)
drainOrTimeout = do
      ProcessDefinition s
pd <- GenProcess s (ProcessDefinition s)
forall s. GenProcess s (ProcessDefinition s)
processDefinition
      s
ps <- GenProcess s s
forall s. GenProcess s s
processState
      Delay
ud <- GenProcess s Delay
forall s. GenProcess s Delay
currentTimeout
      [Match Message]
mr <- GenProcess s [Match Message]
forall s. GenProcess s [Match Message]
mkMatchRunners
      let ump :: UnhandledMessagePolicy
ump     = ProcessDefinition s -> UnhandledMessagePolicy
forall s. ProcessDefinition s -> UnhandledMessagePolicy
unhandledMessagePolicy ProcessDefinition s
pd
          hto :: TimeoutHandler s
hto     = ProcessDefinition s -> TimeoutHandler s
forall s. ProcessDefinition s -> TimeoutHandler s
timeoutHandler ProcessDefinition s
pd
          matches :: [Match Message]
matches = [Match Message]
mr [Match Message] -> [Match Message] -> [Match Message]
forall a. [a] -> [a] -> [a]
++ (((Message -> Process Message) -> Match Message
matchMessage Message -> Process Message
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return)Match Message -> [Match Message] -> [Match Message]
forall a. a -> [a] -> [a]
:(ExternDispatcher s -> Match Message)
-> [ExternDispatcher s] -> [Match Message]
forall a b. (a -> b) -> [a] -> [b]
map (UnhandledMessagePolicy -> s -> ExternDispatcher s -> Match Message
forall s.
UnhandledMessagePolicy -> s -> ExternDispatcher s -> Match Message
forall (d :: * -> *) s.
ExternMatcher d =>
UnhandledMessagePolicy -> s -> d s -> Match Message
matchExtern UnhandledMessagePolicy
ump s
ps) (ProcessDefinition s -> [ExternDispatcher s]
forall s. ProcessDefinition s -> [ExternDispatcher s]
externHandlers ProcessDefinition s
pd))
          recv :: GenProcess s (Maybe Message)
recv    = case Delay
ud of
                      Delay
Infinity -> Process (Maybe Message) -> GenProcess s (Maybe Message)
forall a s. Process a -> GenProcess s a
lift (Process (Maybe Message) -> GenProcess s (Maybe Message))
-> Process (Maybe Message) -> GenProcess s (Maybe Message)
forall a b. (a -> b) -> a -> b
$ (Message -> Maybe Message)
-> Process Message -> Process (Maybe Message)
forall a b. (a -> b) -> Process a -> Process b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Message -> Maybe Message
forall a. a -> Maybe a
Just ([Match Message] -> Process Message
forall b. [Match b] -> Process b
receiveWait [Match Message]
matches)
                      Delay
NoDelay  -> Process (Maybe Message) -> GenProcess s (Maybe Message)
forall a s. Process a -> GenProcess s a
lift (Process (Maybe Message) -> GenProcess s (Maybe Message))
-> Process (Maybe Message) -> GenProcess s (Maybe Message)
forall a b. (a -> b) -> a -> b
$ TimerKey -> [Match Message] -> Process (Maybe Message)
forall b. TimerKey -> [Match b] -> Process (Maybe b)
receiveTimeout TimerKey
0 [Match Message]
matches
                      Delay TimeInterval
i  -> Process (Maybe Message) -> GenProcess s (Maybe Message)
forall a s. Process a -> GenProcess s a
lift (Process (Maybe Message) -> GenProcess s (Maybe Message))
-> Process (Maybe Message) -> GenProcess s (Maybe Message)
forall a b. (a -> b) -> a -> b
$ TimerKey -> [Match Message] -> Process (Maybe Message)
forall b. TimerKey -> [Match b] -> Process (Maybe b)
receiveTimeout (TimeInterval -> TimerKey
asTimeout TimeInterval
i) [Match Message]
matches

      -- see note [masking async exceptions during recv]
      ((forall a. GenProcess s a -> GenProcess s a)
 -> GenProcess s (ProcessAction s))
-> GenProcess s (ProcessAction s)
forall b.
HasCallStack =>
((forall a. GenProcess s a -> GenProcess s a) -> GenProcess s b)
-> GenProcess s b
forall (m :: * -> *) b.
(MonadMask m, HasCallStack) =>
((forall a. m a -> m a) -> m b) -> m b
mask (((forall a. GenProcess s a -> GenProcess s a)
  -> GenProcess s (ProcessAction s))
 -> GenProcess s (ProcessAction s))
-> ((forall a. GenProcess s a -> GenProcess s a)
    -> GenProcess s (ProcessAction s))
-> GenProcess s (ProcessAction s)
forall a b. (a -> b) -> a -> b
$ \forall a. GenProcess s a -> GenProcess s a
restore -> GenProcess s (Maybe Message)
forall s. GenProcess s (Maybe Message)
recv GenProcess s (Maybe Message)
-> (Maybe Message -> GenProcess s (ProcessAction s))
-> GenProcess s (ProcessAction s)
forall a b.
GenProcess s a -> (a -> GenProcess s b) -> GenProcess s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Maybe Message
r ->
        case Maybe Message
r of
          Maybe Message
Nothing -> GenProcess s (ProcessAction s) -> GenProcess s (ProcessAction s)
forall a. GenProcess s a -> GenProcess s a
restore (GenProcess s (ProcessAction s) -> GenProcess s (ProcessAction s))
-> GenProcess s (ProcessAction s) -> GenProcess s (ProcessAction s)
forall a b. (a -> b) -> a -> b
$ Process (ProcessAction s) -> GenProcess s (ProcessAction s)
forall a s. Process a -> GenProcess s a
lift (Process (ProcessAction s) -> GenProcess s (ProcessAction s))
-> Process (ProcessAction s) -> GenProcess s (ProcessAction s)
forall a b. (a -> b) -> a -> b
$ TimeoutHandler s
hto s
ps Delay
ud
          Just Message
m  -> do
            [DispatchPriority s]
pp <- GenProcess s [DispatchPriority s]
forall s. GenProcess s [DispatchPriority s]
processPriorities
            s -> [DispatchPriority s] -> Message -> GenProcess s ()
forall s. s -> [DispatchPriority s] -> Message -> GenProcess s ()
enqueueMessage s
ps [DispatchPriority s]
pp Message
m
            -- Returning @ProcessSkip@ simply causes us to go back into
            -- listening mode until we hit RecvTimeoutPolicy
            GenProcess s (ProcessAction s) -> GenProcess s (ProcessAction s)
forall a. GenProcess s a -> GenProcess s a
restore (GenProcess s (ProcessAction s) -> GenProcess s (ProcessAction s))
-> GenProcess s (ProcessAction s) -> GenProcess s (ProcessAction s)
forall a b. (a -> b) -> a -> b
$ ProcessAction s -> GenProcess s (ProcessAction s)
forall a. a -> GenProcess s a
forall (m :: * -> *) a. Monad m => a -> m a
return ProcessAction s
forall s. ProcessAction s
ProcessSkip

    mkMatchRunners :: GenProcess s [Match Message]
    mkMatchRunners :: forall s. GenProcess s [Match Message]
mkMatchRunners = do
      TimerMap
ut <- (ProcessState s -> TimerMap) -> GenProcess s TimerMap
forall s a. (ProcessState s -> a) -> GenProcess s a
gets ProcessState s -> TimerMap
forall s. ProcessState s -> TimerMap
usrTimers
      TimerKey -> Process Message
fn <- GenProcess s (TimerKey -> Process Message)
forall s. GenProcess s (TimerKey -> Process Message)
mkRunner
      let ms :: [Match Message]
ms = (TimerKey
 -> (Timer, Message) -> [Match Message] -> [Match Message])
-> [Match Message] -> TimerMap -> [Match Message]
forall k a b. (k -> a -> b -> b) -> b -> Map k a -> b
Map.foldrWithKey (\TimerKey
k (Timer
t, Message
_) [Match Message]
ms' -> [Match Message]
ms' [Match Message] -> [Match Message] -> [Match Message]
forall a. [a] -> [a] -> [a]
++ (TimerKey -> Process Message)
-> TimerKey -> Timer -> [Match Message]
matchRun TimerKey -> Process Message
fn TimerKey
k Timer
t) [] TimerMap
ut
      [Match Message] -> GenProcess s [Match Message]
forall a. a -> GenProcess s a
forall (m :: * -> *) a. Monad m => a -> m a
return [Match Message]
ms

    mkRunner :: GenProcess s (TimerKey -> Process Message)
    mkRunner :: forall s. GenProcess s (TimerKey -> Process Message)
mkRunner = do
      State s
st <- GenProcess s (State s)
forall s (m :: * -> *). MonadState s m => m s
ST.get
      let fn :: TimerKey -> Process Message
fn = \TimerKey
k -> do (Message
m, State s
_) <- State s -> GenProcess s Message -> Process (Message, State s)
forall s a. State s -> GenProcess s a -> Process (a, State s)
runProcess State s
st (TimerKey
-> (Message -> GenProcess s Message) -> GenProcess s Message
forall s a.
TimerKey -> (Message -> GenProcess s a) -> GenProcess s a
consumeTimer TimerKey
k Message -> GenProcess s Message
forall a. a -> GenProcess s a
forall (m :: * -> *) a. Monad m => a -> m a
return)
                        Message -> Process Message
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return Message
m
      (TimerKey -> Process Message)
-> GenProcess s (TimerKey -> Process Message)
forall a. a -> GenProcess s a
forall (m :: * -> *) a. Monad m => a -> m a
return TimerKey -> Process Message
fn

    mkMatchers :: s
                -> ProcessDefinition s
                -> [Match (Either TimedOut Message)]
    mkMatchers :: forall s.
s -> ProcessDefinition s -> [Match (Either TimedOut Message)]
mkMatchers s
st ProcessDefinition s
df =
      (ExternDispatcher s -> Match (Either TimedOut Message))
-> [ExternDispatcher s] -> [Match (Either TimedOut Message)]
forall a b. (a -> b) -> [a] -> [b]
map (UnhandledMessagePolicy
-> s
-> (Message -> Either TimedOut Message)
-> ExternDispatcher s
-> Match (Either TimedOut Message)
forall m s.
UnhandledMessagePolicy
-> s -> (Message -> m) -> ExternDispatcher s -> Match m
forall (d :: * -> *) m s.
ExternMatcher d =>
UnhandledMessagePolicy -> s -> (Message -> m) -> d s -> Match m
matchMapExtern (ProcessDefinition s -> UnhandledMessagePolicy
forall s. ProcessDefinition s -> UnhandledMessagePolicy
unhandledMessagePolicy ProcessDefinition s
df) s
st Message -> Either TimedOut Message
toRight)
          (ProcessDefinition s -> [ExternDispatcher s]
forall s. ProcessDefinition s -> [ExternDispatcher s]
externHandlers ProcessDefinition s
df)

    toRight :: Message -> Either TimedOut Message
    toRight :: Message -> Either TimedOut Message
toRight = Message -> Either TimedOut Message
forall a b. b -> Either a b
Right

-- note [handling async exceptions during non-blocking reads]
-- Our golden rule is that if we've dequeued any kind of Message at all
-- from the process mailbox (or input channels), we must not /lose/ it
-- if an asynchronous exception arrives. We therefore mask  when we perform a
-- non-blocking scan on the mailbox, and whilst we enqueue messages.
--
-- If an initial scan of the mailbox yields no data, we fall back to making
-- a blocking read; See note [masking async exceptions during recv].
--
-- Once messages have been safely moved from the mailbox to our priority queue,
-- we restore the masking state whilst running handlers.
--

-- note [timer handling whilst draining the process' mailbox]
-- To prevent a DOS vector - and quite a likely accidental one at that - we do not
-- sit draining the mailbox indefinitely, since continuous reading would thus
-- leave us unable to process any inputs and we'd eventually run out of memory.
-- Instead, the PrioritisedProcessDefinition holds a RecvTimeoutPolicy which can
-- hold either a max-messages-processed limit or a timeout value. Using whichever
-- policy is provided, drainMessageQueue will stop attempting to receive new mail
-- either once the message count limit is exceeded or the timer expires, at which
-- point we go back to processNext.

-- note [masking async exceptions during recv]
-- Reading the process' mailbox is mask'ed anyway, however this only
-- covers dequeue on the underlying CQueue, such that either before
-- the dequeue takes place, or after (during evaluation of the result,
-- or execution of the discovered @Match@ for the message), we can still
-- be terminated by an asynchronous exception. This is wrong, from the
-- perspective of a managed process, since in the case of an exit signal
-- we might handle the exception, at which point we've dequeued and
-- subsequently lost a message.
--
-- Masking recv then, prevents this from happening, and is relatively
-- safe, because we know the following (having written all the handlers
-- explicitly ourselves):
--
-- 1. each handler does nothing more than return the underlying message
-- 2. in the most complex case, we have @Left . unsafeWrapMessage@ or
--    @fmap Right readSTM thing@ inside of @matchSTM@
-- 3. We should not, therefore, introduce any uninterruptible behaviour
-- 4. We cannot, however, be certain that this holds true for decoding
--    (and subsequent calls into Binary and/or Bytestrings), so at best
--    we can mask, but not uninterruptibleMask
--
-- NB: According to /qnikst/, atomicModifyIORef' does not require us to
-- use uninterruptibleMask anyway, so this is fine...
--

--------------------------------------------------------------------------------
-- Ordinary/Blocking Mailbox Handling                                         --
--------------------------------------------------------------------------------

-- TODO: wrap recvLoop in the same exception handling as precvLoop
--       notably, we need to ensure the shutdownHandler runs even in the face
--       of exceptions, and it would be useful/good IMO to pass an IORef for
--       the state, so we can have a decent LastKnown value for it

-- | Managed process loop.
--
-- Evaluating this function will cause the caller to enter a server loop,
-- constantly reading messages from its mailbox (and/or other supplied control
-- planes) and passing these to handler functions in the supplied process
-- definition. Only when it is determined that the server process should
-- terminate - either by the handlers deciding to stop the process, or by an
-- unhandled exit signal or other form of failure condition (e.g. synchronous or
-- asynchronous exceptions).
--
recvLoop :: ProcessDefinition s -> s -> Delay -> Process ExitReason
recvLoop :: forall s. ProcessDefinition s -> s -> Delay -> Process ExitReason
recvLoop ProcessDefinition s
pDef s
pState Delay
recvDelay =
  let p :: UnhandledMessagePolicy
p             = ProcessDefinition s -> UnhandledMessagePolicy
forall s. ProcessDefinition s -> UnhandledMessagePolicy
unhandledMessagePolicy ProcessDefinition s
pDef
      handleTimeout :: TimeoutHandler s
handleTimeout = ProcessDefinition s -> TimeoutHandler s
forall s. ProcessDefinition s -> TimeoutHandler s
timeoutHandler ProcessDefinition s
pDef
      handleStop :: ShutdownHandler s
handleStop    = ProcessDefinition s -> ShutdownHandler s
forall s. ProcessDefinition s -> ShutdownHandler s
shutdownHandler ProcessDefinition s
pDef
      shutdown' :: Match (ProcessAction s)
shutdown'     = UnhandledMessagePolicy
-> s -> Dispatcher s -> Match (ProcessAction s)
forall s.
UnhandledMessagePolicy
-> s -> Dispatcher s -> Match (ProcessAction s)
forall (d :: * -> *) s.
MessageMatcher d =>
UnhandledMessagePolicy -> s -> d s -> Match (ProcessAction s)
matchDispatch UnhandledMessagePolicy
p s
pState Dispatcher s
forall s. Dispatcher s
shutdownHandler'
      extMatchers :: [Match (ProcessAction s)]
extMatchers   = (ExternDispatcher s -> Match (ProcessAction s))
-> [ExternDispatcher s] -> [Match (ProcessAction s)]
forall a b. (a -> b) -> [a] -> [b]
map (UnhandledMessagePolicy
-> s -> ExternDispatcher s -> Match (ProcessAction s)
forall s.
UnhandledMessagePolicy
-> s -> ExternDispatcher s -> Match (ProcessAction s)
forall (d :: * -> *) s.
MessageMatcher d =>
UnhandledMessagePolicy -> s -> d s -> Match (ProcessAction s)
matchDispatch UnhandledMessagePolicy
p s
pState) (ProcessDefinition s -> [ExternDispatcher s]
forall s. ProcessDefinition s -> [ExternDispatcher s]
externHandlers ProcessDefinition s
pDef)
      matchers :: [Match (ProcessAction s)]
matchers      = [Match (ProcessAction s)]
extMatchers [Match (ProcessAction s)]
-> [Match (ProcessAction s)] -> [Match (ProcessAction s)]
forall a. [a] -> [a] -> [a]
++ ((Dispatcher s -> Match (ProcessAction s))
-> [Dispatcher s] -> [Match (ProcessAction s)]
forall a b. (a -> b) -> [a] -> [b]
map (UnhandledMessagePolicy
-> s -> Dispatcher s -> Match (ProcessAction s)
forall s.
UnhandledMessagePolicy
-> s -> Dispatcher s -> Match (ProcessAction s)
forall (d :: * -> *) s.
MessageMatcher d =>
UnhandledMessagePolicy -> s -> d s -> Match (ProcessAction s)
matchDispatch UnhandledMessagePolicy
p s
pState) (ProcessDefinition s -> [Dispatcher s]
forall s. ProcessDefinition s -> [Dispatcher s]
apiHandlers ProcessDefinition s
pDef))
      ex' :: [ExitSignalDispatcher s]
ex'           = (ExitSignalDispatcher s
forall s. ExitSignalDispatcher s
trapExitExitSignalDispatcher s
-> [ExitSignalDispatcher s] -> [ExitSignalDispatcher s]
forall a. a -> [a] -> [a]
:(ProcessDefinition s -> [ExitSignalDispatcher s]
forall s. ProcessDefinition s -> [ExitSignalDispatcher s]
exitHandlers ProcessDefinition s
pDef))
      ms' :: [Match (ProcessAction s)]
ms' = (Match (ProcessAction s)
shutdown'Match (ProcessAction s)
-> [Match (ProcessAction s)] -> [Match (ProcessAction s)]
forall a. a -> [a] -> [a]
:[Match (ProcessAction s)]
matchers) [Match (ProcessAction s)]
-> [Match (ProcessAction s)] -> [Match (ProcessAction s)]
forall a. [a] -> [a] -> [a]
++ UnhandledMessagePolicy
-> s -> [DeferredDispatcher s] -> [Match (ProcessAction s)]
forall s.
UnhandledMessagePolicy
-> s -> [DeferredDispatcher s] -> [Match (ProcessAction s)]
matchAux UnhandledMessagePolicy
p s
pState (ProcessDefinition s -> [DeferredDispatcher s]
forall s. ProcessDefinition s -> [DeferredDispatcher s]
infoHandlers ProcessDefinition s
pDef)
  in do
    ProcessAction s
ac <- Process (ProcessAction s)
-> [ProcessId -> Message -> Process (Maybe (ProcessAction s))]
-> Process (ProcessAction s)
forall b.
Process b
-> [ProcessId -> Message -> Process (Maybe b)] -> Process b
catchesExit ([Match (ProcessAction s)] -> TimeoutHandler s -> TimeoutHandler s
forall s.
[Match (ProcessAction s)] -> TimeoutHandler s -> TimeoutHandler s
processReceive [Match (ProcessAction s)]
ms' TimeoutHandler s
handleTimeout s
pState Delay
recvDelay)
                      ((ExitSignalDispatcher s
 -> ProcessId -> Message -> Process (Maybe (ProcessAction s)))
-> [ExitSignalDispatcher s]
-> [ProcessId -> Message -> Process (Maybe (ProcessAction s))]
forall a b. (a -> b) -> [a] -> [b]
map (\ExitSignalDispatcher s
d' -> (ExitSignalDispatcher s
-> s -> ProcessId -> Message -> Process (Maybe (ProcessAction s))
forall s.
ExitSignalDispatcher s
-> s -> ProcessId -> Message -> Process (Maybe (ProcessAction s))
dispatchExit ExitSignalDispatcher s
d') s
pState) [ExitSignalDispatcher s]
ex')
    case ProcessAction s
ac of
        ProcessAction s
ProcessSkip               -> ProcessDefinition s -> s -> Delay -> Process ExitReason
forall s. ProcessDefinition s -> s -> Delay -> Process ExitReason
recvLoop ProcessDefinition s
pDef s
pState Delay
recvDelay -- TODO: handle differently...
        (ProcessContinue s
s')      -> ProcessDefinition s -> s -> Delay -> Process ExitReason
forall s. ProcessDefinition s -> s -> Delay -> Process ExitReason
recvLoop ProcessDefinition s
pDef s
s' Delay
recvDelay
        (ProcessTimeout Delay
t' s
s')    -> ProcessDefinition s -> s -> Delay -> Process ExitReason
forall s. ProcessDefinition s -> s -> Delay -> Process ExitReason
recvLoop ProcessDefinition s
pDef s
s' Delay
t'
        (ProcessHibernate TimeInterval
d' s
s')  -> TimeInterval -> Process ()
block TimeInterval
d' Process () -> Process ExitReason -> Process ExitReason
forall a b. Process a -> Process b -> Process b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ProcessDefinition s -> s -> Delay -> Process ExitReason
forall s. ProcessDefinition s -> s -> Delay -> Process ExitReason
recvLoop ProcessDefinition s
pDef s
s' Delay
recvDelay
        (ProcessStop ExitReason
r) -> ShutdownHandler s
handleStop (s -> ExitState s
forall s. s -> ExitState s
LastKnown s
pState) ExitReason
r Process () -> Process ExitReason -> Process ExitReason
forall a b. Process a -> Process b -> Process b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ExitReason -> Process ExitReason
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (ExitReason
r :: ExitReason)
        (ProcessStopping s
s' ExitReason
r)    -> ShutdownHandler s
handleStop (s -> ExitState s
forall s. s -> ExitState s
LastKnown s
s') ExitReason
r Process () -> Process ExitReason -> Process ExitReason
forall a b. Process a -> Process b -> Process b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ExitReason -> Process ExitReason
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (ExitReason
r :: ExitReason)
        (ProcessBecome   ProcessDefinition s
d' s
s')   -> ProcessDefinition s -> s -> Delay -> Process ExitReason
forall s. ProcessDefinition s -> s -> Delay -> Process ExitReason
recvLoop ProcessDefinition s
d' s
s' Delay
recvDelay
        (ProcessActivity GenProcess s ()
_)       -> String -> Process ExitReason
forall a b. Serializable a => a -> Process b
die (String -> Process ExitReason) -> String -> Process ExitReason
forall a b. (a -> b) -> a -> b
$ String
"recvLoop.InvalidState - ProcessActivityNotSupported"
        (ProcessExpression GenProcess s (ProcessAction s)
_)     -> String -> Process ExitReason
forall a b. Serializable a => a -> Process b
die (String -> Process ExitReason) -> String -> Process ExitReason
forall a b. (a -> b) -> a -> b
$ String
"recvLoop.InvalidState - ProcessExpressionNotSupported"
  where
    matchAux :: UnhandledMessagePolicy
             -> s
             -> [DeferredDispatcher s]
             -> [Match (ProcessAction s)]
    matchAux :: forall s.
UnhandledMessagePolicy
-> s -> [DeferredDispatcher s] -> [Match (ProcessAction s)]
matchAux UnhandledMessagePolicy
p s
ps [DeferredDispatcher s]
ds = [(Message -> Process (ProcessAction s)) -> Match (ProcessAction s)
forall b. (Message -> Process b) -> Match b
matchAny ((Message -> Process (ProcessAction s))
-> s
-> [DeferredDispatcher s]
-> Message
-> Process (ProcessAction s)
forall s.
(Message -> Process (ProcessAction s))
-> s
-> [DeferredDispatcher s]
-> Message
-> Process (ProcessAction s)
auxHandler (UnhandledMessagePolicy -> s -> Message -> Process (ProcessAction s)
forall s.
UnhandledMessagePolicy -> s -> Message -> Process (ProcessAction s)
applyPolicy UnhandledMessagePolicy
p s
ps) s
ps [DeferredDispatcher s]
ds)]

    auxHandler :: (Message -> Process (ProcessAction s))
               -> s
               -> [DeferredDispatcher s]
               -> Message
               -> Process (ProcessAction s)
    auxHandler :: forall s.
(Message -> Process (ProcessAction s))
-> s
-> [DeferredDispatcher s]
-> Message
-> Process (ProcessAction s)
auxHandler Message -> Process (ProcessAction s)
policy s
_  [] Message
msg = Message -> Process (ProcessAction s)
policy Message
msg
    auxHandler Message -> Process (ProcessAction s)
policy s
st (DeferredDispatcher s
d:[DeferredDispatcher s]
ds :: [DeferredDispatcher s]) Message
msg
      | [DeferredDispatcher s] -> TimerKey
forall a. [a] -> TimerKey
forall (t :: * -> *) a. Foldable t => t a -> TimerKey
length [DeferredDispatcher s]
ds TimerKey -> TimerKey -> Bool
forall a. Ord a => a -> a -> Bool
> TimerKey
0  = let dh :: s -> Message -> Process (Maybe (ProcessAction s))
dh = DeferredDispatcher s
-> s -> Message -> Process (Maybe (ProcessAction s))
forall s.
DeferredDispatcher s
-> s -> Message -> Process (Maybe (ProcessAction s))
dispatchInfo DeferredDispatcher s
d in do
        -- NB: we *do not* want to terminate/dead-letter messages until
        -- we've exhausted all the possible info handlers
        Maybe (ProcessAction s)
m <- s -> Message -> Process (Maybe (ProcessAction s))
dh s
st Message
msg
        case Maybe (ProcessAction s)
m of
          Maybe (ProcessAction s)
Nothing   -> (Message -> Process (ProcessAction s))
-> s
-> [DeferredDispatcher s]
-> Message
-> Process (ProcessAction s)
forall s.
(Message -> Process (ProcessAction s))
-> s
-> [DeferredDispatcher s]
-> Message
-> Process (ProcessAction s)
auxHandler Message -> Process (ProcessAction s)
policy s
st [DeferredDispatcher s]
ds Message
msg
          Just ProcessAction s
act' -> ProcessAction s -> Process (ProcessAction s)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ProcessAction s
act'
        -- but here we *do* let the policy kick in
      | Bool
otherwise = let dh :: s -> Message -> Process (Maybe (ProcessAction s))
dh = DeferredDispatcher s
-> s -> Message -> Process (Maybe (ProcessAction s))
forall s.
DeferredDispatcher s
-> s -> Message -> Process (Maybe (ProcessAction s))
dispatchInfo DeferredDispatcher s
d in do
        Maybe (ProcessAction s)
m <- s -> Message -> Process (Maybe (ProcessAction s))
dh s
st Message
msg
        case Maybe (ProcessAction s)
m of
          Maybe (ProcessAction s)
Nothing   -> Message -> Process (ProcessAction s)
policy Message
msg
          Just ProcessAction s
act' -> ProcessAction s -> Process (ProcessAction s)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ProcessAction s
act'

    processReceive :: [Match (ProcessAction s)]
                   -> TimeoutHandler s
                   -> s
                   -> Delay
                   -> Process (ProcessAction s)
    processReceive :: forall s.
[Match (ProcessAction s)] -> TimeoutHandler s -> TimeoutHandler s
processReceive [Match (ProcessAction s)]
ms TimeoutHandler s
handleTimeout s
st Delay
d = do
      Maybe (ProcessAction s)
next <- [Match (ProcessAction s)]
-> Delay -> Process (Maybe (ProcessAction s))
forall s.
[Match (ProcessAction s)]
-> Delay -> Process (Maybe (ProcessAction s))
recv [Match (ProcessAction s)]
ms Delay
d
      case Maybe (ProcessAction s)
next of
        Maybe (ProcessAction s)
Nothing -> TimeoutHandler s
handleTimeout s
st Delay
d
        Just ProcessAction s
pa -> ProcessAction s -> Process (ProcessAction s)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ProcessAction s
pa

    recv :: [Match (ProcessAction s)]
         -> Delay
         -> Process (Maybe (ProcessAction s))
    recv :: forall s.
[Match (ProcessAction s)]
-> Delay -> Process (Maybe (ProcessAction s))
recv [Match (ProcessAction s)]
matches Delay
d' =
      case Delay
d' of
        Delay
Infinity -> [Match (ProcessAction s)] -> Process (ProcessAction s)
forall b. [Match b] -> Process b
receiveWait [Match (ProcessAction s)]
matches Process (ProcessAction s)
-> (ProcessAction s -> Process (Maybe (ProcessAction s)))
-> Process (Maybe (ProcessAction s))
forall a b. Process a -> (a -> Process b) -> Process b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Maybe (ProcessAction s) -> Process (Maybe (ProcessAction s))
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (ProcessAction s) -> Process (Maybe (ProcessAction s)))
-> (ProcessAction s -> Maybe (ProcessAction s))
-> ProcessAction s
-> Process (Maybe (ProcessAction s))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProcessAction s -> Maybe (ProcessAction s)
forall a. a -> Maybe a
Just
        Delay
NoDelay  -> TimerKey
-> [Match (ProcessAction s)] -> Process (Maybe (ProcessAction s))
forall b. TimerKey -> [Match b] -> Process (Maybe b)
receiveTimeout TimerKey
0 [Match (ProcessAction s)]
matches
        Delay TimeInterval
t' -> TimerKey
-> [Match (ProcessAction s)] -> Process (Maybe (ProcessAction s))
forall b. TimerKey -> [Match b] -> Process (Maybe b)
receiveTimeout (TimeInterval -> TimerKey
asTimeout TimeInterval
t') [Match (ProcessAction s)]
matches

--------------------------------------------------------------------------------
-- Utilities                                                                  --
--------------------------------------------------------------------------------

-- an explicit 'cast' giving 'Shutdown' will stop the server gracefully
shutdownHandler' :: Dispatcher s
shutdownHandler' :: forall s. Dispatcher s
shutdownHandler' = CastHandler s Shutdown -> Dispatcher s
forall a s. Serializable a => CastHandler s a -> Dispatcher s
handleCast (\s
_ Shutdown
Shutdown -> ExitReason -> Action s
forall s. ExitReason -> Action s
stop (ExitReason -> Action s) -> ExitReason -> Action s
forall a b. (a -> b) -> a -> b
$ ExitReason
ExitNormal)

-- @(ProcessExitException from ExitShutdown)@ will stop the server gracefully
trapExit :: ExitSignalDispatcher s
trapExit :: forall s. ExitSignalDispatcher s
trapExit = (s -> ExitReason -> Bool)
-> (ProcessId -> ActionHandler s ExitReason)
-> ExitSignalDispatcher s
forall s a.
Serializable a =>
(s -> a -> Bool)
-> (ProcessId -> ActionHandler s a) -> ExitSignalDispatcher s
handleExitIf (\s
_ ExitReason
e -> ExitReason
e ExitReason -> ExitReason -> Bool
forall a. Eq a => a -> a -> Bool
== ExitReason
ExitShutdown)
                        (\ProcessId
_ s
_ (ExitReason
r :: ExitReason) -> ExitReason -> Action s
forall s. ExitReason -> Action s
stop ExitReason
r)

block :: TimeInterval -> Process ()
block :: TimeInterval -> Process ()
block TimeInterval
i =
  Process (Maybe ()) -> Process ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Process (Maybe ()) -> Process ())
-> Process (Maybe ()) -> Process ()
forall a b. (a -> b) -> a -> b
$ TimerKey -> [Match ()] -> Process (Maybe ())
forall b. TimerKey -> [Match b] -> Process (Maybe b)
receiveTimeout (TimeInterval -> TimerKey
asTimeout TimeInterval
i) [ (TimedOut -> Process ()) -> Match ()
forall a b. Serializable a => (a -> Process b) -> Match b
match (\(TimedOut
_ :: TimedOut) -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()) ]

applyPolicy :: UnhandledMessagePolicy
            -> s
            -> Message
            -> Process (ProcessAction s)
applyPolicy :: forall s.
UnhandledMessagePolicy -> s -> Message -> Process (ProcessAction s)
applyPolicy UnhandledMessagePolicy
p s
s Message
m =
  case UnhandledMessagePolicy
p of
    UnhandledMessagePolicy
Terminate      -> ExitReason -> Process (ProcessAction s)
forall s. ExitReason -> Action s
stop (ExitReason -> Process (ProcessAction s))
-> ExitReason -> Process (ProcessAction s)
forall a b. (a -> b) -> a -> b
$ String -> ExitReason
ExitOther String
"UnhandledInput"
    DeadLetter ProcessId
pid -> Message -> ProcessId -> Process ()
forward Message
m ProcessId
pid Process ()
-> Process (ProcessAction s) -> Process (ProcessAction s)
forall a b. Process a -> Process b -> Process b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> s -> Process (ProcessAction s)
forall s. s -> Action s
continue s
s
    UnhandledMessagePolicy
Drop           -> s -> Process (ProcessAction s)
forall s. s -> Action s
continue s
s
    UnhandledMessagePolicy
Log            -> Process ()
logIt Process ()
-> Process (ProcessAction s) -> Process (ProcessAction s)
forall a b. Process a -> Process b -> Process b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> s -> Process (ProcessAction s)
forall s. s -> Action s
continue s
s
  where
    logIt :: Process ()
logIt =
      (LogChan -> String -> Process ())
-> LogChan -> String -> Process ()
forall l.
Logger l =>
(l -> String -> Process ()) -> l -> String -> Process ()
Log.report LogChan -> String -> Process ()
forall l m.
(Logger l, Serializable m, ToLog m) =>
l -> m -> Process ()
Log.info LogChan
Log.logChannel (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$ String
"Unhandled Gen Input Message: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ (Message -> String
forall a. Show a => a -> String
show Message
m)