{-# LANGUAGE ScopedTypeVariables, DeriveDataTypeable #-}

-----------------------------------------------------------------------------
-- |
-- Module      :  Control.Distributed.Process.Extras.Call
-- Copyright   :  (c) Parallel Scientific (Jeff Epstein) 2012
-- License     :  BSD3 (see the file LICENSE)
--
-- Maintainers :  Jeff Epstein, Tim Watson
-- Stability   :  experimental
-- Portability :  non-portable (requires concurrency)
--
-- This module provides a facility for Remote Procedure Call (rpc) style
-- interactions with Cloud Haskell processes.
--
-- Clients make synchronous calls to a running process (i.e., server) using the
-- 'callAt', 'callTimeout' and 'multicall' functions. Processes acting as the
-- server are constructed using Cloud Haskell's 'receive' family of primitives
-- and the 'callResponse' family of functions in this module.
-----------------------------------------------------------------------------

module Control.Distributed.Process.Extras.Call
  ( -- client API
    callAt
  , callTimeout
  , multicall
    -- server API
  , callResponse
  , callResponseIf
  , callResponseDefer
  , callResponseDeferIf
  , callForward
  , callResponseAsync
  ) where

import Control.Distributed.Process
import Control.Distributed.Process.Serializable (Serializable)
import Control.Monad (forM, forM_, join)
import Data.List (delete)
import qualified Data.Map as M
import Data.Maybe (listToMaybe)
import Data.Binary (Binary,get,put)
import Data.Typeable (Typeable)

import Control.Distributed.Process.Extras hiding (monitor, send)
import Control.Distributed.Process.Extras.Time

----------------------------------------------
-- * Multicall
----------------------------------------------

-- | Sends a message of type a to the given process, to be handled by a
-- corresponding callResponse... function, which will send back a message of
-- type b. The tag is per-process unique identifier of the transaction. If the
-- timeout expires or the target process dies, Nothing will be returned.
callTimeout :: (Serializable a, Serializable b)
            => ProcessId -> a -> Tag -> Timeout -> Process (Maybe b)
callTimeout :: forall a b.
(Serializable a, Serializable b) =>
ProcessId -> a -> Tag -> Timeout -> Process (Maybe b)
callTimeout ProcessId
pid a
msg Tag
tag Timeout
time =
  do [Maybe b]
res <- [ProcessId] -> a -> Tag -> Timeout -> Process [Maybe b]
forall a b.
(Serializable a, Serializable b) =>
[ProcessId] -> a -> Tag -> Timeout -> Process [Maybe b]
multicall [ProcessId
pid] a
msg Tag
tag Timeout
time
     Maybe b -> Process (Maybe b)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe b -> Process (Maybe b)) -> Maybe b -> Process (Maybe b)
forall a b. (a -> b) -> a -> b
$ Maybe (Maybe b) -> Maybe b
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join ([Maybe b] -> Maybe (Maybe b)
forall a. [a] -> Maybe a
listToMaybe [Maybe b]
res)

-- | Like 'callTimeout', but with no timeout.
-- Returns Nothing if the target process dies.
callAt :: (Serializable a, Serializable b)
       => ProcessId -> a -> Tag -> Process (Maybe b)
callAt :: forall a b.
(Serializable a, Serializable b) =>
ProcessId -> a -> Tag -> Process (Maybe b)
callAt ProcessId
pid a
msg Tag
tag = ProcessId -> a -> Tag -> Timeout -> Process (Maybe b)
forall a b.
(Serializable a, Serializable b) =>
ProcessId -> a -> Tag -> Timeout -> Process (Maybe b)
callTimeout ProcessId
pid a
msg Tag
tag Timeout
infiniteWait

-- | Like 'callTimeout', but sends the message to multiple
-- recipients and collects the results.
multicall :: forall a b.(Serializable a, Serializable b)
             => [ProcessId] -> a -> Tag -> Timeout -> Process [Maybe b]
