{-# LANGUAGE CPP #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE DeriveFunctor #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE BangPatterns #-}
module Control.Distributed.Process.Internal.Primitives
(
send
, usend
, expect
, newChan
, sendChan
, receiveChan
, mergePortsBiased
, mergePortsRR
, unsafeSend
, unsafeUSend
, unsafeSendChan
, unsafeNSend
, unsafeNSendRemote
, Match
, receiveWait
, receiveTimeout
, match
, matchIf
, matchUnknown
, matchAny
, matchAnyIf
, matchChan
, matchSTM
, matchMessage
, matchMessageIf
, isEncoded
, wrapMessage
, unsafeWrapMessage
, unwrapMessage
, handleMessage
, handleMessageIf
, handleMessage_
, handleMessageIf_
, forward
, uforward
, delegate
, relay
, proxy
, terminate
, ProcessTerminationException(..)
, die
, kill
, exit
, catchExit
, catchesExit
, ProcessExitException()
, getSelfPid
, getSelfNode
, ProcessInfo(..)
, getProcessInfo
, NodeStats(..)
, getNodeStats
, getLocalNodeStats
, link
, unlink
, monitor
, unmonitor
, unmonitorAsync
, withMonitor
, withMonitor_
, SayMessage(..)
, say
, register
, reregister
, unregister
, whereis
, nsend
, registerRemoteAsync
, reregisterRemoteAsync
, unregisterRemoteAsync
, whereisRemoteAsync
, nsendRemote
, unClosure
, unStatic
, catch
, Handler(..)
, catches
, try
, mask
, mask_
, onException
, bracket
, bracket_
, finally
, expectTimeout
, receiveChanTimeout
, spawnAsync
, linkNode
, linkPort
, unlinkNode
, unlinkPort
, monitorNode
, monitorPort
, reconnect
, reconnectPort
, sendCtrlMsg
) where
import Data.Binary (Binary(..), Put, Get, decode)
import Data.Time.Clock (getCurrentTime, UTCTime(..))
import Data.Time.Calendar (Day(..))
import Data.Time.Format (formatTime)
import Data.Time.Format (defaultTimeLocale)
import System.Timeout (timeout)
import Control.Monad (when, void)
import Control.Monad.Reader (ask)
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Catch
( Exception
, SomeException
, throwM
, fromException
)
import qualified Control.Monad.Catch as Catch
import Control.Applicative
import Control.Distributed.Process.Internal.StrictMVar
( StrictMVar
, modifyMVar
, modifyMVar_
)
import Control.Concurrent.STM
( STM
, TVar
, atomically
, orElse
, newTVar
, readTVar
, writeTVar
)
import Control.Distributed.Process.Internal.CQueue
( dequeue
, BlockSpec(..)
, MatchOn(..)
)
import Control.Distributed.Process.Serializable (Serializable, fingerprint)
import Data.Accessor ((^.), (^:), (^=))
import Control.Distributed.Static
( Static
, Closure
)
import Data.Rank1Typeable (Typeable)
import qualified Control.Distributed.Static as Static (unstatic, unclosure)
import qualified Control.Distributed.Process.UnsafePrimitives as Unsafe
import Control.Distributed.Process.Internal.Types
( NodeId(..)
, ProcessId(..)
, LocalNode(..)
, LocalProcess(..)
, Process(..)
, Message(..)
, MonitorRef(..)
, SpawnRef(..)
, ProcessSignal(..)
, NodeMonitorNotification(..)
, ProcessMonitorNotification(..)
, monitorCounter
, spawnCounter
, SendPort(..)
, ReceivePort(..)
, channelCounter
, typedChannelWithId
, TypedChannel(..)
, SendPortId(..)
, Identifier(..)
, ProcessExitException(..)
, DiedReason(..)
, DidUnmonitor(..)
, DidUnlinkProcess(..)
, DidUnlinkNode(..)
, DidUnlinkPort(..)
, WhereIsReply(..)
, RegisterReply(..)
, ProcessRegistrationException(..)
, ProcessInfo(..)
, ProcessInfoNone(..)
, NodeStats(..)
, isEncoded
, createMessage
, createUnencodedMessage
, ImplicitReconnect( NoImplicitReconnect)
, LocalProcessState
, LocalSendPortId
, messageToPayload
)
import Control.Distributed.Process.Internal.Messaging
( sendMessage
, sendBinary
, sendPayload
, disconnect
, sendCtrlMsg
)
import Control.Distributed.Process.Management.Internal.Types
( MxEvent(..)
)
import Control.Distributed.Process.Management.Internal.Trace.Types
( traceEvent
)
import Control.Distributed.Process.Internal.WeakTQueue
( newTQueueIO
, readTQueue
, mkWeakTQueue
)
import Prelude
import Unsafe.Coerce
send :: Serializable a => ProcessId -> a -> Process ()
send :: forall a. Serializable a => ProcessId -> a -> Process ()
send ProcessId
them a
msg = do
LocalProcess
proc <- Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
let us :: ProcessId
us = LocalProcess -> ProcessId
processId LocalProcess
proc
node :: LocalNode
node = LocalProcess -> LocalNode
processNode LocalProcess
proc
nodeId :: NodeId
nodeId = LocalNode -> NodeId
localNodeId LocalNode
node
destNode :: NodeId
destNode = (ProcessId -> NodeId
processNodeId ProcessId
them)
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ MxEventBus -> MxEvent -> IO ()
traceEvent (LocalNode -> MxEventBus
localEventBus LocalNode
node)
(ProcessId -> ProcessId -> Message -> MxEvent
MxSent ProcessId
them ProcessId
us (a -> Message
forall a. Serializable a => a -> Message
createUnencodedMessage a
msg))
if NodeId
destNode NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== NodeId
nodeId
then ProcessId -> a -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
sendLocal ProcessId
them a
msg
else IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ LocalNode
-> Identifier -> Identifier -> ImplicitReconnect -> a -> IO ()
forall a.
Serializable a =>
LocalNode
-> Identifier -> Identifier -> ImplicitReconnect -> a -> IO ()
sendMessage (LocalProcess -> LocalNode
processNode LocalProcess
proc)
(ProcessId -> Identifier
ProcessIdentifier (LocalProcess -> ProcessId
processId LocalProcess
proc))
(ProcessId -> Identifier
ProcessIdentifier ProcessId
them)
ImplicitReconnect
NoImplicitReconnect
a
msg
unsafeSend :: Serializable a => ProcessId -> a -> Process ()
unsafeSend :: forall a. Serializable a => ProcessId -> a -> Process ()
unsafeSend = ProcessId -> a -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
Unsafe.send
usend :: Serializable a => ProcessId -> a -> Process ()
usend :: forall a. Serializable a => ProcessId -> a -> Process ()
usend ProcessId
them a
msg = do
LocalProcess
proc <- Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
let there :: NodeId
there = ProcessId -> NodeId
processNodeId ProcessId
them
let (ProcessId
us, LocalNode
node) = (LocalProcess -> ProcessId
processId LocalProcess
proc, LocalProcess -> LocalNode
processNode LocalProcess
proc)
let msg' :: Message
msg' = a -> Message
forall a. Serializable a => a -> Message
wrapMessage a
msg
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ MxEventBus -> MxEvent -> IO ()
traceEvent (LocalNode -> MxEventBus
localEventBus LocalNode
node) (ProcessId -> ProcessId -> Message -> MxEvent
MxSent ProcessId
them ProcessId
us Message
msg')
if LocalNode -> NodeId
localNodeId (LocalProcess -> LocalNode
processNode LocalProcess
proc) NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== NodeId
there
then ProcessId -> a -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
sendLocal ProcessId
them a
msg
else Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg (NodeId -> Maybe NodeId
forall a. a -> Maybe a
Just NodeId
there) (ProcessSignal -> Process ()) -> ProcessSignal -> Process ()
forall a b. (a -> b) -> a -> b
$ LocalProcessId -> Message -> ProcessSignal
UnreliableSend (ProcessId -> LocalProcessId
processLocalId ProcessId
them)
(a -> Message
forall a. Serializable a => a -> Message
createMessage a
msg)
unsafeUSend :: Serializable a => ProcessId -> a -> Process ()
unsafeUSend :: forall a. Serializable a => ProcessId -> a -> Process ()
unsafeUSend = ProcessId -> a -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
Unsafe.usend
expect :: forall a. Serializable a => Process a
expect :: forall a. Serializable a => Process a
expect = [Match a] -> Process a
forall b. [Match b] -> Process b
receiveWait [(a -> Process a) -> Match a
forall a b. Serializable a => (a -> Process b) -> Match b
match a -> Process a
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return]
newChan :: Serializable a => Process (SendPort a, ReceivePort a)
newChan :: forall a. Serializable a => Process (SendPort a, ReceivePort a)
newChan = do
LocalProcess
proc <- Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
IO (SendPort a, ReceivePort a)
-> Process (SendPort a, ReceivePort a)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (SendPort a, ReceivePort a)
-> Process (SendPort a, ReceivePort a))
-> ((LocalProcessState
-> IO (LocalProcessState, (SendPort a, ReceivePort a)))
-> IO (SendPort a, ReceivePort a))
-> (LocalProcessState
-> IO (LocalProcessState, (SendPort a, ReceivePort a)))
-> Process (SendPort a, ReceivePort a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StrictMVar LocalProcessState
-> (LocalProcessState
-> IO (LocalProcessState, (SendPort a, ReceivePort a)))
-> IO (SendPort a, ReceivePort a)
forall a b. StrictMVar a -> (a -> IO (a, b)) -> IO b
modifyMVar (LocalProcess -> StrictMVar LocalProcessState
processState LocalProcess
proc) ((LocalProcessState
-> IO (LocalProcessState, (SendPort a, ReceivePort a)))
-> Process (SendPort a, ReceivePort a))
-> (LocalProcessState
-> IO (LocalProcessState, (SendPort a, ReceivePort a)))
-> Process (SendPort a, ReceivePort a)
forall a b. (a -> b) -> a -> b
$ \LocalProcessState
st -> do
let lcid :: Int32
lcid = LocalProcessState
st LocalProcessState -> T LocalProcessState Int32 -> Int32
forall r a. r -> T r a -> a
^. T LocalProcessState Int32
channelCounter
let cid :: SendPortId
cid = SendPortId { sendPortProcessId :: ProcessId
sendPortProcessId = LocalProcess -> ProcessId
processId LocalProcess
proc
, sendPortLocalId :: Int32
sendPortLocalId = Int32
lcid
}
let sport :: SendPort a
sport = SendPortId -> SendPort a
forall a. SendPortId -> SendPort a
SendPort SendPortId
cid
TQueue a
chan <- IO (TQueue a) -> IO (TQueue a)
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (TQueue a)
forall a. IO (TQueue a)
newTQueueIO
Weak (TQueue a)
chan' <- TQueue a -> IO () -> IO (Weak (TQueue a))
forall a. TQueue a -> IO () -> IO (Weak (TQueue a))
mkWeakTQueue TQueue a
chan (IO () -> IO (Weak (TQueue a))) -> IO () -> IO (Weak (TQueue a))
forall a b. (a -> b) -> a -> b
$ StrictMVar LocalProcessState -> Int32 -> IO ()
finalizer (LocalProcess -> StrictMVar LocalProcessState
processState LocalProcess
proc) Int32
lcid
let rport :: ReceivePort a
rport = STM a -> ReceivePort a
forall a. STM a -> ReceivePort a
ReceivePort (STM a -> ReceivePort a) -> STM a -> ReceivePort a
forall a b. (a -> b) -> a -> b
$ TQueue a -> STM a
forall a. TQueue a -> STM a
readTQueue TQueue a
chan
let tch :: TypedChannel
tch = Weak (TQueue a) -> TypedChannel
forall a. Serializable a => Weak (TQueue a) -> TypedChannel
TypedChannel Weak (TQueue a)
chan'
(LocalProcessState, (SendPort a, ReceivePort a))
-> IO (LocalProcessState, (SendPort a, ReceivePort a))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ( (T LocalProcessState Int32
channelCounter T LocalProcessState Int32
-> (Int32 -> Int32) -> LocalProcessState -> LocalProcessState
forall r a. T r a -> (a -> a) -> r -> r
^: (Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
+ Int32
1))
(LocalProcessState -> LocalProcessState)
-> (LocalProcessState -> LocalProcessState)
-> LocalProcessState
-> LocalProcessState
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Int32 -> Accessor LocalProcessState (Maybe TypedChannel)
typedChannelWithId Int32
lcid Accessor LocalProcessState (Maybe TypedChannel)
-> Maybe TypedChannel -> LocalProcessState -> LocalProcessState
forall r a. T r a -> a -> r -> r
^= TypedChannel -> Maybe TypedChannel
forall a. a -> Maybe a
Just TypedChannel
tch)
(LocalProcessState -> LocalProcessState)
-> LocalProcessState -> LocalProcessState
forall a b. (a -> b) -> a -> b
$ LocalProcessState
st
, (SendPort a
forall {a}. SendPort a
sport, ReceivePort a
rport)
)
where
finalizer :: StrictMVar LocalProcessState -> LocalSendPortId -> IO ()
finalizer :: StrictMVar LocalProcessState -> Int32 -> IO ()
finalizer StrictMVar LocalProcessState
st Int32
lcid = StrictMVar LocalProcessState
-> (LocalProcessState -> IO LocalProcessState) -> IO ()
forall a. StrictMVar a -> (a -> IO a) -> IO ()
modifyMVar_ StrictMVar LocalProcessState
st ((LocalProcessState -> IO LocalProcessState) -> IO ())
-> (LocalProcessState -> IO LocalProcessState) -> IO ()
forall a b. (a -> b) -> a -> b
$
LocalProcessState -> IO LocalProcessState
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (LocalProcessState -> IO LocalProcessState)
-> (LocalProcessState -> LocalProcessState)
-> LocalProcessState
-> IO LocalProcessState
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Int32 -> Accessor LocalProcessState (Maybe TypedChannel)
typedChannelWithId Int32
lcid Accessor LocalProcessState (Maybe TypedChannel)
-> Maybe TypedChannel -> LocalProcessState -> LocalProcessState
forall r a. T r a -> a -> r -> r
^= Maybe TypedChannel
forall a. Maybe a
Nothing)
sendChan :: Serializable a => SendPort a -> a -> Process ()
sendChan :: forall a. Serializable a => SendPort a -> a -> Process ()
sendChan (SendPort SendPortId
cid) a
msg = do
LocalProcess
proc <- Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
let node :: LocalNode
node = LocalProcess -> LocalNode
processNode LocalProcess
proc
pid :: ProcessId
pid = LocalProcess -> ProcessId
processId LocalProcess
proc
us :: NodeId
us = LocalNode -> NodeId
localNodeId LocalNode
node
them :: NodeId
them = ProcessId -> NodeId
processNodeId (SendPortId -> ProcessId
sendPortProcessId SendPortId
cid)
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ MxEventBus -> MxEvent -> IO ()
traceEvent (LocalNode -> MxEventBus
localEventBus LocalNode
node) (ProcessId -> SendPortId -> Message -> MxEvent
MxSentToPort ProcessId
pid SendPortId
cid (Message -> MxEvent) -> Message -> MxEvent
forall a b. (a -> b) -> a -> b
$ a -> Message
forall a. Serializable a => a -> Message
wrapMessage a
msg)
case NodeId
them NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== NodeId
us of
Bool
True -> SendPortId -> a -> Process ()
forall a. Serializable a => SendPortId -> a -> Process ()
sendChanLocal SendPortId
cid a
msg
Bool
False -> do
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ LocalNode
-> Identifier -> Identifier -> ImplicitReconnect -> a -> IO ()
forall a.
Binary a =>
LocalNode
-> Identifier -> Identifier -> ImplicitReconnect -> a -> IO ()
sendBinary LocalNode
node
(ProcessId -> Identifier
ProcessIdentifier ProcessId
pid)
(SendPortId -> Identifier
SendPortIdentifier SendPortId
cid)
ImplicitReconnect
NoImplicitReconnect
a
msg
unsafeSendChan :: Serializable a => SendPort a -> a -> Process ()
unsafeSendChan :: forall a. Serializable a => SendPort a -> a -> Process ()
unsafeSendChan = SendPort a -> a -> Process ()
forall a. Serializable a => SendPort a -> a -> Process ()
Unsafe.sendChan
receiveChan :: Serializable a => ReceivePort a -> Process a
receiveChan :: forall a. Serializable a => ReceivePort a -> Process a
receiveChan = IO a -> Process a
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO a -> Process a)
-> (ReceivePort a -> IO a) -> ReceivePort a -> Process a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM a -> IO a
forall a. STM a -> IO a
atomically (STM a -> IO a)
-> (ReceivePort a -> STM a) -> ReceivePort a -> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ReceivePort a -> STM a
forall a. ReceivePort a -> STM a
receiveSTM
receiveChanTimeout :: Serializable a => Int -> ReceivePort a -> Process (Maybe a)
receiveChanTimeout :: forall a.
Serializable a =>
Int -> ReceivePort a -> Process (Maybe a)
receiveChanTimeout Int
0 ReceivePort a
ch = IO (Maybe a) -> Process (Maybe a)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe a) -> Process (Maybe a))
-> (STM (Maybe a) -> IO (Maybe a))
-> STM (Maybe a)
-> Process (Maybe a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (Maybe a) -> IO (Maybe a)
forall a. STM a -> IO a
atomically (STM (Maybe a) -> Process (Maybe a))
-> STM (Maybe a) -> Process (Maybe a)
forall a b. (a -> b) -> a -> b
$
(a -> Maybe a
forall a. a -> Maybe a
Just (a -> Maybe a) -> STM a -> STM (Maybe a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ReceivePort a -> STM a
forall a. ReceivePort a -> STM a
receiveSTM ReceivePort a
ch) STM (Maybe a) -> STM (Maybe a) -> STM (Maybe a)
forall a. STM a -> STM a -> STM a
`orElse` Maybe a -> STM (Maybe a)
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
receiveChanTimeout Int
n ReceivePort a
ch = IO (Maybe a) -> Process (Maybe a)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe a) -> Process (Maybe a))
-> (STM a -> IO (Maybe a)) -> STM a -> Process (Maybe a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> IO a -> IO (Maybe a)
forall a. Int -> IO a -> IO (Maybe a)
timeout Int
n (IO a -> IO (Maybe a)) -> (STM a -> IO a) -> STM a -> IO (Maybe a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM a -> IO a
forall a. STM a -> IO a
atomically (STM a -> Process (Maybe a)) -> STM a -> Process (Maybe a)
forall a b. (a -> b) -> a -> b
$
ReceivePort a -> STM a
forall a. ReceivePort a -> STM a
receiveSTM ReceivePort a
ch
mergePortsBiased :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)
mergePortsBiased :: forall a.
Serializable a =>
[ReceivePort a] -> Process (ReceivePort a)
mergePortsBiased = ReceivePort a -> Process (ReceivePort a)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (ReceivePort a -> Process (ReceivePort a))
-> ([ReceivePort a] -> ReceivePort a)
-> [ReceivePort a]
-> Process (ReceivePort a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM a -> ReceivePort a
forall a. STM a -> ReceivePort a
ReceivePort(STM a -> ReceivePort a)
-> ([ReceivePort a] -> STM a) -> [ReceivePort a] -> ReceivePort a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (STM a -> STM a -> STM a) -> [STM a] -> STM a
forall a. (a -> a -> a) -> [a] -> a
forall (t :: * -> *) a. Foldable t => (a -> a -> a) -> t a -> a
foldr1 STM a -> STM a -> STM a
forall a. STM a -> STM a -> STM a
orElse ([STM a] -> STM a)
-> ([ReceivePort a] -> [STM a]) -> [ReceivePort a] -> STM a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ReceivePort a -> STM a) -> [ReceivePort a] -> [STM a]
forall a b. (a -> b) -> [a] -> [b]
map ReceivePort a -> STM a
forall a. ReceivePort a -> STM a
receiveSTM
mergePortsRR :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)
mergePortsRR :: forall a.
Serializable a =>
[ReceivePort a] -> Process (ReceivePort a)
mergePortsRR = \[ReceivePort a]
ps -> do
TVar [STM a]
psVar <- IO (TVar [STM a]) -> Process (TVar [STM a])
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (TVar [STM a]) -> Process (TVar [STM a]))
-> (STM (TVar [STM a]) -> IO (TVar [STM a]))
-> STM (TVar [STM a])
-> Process (TVar [STM a])
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (TVar [STM a]) -> IO (TVar [STM a])
forall a. STM a -> IO a
atomically (STM (TVar [STM a]) -> Process (TVar [STM a]))
-> STM (TVar [STM a]) -> Process (TVar [STM a])
forall a b. (a -> b) -> a -> b
$ [STM a] -> STM (TVar [STM a])
forall a. a -> STM (TVar a)
newTVar ((ReceivePort a -> STM a) -> [ReceivePort a] -> [STM a]
forall a b. (a -> b) -> [a] -> [b]
map ReceivePort a -> STM a
forall a. ReceivePort a -> STM a
receiveSTM [ReceivePort a]
ps)
ReceivePort a -> Process (ReceivePort a)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (ReceivePort a -> Process (ReceivePort a))
-> ReceivePort a -> Process (ReceivePort a)
forall a b. (a -> b) -> a -> b
$ STM a -> ReceivePort a
forall a. STM a -> ReceivePort a
ReceivePort (TVar [STM a] -> STM a
forall a. TVar [STM a] -> STM a
rr TVar [STM a]
psVar)
where
rotate :: [a] -> [a]
rotate :: forall a. [a] -> [a]
rotate [] = []
rotate (a
x:[a]
xs) = [a]
xs [a] -> [a] -> [a]
forall a. [a] -> [a] -> [a]
++ [a
x]
rr :: TVar [STM a] -> STM a
rr :: forall a. TVar [STM a] -> STM a
rr TVar [STM a]
psVar = do
[STM a]
ps <- TVar [STM a] -> STM [STM a]
forall a. TVar a -> STM a
readTVar TVar [STM a]
psVar
a
a <- (STM a -> STM a -> STM a) -> [STM a] -> STM a
forall a. (a -> a -> a) -> [a] -> a
forall (t :: * -> *) a. Foldable t => (a -> a -> a) -> t a -> a
foldr1 STM a -> STM a -> STM a
forall a. STM a -> STM a -> STM a
orElse [STM a]
ps
TVar [STM a] -> [STM a] -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar [STM a]
psVar ([STM a] -> [STM a]
forall a. [a] -> [a]
rotate [STM a]
ps)
a -> STM a
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
newtype Match b = Match { forall b. Match b -> MatchOn Message (Process b)
unMatch :: MatchOn Message (Process b) }
deriving ((forall a b. (a -> b) -> Match a -> Match b)
-> (forall a b. a -> Match b -> Match a) -> Functor Match
forall a b. a -> Match b -> Match a
forall a b. (a -> b) -> Match a -> Match b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
$cfmap :: forall a b. (a -> b) -> Match a -> Match b
fmap :: forall a b. (a -> b) -> Match a -> Match b
$c<$ :: forall a b. a -> Match b -> Match a
<$ :: forall a b. a -> Match b -> Match a
Functor)
receiveWait :: [Match b] -> Process b
receiveWait :: forall b. [Match b] -> Process b
receiveWait [Match b]
ms = do
CQueue Message
queue <- LocalProcess -> CQueue Message
processQueue (LocalProcess -> CQueue Message)
-> Process LocalProcess -> Process (CQueue Message)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
Maybe (Process b)
mProc <- IO (Maybe (Process b)) -> Process (Maybe (Process b))
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (Process b)) -> Process (Maybe (Process b)))
-> IO (Maybe (Process b)) -> Process (Maybe (Process b))
forall a b. (a -> b) -> a -> b
$ CQueue Message
-> BlockSpec
-> [MatchOn Message (Process b)]
-> IO (Maybe (Process b))
forall m a. CQueue m -> BlockSpec -> [MatchOn m a] -> IO (Maybe a)
dequeue CQueue Message
queue BlockSpec
Blocking ((Match b -> MatchOn Message (Process b))
-> [Match b] -> [MatchOn Message (Process b)]
forall a b. (a -> b) -> [a] -> [b]
map Match b -> MatchOn Message (Process b)
forall b. Match b -> MatchOn Message (Process b)
unMatch [Match b]
ms)
case Maybe (Process b)
mProc of
Just Process b
proc' -> Process b
proc'
Maybe (Process b)
Nothing -> String -> Process b
forall a b. Serializable a => a -> Process b
die (String -> Process b) -> String -> Process b
forall a b. (a -> b) -> a -> b
$ String
"System Invariant Violation: CQueue.hs returned `Nothing` "
String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
"in the absence of a timeout value."
receiveTimeout :: Int
-> [Match b]
-> Process (Maybe b)
receiveTimeout :: forall b. Int -> [Match b] -> Process (Maybe b)
receiveTimeout Int
t [Match b]
ms = do
CQueue Message
queue <- LocalProcess -> CQueue Message
processQueue (LocalProcess -> CQueue Message)
-> Process LocalProcess -> Process (CQueue Message)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
let blockSpec :: BlockSpec
blockSpec = if Int
t Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 then BlockSpec
NonBlocking else Int -> BlockSpec
Timeout Int
t
Maybe (Process b)
mProc <- IO (Maybe (Process b)) -> Process (Maybe (Process b))
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (Process b)) -> Process (Maybe (Process b)))
-> IO (Maybe (Process b)) -> Process (Maybe (Process b))
forall a b. (a -> b) -> a -> b
$ CQueue Message
-> BlockSpec
-> [MatchOn Message (Process b)]
-> IO (Maybe (Process b))
forall m a. CQueue m -> BlockSpec -> [MatchOn m a] -> IO (Maybe a)
dequeue CQueue Message
queue BlockSpec
blockSpec ((Match b -> MatchOn Message (Process b))
-> [Match b] -> [MatchOn Message (Process b)]
forall a b. (a -> b) -> [a] -> [b]
map Match b -> MatchOn Message (Process b)
forall b. Match b -> MatchOn Message (Process b)
unMatch [Match b]
ms)
case Maybe (Process b)
mProc of
Maybe (Process b)
Nothing -> Maybe b -> Process (Maybe b)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe b
forall a. Maybe a
Nothing
Just Process b
proc -> b -> Maybe b
forall a. a -> Maybe a
Just (b -> Maybe b) -> Process b -> Process (Maybe b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Process b
proc
matchChan :: ReceivePort a -> (a -> Process b) -> Match b
matchChan :: forall a b. ReceivePort a -> (a -> Process b) -> Match b
matchChan ReceivePort a
p a -> Process b
fn = MatchOn Message (Process b) -> Match b
forall b. MatchOn Message (Process b) -> Match b
Match (MatchOn Message (Process b) -> Match b)
-> MatchOn Message (Process b) -> Match b
forall a b. (a -> b) -> a -> b
$ STM (Process b) -> MatchOn Message (Process b)
forall m a. STM a -> MatchOn m a
MatchChan ((a -> Process b) -> STM a -> STM (Process b)
forall a b. (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> Process b
fn (ReceivePort a -> STM a
forall a. ReceivePort a -> STM a
receiveSTM ReceivePort a
p))
matchSTM :: STM a -> (a -> Process b) -> Match b
matchSTM :: forall a b. STM a -> (a -> Process b) -> Match b
matchSTM STM a
stm a -> Process b
fn = MatchOn Message (Process b) -> Match b
forall b. MatchOn Message (Process b) -> Match b
Match (MatchOn Message (Process b) -> Match b)
-> MatchOn Message (Process b) -> Match b
forall a b. (a -> b) -> a -> b
$ STM (Process b) -> MatchOn Message (Process b)
forall m a. STM a -> MatchOn m a
MatchChan ((a -> Process b) -> STM a -> STM (Process b)
forall a b. (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> Process b
fn STM a
stm)
match :: forall a b. Serializable a => (a -> Process b) -> Match b
match :: forall a b. Serializable a => (a -> Process b) -> Match b
match = (a -> Bool) -> (a -> Process b) -> Match b
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (Bool -> a -> Bool
forall a b. a -> b -> a
const Bool
True)
matchIf :: forall a b. Serializable a => (a -> Bool) -> (a -> Process b) -> Match b
matchIf :: forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf a -> Bool
c a -> Process b
p = MatchOn Message (Process b) -> Match b
forall b. MatchOn Message (Process b) -> Match b
Match (MatchOn Message (Process b) -> Match b)
-> MatchOn Message (Process b) -> Match b
forall a b. (a -> b) -> a -> b
$ (Message -> Maybe (Process b)) -> MatchOn Message (Process b)
forall m a. (m -> Maybe a) -> MatchOn m a
MatchMsg ((Message -> Maybe (Process b)) -> MatchOn Message (Process b))
-> (Message -> Maybe (Process b)) -> MatchOn Message (Process b)
forall a b. (a -> b) -> a -> b
$ \Message
msg ->
case Message -> Fingerprint
messageFingerprint Message
msg Fingerprint -> Fingerprint -> Bool
forall a. Eq a => a -> a -> Bool
== a -> Fingerprint
forall a. Typeable a => a -> Fingerprint
fingerprint (a
forall a. HasCallStack => a
undefined :: a) of
Bool
False -> Maybe (Process b)
forall a. Maybe a
Nothing
Bool
True -> case Message
msg of
(UnencodedMessage Fingerprint
_ a
m) ->
let m' :: a
m' = a -> a
forall a b. a -> b
unsafeCoerce a
m :: a in
case (a -> Bool
c a
m') of
Bool
True -> Process b -> Maybe (Process b)
forall a. a -> Maybe a
Just (a -> Process b
p a
m')
Bool
False -> Maybe (Process b)
forall a. Maybe a
Nothing
(EncodedMessage Fingerprint
_ ByteString
_) ->
if (a -> Bool
c a
decoded) then Process b -> Maybe (Process b)
forall a. a -> Maybe a
Just (a -> Process b
p a
decoded) else Maybe (Process b)
forall a. Maybe a
Nothing
where
decoded :: a
!decoded :: a
decoded = ByteString -> a
forall a. Binary a => ByteString -> a
decode (Message -> ByteString
messageEncoding Message
msg)
matchMessage :: (Message -> Process Message) -> Match Message
matchMessage :: (Message -> Process Message) -> Match Message
matchMessage Message -> Process Message
p = MatchOn Message (Process Message) -> Match Message
forall b. MatchOn Message (Process b) -> Match b
Match (MatchOn Message (Process Message) -> Match Message)
-> MatchOn Message (Process Message) -> Match Message
forall a b. (a -> b) -> a -> b
$ (Message -> Maybe (Process Message))
-> MatchOn Message (Process Message)
forall m a. (m -> Maybe a) -> MatchOn m a
MatchMsg ((Message -> Maybe (Process Message))
-> MatchOn Message (Process Message))
-> (Message -> Maybe (Process Message))
-> MatchOn Message (Process Message)
forall a b. (a -> b) -> a -> b
$ \Message
msg -> Process Message -> Maybe (Process Message)
forall a. a -> Maybe a
Just (Message -> Process Message
p Message
msg)
matchMessageIf :: (Message -> Bool) -> (Message -> Process Message) -> Match Message
matchMessageIf :: (Message -> Bool) -> (Message -> Process Message) -> Match Message
matchMessageIf Message -> Bool
c Message -> Process Message
p = MatchOn Message (Process Message) -> Match Message
forall b. MatchOn Message (Process b) -> Match b
Match (MatchOn Message (Process Message) -> Match Message)
-> MatchOn Message (Process Message) -> Match Message
forall a b. (a -> b) -> a -> b
$ (Message -> Maybe (Process Message))
-> MatchOn Message (Process Message)
forall m a. (m -> Maybe a) -> MatchOn m a
MatchMsg ((Message -> Maybe (Process Message))
-> MatchOn Message (Process Message))
-> (Message -> Maybe (Process Message))
-> MatchOn Message (Process Message)
forall a b. (a -> b) -> a -> b
$ \Message
msg ->
case (Message -> Bool
c Message
msg) of
Bool
True -> Process Message -> Maybe (Process Message)
forall a. a -> Maybe a
Just (Message -> Process Message
p Message
msg)
Bool
False -> Maybe (Process Message)
forall a. Maybe a
Nothing
forward :: Message -> ProcessId -> Process ()
forward :: Message -> ProcessId -> Process ()
forward Message
msg ProcessId
them = do
LocalProcess
proc <- Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
let node :: LocalNode
node = LocalProcess -> LocalNode
processNode LocalProcess
proc
us :: ProcessId
us = LocalProcess -> ProcessId
processId LocalProcess
proc
nid :: NodeId
nid = LocalNode -> NodeId
localNodeId LocalNode
node
destNode :: NodeId
destNode = (ProcessId -> NodeId
processNodeId ProcessId
them)
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ MxEventBus -> MxEvent -> IO ()
traceEvent (LocalNode -> MxEventBus
localEventBus LocalNode
node) (ProcessId -> ProcessId -> Message -> MxEvent
MxSent ProcessId
them ProcessId
us Message
msg)
if NodeId
destNode NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== NodeId
nid
then Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (ProcessId -> Message -> ProcessSignal
LocalSend ProcessId
them Message
msg)
else IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ LocalNode
-> Identifier
-> Identifier
-> ImplicitReconnect
-> [ByteString]
-> IO ()
sendPayload (LocalProcess -> LocalNode
processNode LocalProcess
proc)
(ProcessId -> Identifier
ProcessIdentifier (LocalProcess -> ProcessId
processId LocalProcess
proc))
(ProcessId -> Identifier
ProcessIdentifier ProcessId
them)
ImplicitReconnect
NoImplicitReconnect
(Message -> [ByteString]
messageToPayload Message
msg)
uforward :: Message -> ProcessId -> Process ()
uforward :: Message -> ProcessId -> Process ()
uforward Message
msg ProcessId
them = do
LocalProcess
proc <- Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
let node :: LocalNode
node = LocalProcess -> LocalNode
processNode LocalProcess
proc
us :: ProcessId
us = LocalProcess -> ProcessId
processId LocalProcess
proc
nid :: NodeId
nid = LocalNode -> NodeId
localNodeId LocalNode
node
destNode :: NodeId
destNode = (ProcessId -> NodeId
processNodeId ProcessId
them)
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ MxEventBus -> MxEvent -> IO ()
traceEvent (LocalNode -> MxEventBus
localEventBus LocalNode
node) (ProcessId -> ProcessId -> Message -> MxEvent
MxSent ProcessId
them ProcessId
us Message
msg)
if NodeId
destNode NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== NodeId
nid
then Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (ProcessId -> Message -> ProcessSignal
LocalSend ProcessId
them Message
msg)
else Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg (NodeId -> Maybe NodeId
forall a. a -> Maybe a
Just NodeId
destNode) (ProcessSignal -> Process ()) -> ProcessSignal -> Process ()
forall a b. (a -> b) -> a -> b
$ LocalProcessId -> Message -> ProcessSignal
UnreliableSend (ProcessId -> LocalProcessId
processLocalId ProcessId
them) Message
msg
wrapMessage :: Serializable a => a -> Message
wrapMessage :: forall a. Serializable a => a -> Message
wrapMessage = a -> Message
forall a. Serializable a => a -> Message
createUnencodedMessage
unsafeWrapMessage :: Serializable a => a -> Message
unsafeWrapMessage :: forall a. Serializable a => a -> Message
unsafeWrapMessage = a -> Message
forall a. Serializable a => a -> Message
Unsafe.wrapMessage
unwrapMessage :: forall m a. (Monad m, Serializable a) => Message -> m (Maybe a)
unwrapMessage :: forall (m :: * -> *) a.
(Monad m, Serializable a) =>
Message -> m (Maybe a)
unwrapMessage Message
msg =
case Message -> Fingerprint
messageFingerprint Message
msg Fingerprint -> Fingerprint -> Bool
forall a. Eq a => a -> a -> Bool
== a -> Fingerprint
forall a. Typeable a => a -> Fingerprint
fingerprint (a
forall a. HasCallStack => a
undefined :: a) of
Bool
False -> Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing :: m (Maybe a)
Bool
True -> case Message
msg of
(UnencodedMessage Fingerprint
_ a
ms) ->
let ms' :: a
ms' = a -> a
forall a b. a -> b
unsafeCoerce a
ms :: a
in Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
ms')
(EncodedMessage Fingerprint
_ ByteString
_) ->
Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just (a
decoded))
where
decoded :: a
!decoded :: a
decoded = ByteString -> a
forall a. Binary a => ByteString -> a
decode (Message -> ByteString
messageEncoding Message
msg)
handleMessage :: forall m a b. (Monad m, Serializable a)
=> Message -> (a -> m b) -> m (Maybe b)
handleMessage :: forall (m :: * -> *) a b.
(Monad m, Serializable a) =>
Message -> (a -> m b) -> m (Maybe b)
handleMessage Message
msg a -> m b
proc = Message -> (a -> Bool) -> (a -> m b) -> m (Maybe b)
forall (m :: * -> *) a b.
(Monad m, Serializable a) =>
Message -> (a -> Bool) -> (a -> m b) -> m (Maybe b)
handleMessageIf Message
msg (Bool -> a -> Bool
forall a b. a -> b -> a
const Bool
True) a -> m b
proc
handleMessageIf :: forall m a b . (Monad m, Serializable a)
=> Message
-> (a -> Bool)
-> (a -> m b)
-> m (Maybe b)
handleMessageIf :: forall (m :: * -> *) a b.
(Monad m, Serializable a) =>
Message -> (a -> Bool) -> (a -> m b) -> m (Maybe b)
handleMessageIf Message
msg a -> Bool
c a -> m b
proc = do
case Message -> Fingerprint
messageFingerprint Message
msg Fingerprint -> Fingerprint -> Bool
forall a. Eq a => a -> a -> Bool
== a -> Fingerprint
forall a. Typeable a => a -> Fingerprint
fingerprint (a
forall a. HasCallStack => a
undefined :: a) of
Bool
False -> Maybe b -> m (Maybe b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe b
forall a. Maybe a
Nothing :: m (Maybe b)
Bool
True -> case Message
msg of
(UnencodedMessage Fingerprint
_ a
ms) ->
let ms' :: a
ms' = a -> a
forall a b. a -> b
unsafeCoerce a
ms :: a in
case (a -> Bool
c a
ms') of
Bool
True -> do { b
r <- a -> m b
proc a
ms'; Maybe b -> m (Maybe b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> Maybe b
forall a. a -> Maybe a
Just b
r) }
Bool
False -> Maybe b -> m (Maybe b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe b
forall a. Maybe a
Nothing :: m (Maybe b)
(EncodedMessage Fingerprint
_ ByteString
_) ->
case (a -> Bool
c a
decoded) of
Bool
True -> do { b
r <- a -> m b
proc (a
decoded :: a); Maybe b -> m (Maybe b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> Maybe b
forall a. a -> Maybe a
Just b
r) }
Bool
False -> Maybe b -> m (Maybe b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe b
forall a. Maybe a
Nothing :: m (Maybe b)
where
decoded :: a
!decoded :: a
decoded = ByteString -> a
forall a. Binary a => ByteString -> a
decode (Message -> ByteString
messageEncoding Message
msg)
handleMessage_ :: forall m a . (Monad m, Serializable a)
=> Message -> (a -> m ()) -> m ()
handleMessage_ :: forall (m :: * -> *) a.
(Monad m, Serializable a) =>
Message -> (a -> m ()) -> m ()
handleMessage_ Message
msg a -> m ()
proc = Message -> (a -> Bool) -> (a -> m ()) -> m ()
forall (m :: * -> *) a.
(Monad m, Serializable a) =>
Message -> (a -> Bool) -> (a -> m ()) -> m ()
handleMessageIf_ Message
msg (Bool -> a -> Bool
forall a b. a -> b -> a
const Bool
True) a -> m ()
proc
handleMessageIf_ :: forall m a . (Monad m, Serializable a)
=> Message
-> (a -> Bool)
-> (a -> m ())
-> m ()
handleMessageIf_ :: forall (m :: * -> *) a.
(Monad m, Serializable a) =>
Message -> (a -> Bool) -> (a -> m ()) -> m ()
handleMessageIf_ Message
msg a -> Bool
c a -> m ()
proc = Message -> (a -> Bool) -> (a -> m ()) -> m (Maybe ())
forall (m :: * -> *) a b.
(Monad m, Serializable a) =>
Message -> (a -> Bool) -> (a -> m b) -> m (Maybe b)
handleMessageIf Message
msg a -> Bool
c a -> m ()
proc m (Maybe ()) -> m () -> m ()
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
matchAny :: forall b. (Message -> Process b) -> Match b
matchAny :: forall b. (Message -> Process b) -> Match b
matchAny Message -> Process b
p = MatchOn Message (Process b) -> Match b
forall b. MatchOn Message (Process b) -> Match b
Match (MatchOn Message (Process b) -> Match b)
-> MatchOn Message (Process b) -> Match b
forall a b. (a -> b) -> a -> b
$ (Message -> Maybe (Process b)) -> MatchOn Message (Process b)
forall m a. (m -> Maybe a) -> MatchOn m a
MatchMsg ((Message -> Maybe (Process b)) -> MatchOn Message (Process b))
-> (Message -> Maybe (Process b)) -> MatchOn Message (Process b)
forall a b. (a -> b) -> a -> b
$ \Message
msg -> Process b -> Maybe (Process b)
forall a. a -> Maybe a
Just (Message -> Process b
p Message
msg)
matchAnyIf :: forall a b. (Serializable a)
=> (a -> Bool)
-> (Message -> Process b)
-> Match b
matchAnyIf :: forall a b.
Serializable a =>
(a -> Bool) -> (Message -> Process b) -> Match b
matchAnyIf a -> Bool
c Message -> Process b
p = MatchOn Message (Process b) -> Match b
forall b. MatchOn Message (Process b) -> Match b
Match (MatchOn Message (Process b) -> Match b)
-> MatchOn Message (Process b) -> Match b
forall a b. (a -> b) -> a -> b
$ (Message -> Maybe (Process b)) -> MatchOn Message (Process b)
forall m a. (m -> Maybe a) -> MatchOn m a
MatchMsg ((Message -> Maybe (Process b)) -> MatchOn Message (Process b))
-> (Message -> Maybe (Process b)) -> MatchOn Message (Process b)
forall a b. (a -> b) -> a -> b
$ \Message
msg ->
case Message -> Fingerprint
messageFingerprint Message
msg Fingerprint -> Fingerprint -> Bool
forall a. Eq a => a -> a -> Bool
== a -> Fingerprint
forall a. Typeable a => a -> Fingerprint
fingerprint (a
forall a. HasCallStack => a
undefined :: a) of
Bool
True | Bool
check -> Process b -> Maybe (Process b)
forall a. a -> Maybe a
Just (Message -> Process b
p Message
msg)
where
check :: Bool
!check :: Bool
check =
case Message
msg of
(EncodedMessage Fingerprint
_ ByteString
_) -> a -> Bool
c a
decoded
(UnencodedMessage Fingerprint
_ a
m') -> a -> Bool
c (a -> a
forall a b. a -> b
unsafeCoerce a
m')
decoded :: a
!decoded :: a
decoded = ByteString -> a
forall a. Binary a => ByteString -> a
decode (Message -> ByteString
messageEncoding Message
msg)
Bool
_ -> Maybe (Process b)
forall a. Maybe a
Nothing
matchUnknown :: Process b -> Match b
matchUnknown :: forall b. Process b -> Match b
matchUnknown Process b
p = MatchOn Message (Process b) -> Match b
forall b. MatchOn Message (Process b) -> Match b
Match (MatchOn Message (Process b) -> Match b)
-> MatchOn Message (Process b) -> Match b
forall a b. (a -> b) -> a -> b
$ (Message -> Maybe (Process b)) -> MatchOn Message (Process b)
forall m a. (m -> Maybe a) -> MatchOn m a
MatchMsg (Maybe (Process b) -> Message -> Maybe (Process b)
forall a b. a -> b -> a
const (Process b -> Maybe (Process b)
forall a. a -> Maybe a
Just Process b
p))
delegate :: ProcessId -> (Message -> Bool) -> Process ()
delegate :: ProcessId -> (Message -> Bool) -> Process ()
delegate ProcessId
pid Message -> Bool
p = do
[Match ()] -> Process ()
forall b. [Match b] -> Process b
receiveWait [
(Message -> Process ()) -> Match ()
forall b. (Message -> Process b) -> Match b
matchAny (\Message
m -> case (Message -> Bool
p Message
m) of
Bool
True -> Message -> ProcessId -> Process ()
forward Message
m ProcessId
pid
Bool
False -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
]
ProcessId -> (Message -> Bool) -> Process ()
delegate ProcessId
pid Message -> Bool
p
relay :: ProcessId -> Process ()
relay :: ProcessId -> Process ()
relay !ProcessId
pid = [Match ()] -> Process ()
forall b. [Match b] -> Process b
receiveWait [ (Message -> Process ()) -> Match ()
forall b. (Message -> Process b) -> Match b
matchAny (\Message
m -> Message -> ProcessId -> Process ()
forward Message
m ProcessId
pid) ] Process () -> Process () -> Process ()
forall a b. Process a -> Process b -> Process b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ProcessId -> Process ()
relay ProcessId
pid
proxy :: Serializable a => ProcessId -> (a -> Process Bool) -> Process ()
proxy :: forall a.
Serializable a =>
ProcessId -> (a -> Process Bool) -> Process ()
proxy ProcessId
pid a -> Process Bool
proc = do
[Match ()] -> Process ()
forall b. [Match b] -> Process b
receiveWait [
(Message -> Process ()) -> Match ()
forall b. (Message -> Process b) -> Match b
matchAny (\Message
m -> do
Maybe Bool
next <- Message -> (a -> Process Bool) -> Process (Maybe Bool)
forall (m :: * -> *) a b.
(Monad m, Serializable a) =>
Message -> (a -> m b) -> m (Maybe b)
handleMessage Message
m a -> Process Bool
proc
case Maybe Bool
next of
Just Bool
True -> Message -> ProcessId -> Process ()
forward Message
m ProcessId
pid
Just Bool
False -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Maybe Bool
Nothing -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
]
ProcessId -> (a -> Process Bool) -> Process ()
forall a.
Serializable a =>
ProcessId -> (a -> Process Bool) -> Process ()
proxy ProcessId
pid a -> Process Bool
proc
data ProcessTerminationException = ProcessTerminationException
deriving (Int -> ProcessTerminationException -> String -> String
[ProcessTerminationException] -> String -> String
ProcessTerminationException -> String
(Int -> ProcessTerminationException -> String -> String)
-> (ProcessTerminationException -> String)
-> ([ProcessTerminationException] -> String -> String)
-> Show ProcessTerminationException
forall a.
(Int -> a -> String -> String)
-> (a -> String) -> ([a] -> String -> String) -> Show a
$cshowsPrec :: Int -> ProcessTerminationException -> String -> String
showsPrec :: Int -> ProcessTerminationException -> String -> String
$cshow :: ProcessTerminationException -> String
show :: ProcessTerminationException -> String
$cshowList :: [ProcessTerminationException] -> String -> String
showList :: [ProcessTerminationException] -> String -> String
Show, Typeable)
instance Exception ProcessTerminationException
terminate :: Process a
terminate :: forall a. Process a
terminate = ProcessTerminationException -> Process a
forall e a. (HasCallStack, Exception e) => e -> Process a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM ProcessTerminationException
ProcessTerminationException
die :: Serializable a => a -> Process b
die :: forall a b. Serializable a => a -> Process b
die a
reason = do
ProcessId
pid <- Process ProcessId
getSelfPid
ProcessExitException -> Process b
forall e a. (HasCallStack, Exception e) => e -> Process a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM (ProcessId -> Message -> ProcessExitException
ProcessExitException ProcessId
pid (a -> Message
forall a. Serializable a => a -> Message
createMessage a
reason))
kill :: ProcessId -> String -> Process ()
kill :: ProcessId -> String -> Process ()
kill ProcessId
them String
reason = Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (ProcessId -> String -> ProcessSignal
Kill ProcessId
them String
reason)
exit :: Serializable a => ProcessId -> a -> Process ()
exit :: forall a. Serializable a => ProcessId -> a -> Process ()
exit ProcessId
them a
reason = Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (ProcessId -> Message -> ProcessSignal
Exit ProcessId
them (a -> Message
forall a. Serializable a => a -> Message
createMessage a
reason))
catchExit :: forall a b . (Show a, Serializable a)
=> Process b
-> (ProcessId -> a -> Process b)
-> Process b
catchExit :: forall a b.
(Show a, Serializable a) =>
Process b -> (ProcessId -> a -> Process b) -> Process b
catchExit Process b
act ProcessId -> a -> Process b
exitHandler = Process b -> (ProcessExitException -> Process b) -> Process b
forall e a.
(HasCallStack, Exception e) =>
Process a -> (e -> Process a) -> Process a
forall (m :: * -> *) e a.
(MonadCatch m, HasCallStack, Exception e) =>
m a -> (e -> m a) -> m a
Catch.catch Process b
act ProcessExitException -> Process b
handleExit
where
handleExit :: ProcessExitException -> Process b
handleExit ex :: ProcessExitException
ex@(ProcessExitException ProcessId
from Message
msg) =
if Message -> Fingerprint
messageFingerprint Message
msg Fingerprint -> Fingerprint -> Bool
forall a. Eq a => a -> a -> Bool
== a -> Fingerprint
forall a. Typeable a => a -> Fingerprint
fingerprint (a
forall a. HasCallStack => a
undefined :: a)
then ProcessId -> a -> Process b
exitHandler ProcessId
from a
decoded
else ProcessExitException -> Process b
forall e a. (HasCallStack, Exception e) => e -> Process a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM ProcessExitException
ex
where
decoded :: a
!decoded :: a
decoded = ByteString -> a
forall a. Binary a => ByteString -> a
decode (Message -> ByteString
messageEncoding Message
msg)
catchesExit :: Process b
-> [(ProcessId -> Message -> (Process (Maybe b)))]
-> Process b
catchesExit :: forall b.
Process b
-> [ProcessId -> Message -> Process (Maybe b)] -> Process b
catchesExit Process b
act [ProcessId -> Message -> Process (Maybe b)]
handlers = Process b -> (ProcessExitException -> Process b) -> Process b
forall e a.
(HasCallStack, Exception e) =>
Process a -> (e -> Process a) -> Process a
forall (m :: * -> *) e a.
(MonadCatch m, HasCallStack, Exception e) =>
m a -> (e -> m a) -> m a
Catch.catch Process b
act (((ProcessExitException
-> [ProcessId -> Message -> Process (Maybe b)] -> Process b)
-> [ProcessId -> Message -> Process (Maybe b)]
-> ProcessExitException
-> Process b
forall a b c. (a -> b -> c) -> b -> a -> c
flip ProcessExitException
-> [ProcessId -> Message -> Process (Maybe b)] -> Process b
forall b.
ProcessExitException
-> [ProcessId -> Message -> Process (Maybe b)] -> Process b
handleExit) [ProcessId -> Message -> Process (Maybe b)]
handlers)
where
handleExit :: ProcessExitException
-> [(ProcessId -> Message -> Process (Maybe b))]
-> Process b
handleExit :: forall b.
ProcessExitException
-> [ProcessId -> Message -> Process (Maybe b)] -> Process b
handleExit ProcessExitException
ex [] = ProcessExitException -> Process b
forall e a. (HasCallStack, Exception e) => e -> Process a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM ProcessExitException
ex
handleExit ex :: ProcessExitException
ex@(ProcessExitException ProcessId
from Message
msg) (ProcessId -> Message -> Process (Maybe b)
h:[ProcessId -> Message -> Process (Maybe b)]
hs) = do
Maybe b
r <- ProcessId -> Message -> Process (Maybe b)
h ProcessId
from Message
msg
case Maybe b
r of
Maybe b
Nothing -> ProcessExitException
-> [ProcessId -> Message -> Process (Maybe b)] -> Process b
forall b.
ProcessExitException
-> [ProcessId -> Message -> Process (Maybe b)] -> Process b
handleExit ProcessExitException
ex [ProcessId -> Message -> Process (Maybe b)]
hs
Just b
p -> b -> Process b
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return b
p
getSelfPid :: Process ProcessId
getSelfPid :: Process ProcessId
getSelfPid = LocalProcess -> ProcessId
processId (LocalProcess -> ProcessId)
-> Process LocalProcess -> Process ProcessId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
getSelfNode :: Process NodeId
getSelfNode :: Process NodeId
getSelfNode = LocalNode -> NodeId
localNodeId (LocalNode -> NodeId)
-> (LocalProcess -> LocalNode) -> LocalProcess -> NodeId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. LocalProcess -> LocalNode
processNode (LocalProcess -> NodeId) -> Process LocalProcess -> Process NodeId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
getNodeStats :: NodeId -> Process (Either DiedReason NodeStats)
getNodeStats :: NodeId -> Process (Either DiedReason NodeStats)
getNodeStats NodeId
nid = do
NodeId
selfNode <- Process NodeId
getSelfNode
if NodeId
nid NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== NodeId
selfNode
then NodeStats -> Either DiedReason NodeStats
forall a b. b -> Either a b
Right (NodeStats -> Either DiedReason NodeStats)
-> Process NodeStats -> Process (Either DiedReason NodeStats)
forall a b. (a -> b) -> Process a -> Process b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
`fmap` Process NodeStats
getLocalNodeStats
else NodeId -> Process (Either DiedReason NodeStats)
getNodeStatsRemote NodeId
selfNode
where
getNodeStatsRemote :: NodeId -> Process (Either DiedReason NodeStats)
getNodeStatsRemote :: NodeId -> Process (Either DiedReason NodeStats)
getNodeStatsRemote NodeId
selfNode = do
Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg (NodeId -> Maybe NodeId
forall a. a -> Maybe a
Just NodeId
nid) (ProcessSignal -> Process ()) -> ProcessSignal -> Process ()
forall a b. (a -> b) -> a -> b
$ NodeId -> ProcessSignal
GetNodeStats NodeId
selfNode
Process MonitorRef
-> (MonitorRef -> Process ())
-> (MonitorRef -> Process (Either DiedReason NodeStats))
-> Process (Either DiedReason NodeStats)
forall a b c.
Process a -> (a -> Process b) -> (a -> Process c) -> Process c
bracket (NodeId -> Process MonitorRef
monitorNode NodeId
nid) MonitorRef -> Process ()
unmonitor ((MonitorRef -> Process (Either DiedReason NodeStats))
-> Process (Either DiedReason NodeStats))
-> (MonitorRef -> Process (Either DiedReason NodeStats))
-> Process (Either DiedReason NodeStats)
forall a b. (a -> b) -> a -> b
$ \MonitorRef
mRef ->
[Match (Either DiedReason NodeStats)]
-> Process (Either DiedReason NodeStats)
forall b. [Match b] -> Process b
receiveWait [ (NodeStats -> Process (Either DiedReason NodeStats))
-> Match (Either DiedReason NodeStats)
forall a b. Serializable a => (a -> Process b) -> Match b
match (\(NodeStats
stats :: NodeStats) -> Either DiedReason NodeStats
-> Process (Either DiedReason NodeStats)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either DiedReason NodeStats
-> Process (Either DiedReason NodeStats))
-> Either DiedReason NodeStats
-> Process (Either DiedReason NodeStats)
forall a b. (a -> b) -> a -> b
$ NodeStats -> Either DiedReason NodeStats
forall a b. b -> Either a b
Right NodeStats
stats)
, (NodeMonitorNotification -> Bool)
-> (NodeMonitorNotification
-> Process (Either DiedReason NodeStats))
-> Match (Either DiedReason NodeStats)
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (\(NodeMonitorNotification MonitorRef
ref NodeId
_ DiedReason
_) -> MonitorRef
ref MonitorRef -> MonitorRef -> Bool
forall a. Eq a => a -> a -> Bool
== MonitorRef
mRef)
(\(NodeMonitorNotification MonitorRef
_ NodeId
_ DiedReason
dr) -> Either DiedReason NodeStats
-> Process (Either DiedReason NodeStats)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either DiedReason NodeStats
-> Process (Either DiedReason NodeStats))
-> Either DiedReason NodeStats
-> Process (Either DiedReason NodeStats)
forall a b. (a -> b) -> a -> b
$ DiedReason -> Either DiedReason NodeStats
forall a b. a -> Either a b
Left DiedReason
dr)
]
getLocalNodeStats :: Process NodeStats
getLocalNodeStats :: Process NodeStats
getLocalNodeStats = do
NodeId
self <- Process NodeId
getSelfNode
Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (ProcessSignal -> Process ()) -> ProcessSignal -> Process ()
forall a b. (a -> b) -> a -> b
$ NodeId -> ProcessSignal
GetNodeStats NodeId
self
[Match NodeStats] -> Process NodeStats
forall b. [Match b] -> Process b
receiveWait [ (NodeStats -> Process NodeStats) -> Match NodeStats
forall a b. Serializable a => (a -> Process b) -> Match b
match (\(NodeStats
stats :: NodeStats) -> NodeStats -> Process NodeStats
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return NodeStats
stats) ]
getProcessInfo :: ProcessId -> Process (Maybe ProcessInfo)
getProcessInfo :: ProcessId -> Process (Maybe ProcessInfo)
getProcessInfo ProcessId
pid =
let them :: NodeId
them = ProcessId -> NodeId
processNodeId ProcessId
pid in do
NodeId
us <- Process NodeId
getSelfNode
Maybe NodeId
dest <- NodeId -> NodeId -> Process (Maybe NodeId)
mkNode NodeId
them NodeId
us
Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
dest (ProcessSignal -> Process ()) -> ProcessSignal -> Process ()
forall a b. (a -> b) -> a -> b
$ ProcessId -> ProcessSignal
GetInfo ProcessId
pid
[Match (Maybe ProcessInfo)] -> Process (Maybe ProcessInfo)
forall b. [Match b] -> Process b
receiveWait [
(ProcessInfo -> Process (Maybe ProcessInfo))
-> Match (Maybe ProcessInfo)
forall a b. Serializable a => (a -> Process b) -> Match b
match (\(ProcessInfo
p :: ProcessInfo) -> Maybe ProcessInfo -> Process (Maybe ProcessInfo)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe ProcessInfo -> Process (Maybe ProcessInfo))
-> Maybe ProcessInfo -> Process (Maybe ProcessInfo)
forall a b. (a -> b) -> a -> b
$ ProcessInfo -> Maybe ProcessInfo
forall a. a -> Maybe a
Just ProcessInfo
p)
, (ProcessInfoNone -> Process (Maybe ProcessInfo))
-> Match (Maybe ProcessInfo)
forall a b. Serializable a => (a -> Process b) -> Match b
match (\(ProcessInfoNone
_ :: ProcessInfoNone) -> Maybe ProcessInfo -> Process (Maybe ProcessInfo)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe ProcessInfo
forall a. Maybe a
Nothing)
]
where mkNode :: NodeId -> NodeId -> Process (Maybe NodeId)
mkNode :: NodeId -> NodeId -> Process (Maybe NodeId)
mkNode NodeId
them NodeId
us = case NodeId
them NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== NodeId
us of
Bool
True -> Maybe NodeId -> Process (Maybe NodeId)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe NodeId
forall a. Maybe a
Nothing
Bool
_ -> Maybe NodeId -> Process (Maybe NodeId)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe NodeId -> Process (Maybe NodeId))
-> Maybe NodeId -> Process (Maybe NodeId)
forall a b. (a -> b) -> a -> b
$ NodeId -> Maybe NodeId
forall a. a -> Maybe a
Just NodeId
them
link :: ProcessId -> Process ()
link :: ProcessId -> Process ()
link = Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (ProcessSignal -> Process ())
-> (ProcessId -> ProcessSignal) -> ProcessId -> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Identifier -> ProcessSignal
Link (Identifier -> ProcessSignal)
-> (ProcessId -> Identifier) -> ProcessId -> ProcessSignal
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProcessId -> Identifier
ProcessIdentifier
monitor :: ProcessId -> Process MonitorRef
monitor :: ProcessId -> Process MonitorRef
monitor = Identifier -> Process MonitorRef
monitor' (Identifier -> Process MonitorRef)
-> (ProcessId -> Identifier) -> ProcessId -> Process MonitorRef
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProcessId -> Identifier
ProcessIdentifier
withMonitor :: ProcessId -> (MonitorRef -> Process a) -> Process a
withMonitor :: forall a. ProcessId -> (MonitorRef -> Process a) -> Process a
withMonitor ProcessId
pid = Process MonitorRef
-> (MonitorRef -> Process ())
-> (MonitorRef -> Process a)
-> Process a
forall a b c.
Process a -> (a -> Process b) -> (a -> Process c) -> Process c
bracket (ProcessId -> Process MonitorRef
monitor ProcessId
pid) MonitorRef -> Process ()
unmonitor
withMonitor_ :: ProcessId -> Process a -> Process a
withMonitor_ :: forall a. ProcessId -> Process a -> Process a
withMonitor_ ProcessId
p = ProcessId -> (MonitorRef -> Process a) -> Process a
forall a. ProcessId -> (MonitorRef -> Process a) -> Process a
withMonitor ProcessId
p ((MonitorRef -> Process a) -> Process a)
-> (Process a -> MonitorRef -> Process a) -> Process a -> Process a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Process a -> MonitorRef -> Process a
forall a b. a -> b -> a
const
unlink :: ProcessId -> Process ()
unlink :: ProcessId -> Process ()
unlink ProcessId
pid = do
ProcessId -> Process ()
unlinkAsync ProcessId
pid
[Match ()] -> Process ()
forall b. [Match b] -> Process b
receiveWait [ (DidUnlinkProcess -> Bool)
-> (DidUnlinkProcess -> Process ()) -> Match ()
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (\(DidUnlinkProcess ProcessId
pid') -> ProcessId
pid' ProcessId -> ProcessId -> Bool
forall a. Eq a => a -> a -> Bool
== ProcessId
pid)
(\DidUnlinkProcess
_ -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
]
unlinkNode :: NodeId -> Process ()
unlinkNode :: NodeId -> Process ()
unlinkNode NodeId
nid = do
NodeId -> Process ()
unlinkNodeAsync NodeId
nid
[Match ()] -> Process ()
forall b. [Match b] -> Process b
receiveWait [ (DidUnlinkNode -> Bool)
-> (DidUnlinkNode -> Process ()) -> Match ()
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (\(DidUnlinkNode NodeId
nid') -> NodeId
nid' NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== NodeId
nid)
(\DidUnlinkNode
_ -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
]
unlinkPort :: SendPort a -> Process ()
unlinkPort :: forall a. SendPort a -> Process ()
unlinkPort SendPort a
sport = do
SendPort a -> Process ()
forall a. SendPort a -> Process ()
unlinkPortAsync SendPort a
sport
[Match ()] -> Process ()
forall b. [Match b] -> Process b
receiveWait [ (DidUnlinkPort -> Bool)
-> (DidUnlinkPort -> Process ()) -> Match ()
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (\(DidUnlinkPort SendPortId
cid) -> SendPortId
cid SendPortId -> SendPortId -> Bool
forall a. Eq a => a -> a -> Bool
== SendPort a -> SendPortId
forall a. SendPort a -> SendPortId
sendPortId SendPort a
sport)
(\DidUnlinkPort
_ -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
]
unmonitor :: MonitorRef -> Process ()
unmonitor :: MonitorRef -> Process ()
unmonitor MonitorRef
ref = do
MonitorRef -> Process ()
unmonitorAsync MonitorRef
ref
[Match ()] -> Process ()
forall b. [Match b] -> Process b
receiveWait [ (DidUnmonitor -> Bool) -> (DidUnmonitor -> Process ()) -> Match ()
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (\(DidUnmonitor MonitorRef
ref') -> MonitorRef
ref' MonitorRef -> MonitorRef -> Bool
forall a. Eq a => a -> a -> Bool
== MonitorRef
ref)
(\DidUnmonitor
_ -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
]
Process (Maybe ()) -> Process ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Process (Maybe ()) -> Process ())
-> Process (Maybe ()) -> Process ()
forall a b. (a -> b) -> a -> b
$ Int -> [Match ()] -> Process (Maybe ())
forall b. Int -> [Match b] -> Process (Maybe b)
receiveTimeout Int
0
[ (ProcessMonitorNotification -> Bool)
-> (ProcessMonitorNotification -> Process ()) -> Match ()
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (\(ProcessMonitorNotification MonitorRef
ref' ProcessId
_ DiedReason
_) -> MonitorRef
ref' MonitorRef -> MonitorRef -> Bool
forall a. Eq a => a -> a -> Bool
== MonitorRef
ref)
(Process () -> ProcessMonitorNotification -> Process ()
forall a b. a -> b -> a
const (Process () -> ProcessMonitorNotification -> Process ())
-> Process () -> ProcessMonitorNotification -> Process ()
forall a b. (a -> b) -> a -> b
$ () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
]
catch :: Exception e => Process a -> (e -> Process a) -> Process a
catch :: forall e a.
Exception e =>
Process a -> (e -> Process a) -> Process a
catch = Process a -> (e -> Process a) -> Process a
forall e a.
(HasCallStack, Exception e) =>
Process a -> (e -> Process a) -> Process a
forall (m :: * -> *) e a.
(MonadCatch m, HasCallStack, Exception e) =>
m a -> (e -> m a) -> m a
Catch.catch
{-# DEPRECATED catch "Use Control.Monad.Catch.catch instead" #-}
try :: Exception e => Process a -> Process (Either e a)
try :: forall e a. Exception e => Process a -> Process (Either e a)
try = Process a -> Process (Either e a)
forall (m :: * -> *) e a.
(HasCallStack, MonadCatch m, Exception e) =>
m a -> m (Either e a)
Catch.try
{-# DEPRECATED try "Use Control.Monad.Catch.try instead" #-}
mask :: ((forall a. Process a -> Process a) -> Process b) -> Process b
mask :: forall b.
((forall a. Process a -> Process a) -> Process b) -> Process b
mask = ((forall a. Process a -> Process a) -> Process b) -> Process b
forall b.
HasCallStack =>
((forall a. Process a -> Process a) -> Process b) -> Process b
forall (m :: * -> *) b.
(MonadMask m, HasCallStack) =>
((forall a. m a -> m a) -> m b) -> m b
Catch.mask
{-# DEPRECATED mask "Use Control.Monad.Catch.mask_ instead" #-}
mask_ :: Process a -> Process a
mask_ :: forall a. Process a -> Process a
mask_ = Process a -> Process a
forall (m :: * -> *) a. (HasCallStack, MonadMask m) => m a -> m a
Catch.mask_
{-# DEPRECATED mask_ "Use Control.Monad.Catch.mask_ instead" #-}
onException :: Process a -> Process b -> Process a
onException :: forall a b. Process a -> Process b -> Process a
onException = Process a -> Process b -> Process a
forall (m :: * -> *) a b.
(HasCallStack, MonadCatch m) =>
m a -> m b -> m a
Catch.onException
{-# DEPRECATED onException "Use Control.Monad.Catch.onException instead" #-}
bracket :: Process a -> (a -> Process b) -> (a -> Process c) -> Process c
bracket :: forall a b c.
Process a -> (a -> Process b) -> (a -> Process c) -> Process c
bracket = Process a -> (a -> Process b) -> (a -> Process c) -> Process c
forall (m :: * -> *) a c b.
(HasCallStack, MonadMask m) =>
m a -> (a -> m c) -> (a -> m b) -> m b
Catch.bracket
{-# DEPRECATED bracket "Use Control.Monad.Catch.bracket instead" #-}
bracket_ :: Process a -> Process b -> Process c -> Process c
bracket_ :: forall a b c. Process a -> Process b -> Process c -> Process c
bracket_ = Process a -> Process b -> Process c -> Process c
forall (m :: * -> *) a c b.
(HasCallStack, MonadMask m) =>
m a -> m c -> m b -> m b
Catch.bracket_
{-# DEPRECATED bracket_ "Use Control.Monad.Catch.bracket_ instead" #-}
finally :: Process a -> Process b -> Process a
finally :: forall a b. Process a -> Process b -> Process a
finally = Process a -> Process b -> Process a
forall (m :: * -> *) a b.
(HasCallStack, MonadMask m) =>
m a -> m b -> m a
Catch.finally
{-# DEPRECATED finally "Use Control.Monad.Catch.finally instead" #-}
data Handler a = forall e . Exception e => Handler (e -> Process a)
instance Functor Handler where
fmap :: forall a b. (a -> b) -> Handler a -> Handler b
fmap a -> b
f (Handler e -> Process a
h) = (e -> Process b) -> Handler b
forall a e. Exception e => (e -> Process a) -> Handler a
Handler ((a -> b) -> Process a -> Process b
forall a b. (a -> b) -> Process a -> Process b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> b
f (Process a -> Process b) -> (e -> Process a) -> e -> Process b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. e -> Process a
h)
catches :: Process a -> [Handler a] -> Process a
catches :: forall a. Process a -> [Handler a] -> Process a
catches Process a
proc [Handler a]
handlers = Process a
proc Process a -> (SomeException -> Process a) -> Process a
forall e a.
(HasCallStack, Exception e) =>
Process a -> (e -> Process a) -> Process a
forall (m :: * -> *) e a.
(MonadCatch m, HasCallStack, Exception e) =>
m a -> (e -> m a) -> m a
`Catch.catch` [Handler a] -> SomeException -> Process a
forall a. [Handler a] -> SomeException -> Process a
catchesHandler [Handler a]
handlers
catchesHandler :: [Handler a] -> SomeException -> Process a
catchesHandler :: forall a. [Handler a] -> SomeException -> Process a
catchesHandler [Handler a]
handlers SomeException
e = (Handler a -> Process a -> Process a)
-> Process a -> [Handler a] -> Process a
forall a b. (a -> b -> b) -> b -> [a] -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr Handler a -> Process a -> Process a
forall {a}. Handler a -> Process a -> Process a
tryHandler (SomeException -> Process a
forall e a. (HasCallStack, Exception e) => e -> Process a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM SomeException
e) [Handler a]
handlers
where tryHandler :: Handler a -> Process a -> Process a
tryHandler (Handler e -> Process a
handler) Process a
res
= case SomeException -> Maybe e
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e of
Just e
e' -> e -> Process a
handler e
e'
Maybe e
Nothing -> Process a
res
expectTimeout :: forall a. Serializable a
=> Int
-> Process (Maybe a)
expectTimeout :: forall a. Serializable a => Int -> Process (Maybe a)
expectTimeout Int
n = Int -> [Match a] -> Process (Maybe a)
forall b. Int -> [Match b] -> Process (Maybe b)
receiveTimeout Int
n [(a -> Process a) -> Match a
forall a b. Serializable a => (a -> Process b) -> Match b
match a -> Process a
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return]
spawnAsync :: NodeId -> Closure (Process ()) -> Process SpawnRef
spawnAsync :: NodeId -> Closure (Process ()) -> Process SpawnRef
spawnAsync NodeId
nid Closure (Process ())
proc = do
SpawnRef
spawnRef <- Process SpawnRef
getSpawnRef
NodeId
node <- Process NodeId
getSelfNode
if NodeId
nid NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== NodeId
node
then Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (ProcessSignal -> Process ()) -> ProcessSignal -> Process ()
forall a b. (a -> b) -> a -> b
$ Closure (Process ()) -> SpawnRef -> ProcessSignal
Spawn Closure (Process ())
proc SpawnRef
spawnRef
else Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg (NodeId -> Maybe NodeId
forall a. a -> Maybe a
Just NodeId
nid) (ProcessSignal -> Process ()) -> ProcessSignal -> Process ()
forall a b. (a -> b) -> a -> b
$ Closure (Process ()) -> SpawnRef -> ProcessSignal
Spawn Closure (Process ())
proc SpawnRef
spawnRef
SpawnRef -> Process SpawnRef
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return SpawnRef
spawnRef
monitorNode :: NodeId -> Process MonitorRef
monitorNode :: NodeId -> Process MonitorRef
monitorNode =
Identifier -> Process MonitorRef
monitor' (Identifier -> Process MonitorRef)
-> (NodeId -> Identifier) -> NodeId -> Process MonitorRef
forall b c a. (b -> c) -> (a -> b) -> a -> c
. NodeId -> Identifier
NodeIdentifier
monitorPort :: forall a. Serializable a => SendPort a -> Process MonitorRef
monitorPort :: forall a. Serializable a => SendPort a -> Process MonitorRef
monitorPort (SendPort SendPortId
cid) =
Identifier -> Process MonitorRef
monitor' (SendPortId -> Identifier
SendPortIdentifier SendPortId
cid)
unmonitorAsync :: MonitorRef -> Process ()
unmonitorAsync :: MonitorRef -> Process ()
unmonitorAsync =
Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (ProcessSignal -> Process ())
-> (MonitorRef -> ProcessSignal) -> MonitorRef -> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MonitorRef -> ProcessSignal
Unmonitor
linkNode :: NodeId -> Process ()
linkNode :: NodeId -> Process ()
linkNode = Identifier -> Process ()
link' (Identifier -> Process ())
-> (NodeId -> Identifier) -> NodeId -> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. NodeId -> Identifier
NodeIdentifier
linkPort :: SendPort a -> Process ()
linkPort :: forall a. SendPort a -> Process ()
linkPort (SendPort SendPortId
cid) =
Identifier -> Process ()
link' (SendPortId -> Identifier
SendPortIdentifier SendPortId
cid)
unlinkAsync :: ProcessId -> Process ()
unlinkAsync :: ProcessId -> Process ()
unlinkAsync =
Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (ProcessSignal -> Process ())
-> (ProcessId -> ProcessSignal) -> ProcessId -> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Identifier -> ProcessSignal
Unlink (Identifier -> ProcessSignal)
-> (ProcessId -> Identifier) -> ProcessId -> ProcessSignal
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProcessId -> Identifier
ProcessIdentifier
unlinkNodeAsync :: NodeId -> Process ()
unlinkNodeAsync :: NodeId -> Process ()
unlinkNodeAsync =
Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (ProcessSignal -> Process ())
-> (NodeId -> ProcessSignal) -> NodeId -> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Identifier -> ProcessSignal
Unlink (Identifier -> ProcessSignal)
-> (NodeId -> Identifier) -> NodeId -> ProcessSignal
forall b c a. (b -> c) -> (a -> b) -> a -> c
. NodeId -> Identifier
NodeIdentifier
unlinkPortAsync :: SendPort a -> Process ()
unlinkPortAsync :: forall a. SendPort a -> Process ()
unlinkPortAsync (SendPort SendPortId
cid) =
Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (ProcessSignal -> Process ())
-> (Identifier -> ProcessSignal) -> Identifier -> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Identifier -> ProcessSignal
Unlink (Identifier -> Process ()) -> Identifier -> Process ()
forall a b. (a -> b) -> a -> b
$ SendPortId -> Identifier
SendPortIdentifier SendPortId
cid
data SayMessage = SayMessage { SayMessage -> UTCTime
sayTime :: UTCTime
, SayMessage -> ProcessId
sayProcess :: ProcessId
, SayMessage -> String
sayMessage :: String }
deriving (Typeable)
instance Show SayMessage where
showsPrec :: Int -> SayMessage -> String -> String
showsPrec Int
p SayMessage
msg =
Bool -> (String -> String) -> String -> String
showParen (Int
p Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
11)
((String -> String) -> String -> String)
-> (String -> String) -> String -> String
forall a b. (a -> b) -> a -> b
$ String -> String -> String
showString String
"SayMessage "
(String -> String) -> (String -> String) -> String -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> String -> String
showString (TimeLocale -> String -> UTCTime -> String
forall t. FormatTime t => TimeLocale -> String -> t -> String
formatTime TimeLocale
defaultTimeLocale String
"%c" (SayMessage -> UTCTime
sayTime SayMessage
msg))
(String -> String) -> (String -> String) -> String -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Char -> String -> String
showChar Char
' '
(String -> String) -> (String -> String) -> String -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> ProcessId -> String -> String
forall a. Show a => Int -> a -> String -> String
showsPrec Int
11 (SayMessage -> ProcessId
sayProcess SayMessage
msg) (String -> String) -> (String -> String) -> String -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Char -> String -> String
showChar Char
' '
(String -> String) -> (String -> String) -> String -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> String -> String -> String
forall a. Show a => Int -> a -> String -> String
showsPrec Int
11 (SayMessage -> String
sayMessage SayMessage
msg) (String -> String) -> (String -> String) -> String -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Char -> String -> String
showChar Char
' '
instance Binary SayMessage where
put :: SayMessage -> Put
put SayMessage
s = do
UTCTime -> Put
putUTCTime (SayMessage -> UTCTime
sayTime SayMessage
s)
ProcessId -> Put
forall t. Binary t => t -> Put
put (SayMessage -> ProcessId
sayProcess SayMessage
s)
String -> Put
forall t. Binary t => t -> Put
put (SayMessage -> String
sayMessage SayMessage
s)
get :: Get SayMessage
get = UTCTime -> ProcessId -> String -> SayMessage
SayMessage (UTCTime -> ProcessId -> String -> SayMessage)
-> Get UTCTime -> Get (ProcessId -> String -> SayMessage)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Get UTCTime
getUTCTime Get (ProcessId -> String -> SayMessage)
-> Get ProcessId -> Get (String -> SayMessage)
forall a b. Get (a -> b) -> Get a -> Get b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Get ProcessId
forall t. Binary t => Get t
get Get (String -> SayMessage) -> Get String -> Get SayMessage
forall a b. Get (a -> b) -> Get a -> Get b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Get String
forall t. Binary t => Get t
get
putUTCTime :: UTCTime -> Put
putUTCTime :: UTCTime -> Put
putUTCTime (UTCTime (ModifiedJulianDay Integer
day) DiffTime
tod) = do
Integer -> Put
forall t. Binary t => t -> Put
put Integer
day
Rational -> Put
forall t. Binary t => t -> Put
put (DiffTime -> Rational
forall a. Real a => a -> Rational
toRational DiffTime
tod)
getUTCTime :: Get UTCTime
getUTCTime :: Get UTCTime
getUTCTime = do
Integer
day <- Get Integer
forall t. Binary t => Get t
get
Rational
tod <- Get Rational
forall t. Binary t => Get t
get
UTCTime -> Get UTCTime
forall a. a -> Get a
forall (m :: * -> *) a. Monad m => a -> m a
return (UTCTime -> Get UTCTime) -> UTCTime -> Get UTCTime
forall a b. (a -> b) -> a -> b
$! Day -> DiffTime -> UTCTime
UTCTime (Integer -> Day
ModifiedJulianDay Integer
day)
(Rational -> DiffTime
forall a. Fractional a => Rational -> a
fromRational Rational
tod)
say :: String -> Process ()
say :: String -> Process ()
say String
string = do
UTCTime
now <- IO UTCTime -> Process UTCTime
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
ProcessId
us <- Process ProcessId
getSelfPid
String -> SayMessage -> Process ()
forall a. Serializable a => String -> a -> Process ()
nsend String
"logger" (UTCTime -> ProcessId -> String -> SayMessage
SayMessage UTCTime
now ProcessId
us String
string)
register :: String -> ProcessId -> Process ()
register :: String -> ProcessId -> Process ()
register = Bool -> String -> ProcessId -> Process ()
registerImpl Bool
False
reregister :: String -> ProcessId -> Process ()
reregister :: String -> ProcessId -> Process ()
reregister = Bool -> String -> ProcessId -> Process ()
registerImpl Bool
True
registerImpl :: Bool -> String -> ProcessId -> Process ()
registerImpl :: Bool -> String -> ProcessId -> Process ()
registerImpl Bool
force String
label ProcessId
pid = do
NodeId
mynid <- Process NodeId
getSelfNode
Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (String -> NodeId -> Maybe ProcessId -> Bool -> ProcessSignal
Register String
label NodeId
mynid (ProcessId -> Maybe ProcessId
forall a. a -> Maybe a
Just ProcessId
pid) Bool
force)
[Match ()] -> Process ()
forall b. [Match b] -> Process b
receiveWait
[ (RegisterReply -> Bool)
-> (RegisterReply -> Process ()) -> Match ()
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (\(RegisterReply String
label' Bool
_ Maybe ProcessId
_) -> String
label String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== String
label')
(\(RegisterReply String
_ Bool
ok Maybe ProcessId
owner) -> String -> Bool -> Maybe ProcessId -> Process ()
handleRegistrationReply String
label Bool
ok Maybe ProcessId
owner)
]
registerRemoteAsync :: NodeId -> String -> ProcessId -> Process ()
registerRemoteAsync :: NodeId -> String -> ProcessId -> Process ()
registerRemoteAsync NodeId
nid String
label ProcessId
pid = do
NodeId
here <- Process NodeId
getSelfNode
Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg (if NodeId
nid NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== NodeId
here then Maybe NodeId
forall a. Maybe a
Nothing else NodeId -> Maybe NodeId
forall a. a -> Maybe a
Just NodeId
nid)
(String -> NodeId -> Maybe ProcessId -> Bool -> ProcessSignal
Register String
label NodeId
nid (ProcessId -> Maybe ProcessId
forall a. a -> Maybe a
Just ProcessId
pid) Bool
False)
reregisterRemoteAsync :: NodeId -> String -> ProcessId -> Process ()
reregisterRemoteAsync :: NodeId -> String -> ProcessId -> Process ()
reregisterRemoteAsync NodeId
nid String
label ProcessId
pid =
Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg (NodeId -> Maybe NodeId
forall a. a -> Maybe a
Just NodeId
nid) (String -> NodeId -> Maybe ProcessId -> Bool -> ProcessSignal
Register String
label NodeId
nid (ProcessId -> Maybe ProcessId
forall a. a -> Maybe a
Just ProcessId
pid) Bool
True)
unregister :: String -> Process ()
unregister :: String -> Process ()
unregister String
label = do
NodeId
mynid <- Process NodeId
getSelfNode
Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (String -> NodeId -> Maybe ProcessId -> Bool -> ProcessSignal
Register String
label NodeId
mynid Maybe ProcessId
forall a. Maybe a
Nothing Bool
False)
[Match ()] -> Process ()
forall b. [Match b] -> Process b
receiveWait
[ (RegisterReply -> Bool)
-> (RegisterReply -> Process ()) -> Match ()
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (\(RegisterReply String
label' Bool
_ Maybe ProcessId
_) -> String
label String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== String
label')
(\(RegisterReply String
_ Bool
ok Maybe ProcessId
owner) -> String -> Bool -> Maybe ProcessId -> Process ()
handleRegistrationReply String
label Bool
ok Maybe ProcessId
owner)
]
handleRegistrationReply :: String -> Bool -> Maybe ProcessId -> Process ()
handleRegistrationReply :: String -> Bool -> Maybe ProcessId -> Process ()
handleRegistrationReply String
label Bool
ok Maybe ProcessId
owner =
Bool -> Process () -> Process ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
ok) (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$
ProcessRegistrationException -> Process ()
forall e a. (HasCallStack, Exception e) => e -> Process a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM (ProcessRegistrationException -> Process ())
-> ProcessRegistrationException -> Process ()
forall a b. (a -> b) -> a -> b
$ String -> Maybe ProcessId -> ProcessRegistrationException
ProcessRegistrationException String
label Maybe ProcessId
owner
unregisterRemoteAsync :: NodeId -> String -> Process ()
unregisterRemoteAsync :: NodeId -> String -> Process ()
unregisterRemoteAsync NodeId
nid String
label =
Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg (NodeId -> Maybe NodeId
forall a. a -> Maybe a
Just NodeId
nid) (String -> NodeId -> Maybe ProcessId -> Bool -> ProcessSignal
Register String
label NodeId
nid Maybe ProcessId
forall a. Maybe a
Nothing Bool
False)
whereis :: String -> Process (Maybe ProcessId)
whereis :: String -> Process (Maybe ProcessId)
whereis String
label = do
Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (String -> ProcessSignal
WhereIs String
label)
[Match (Maybe ProcessId)] -> Process (Maybe ProcessId)
forall b. [Match b] -> Process b
receiveWait [ (WhereIsReply -> Bool)
-> (WhereIsReply -> Process (Maybe ProcessId))
-> Match (Maybe ProcessId)
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (\(WhereIsReply String
label' Maybe ProcessId
_) -> String
label String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== String
label')
(\(WhereIsReply String
_ Maybe ProcessId
mPid) -> Maybe ProcessId -> Process (Maybe ProcessId)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe ProcessId
mPid)
]
whereisRemoteAsync :: NodeId -> String -> Process ()
whereisRemoteAsync :: NodeId -> String -> Process ()
whereisRemoteAsync NodeId
nid String
label = do
NodeId
here <- Process NodeId
getSelfNode
Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg (if NodeId
nid NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== NodeId
here then Maybe NodeId
forall a. Maybe a
Nothing else NodeId -> Maybe NodeId
forall a. a -> Maybe a
Just NodeId
nid) (String -> ProcessSignal
WhereIs String
label)
nsend :: Serializable a => String -> a -> Process ()
nsend :: forall a. Serializable a => String -> a -> Process ()
nsend String
label a
msg = do
LocalProcess
proc <- Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
let msg' :: Message
msg' = a -> Message
forall a. Serializable a => a -> Message
createUnencodedMessage a
msg
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ MxEventBus -> MxEvent -> IO ()
traceEvent (LocalNode -> MxEventBus
localEventBus (LocalProcess -> LocalNode
processNode LocalProcess
proc))
(String -> ProcessId -> Message -> MxEvent
MxSentToName String
label (LocalProcess -> ProcessId
processId LocalProcess
proc) Message
msg')
Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (String -> Message -> ProcessSignal
NamedSend String
label Message
msg')
unsafeNSend :: Serializable a => String -> a -> Process ()
unsafeNSend :: forall a. Serializable a => String -> a -> Process ()
unsafeNSend = String -> a -> Process ()
forall a. Serializable a => String -> a -> Process ()
Unsafe.nsend
nsendRemote :: Serializable a => NodeId -> String -> a -> Process ()
nsendRemote :: forall a. Serializable a => NodeId -> String -> a -> Process ()
nsendRemote NodeId
nid String
label a
msg = do
LocalProcess
proc <- Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
let us :: ProcessId
us = LocalProcess -> ProcessId
processId LocalProcess
proc
let node :: LocalNode
node = LocalProcess -> LocalNode
processNode LocalProcess
proc
if LocalNode -> NodeId
localNodeId LocalNode
node NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== NodeId
nid
then String -> a -> Process ()
forall a. Serializable a => String -> a -> Process ()
nsend String
label a
msg
else let lbl :: String
lbl = String
label String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
"@" String -> String -> String
forall a. [a] -> [a] -> [a]
++ NodeId -> String
forall a. Show a => a -> String
show NodeId
nid in do
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ MxEventBus -> MxEvent -> IO ()
traceEvent (LocalNode -> MxEventBus
localEventBus LocalNode
node)
(String -> ProcessId -> Message -> MxEvent
MxSentToName String
lbl ProcessId
us (a -> Message
forall a. Serializable a => a -> Message
wrapMessage a
msg))
Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg (NodeId -> Maybe NodeId
forall a. a -> Maybe a
Just NodeId
nid) (String -> Message -> ProcessSignal
NamedSend String
label (a -> Message
forall a. Serializable a => a -> Message
createMessage a
msg))
unsafeNSendRemote :: Serializable a => NodeId -> String -> a -> Process ()
unsafeNSendRemote :: forall a. Serializable a => NodeId -> String -> a -> Process ()
unsafeNSendRemote = NodeId -> String -> a -> Process ()
forall a. Serializable a => NodeId -> String -> a -> Process ()
Unsafe.nsendRemote
unStatic :: Typeable a => Static a -> Process a
unStatic :: forall a. Typeable a => Static a -> Process a
unStatic Static a
static = do
RemoteTable
rtable <- LocalNode -> RemoteTable
remoteTable (LocalNode -> RemoteTable)
-> (LocalProcess -> LocalNode) -> LocalProcess -> RemoteTable
forall b c a. (b -> c) -> (a -> b) -> a -> c
. LocalProcess -> LocalNode
processNode (LocalProcess -> RemoteTable)
-> Process LocalProcess -> Process RemoteTable
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
case RemoteTable -> Static a -> Either String a
forall a. Typeable a => RemoteTable -> Static a -> Either String a
Static.unstatic RemoteTable
rtable Static a
static of
Left String
err -> String -> Process a
forall a. String -> Process a
forall (m :: * -> *) a. MonadFail m => String -> m a
fail (String -> Process a) -> String -> Process a
forall a b. (a -> b) -> a -> b
$ String
"Could not resolve static value: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
err
Right a
x -> a -> Process a
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x
unClosure :: Typeable a => Closure a -> Process a
unClosure :: forall a. Typeable a => Closure a -> Process a
unClosure Closure a
closure = do
RemoteTable
rtable <- LocalNode -> RemoteTable
remoteTable (LocalNode -> RemoteTable)
-> (LocalProcess -> LocalNode) -> LocalProcess -> RemoteTable
forall b c a. (b -> c) -> (a -> b) -> a -> c
. LocalProcess -> LocalNode
processNode (LocalProcess -> RemoteTable)
-> Process LocalProcess -> Process RemoteTable
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
case RemoteTable -> Closure a -> Either String a
forall a. Typeable a => RemoteTable -> Closure a -> Either String a
Static.unclosure RemoteTable
rtable Closure a
closure of
Left String
err -> String -> Process a
forall a. String -> Process a
forall (m :: * -> *) a. MonadFail m => String -> m a
fail (String -> Process a) -> String -> Process a
forall a b. (a -> b) -> a -> b
$ String
"Could not resolve closure: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
err
Right a
x -> a -> Process a
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x
reconnect :: ProcessId -> Process ()
reconnect :: ProcessId -> Process ()
reconnect ProcessId
them = do
ProcessId
us <- Process ProcessId
getSelfPid
LocalNode
node <- LocalProcess -> LocalNode
processNode (LocalProcess -> LocalNode)
-> Process LocalProcess -> Process LocalNode
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ LocalNode -> Identifier -> Identifier -> IO ()
disconnect LocalNode
node (ProcessId -> Identifier
ProcessIdentifier ProcessId
us) (ProcessId -> Identifier
ProcessIdentifier ProcessId
them)
reconnectPort :: SendPort a -> Process ()
reconnectPort :: forall a. SendPort a -> Process ()
reconnectPort SendPort a
them = do
ProcessId
us <- Process ProcessId
getSelfPid
LocalNode
node <- LocalProcess -> LocalNode
processNode (LocalProcess -> LocalNode)
-> Process LocalProcess -> Process LocalNode
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ LocalNode -> Identifier -> Identifier -> IO ()
disconnect LocalNode
node (ProcessId -> Identifier
ProcessIdentifier ProcessId
us) (SendPortId -> Identifier
SendPortIdentifier (SendPort a -> SendPortId
forall a. SendPort a -> SendPortId
sendPortId SendPort a
them))
sendLocal :: (Serializable a) => ProcessId -> a -> Process ()
sendLocal :: forall a. Serializable a => ProcessId -> a -> Process ()
sendLocal ProcessId
to a
msg =
Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (ProcessSignal -> Process ()) -> ProcessSignal -> Process ()
forall a b. (a -> b) -> a -> b
$ ProcessId -> Message -> ProcessSignal
LocalSend ProcessId
to (a -> Message
forall a. Serializable a => a -> Message
createUnencodedMessage a
msg)
sendChanLocal :: (Serializable a) => SendPortId -> a -> Process ()
sendChanLocal :: forall a. Serializable a => SendPortId -> a -> Process ()
sendChanLocal SendPortId
spId a
msg =
Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (ProcessSignal -> Process ()) -> ProcessSignal -> Process ()
forall a b. (a -> b) -> a -> b
$ SendPortId -> Message -> ProcessSignal
LocalPortSend SendPortId
spId (a -> Message
forall a. Serializable a => a -> Message
createUnencodedMessage a
msg)
getMonitorRefFor :: Identifier -> Process MonitorRef
getMonitorRefFor :: Identifier -> Process MonitorRef
getMonitorRefFor Identifier
ident = do
LocalProcess
proc <- Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
IO MonitorRef -> Process MonitorRef
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO MonitorRef -> Process MonitorRef)
-> IO MonitorRef -> Process MonitorRef
forall a b. (a -> b) -> a -> b
$ StrictMVar LocalProcessState
-> (LocalProcessState -> IO (LocalProcessState, MonitorRef))
-> IO MonitorRef
forall a b. StrictMVar a -> (a -> IO (a, b)) -> IO b
modifyMVar (LocalProcess -> StrictMVar LocalProcessState
processState LocalProcess
proc) ((LocalProcessState -> IO (LocalProcessState, MonitorRef))
-> IO MonitorRef)
-> (LocalProcessState -> IO (LocalProcessState, MonitorRef))
-> IO MonitorRef
forall a b. (a -> b) -> a -> b
$ \LocalProcessState
st -> do
let counter :: Int32
counter = LocalProcessState
st LocalProcessState -> T LocalProcessState Int32 -> Int32
forall r a. r -> T r a -> a
^. T LocalProcessState Int32
monitorCounter
(LocalProcessState, MonitorRef)
-> IO (LocalProcessState, MonitorRef)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ( T LocalProcessState Int32
monitorCounter T LocalProcessState Int32
-> (Int32 -> Int32) -> LocalProcessState -> LocalProcessState
forall r a. T r a -> (a -> a) -> r -> r
^: (Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
+ Int32
1) (LocalProcessState -> LocalProcessState)
-> LocalProcessState -> LocalProcessState
forall a b. (a -> b) -> a -> b
$ LocalProcessState
st
, Identifier -> Int32 -> MonitorRef
MonitorRef Identifier
ident Int32
counter
)
getSpawnRef :: Process SpawnRef
getSpawnRef :: Process SpawnRef
getSpawnRef = do
LocalProcess
proc <- Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
IO SpawnRef -> Process SpawnRef
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO SpawnRef -> Process SpawnRef)
-> IO SpawnRef -> Process SpawnRef
forall a b. (a -> b) -> a -> b
$ StrictMVar LocalProcessState
-> (LocalProcessState -> IO (LocalProcessState, SpawnRef))
-> IO SpawnRef
forall a b. StrictMVar a -> (a -> IO (a, b)) -> IO b
modifyMVar (LocalProcess -> StrictMVar LocalProcessState
processState LocalProcess
proc) ((LocalProcessState -> IO (LocalProcessState, SpawnRef))
-> IO SpawnRef)
-> (LocalProcessState -> IO (LocalProcessState, SpawnRef))
-> IO SpawnRef
forall a b. (a -> b) -> a -> b
$ \LocalProcessState
st -> do
let counter :: Int32
counter = LocalProcessState
st LocalProcessState -> T LocalProcessState Int32 -> Int32
forall r a. r -> T r a -> a
^. T LocalProcessState Int32
spawnCounter
(LocalProcessState, SpawnRef) -> IO (LocalProcessState, SpawnRef)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ( T LocalProcessState Int32
spawnCounter T LocalProcessState Int32
-> (Int32 -> Int32) -> LocalProcessState -> LocalProcessState
forall r a. T r a -> (a -> a) -> r -> r
^: (Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
+ Int32
1) (LocalProcessState -> LocalProcessState)
-> LocalProcessState -> LocalProcessState
forall a b. (a -> b) -> a -> b
$ LocalProcessState
st
, Int32 -> SpawnRef
SpawnRef Int32
counter
)
monitor' :: Identifier -> Process MonitorRef
monitor' :: Identifier -> Process MonitorRef
monitor' Identifier
ident = do
MonitorRef
monitorRef <- Identifier -> Process MonitorRef
getMonitorRefFor Identifier
ident
Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (ProcessSignal -> Process ()) -> ProcessSignal -> Process ()
forall a b. (a -> b) -> a -> b
$ MonitorRef -> ProcessSignal
Monitor MonitorRef
monitorRef
MonitorRef -> Process MonitorRef
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return MonitorRef
monitorRef
link' :: Identifier -> Process ()
link' :: Identifier -> Process ()
link' = Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (ProcessSignal -> Process ())
-> (Identifier -> ProcessSignal) -> Identifier -> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Identifier -> ProcessSignal
Link