{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE DeriveGeneric #-}
module Control.Distributed.Process.Extras.Monitoring
(
NodeUp(..)
, NodeDown(..)
, nodeMonitorAgentId
, nodeMonitor
, monitorNodes
, unmonitorNodes
) where
import Control.DeepSeq (NFData)
import Control.Distributed.Process
import Control.Distributed.Process.Management
( MxEvent(MxConnected, MxDisconnected)
, MxAgentId(..)
, mxAgent
, mxSink
, mxReady
, liftMX
, mxGetLocal
, mxSetLocal
, mxNotify
)
import Control.Distributed.Process.Extras (deliver)
import Data.Binary
import qualified Data.Foldable as Foldable
import Data.HashSet (HashSet)
import qualified Data.HashSet as Set
import Data.Typeable (Typeable)
import GHC.Generics
data Register = Register !ProcessId
deriving (Typeable, (forall x. Register -> Rep Register x)
-> (forall x. Rep Register x -> Register) -> Generic Register
forall x. Rep Register x -> Register
forall x. Register -> Rep Register x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. Register -> Rep Register x
from :: forall x. Register -> Rep Register x
$cto :: forall x. Rep Register x -> Register
to :: forall x. Rep Register x -> Register
Generic)
instance Binary Register where
instance NFData Register where
data UnRegister = UnRegister !ProcessId
deriving (Typeable, (forall x. UnRegister -> Rep UnRegister x)
-> (forall x. Rep UnRegister x -> UnRegister) -> Generic UnRegister
forall x. Rep UnRegister x -> UnRegister
forall x. UnRegister -> Rep UnRegister x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. UnRegister -> Rep UnRegister x
from :: forall x. UnRegister -> Rep UnRegister x
$cto :: forall x. Rep UnRegister x -> UnRegister
to :: forall x. Rep UnRegister x -> UnRegister
Generic)
instance Binary UnRegister where
instance NFData UnRegister where
data NodeUp = NodeUp !NodeId
deriving (Typeable, (forall x. NodeUp -> Rep NodeUp x)
-> (forall x. Rep NodeUp x -> NodeUp) -> Generic NodeUp
forall x. Rep NodeUp x -> NodeUp
forall x. NodeUp -> Rep NodeUp x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. NodeUp -> Rep NodeUp x
from :: forall x. NodeUp -> Rep NodeUp x
$cto :: forall x. Rep NodeUp x -> NodeUp
to :: forall x. Rep NodeUp x -> NodeUp
Generic, Int -> NodeUp -> ShowS
[NodeUp] -> ShowS
NodeUp -> String
(Int -> NodeUp -> ShowS)
-> (NodeUp -> String) -> ([NodeUp] -> ShowS) -> Show NodeUp
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> NodeUp -> ShowS
showsPrec :: Int -> NodeUp -> ShowS
$cshow :: NodeUp -> String
show :: NodeUp -> String
$cshowList :: [NodeUp] -> ShowS
showList :: [NodeUp] -> ShowS
Show)
instance Binary NodeUp where
instance NFData NodeUp where
data NodeDown = NodeDown !NodeId
deriving (Typeable, (forall x. NodeDown -> Rep NodeDown x)
-> (forall x. Rep NodeDown x -> NodeDown) -> Generic NodeDown
forall x. Rep NodeDown x -> NodeDown
forall x. NodeDown -> Rep NodeDown x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. NodeDown -> Rep NodeDown x
from :: forall x. NodeDown -> Rep NodeDown x
$cto :: forall x. Rep NodeDown x -> NodeDown
to :: forall x. Rep NodeDown x -> NodeDown
Generic, Int -> NodeDown -> ShowS
[NodeDown] -> ShowS
NodeDown -> String
(Int -> NodeDown -> ShowS)
-> (NodeDown -> String) -> ([NodeDown] -> ShowS) -> Show NodeDown
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> NodeDown -> ShowS
showsPrec :: Int -> NodeDown -> ShowS
$cshow :: NodeDown -> String
show :: NodeDown -> String
$cshowList :: [NodeDown] -> ShowS
showList :: [NodeDown] -> ShowS
Show)
instance Binary NodeDown where
instance NFData NodeDown where
nodeMonitorAgentId :: MxAgentId
nodeMonitorAgentId :: MxAgentId
nodeMonitorAgentId = String -> MxAgentId
MxAgentId String
"service.monitoring.nodes"
monitorNodes :: Process ()
monitorNodes :: Process ()
monitorNodes = do
ProcessId
us <- Process ProcessId
getSelfPid
Register -> Process ()
forall a. Serializable a => a -> Process ()
mxNotify (Register -> Process ()) -> Register -> Process ()
forall a b. (a -> b) -> a -> b
$ ProcessId -> Register
Register ProcessId
us
unmonitorNodes :: Process ()
unmonitorNodes :: Process ()
unmonitorNodes = do
ProcessId
us <- Process ProcessId
getSelfPid
UnRegister -> Process ()
forall a. Serializable a => a -> Process ()
mxNotify (UnRegister -> Process ()) -> UnRegister -> Process ()
forall a b. (a -> b) -> a -> b
$ ProcessId -> UnRegister
UnRegister ProcessId
us
nodeMonitor :: Process ProcessId
nodeMonitor :: Process ProcessId
nodeMonitor = do
MxAgentId
-> HashSet ProcessId
-> [MxSink (HashSet ProcessId)]
-> Process ProcessId
forall s. MxAgentId -> s -> [MxSink s] -> Process ProcessId
mxAgent MxAgentId
nodeMonitorAgentId HashSet ProcessId
initState [
((Register -> MxAgent (HashSet ProcessId) MxAction)
-> MxSink (HashSet ProcessId)
forall s m. Serializable m => (m -> MxAgent s MxAction) -> MxSink s
mxSink ((Register -> MxAgent (HashSet ProcessId) MxAction)
-> MxSink (HashSet ProcessId))
-> (Register -> MxAgent (HashSet ProcessId) MxAction)
-> MxSink (HashSet ProcessId)
forall a b. (a -> b) -> a -> b
$ \(Register ProcessId
pid) -> do
HashSet ProcessId -> MxAgent (HashSet ProcessId) ()
forall s. s -> MxAgent s ()
mxSetLocal (HashSet ProcessId -> MxAgent (HashSet ProcessId) ())
-> (HashSet ProcessId -> HashSet ProcessId)
-> HashSet ProcessId
-> MxAgent (HashSet ProcessId) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProcessId -> HashSet ProcessId -> HashSet ProcessId
forall a. (Eq a, Hashable a) => a -> HashSet a -> HashSet a
Set.insert ProcessId
pid (HashSet ProcessId -> MxAgent (HashSet ProcessId) ())
-> MxAgent (HashSet ProcessId) (HashSet ProcessId)
-> MxAgent (HashSet ProcessId) ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< MxAgent (HashSet ProcessId) (HashSet ProcessId)
forall s. MxAgent s s
mxGetLocal
MxAgent (HashSet ProcessId) MxAction
forall s. MxAgent s MxAction
mxReady)
, ((UnRegister -> MxAgent (HashSet ProcessId) MxAction)
-> MxSink (HashSet ProcessId)
forall s m. Serializable m => (m -> MxAgent s MxAction) -> MxSink s
mxSink ((UnRegister -> MxAgent (HashSet ProcessId) MxAction)
-> MxSink (HashSet ProcessId))
-> (UnRegister -> MxAgent (HashSet ProcessId) MxAction)
-> MxSink (HashSet ProcessId)
forall a b. (a -> b) -> a -> b
$ \(UnRegister ProcessId
pid) -> do
HashSet ProcessId -> MxAgent (HashSet ProcessId) ()
forall s. s -> MxAgent s ()
mxSetLocal (HashSet ProcessId -> MxAgent (HashSet ProcessId) ())
-> (HashSet ProcessId -> HashSet ProcessId)
-> HashSet ProcessId
-> MxAgent (HashSet ProcessId) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProcessId -> HashSet ProcessId -> HashSet ProcessId
forall a. (Eq a, Hashable a) => a -> HashSet a -> HashSet a
Set.delete ProcessId
pid (HashSet ProcessId -> MxAgent (HashSet ProcessId) ())
-> MxAgent (HashSet ProcessId) (HashSet ProcessId)
-> MxAgent (HashSet ProcessId) ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< MxAgent (HashSet ProcessId) (HashSet ProcessId)
forall s. MxAgent s s
mxGetLocal
MxAgent (HashSet ProcessId) MxAction
forall s. MxAgent s MxAction
mxReady)
, ((MxEvent -> MxAgent (HashSet ProcessId) MxAction)
-> MxSink (HashSet ProcessId)
forall s m. Serializable m => (m -> MxAgent s MxAction) -> MxSink s
mxSink ((MxEvent -> MxAgent (HashSet ProcessId) MxAction)
-> MxSink (HashSet ProcessId))
-> (MxEvent -> MxAgent (HashSet ProcessId) MxAction)
-> MxSink (HashSet ProcessId)
forall a b. (a -> b) -> a -> b
$ \MxEvent
ev -> do
let act :: MxAgent (HashSet ProcessId) ()
act =
case MxEvent
ev of
(MxConnected ConnectionId
_ EndPointAddress
ep) -> NodeUp -> MxAgent (HashSet ProcessId) ()
forall {t :: * -> *} {a} {m}.
(Foldable t, Addressable a, Binary m, Typeable m) =>
m -> MxAgent (t a) ()
notify (NodeUp -> MxAgent (HashSet ProcessId) ())
-> NodeUp -> MxAgent (HashSet ProcessId) ()
forall a b. (a -> b) -> a -> b
$ EndPointAddress -> NodeUp
nodeUp EndPointAddress
ep
(MxDisconnected ConnectionId
_ EndPointAddress
ep) -> NodeDown -> MxAgent (HashSet ProcessId) ()
forall {t :: * -> *} {a} {m}.
(Foldable t, Addressable a, Binary m, Typeable m) =>
m -> MxAgent (t a) ()
notify (NodeDown -> MxAgent (HashSet ProcessId) ())
-> NodeDown -> MxAgent (HashSet ProcessId) ()
forall a b. (a -> b) -> a -> b
$ EndPointAddress -> NodeDown
nodeDown EndPointAddress
ep
MxEvent
_ -> () -> MxAgent (HashSet ProcessId) ()
forall a. a -> MxAgent (HashSet ProcessId) a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
MxAgent (HashSet ProcessId) ()
act MxAgent (HashSet ProcessId) ()
-> MxAgent (HashSet ProcessId) MxAction
-> MxAgent (HashSet ProcessId) MxAction
forall a b.
MxAgent (HashSet ProcessId) a
-> MxAgent (HashSet ProcessId) b -> MxAgent (HashSet ProcessId) b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> MxAgent (HashSet ProcessId) MxAction
forall s. MxAgent s MxAction
mxReady)
]
where
initState :: HashSet ProcessId
initState :: HashSet ProcessId
initState = HashSet ProcessId
forall a. HashSet a
Set.empty
notify :: m -> MxAgent (t a) ()
notify m
msg = (a -> MxAgent (t a) ()) -> t a -> MxAgent (t a) ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
Foldable.mapM_ (Process () -> MxAgent (t a) ()
forall a s. Process a -> MxAgent s a
liftMX (Process () -> MxAgent (t a) ())
-> (a -> Process ()) -> a -> MxAgent (t a) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m -> a -> Process ()
forall a m. (Addressable a, Serializable m) => m -> a -> Process ()
deliver m
msg) (t a -> MxAgent (t a) ())
-> MxAgent (t a) (t a) -> MxAgent (t a) ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< MxAgent (t a) (t a)
forall s. MxAgent s s
mxGetLocal
nodeUp :: EndPointAddress -> NodeUp
nodeUp = NodeId -> NodeUp
NodeUp (NodeId -> NodeUp)
-> (EndPointAddress -> NodeId) -> EndPointAddress -> NodeUp
forall b c a. (b -> c) -> (a -> b) -> a -> c
. EndPointAddress -> NodeId
NodeId
nodeDown :: EndPointAddress -> NodeDown
nodeDown = NodeId -> NodeDown
NodeDown (NodeId -> NodeDown)
-> (EndPointAddress -> NodeId) -> EndPointAddress -> NodeDown
forall b c a. (b -> c) -> (a -> b) -> a -> c
. EndPointAddress -> NodeId
NodeId