multicall :: forall a b.
(Serializable a, Serializable b) =>
[ProcessId] -> a -> Tag -> Timeout -> Process [Maybe b]
multicall [ProcessId]
nodes a
msg Tag
tag Timeout
time =
  do ProcessId
caller <- Process ProcessId
getSelfPid
     ProcessId
receiver <- Process () -> Process ProcessId
spawnLocal (Process () -> Process ProcessId)
-> Process () -> Process ProcessId
forall a b. (a -> b) -> a -> b
$
         do ProcessId
receiver_pid <- Process ProcessId
getSelfPid
            MonitorRef
mon_caller <- ProcessId -> Process MonitorRef
monitor ProcessId
caller
            () <- Process ()
forall a. Serializable a => Process a
expect
            [MonitorRef]
monitortags <- [ProcessId]
-> (ProcessId -> Process MonitorRef) -> Process [MonitorRef]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [ProcessId]
nodes ProcessId -> Process MonitorRef
monitor
            [ProcessId] -> (ProcessId -> Process ()) -> Process ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [ProcessId]
nodes ((ProcessId -> Process ()) -> Process ())
-> (ProcessId -> Process ()) -> Process ()
forall a b. (a -> b) -> a -> b
$ \ProcessId
node -> ProcessId
-> (Multicall, ProcessId, ProcessId, Tag, a) -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
send ProcessId
node (Multicall
Multicall, ProcessId
node,
                                              ProcessId
receiver_pid, Tag
tag, a
msg)
            Timeout -> Tag -> ProcessId -> Process ()
maybeTimeout Timeout
time Tag
tag ProcessId
receiver_pid
            [Maybe b]
results <- [ProcessId] -> [MonitorRef] -> MonitorRef -> Process [Maybe b]
recv [ProcessId]
nodes [MonitorRef]
monitortags MonitorRef
mon_caller
            ProcessId -> (MulticallResponse, Tag, [Maybe b]) -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
send ProcessId
caller (MulticallResponse
MulticallResponse,Tag
tag,[Maybe b]
results)
     MonitorRef
mon_receiver <- ProcessId -> Process MonitorRef
monitor ProcessId
receiver
     ProcessId -> () -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
send ProcessId
receiver ()
     [Match [Maybe b]] -> Process [Maybe b]
forall b. [Match b] -> Process b
receiveWait [
         ((MulticallResponse, Tag, [Maybe b]) -> Bool)
-> ((MulticallResponse, Tag, [Maybe b]) -> Process [Maybe b])
-> Match [Maybe b]
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (\(MulticallResponse
MulticallResponse,Tag
mtag,[Maybe b]
_) -> Tag
mtag Tag -> Tag -> Bool
forall a. Eq a => a -> a -> Bool
== Tag
tag)
                 (\(MulticallResponse
MulticallResponse,Tag
_,[Maybe b]
val) -> [Maybe b] -> Process [Maybe b]
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return [Maybe b]
val),
         (ProcessMonitorNotification -> Bool)
-> (ProcessMonitorNotification -> Process [Maybe b])
-> Match [Maybe b]
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (\(ProcessMonitorNotification MonitorRef
ref ProcessId
_pid DiedReason
reason)
                  -> MonitorRef
ref MonitorRef -> MonitorRef -> Bool
forall a. Eq a => a -> a -> Bool
== MonitorRef
mon_receiver Bool -> Bool -> Bool
&& DiedReason
reason DiedReason -> DiedReason -> Bool
forall a. Eq a => a -> a -> Bool
/= DiedReason
DiedNormal)
                 (\ProcessMonitorNotification
_ -> [Char] -> Process [Maybe b]
forall a. HasCallStack => [Char] -> a
error [Char]
"multicall: unexpected termination of worker")
       ]
  where
    recv :: [ProcessId] -> [MonitorRef] -> MonitorRef -> Process [Maybe b]
recv [ProcessId]
nodes' [MonitorRef]
monitortags MonitorRef
mon_caller = do
      Map ProcessId b
