{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE DeriveGeneric      #-}

-----------------------------------------------------------------------------
-- |
-- Module      :  Control.Distributed.Process.Extras.Monitoring
-- Copyright   :  (c) Tim Watson 2013 - 2017
-- License     :  BSD3 (see the file LICENSE)
--
-- Maintainer  :  Tim Watson <watson.timothy@gmail.com>
-- Stability   :  experimental
-- Portability :  non-portable (requires concurrency)
--
-- This module provides a primitive node monitoring capability, implemented as
-- a /distributed-process Management Agent/. Once the 'nodeMonitor' agent is
-- started, calling 'monitorNodes' will ensure that whenever the local node
-- detects a new network-transport connection (from another cloud haskell node),
-- the caller will receive a 'NodeUp' message in its mailbox. If a node
-- disconnects, a corollary 'NodeDown' message will be delivered as well.
--
-----------------------------------------------------------------------------

module Control.Distributed.Process.Extras.Monitoring
  (
    NodeUp(..)
  , NodeDown(..)
  , nodeMonitorAgentId
  , nodeMonitor
  , monitorNodes
  , unmonitorNodes
  ) where

import Control.DeepSeq (NFData)
import Control.Distributed.Process  -- NB: requires NodeId(..) to be exported!
import Control.Distributed.Process.Management
  ( MxEvent(MxConnected, MxDisconnected)
  , MxAgentId(..)
  , mxAgent
  , mxSink
  , mxReady
  , liftMX
  , mxGetLocal
  , mxSetLocal
  , mxNotify
  )
import Control.Distributed.Process.Extras (deliver)
import Data.Binary
import qualified Data.Foldable as Foldable
import Data.HashSet (HashSet)
import qualified Data.HashSet as Set

import Data.Typeable (Typeable)
import GHC.Generics

data Register = Register !ProcessId
  deriving (Typeable, (forall x. Register -> Rep Register x)
-> (forall x. Rep Register x -> Register) -> Generic Register
forall x. Rep Register x -> Register
forall x. Register -> Rep Register x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. Register -> Rep Register x
from :: forall x. Register -> Rep Register x
$cto :: forall x. Rep Register x -> Register
to :: forall x. Rep Register x -> Register
Generic)
instance Binary Register where
instance NFData Register where

data UnRegister = UnRegister !ProcessId
  deriving (Typeable, (forall x. UnRegister -> Rep UnRegister x)
-> (forall x. Rep UnRegister x -> UnRegister) -> Generic UnRegister
forall x. Rep UnRegister x -> UnRegister
forall x. UnRegister -> Rep UnRegister x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. UnRegister -> Rep UnRegister x
from :: forall x. UnRegister -> Rep UnRegister x
$cto :: forall x. Rep UnRegister x -> UnRegister
to :: forall x. Rep UnRegister x -> UnRegister
Generic)
instance Binary UnRegister where
instance NFData UnRegister where

-- | Sent to subscribing processes when a connection
-- (from a remote node) is detected.
--
data NodeUp = NodeUp !NodeId
  deriving (Typeable, (forall x. NodeUp -> Rep NodeUp x)
-> (forall x. Rep NodeUp x -> NodeUp) -> Generic NodeUp
forall x. Rep NodeUp x -> NodeUp
forall x. NodeUp -> Rep NodeUp x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. NodeUp -> Rep NodeUp x
from :: forall x. NodeUp -> Rep NodeUp x
$cto :: forall x. Rep NodeUp x -> NodeUp
to :: forall x. Rep NodeUp x -> NodeUp
Generic, Int -> NodeUp -> ShowS
[NodeUp] -> ShowS
NodeUp -> String
(Int -> NodeUp -> ShowS)
-> (NodeUp -> String) -> ([NodeUp] -> ShowS) -> Show NodeUp
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> NodeUp -> ShowS
showsPrec :: Int -> NodeUp -> ShowS
$cshow :: NodeUp -> String
show :: NodeUp -> String
$cshowList :: [NodeUp] -> ShowS
showList :: [NodeUp] -> ShowS
Show)
instance Binary NodeUp where
instance NFData NodeUp where

-- | Sent to subscribing processes when a dis-connection
-- (from a remote node) is detected.
--
data NodeDown = NodeDown !NodeId
  deriving (Typeable, (forall x. NodeDown -> Rep NodeDown x)
-> (forall x. Rep NodeDown x -> NodeDown) -> Generic NodeDown
forall x. Rep NodeDown x -> NodeDown
forall x. NodeDown -> Rep NodeDown x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. NodeDown -> Rep NodeDown x
from :: forall x. NodeDown -> Rep NodeDown x
$cto :: forall x. Rep NodeDown x -> NodeDown
to :: forall x. Rep NodeDown x -> NodeDown
Generic, Int -> NodeDown -> ShowS
[NodeDown] -> ShowS
NodeDown -> String
(Int -> NodeDown -> ShowS)
-> (NodeDown -> String) -> ([NodeDown] -> ShowS) -> Show NodeDown
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> NodeDown -> ShowS
showsPrec :: Int -> NodeDown -> ShowS
$cshow :: NodeDown -> String
show :: NodeDown -> String
$cshowList :: [NodeDown] -> ShowS
showList :: [NodeDown] -> ShowS
Show)
instance Binary NodeDown where
instance NFData NodeDown where

-- | The @MxAgentId@ for the node monitoring agent.
nodeMonitorAgentId :: MxAgentId
nodeMonitorAgentId :: MxAgentId
nodeMonitorAgentId = String -> MxAgentId
MxAgentId String
"service.monitoring.nodes"

-- | Start monitoring node connection/disconnection events. When a
-- connection event occurs, the calling process will receive a message
-- @NodeUp NodeId@ in its mailbox. When a disconnect occurs, the
-- corollary @NodeDown NodeId@ message will be delivered instead.
--
-- No guaranatee is made about the timeliness of the delivery, nor can
-- the receiver expect that the node (for which it is being notified)
-- is still up/connected or down/disconnected at the point when it receives
-- a message from the node monitoring agent.
--
monitorNodes :: Process ()
monitorNodes :: Process ()
monitorNodes = do
  ProcessId
us <- Process ProcessId
getSelfPid
  Register -> Process ()
forall a. Serializable a => a -> Process ()
mxNotify (Register -> Process ()) -> Register -> Process ()
forall a b. (a -> b) -> a -> b
$ ProcessId -> Register
Register ProcessId
us

-- | Stop monitoring node connection/disconnection events. This does not
-- flush the caller's mailbox, nor does it guarantee that any/all node
-- up/down notifications will have been delivered before it is evaluated.
--
unmonitorNodes :: Process ()
unmonitorNodes :: Process ()
unmonitorNodes = do
  ProcessId
us <- Process ProcessId
getSelfPid
  UnRegister -> Process ()
forall a. Serializable a => a -> Process ()
mxNotify (UnRegister -> Process ()) -> UnRegister -> Process ()
forall a b. (a -> b) -> a -> b
$ ProcessId -> UnRegister
UnRegister ProcessId
us

-- | Starts the node monitoring agent. No call to @monitorNodes@ and
-- @unmonitorNodes@ will have any effect unless the agent is already
-- running. Note that we make /no guarantees what-so-ever/ about the
-- timeliness or ordering semantics of node monitoring notifications.
--
nodeMonitor :: Process ProcessId
nodeMonitor :: Process ProcessId
nodeMonitor = do
  MxAgentId
-> HashSet ProcessId
-> [MxSink (HashSet ProcessId)]
-> Process ProcessId
forall s. MxAgentId -> s -> [MxSink s] -> Process ProcessId
mxAgent MxAgentId
nodeMonitorAgentId HashSet ProcessId
initState [
        ((Register -> MxAgent (HashSet ProcessId) MxAction)
-> MxSink (HashSet ProcessId)
forall s m. Serializable m => (m -> MxAgent s MxAction) -> MxSink s
mxSink ((Register -> MxAgent (HashSet ProcessId) MxAction)
 -> MxSink (HashSet ProcessId))
-> (Register -> MxAgent (HashSet ProcessId) MxAction)
-> MxSink (HashSet ProcessId)
forall a b. (a -> b) -> a -> b
$ \(Register ProcessId
pid) -> do
            HashSet ProcessId -> MxAgent (HashSet ProcessId) ()
forall s. s -> MxAgent s ()
mxSetLocal (HashSet ProcessId -> MxAgent (HashSet ProcessId) ())
-> (HashSet ProcessId -> HashSet ProcessId)
-> HashSet ProcessId
-> MxAgent (HashSet ProcessId) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProcessId -> HashSet ProcessId -> HashSet ProcessId
forall a. (Eq a, Hashable a) => a -> HashSet a -> HashSet a
Set.insert ProcessId
pid (HashSet ProcessId -> MxAgent (HashSet ProcessId) ())
-> MxAgent (HashSet ProcessId) (HashSet ProcessId)
-> MxAgent (HashSet ProcessId) ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< MxAgent (HashSet ProcessId) (HashSet ProcessId)
forall s. MxAgent s s
mxGetLocal
            MxAgent (HashSet ProcessId) MxAction
forall s. MxAgent s MxAction
mxReady)
      , ((UnRegister -> MxAgent (HashSet ProcessId) MxAction)
-> MxSink (HashSet ProcessId)
forall s m. Serializable m => (m -> MxAgent s MxAction) -> MxSink s
mxSink ((UnRegister -> MxAgent (HashSet ProcessId) MxAction)
 -> MxSink (HashSet ProcessId))
-> (UnRegister -> MxAgent (HashSet ProcessId) MxAction)
-> MxSink (HashSet ProcessId)
forall a b. (a -> b) -> a -> b
$ \(UnRegister ProcessId
pid) -> do
            HashSet ProcessId -> MxAgent (HashSet ProcessId) ()
forall s. s -> MxAgent s ()
mxSetLocal (HashSet ProcessId -> MxAgent (HashSet ProcessId) ())
-> (HashSet ProcessId -> HashSet ProcessId)
-> HashSet ProcessId
-> MxAgent (HashSet ProcessId) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProcessId -> HashSet ProcessId -> HashSet ProcessId
forall a. (Eq a, Hashable a) => a -> HashSet a -> HashSet a
Set.delete ProcessId
pid (HashSet ProcessId -> MxAgent (HashSet ProcessId) ())
-> MxAgent (HashSet ProcessId) (HashSet ProcessId)
-> MxAgent (HashSet ProcessId) ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< MxAgent (HashSet ProcessId) (HashSet ProcessId)
forall s. MxAgent s s
mxGetLocal
            MxAgent (HashSet ProcessId) MxAction
forall s. MxAgent s MxAction
mxReady)
      , ((MxEvent -> MxAgent (HashSet ProcessId) MxAction)
-> MxSink (HashSet ProcessId)
forall s m. Serializable m => (m -> MxAgent s MxAction) -> MxSink s
mxSink ((MxEvent -> MxAgent (HashSet ProcessId) MxAction)
 -> MxSink (HashSet ProcessId))
-> (MxEvent -> MxAgent (HashSet ProcessId) MxAction)
-> MxSink (HashSet ProcessId)
forall a b. (a -> b) -> a -> b
$ \MxEvent
ev -> do
            let act :: MxAgent (HashSet ProcessId) ()
act =
                  case MxEvent
ev of
                    (MxConnected    ConnectionId
_ EndPointAddress
ep) -> NodeUp -> MxAgent (HashSet ProcessId) ()
forall {t :: * -> *} {a} {m}.
(Foldable t, Addressable a, Binary m, Typeable m) =>
m -> MxAgent (t a) ()
notify (NodeUp -> MxAgent (HashSet ProcessId) ())
-> NodeUp -> MxAgent (HashSet ProcessId) ()
forall a b. (a -> b) -> a -> b
$ EndPointAddress -> NodeUp
nodeUp EndPointAddress
ep
                    (MxDisconnected ConnectionId
_ EndPointAddress
ep) -> NodeDown -> MxAgent (HashSet ProcessId) ()
forall {t :: * -> *} {a} {m}.
(Foldable t, Addressable a, Binary m, Typeable m) =>
m -> MxAgent (t a) ()
notify (NodeDown -> MxAgent (HashSet ProcessId) ())
-> NodeDown -> MxAgent (HashSet ProcessId) ()
forall a b. (a -> b) -> a -> b
$ EndPointAddress -> NodeDown
nodeDown EndPointAddress
ep
                    MxEvent
_                     -> () -> MxAgent (HashSet ProcessId) ()
forall a. a -> MxAgent (HashSet ProcessId) a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
            MxAgent (HashSet ProcessId) ()
act MxAgent (HashSet ProcessId) ()
-> MxAgent (HashSet ProcessId) MxAction
-> MxAgent (HashSet ProcessId) MxAction
forall a b.
MxAgent (HashSet ProcessId) a
-> MxAgent (HashSet ProcessId) b -> MxAgent (HashSet ProcessId) b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> MxAgent (HashSet ProcessId) MxAction
forall s. MxAgent s MxAction
mxReady)
    ]
  where
    initState :: HashSet ProcessId
    initState :: HashSet ProcessId
initState = HashSet ProcessId
forall a. HashSet a
Set.empty

    notify :: m -> MxAgent (t a) ()
notify m
msg = (a -> MxAgent (t a) ()) -> t a -> MxAgent (t a) ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
Foldable.mapM_ (Process () -> MxAgent (t a) ()
forall a s. Process a -> MxAgent s a
liftMX (Process () -> MxAgent (t a) ())
-> (a -> Process ()) -> a -> MxAgent (t a) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m -> a -> Process ()
forall a m. (Addressable a, Serializable m) => m -> a -> Process ()
deliver m
msg) (t a -> MxAgent (t a) ())
-> MxAgent (t a) (t a) -> MxAgent (t a) ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< MxAgent (t a) (t a)
forall s. MxAgent s s
mxGetLocal

    nodeUp :: EndPointAddress -> NodeUp
nodeUp = NodeId -> NodeUp
NodeUp (NodeId -> NodeUp)
-> (EndPointAddress -> NodeId) -> EndPointAddress -> NodeUp
forall b c a. (b -> c) -> (a -> b) -> a -> c
. EndPointAddress -> NodeId
NodeId
    nodeDown :: EndPointAddress -> NodeDown
nodeDown = NodeId -> NodeDown
NodeDown (NodeId -> NodeDown)
-> (EndPointAddress -> NodeId) -> EndPointAddress -> NodeDown
forall b c a. (b -> c) -> (a -> b) -> a -> c
. EndPointAddress -> NodeId
NodeId