resultmap <- MonitorRef
-> ([ProcessId], [MonitorRef], Map ProcessId b)
-> Process (Map ProcessId b)
recv1 MonitorRef
mon_caller
                         ([ProcessId]
nodes', [MonitorRef]
monitortags, Map ProcessId b
forall k a. Map k a
M.empty) :: Process (M.Map ProcessId b)
      [Maybe b] -> Process [Maybe b]
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ([Maybe b] -> Process [Maybe b]) -> [Maybe b] -> Process [Maybe b]
forall a b. (a -> b) -> a -> b
$ [ProcessId] -> Map ProcessId b -> [Maybe b]
forall {k} {a}. Ord k => [k] -> Map k a -> [Maybe a]
ordered [ProcessId]
nodes' Map ProcessId b
resultmap

    ordered :: [k] -> Map k a -> [Maybe a]
ordered []     Map k a
_ = []
    ordered (k
x:[k]
xs) Map k a
m = k -> Map k a -> Maybe a
forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup k
x Map k a
m Maybe a -> [Maybe a] -> [Maybe a]
forall a. a -> [a] -> [a]
: [k] -> Map k a -> [Maybe a]
ordered [k]
xs Map k a
m

    recv1 :: MonitorRef
-> ([ProcessId], [MonitorRef], Map ProcessId b)
-> Process (Map ProcessId b)
recv1 MonitorRef
_   ([],[MonitorRef]
_,Map ProcessId b
results) = Map ProcessId b -> Process (Map ProcessId b)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return Map ProcessId b
results
    recv1 MonitorRef
_   ([ProcessId]
_,[],Map ProcessId b
results) = Map ProcessId b -> Process (Map ProcessId b)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return Map ProcessId b
results
    recv1 MonitorRef
ref ([ProcessId]
nodesleft,[MonitorRef]
monitortagsleft,Map ProcessId b
results) =
          [Match (Maybe ([ProcessId], [MonitorRef], Map ProcessId b))]
-> Process (Maybe ([ProcessId], [MonitorRef], Map ProcessId b))
forall b. [Match b] -> Process b
receiveWait [
              (ProcessMonitorNotification -> Bool)
-> (ProcessMonitorNotification
    -> Process (Maybe ([ProcessId], [MonitorRef], Map ProcessId b)))
-> Match (Maybe ([ProcessId], [MonitorRef], Map ProcessId b))
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (\(ProcessMonitorNotification MonitorRef
ref' ProcessId
_ DiedReason
_)
                         -> MonitorRef
ref' MonitorRef -> MonitorRef -> Bool
forall a. Eq a => a -> a -> Bool
== MonitorRef
ref)
                      (\ProcessMonitorNotification
_ -> Maybe ([ProcessId], [MonitorRef], Map ProcessId b)
-> Process (Maybe ([ProcessId], [MonitorRef], Map ProcessId b))
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe ([ProcessId], [MonitorRef], Map ProcessId b)
forall a. Maybe a
Nothing)
            , (ProcessMonitorNotification -> Bool)
-> (ProcessMonitorNotification
    -> Process (Maybe ([ProcessId], [MonitorRef], Map ProcessId b)))
-> Match (Maybe ([ProcessId], [MonitorRef], Map ProcessId b))
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (\(ProcessMonitorNotification MonitorRef
ref' ProcessId
pid DiedReason
reason) ->
                      MonitorRef
ref' MonitorRef -> [MonitorRef] -> Bool
forall a. Eq a => a -> [a] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [MonitorRef]
monitortagsleft Bool -> Bool -> Bool
&&
                      ProcessId
pid ProcessId -> [ProcessId] -> Bool
forall a. Eq a => a -> [a] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [ProcessId]
nodesleft
                      Bool -> Bool -> Bool
&& DiedReason
reason DiedReason -> DiedReason -> Bool
forall a. Eq a => a -> a -> Bool
/= DiedReason
DiedNormal)
                      (\(ProcessMonitorNotification MonitorRef
ref' ProcessId
pid DiedReason
_reason) ->
                        Maybe ([ProcessId], [MonitorRef], Map ProcessId b)
-> Process (Maybe ([ProcessId], [MonitorRef], Map ProcessId b))
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe ([ProcessId], [MonitorRef], Map ProcessId b)
 -> Process (Maybe ([ProcessId], [MonitorRef], Map ProcessId b)))
-> Maybe ([ProcessId], [MonitorRef], Map ProcessId b)
-> Process (Maybe ([ProcessId], [MonitorRef], Map ProcessId b))
forall a b. (a -> b) -> a -> b
$ ([ProcessId], [MonitorRef], Map ProcessId b)
-> Maybe ([ProcessId], [MonitorRef], Map ProcessId b)
forall a. a -> Maybe a
Just (ProcessId -> [ProcessId] -> [ProcessId]
forall a. Eq a => a -> [a] -> [a]
delete ProcessId
pid [ProcessId]
nodesleft,
                                       MonitorRef -> [MonitorRef] -> [MonitorRef]
forall a. Eq a => a -> [a] -> [a]
delete MonitorRef
ref' [MonitorRef]
monitortagsleft, Map ProcessId b
results))
            , ((MulticallResponse, Tag, ProcessId, b) -> Bool)
-> ((MulticallResponse, Tag, ProcessId, b)
    -> Process (Maybe ([ProcessId], [MonitorRef], Map ProcessId b)))
-> Match (Maybe ([ProcessId], [MonitorRef], Map ProcessId b))
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (\(MulticallResponse
MulticallResponse, Tag
mtag, ProcessId
_, b
_) -> Tag
mtag Tag -> Tag -> Bool
forall a. Eq a => a -> a -> Bool
== Tag
tag)
                      (\(MulticallResponse
MulticallResponse, Tag
_, ProcessId
responder, b
msgx) ->
                        Maybe ([ProcessId], [MonitorRef], Map ProcessId b)
-> Process (Maybe ([ProcessId], [MonitorRef], Map ProcessId b))
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe ([ProcessId], [MonitorRef], Map ProcessId b)
 -> Process (Maybe ([ProcessId], [MonitorRef], Map ProcessId b)))
-> Maybe ([ProcessId], [MonitorRef], Map ProcessId b)
-> Process (Maybe ([ProcessId], [MonitorRef], Map ProcessId b))
forall a b. (a -> b) -> a -> b
$ ([ProcessId], [MonitorRef], Map ProcessId b)
-> Maybe ([ProcessId], [MonitorRef], Map ProcessId b)
forall a. a -> Maybe a
Just (ProcessId -> [ProcessId] -> [ProcessId]
forall a. Eq a => a -> [a] -> [a]
delete ProcessId
responder [ProcessId]
nodesleft,
                                       [MonitorRef]
monitortagsleft,
                                       ProcessId -> b -> Map ProcessId b -> Map ProcessId b
forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert ProcessId
responder (b
msgx :: b) Map ProcessId b
results))
            , (TimeoutNotification -> Bool)
-> (TimeoutNotification
    -> Process (Maybe ([ProcessId], [MonitorRef], Map ProcessId b)))
-> Match (Maybe ([ProcessId], [MonitorRef], Map ProcessId b))
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (\(TimeoutNotification Tag
mtag) -> Tag
mtag Tag -> Tag -> Bool
forall a. Eq a => a -> a -> Bool
== Tag
tag )
                      (\TimeoutNotification
_ -> Maybe ([ProcessId], [MonitorRef], Map ProcessId b)
-> Process (Maybe ([ProcessId], [MonitorRef], Map ProcessId b))
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe ([ProcessId], [MonitorRef], Map ProcessId b)
forall a. Maybe a
Nothing)
          ]
          Process (Maybe ([ProcessId], [MonitorRef], Map ProcessId b))
-> (Maybe ([ProcessId], [MonitorRef], Map ProcessId b)
    -> Process (Map ProcessId b))
-> Process (Map ProcessId b)
forall a b. Process a -> (a -> Process b) -> Process b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Process (Map ProcessId b)
-> (([ProcessId], [MonitorRef], Map ProcessId b)
    -> Process (Map ProcessId b))
-> Maybe ([ProcessId], [MonitorRef], Map ProcessId b)
-> Process (Map ProcessId b)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (Map ProcessId b -> Process (Map ProcessId b)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return Map ProcessId b
results) (MonitorRef
-> ([ProcessId], [MonitorRef], Map ProcessId b)
-> Process (Map ProcessId b)
recv1 MonitorRef
ref)

data MulticallResponseType a =
         MulticallAccept
       | MulticallForward ProcessId a
       | MulticallReject deriving MulticallResponseType a -> MulticallResponseType a -> Bool
(MulticallResponseType a -> MulticallResponseType a -> Bool)
-> (MulticallResponseType a -> MulticallResponseType a -> Bool)
-> Eq (MulticallResponseType a)
forall a.
Eq a =>
MulticallResponseType a -> MulticallResponseType a -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: forall a.
Eq a =>
MulticallResponseType a -> MulticallResponseType a -> Bool
== :: MulticallResponseType a -> MulticallResponseType a -> Bool
$c/= :: forall a.
Eq a =>
MulticallResponseType a -> MulticallResponseType a -> Bool
/= :: MulticallResponseType a -> MulticallResponseType a -> Bool
Eq

callResponseImpl :: (Serializable a,Serializable b)
                 => (a -> MulticallResponseType c) ->
                    (a -> (b -> Process())-> Process c) -> Match c
callResponseImpl :: forall a b c.
(Serializable a, Serializable b) =>
(a -> MulticallResponseType c)
-> (a -> (b -> Process ()) -> Process c) -> Match c
callResponseImpl a -> MulticallResponseType c
cond a -> (b -> Process ()) -> Process c
proc =
  ((Multicall, ProcessId, ProcessId, Tag, a) -> Bool)
-> ((Multicall, ProcessId, ProcessId, Tag, a) -> Process c)
-> Match c
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (\(Multicall
Multicall,ProcessId
_responder,ProcessId
_,Tag
_,a
msg) ->
            case a -> MulticallResponseType c
cond a
msg of
              MulticallResponseType c
MulticallReject -> Bool
False
              MulticallResponseType c
_               -> Bool
True)
          (\wholemsg :: (Multicall, ProcessId, ProcessId, Tag, a)
wholemsg@(Multicall
Multicall,ProcessId
responder,ProcessId
sender,Tag
tag,a
msg) ->
            case a -> MulticallResponseType c
cond a
msg of
              -- TODO: sender should get a ProcessMonitorNotification if
              -- our target dies, or we should link to it (?)
              MulticallForward ProcessId
target c
ret -> ProcessId
-> (Multicall, ProcessId, ProcessId, Tag, a) -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
send ProcessId
target (Multicall, ProcessId, ProcessId, Tag, a)
wholemsg Process () -> Process c -> Process c
forall a b. Process a -> Process b -> Process b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> c -> Process c
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return c
ret
              -- TODO: use `die Reason` when issue #110 is resolved
              MulticallResponseType c
MulticallReject -> [Char] -> Process c
forall a. HasCallStack => [Char] -> a
error [Char]
"multicallResponseImpl: Indecisive condition"
              MulticallResponseType c
MulticallAccept ->
                let resultSender :: d -> Process ()
resultSender d
tosend =
                      ProcessId -> (MulticallResponse, Tag, ProcessId, d) -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
send ProcessId
sender (MulticallResponse
MulticallResponse,
                                   Tag
tag::Tag,
                                   ProcessId
responder::ProcessId,
                                   d
tosend)
                in a -> (b -> Process ()) -> Process c
proc a
msg b -> Process ()
forall {d}. (Binary d, Typeable d) => d -> Process ()
resultSender)

-- | Produces a Match that can be used with the 'receiveWait' family of
-- message-receiving functions. @callResponse@ will respond to a message of
-- type a sent by 'callTimeout', and will respond with a value of type b.
callResponse :: (Serializable a,Serializable b)
                => (a -> Process (b,c)) -> Match c
callResponse :: forall a b c.
(Serializable a, Serializable b) =>
(a -> Process (b, c)) -> Match c
callResponse = (a -> Bool) -> (a -> Process (b, c)) -> Match c
forall a b c.
(Serializable a, Serializable b) =>
(a -> Bool) -> (a -> Process (b, c)) -> Match c
callResponseIf (Bool -> a -> Bool
forall a b. a -> b -> a
const Bool
True)

callResponseDeferIf  :: (Serializable a,Serializable b)
                     => (a -> Bool)
                     -> (a -> (b -> Process()) -> Process c)
                     -> Match c
callResponseDeferIf :: forall a b c.
(Serializable a, Serializable b) =>
(a -> Bool) -> (a -> (b -> Process ()) -> Process c) -> Match c
callResponseDeferIf a -> Bool
cond =
  (a -> MulticallResponseType c)
-> (a -> (b -> Process ()) -> Process c) -> Match c
forall a b c.
(Serializable a, Serializable b) =>
(a -> MulticallResponseType c)
-> (a -> (b -> Process ()) -> Process c) -> Match c
callResponseImpl (\a
msg ->
                     if a -> Bool
cond a
msg
                     then MulticallResponseType c
forall a. MulticallResponseType a
MulticallAccept
                     else MulticallResponseType c
forall a. MulticallResponseType a
MulticallReject)

callResponseDefer  :: (Serializable a,Serializable b)
                   => (a -> (b -> Process())-> Process c) -> Match c
callResponseDefer :: forall a b c.
(Serializable a, Serializable b) =>
(a -> (b -> Process ()) -> Process c) -> Match c
callResponseDefer = (a -> Bool) -> (a -> (b -> Process ()) -> Process c) -> Match c
forall a b c.
(Serializable a, Serializable b) =>
(a -> Bool) -> (a -> (b -> Process ()) -> Process c) -> Match c
callResponseDeferIf (Bool -> a -> Bool
forall a b. a -> b -> a
const Bool
True)

-- | Produces a Match that can be used with the 'receiveWait' family of
-- message-receiving functions. When calllForward receives a message of type
-- from from 'callTimeout' (and similar), it will forward the message to another
-- process, who will be responsible for responding to it. It is the user's
-- responsibility to ensure that the forwarding process is linked to the
-- destination process, so that if it fails, the sender will be notified.
callForward :: Serializable a => (a -> (ProcessId, c)) -> Match c
callForward :: forall a c. Serializable a => (a -> (ProcessId, c)) -> Match c
callForward a -> (ProcessId, c)
proc =
   (a -> MulticallResponseType c)
-> (a -> (() -> Process ()) -> Process c) -> Match c
forall a b c.
(Serializable a, Serializable b) =>
(a -> MulticallResponseType c)
-> (a -> (b -> Process ()) -> Process c) -> Match c
callResponseImpl
     (\a
msg -> let (ProcessId
pid, c
ret) = a -> (ProcessId, c)
proc a
msg
              in ProcessId -> c -> MulticallResponseType c
forall a. ProcessId -> a -> MulticallResponseType a
MulticallForward ProcessId
pid c
ret )
     (\a
_ () -> Process ()
sender ->
       (() -> Process ()
sender::(() -> Process ())) (() -> Process ()) -> Process c -> Process c
forall a b. a -> b -> b
`mention`
          [Char] -> Process c
forall a. HasCallStack => [Char] -> a
error [Char]
"multicallForward: Indecisive condition")

-- | The message handling code is started in a separate thread. It's not
-- automatically linked to the calling thread, so if you want it to be
-- terminated when the message handling thread dies, you'll need to call
-- link yourself.
callResponseAsync :: (Serializable a,Serializable b)
                  => (a -> Maybe c) -> (a -> Process b) -> Match c
callResponseAsync :: forall a b c.
(Serializable a, Serializable b) =>
(a -> Maybe c) -> (a -> Process b) -> Match c
callResponseAsync a -> Maybe c
cond a -> Process b
proc =
   (a -> MulticallResponseType c)
-> (a -> (b -> Process ()) -> Process c) -> Match c
forall a b c.
(Serializable a, Serializable b) =>
(a -> MulticallResponseType c)
-> (a -> (b -> Process ()) -> Process c) -> Match c
callResponseImpl
         (\a
msg ->
            case a -> Maybe c
cond a
msg of
              Maybe c
Nothing -> MulticallResponseType c
forall a. MulticallResponseType a
MulticallReject
              Just c
_ -> MulticallResponseType c
forall a. MulticallResponseType a
MulticallAccept)
         (\a
msg b -> Process ()
sender ->
            do ProcessId
_ <- Process () -> Process ProcessId
spawnLocal (Process () -> Process ProcessId)
-> Process () -> Process ProcessId
forall a b. (a -> b) -> a -> b
$ -- TODO linkOnFailure to spawned procss
                 do b
val <- a -> Process b
proc a
msg
                    b -> Process ()
sender b
val
               case a -> Maybe c
cond a
msg of
                 Maybe c
Nothing -> [Char] -> Process c
forall a. HasCallStack => [Char] -> a
error [Char]
"multicallResponseAsync: Indecisive condition"
                 Just c
ret -> c -> Process c
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return c
ret )

callResponseIf :: (Serializable a,Serializable b)
               => (a -> Bool) -> (a -> Process (b,c)) -> Match c
callResponseIf :: forall a b c.
(Serializable a, Serializable b) =>
(a -> Bool) -> (a -> Process (b, c)) -> Match c
callResponseIf a -> Bool
cond a -> Process (b, c)
proc =
    (a -> MulticallResponseType c)
-> (a -> (b -> Process ()) -> Process c) -> Match c
forall a b c.
(Serializable a, Serializable b) =>
(a -> MulticallResponseType c)
-> (a -> (b -> Process ()) -> Process c) -> Match c
callResponseImpl
             (\a
msg ->
                 case a -> Bool
cond a
msg of
                   Bool
True -> MulticallResponseType c
forall a. MulticallResponseType a
MulticallAccept
                   Bool
False -> MulticallResponseType c
forall a. MulticallResponseType a
MulticallReject)
             (\a
msg b -> Process ()
sender ->
                 do (b
tosend,c
toreturn) <- a -> Process (b, c)
proc a
msg
                    b -> Process ()
sender b
tosend
                    c -> Process c
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return c
toreturn)

maybeTimeout :: Timeout -> Tag -> ProcessId -> Process ()
maybeTimeout :: Timeout -> Tag -> ProcessId -> Process ()
maybeTimeout Timeout
Nothing Tag
_ ProcessId
_ = () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
maybeTimeout (Just Tag
time) Tag
tag ProcessId
p = Tag -> Tag -> ProcessId -> Process ()
timeout Tag
time Tag
tag ProcessId
p

----------------------------------------------
-- * Private types
----------------------------------------------

mention :: a -> b -> b
mention :: forall a b. a -> b -> b
mention a
_a b
b = b
b

data Multicall = Multicall
       deriving (Typeable)
instance Binary Multicall where
       get :: Get Multicall
get = Multicall -> Get Multicall
forall a. a -> Get a
forall (m :: * -> *) a. Monad m => a -> m a
return Multicall
Multicall
       put :: Multicall -> Put
put Multicall
_ = () -> Put
forall a. a -> PutM a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
data MulticallResponse = MulticallResponse
       deriving (Typeable)
instance Binary MulticallResponse where
       get :: Get MulticallResponse
get = MulticallResponse -> Get MulticallResponse
forall a. a -> Get a
forall (m :: * -> *) a. Monad m => a -> m a
return MulticallResponse
MulticallResponse
       put :: MulticallResponse -> Put
put MulticallResponse
_ = () -> Put
forall a. a -> PutM a
forall (m :: * -> *) a. Monad m => a -> m a
return ()