{-# LANGUAGE OverloadedStrings, DoAndIfThenElse, FlexibleContexts, CPP #-}
module IHaskell.IPython.ZeroMQ (
ZeroMQInterface(..),
ZeroMQStdin(..),
serveProfile,
serveStdin,
ZeroMQEphemeralPorts,
withEphemeralPorts,
) where
import Control.Concurrent
import Control.Exception
import Control.Monad
import qualified Crypto.Hash as Hash
import Crypto.Hash.Algorithms (SHA256)
import qualified Crypto.MAC.HMAC as HMAC
import Data.Aeson
import qualified Data.ByteArray.Encoding as Encoding
import Data.ByteString (ByteString)
import qualified Data.ByteString.Char8 as Char
import qualified Data.ByteString.Lazy as LBS
import Data.Char
import Data.Monoid ((<>))
import qualified Data.Text.Encoding as Text
import System.ZMQ4 as ZMQ4
import Text.Read (readMaybe)
import Text.Parsec (runParserT, manyTill, anyToken, (<|>), eof, tokenPrim, incSourceColumn)
import IHaskell.IPython.Message.Parser
import IHaskell.IPython.Types
data ZeroMQInterface =
Channels
{
ZeroMQInterface -> Chan (Chan Message, Message)
shellRequestChannel :: Chan (Chan Message, Message)
, ZeroMQInterface -> Chan Message
shellReplyChannel :: Chan Message
, ZeroMQInterface -> Chan (Chan Message, Message)
controlRequestChannel :: Chan (Chan Message, Message)
, ZeroMQInterface -> Chan Message
controlReplyChannel :: Chan Message
, ZeroMQInterface -> Chan Message
iopubChannel :: Chan Message
, ZeroMQInterface -> ByteString
hmacKey :: ByteString
}
data ZeroMQStdin =
StdinChannel
{ ZeroMQStdin -> Chan Message
stdinRequestChannel :: Chan Message
, ZeroMQStdin -> Chan Message
stdinReplyChannel :: Chan Message
}
newZeroMQInterface :: ByteString -> IO ZeroMQInterface
newZeroMQInterface :: ByteString -> IO ZeroMQInterface
newZeroMQInterface ByteString
key = do
Chan (Chan Message, Message)
shellReqChan <- IO (Chan (Chan Message, Message))
forall a. IO (Chan a)
newChan
Chan Message
shellRepChan <- IO (Chan Message)
forall a. IO (Chan a)
newChan
Chan (Chan Message, Message)
controlReqChan <- Chan (Chan Message, Message) -> IO (Chan (Chan Message, Message))
forall a. Chan a -> IO (Chan a)
dupChan Chan (Chan Message, Message)
shellReqChan
Chan Message
controlRepChan <- IO (Chan Message)
forall a. IO (Chan a)
newChan
Chan Message
iopubChan <- IO (Chan Message)
forall a. IO (Chan a)
newChan
ZeroMQInterface -> IO ZeroMQInterface
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (ZeroMQInterface -> IO ZeroMQInterface)
-> ZeroMQInterface -> IO ZeroMQInterface
forall a b. (a -> b) -> a -> b
$! Channels
{ shellRequestChannel :: Chan (Chan Message, Message)
shellRequestChannel = Chan (Chan Message, Message)
shellReqChan
, shellReplyChannel :: Chan Message
shellReplyChannel = Chan Message
shellRepChan
, controlRequestChannel :: Chan (Chan Message, Message)
controlRequestChannel = Chan (Chan Message, Message)
controlReqChan
, controlReplyChannel :: Chan Message
controlReplyChannel = Chan Message
controlRepChan
, iopubChannel :: Chan Message
iopubChannel = Chan Message
iopubChan
, hmacKey :: ByteString
hmacKey = ByteString
key
}
serveProfile :: Profile
-> Bool
-> IO ZeroMQInterface
serveProfile :: Profile -> Bool -> IO ZeroMQInterface
serveProfile Profile
profile Bool
debug = do
ZeroMQInterface
channels <- ByteString -> IO ZeroMQInterface
newZeroMQInterface (Profile -> ByteString
signatureKey Profile
profile)
ThreadId
_ <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ (Context -> IO ()) -> IO ()
forall a. (Context -> IO a) -> IO a
withContext ((Context -> IO ()) -> IO ()) -> (Context -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Context
ctxt -> do
ThreadId
_ <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ Context -> Rep -> IP -> Int -> (Socket Rep -> IO ()) -> IO ()
forall a b.
SocketType a =>
Context -> a -> IP -> Int -> (Socket a -> IO b) -> IO ()
serveSocket Context
ctxt Rep
Rep (Profile -> IP
ip Profile
profile) (Profile -> Int
hbPort Profile
profile) ((Socket Rep -> IO ()) -> IO ()) -> (Socket Rep -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ ZeroMQInterface -> Socket Rep -> IO ()
heartbeat ZeroMQInterface
channels
ThreadId
_ <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ Context -> Router -> IP -> Int -> (Socket Router -> IO ()) -> IO ()
forall a b.
SocketType a =>
Context -> a -> IP -> Int -> (Socket a -> IO b) -> IO ()
serveSocket Context
ctxt Router
Router (Profile -> IP
ip Profile
profile) (Profile -> Int
controlPort Profile
profile) ((Socket Router -> IO ()) -> IO ())
-> (Socket Router -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ Bool -> ZeroMQInterface -> Socket Router -> IO ()
control Bool
debug ZeroMQInterface
channels
ThreadId
_ <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ Context -> Router -> IP -> Int -> (Socket Router -> IO ()) -> IO ()
forall a b.
SocketType a =>
Context -> a -> IP -> Int -> (Socket a -> IO b) -> IO ()
serveSocket Context
ctxt Router
Router (Profile -> IP
ip Profile
profile) (Profile -> Int
shellPort Profile
profile) ((Socket Router -> IO ()) -> IO ())
-> (Socket Router -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ Bool -> ZeroMQInterface -> Socket Router -> IO ()
shell Bool
debug ZeroMQInterface
channels
Context -> Pub -> IP -> Int -> (Socket Pub -> IO ()) -> IO ()
forall a b.
SocketType a =>
Context -> a -> IP -> Int -> (Socket a -> IO b) -> IO ()
serveSocket Context
ctxt Pub
Pub (Profile -> IP
ip Profile
profile) (Profile -> Int
iopubPort Profile
profile) ((Socket Pub -> IO ()) -> IO ()) -> (Socket Pub -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ Bool -> ZeroMQInterface -> Socket Pub -> IO ()
iopub Bool
debug ZeroMQInterface
channels
ZeroMQInterface -> IO ZeroMQInterface
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ZeroMQInterface
channels
data ZeroMQEphemeralPorts =
ZeroMQEphemeralPorts
{ ZeroMQEphemeralPorts -> Int
ephHbPort :: !Port
, ZeroMQEphemeralPorts -> Int
ephControlPort :: !Port
, ZeroMQEphemeralPorts -> Int
ephShellPort :: !Port
, ZeroMQEphemeralPorts -> Int
ephIOPubPort :: !Port
, ZeroMQEphemeralPorts -> ByteString
ephSignatureKey :: !ByteString
}
instance ToJSON ZeroMQEphemeralPorts where
toJSON :: ZeroMQEphemeralPorts -> Value
toJSON ZeroMQEphemeralPorts
ports =
[Pair] -> Value
object
[ Key
"ip" Key -> IP -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= (IP
"127.0.0.1" :: String)
, Key
"transport" Key -> Transport -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= Transport
TCP
, Key
"control_port" Key -> Int -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= ZeroMQEphemeralPorts -> Int
ephControlPort ZeroMQEphemeralPorts
ports
, Key
"hb_port" Key -> Int -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= ZeroMQEphemeralPorts -> Int
ephHbPort ZeroMQEphemeralPorts
ports
, Key
"shell_port" Key -> Int -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= ZeroMQEphemeralPorts -> Int
ephShellPort ZeroMQEphemeralPorts
ports
, Key
"iopub_port" Key -> Int -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= ZeroMQEphemeralPorts -> Int
ephIOPubPort ZeroMQEphemeralPorts
ports
, Key
"key" Key -> Text -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= ByteString -> Text
Text.decodeUtf8 (ZeroMQEphemeralPorts -> ByteString
ephSignatureKey ZeroMQEphemeralPorts
ports)
]
parsePort :: String -> Maybe Int
parsePort :: IP -> Maybe Int
parsePort IP
s = IP -> Maybe Int
forall a. Read a => IP -> Maybe a
readMaybe IP
num
where
num :: IP
num = IP -> IP
forall a. [a] -> [a]
reverse ((Char -> Bool) -> IP -> IP
forall a. (a -> Bool) -> [a] -> [a]
takeWhile Char -> Bool
isNumber (IP -> IP
forall a. [a] -> [a]
reverse IP
s))
bindLocalEphemeralPort :: Socket a -> IO Int
bindLocalEphemeralPort :: forall a. Socket a -> IO Int
bindLocalEphemeralPort Socket a
sock = do
Socket a -> IP -> IO ()
forall a. Socket a -> IP -> IO ()
bind Socket a
sock (IP -> IO ()) -> IP -> IO ()
forall a b. (a -> b) -> a -> b
$ IP
"tcp://127.0.0.1:*"
IP
endpointString <- Socket a -> IO IP
forall a. Socket a -> IO IP
lastEndpoint Socket a
sock
case IP -> Maybe Int
parsePort IP
endpointString of
Maybe Int
Nothing ->
IP -> IO Int
forall a. IP -> IO a
forall (m :: * -> *) a. MonadFail m => IP -> m a
fail (IP -> IO Int) -> IP -> IO Int
forall a b. (a -> b) -> a -> b
$ IP
"internalError: IHaskell.IPython.ZeroMQ.bindLocalEphemeralPort encountered a port index that could not be interpreted as an int."
Just Int
endpointIndex ->
Int -> IO Int
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Int
endpointIndex
withEphemeralPorts :: ByteString
-> Bool
-> (ZeroMQEphemeralPorts
-> ZeroMQInterface
-> IO a)
-> IO a
withEphemeralPorts :: forall a.
ByteString
-> Bool
-> (ZeroMQEphemeralPorts -> ZeroMQInterface -> IO a)
-> IO a
withEphemeralPorts ByteString
key Bool
debug ZeroMQEphemeralPorts -> ZeroMQInterface -> IO a
callback = do
ZeroMQInterface
channels <- ByteString -> IO ZeroMQInterface
newZeroMQInterface ByteString
key
(Context -> IO a) -> IO a
forall a. (Context -> IO a) -> IO a
withContext ((Context -> IO a) -> IO a) -> (Context -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \Context
ctxt -> do
Context -> Rep -> (Socket Rep -> IO a) -> IO a
forall a b.
SocketType a =>
Context -> a -> (Socket a -> IO b) -> IO b
withSocket Context
ctxt Rep
Rep ((Socket Rep -> IO a) -> IO a) -> (Socket Rep -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \Socket Rep
heartbeatSocket -> do
Context -> Router -> (Socket Router -> IO a) -> IO a
forall a b.
SocketType a =>
Context -> a -> (Socket a -> IO b) -> IO b
withSocket Context
ctxt Router
Router ((Socket Router -> IO a) -> IO a)
-> (Socket Router -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \Socket Router
controlportSocket -> do
Context -> Router -> (Socket Router -> IO a) -> IO a
forall a b.
SocketType a =>
Context -> a -> (Socket a -> IO b) -> IO b
withSocket Context
ctxt Router
Router ((Socket Router -> IO a) -> IO a)
-> (Socket Router -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \Socket Router
shellportSocket -> do
Context -> Pub -> (Socket Pub -> IO a) -> IO a
forall a b.
SocketType a =>
Context -> a -> (Socket a -> IO b) -> IO b
withSocket Context
ctxt Pub
Pub ((Socket Pub -> IO a) -> IO a) -> (Socket Pub -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \Socket Pub
iopubSocket -> do
Int
hbPt <- Socket Rep -> IO Int
forall a. Socket a -> IO Int
bindLocalEphemeralPort Socket Rep
heartbeatSocket
Int
controlPt <- Socket Router -> IO Int
forall a. Socket a -> IO Int
bindLocalEphemeralPort Socket Router
controlportSocket
Int
shellPt <- Socket Router -> IO Int
forall a. Socket a -> IO Int
bindLocalEphemeralPort Socket Router
shellportSocket
Int
iopubPt <- Socket Pub -> IO Int
forall a. Socket a -> IO Int
bindLocalEphemeralPort Socket Pub
iopubSocket
let ports :: ZeroMQEphemeralPorts
ports = Int -> Int -> Int -> Int -> ByteString -> ZeroMQEphemeralPorts
ZeroMQEphemeralPorts Int
hbPt Int
controlPt Int
shellPt Int
iopubPt ByteString
key
ThreadId
_ <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ ZeroMQInterface -> Socket Rep -> IO ()
heartbeat ZeroMQInterface
channels Socket Rep
heartbeatSocket
ThreadId
_ <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Bool -> ZeroMQInterface -> Socket Router -> IO ()
control Bool
debug ZeroMQInterface
channels Socket Router
controlportSocket
ThreadId
_ <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Bool -> ZeroMQInterface -> Socket Router -> IO ()
shell Bool
debug ZeroMQInterface
channels Socket Router
shellportSocket
ThreadId
_ <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ Bool -> ZeroMQInterface -> Socket Pub -> IO ()
checkedIOpub Bool
debug ZeroMQInterface
channels Socket Pub
iopubSocket
ZeroMQEphemeralPorts -> ZeroMQInterface -> IO a
callback ZeroMQEphemeralPorts
ports ZeroMQInterface
channels
serveStdin :: Profile -> IO ZeroMQStdin
serveStdin :: Profile -> IO ZeroMQStdin
serveStdin Profile
profile = do
Chan Message
reqChannel <- IO (Chan Message)
forall a. IO (Chan a)
newChan
Chan Message
repChannel <- IO (Chan Message)
forall a. IO (Chan a)
newChan
ThreadId
_ <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ (Context -> IO ()) -> IO ()
forall a. (Context -> IO a) -> IO a
withContext ((Context -> IO ()) -> IO ()) -> (Context -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Context
ctxt ->
Context -> Router -> IP -> Int -> (Socket Router -> IO ()) -> IO ()
forall a b.
SocketType a =>
Context -> a -> IP -> Int -> (Socket a -> IO b) -> IO ()
serveSocket Context
ctxt Router
Router (Profile -> IP
ip Profile
profile) (Profile -> Int
stdinPort Profile
profile) ((Socket Router -> IO ()) -> IO ())
-> (Socket Router -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Socket Router
sock -> do
Chan Message -> IO Message
forall a. Chan a -> IO a
readChan Chan Message
reqChannel IO Message -> (Message -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Bool -> ByteString -> Socket Router -> Message -> IO ()
forall a.
Sender a =>
Bool -> ByteString -> Socket a -> Message -> IO ()
sendMessage Bool
False (Profile -> ByteString
signatureKey Profile
profile) Socket Router
sock
Bool -> Socket Router -> IO Message
forall a. Receiver a => Bool -> Socket a -> IO Message
receiveMessage Bool
False Socket Router
sock IO Message -> (Message -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Chan Message -> Message -> IO ()
forall a. Chan a -> a -> IO ()
writeChan Chan Message
repChannel
ZeroMQStdin -> IO ZeroMQStdin
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (ZeroMQStdin -> IO ZeroMQStdin) -> ZeroMQStdin -> IO ZeroMQStdin
forall a b. (a -> b) -> a -> b
$ Chan Message -> Chan Message -> ZeroMQStdin
StdinChannel Chan Message
reqChannel Chan Message
repChannel
serveSocket :: SocketType a => Context -> a -> IP -> Port -> (Socket a -> IO b) -> IO ()
serveSocket :: forall a b.
SocketType a =>
Context -> a -> IP -> Int -> (Socket a -> IO b) -> IO ()
serveSocket Context
ctxt a
socketType IP
ipAddress Int
port Socket a -> IO b
action = IO Any -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Any -> IO ()) -> IO Any -> IO ()
forall a b. (a -> b) -> a -> b
$
Context -> a -> (Socket a -> IO Any) -> IO Any
forall a b.
SocketType a =>
Context -> a -> (Socket a -> IO b) -> IO b
withSocket Context
ctxt a
socketType ((Socket a -> IO Any) -> IO Any) -> (Socket a -> IO Any) -> IO Any
forall a b. (a -> b) -> a -> b
$ \Socket a
sock -> do
Socket a -> IP -> IO ()
forall a. Socket a -> IP -> IO ()
bind Socket a
sock (IP -> IO ()) -> IP -> IO ()
forall a b. (a -> b) -> a -> b
$ IP
"tcp://" IP -> IP -> IP
forall a. [a] -> [a] -> [a]
++ IP
ipAddress IP -> IP -> IP
forall a. [a] -> [a] -> [a]
++ IP
":" IP -> IP -> IP
forall a. [a] -> [a] -> [a]
++ Int -> IP
forall a. Show a => a -> IP
show Int
port
IO b -> IO Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO b -> IO Any) -> IO b -> IO Any
forall a b. (a -> b) -> a -> b
$ Socket a -> IO b
action Socket a
sock
heartbeat :: ZeroMQInterface -> Socket Rep -> IO ()
heartbeat :: ZeroMQInterface -> Socket Rep -> IO ()
heartbeat ZeroMQInterface
_ Socket Rep
sock = do
ByteString
request <- Socket Rep -> IO ByteString
forall a. Receiver a => Socket a -> IO ByteString
receive Socket Rep
sock
Socket Rep -> [Flag] -> ByteString -> IO ()
forall a. Sender a => Socket a -> [Flag] -> ByteString -> IO ()
send Socket Rep
sock [] ByteString
request
shell :: Bool -> ZeroMQInterface -> Socket Router -> IO ()
shell :: Bool -> ZeroMQInterface -> Socket Router -> IO ()
shell Bool
debug ZeroMQInterface
channels Socket Router
sock = do
Message
message <- Bool -> Socket Router -> IO Message
forall a. Receiver a => Bool -> Socket a -> IO Message
receiveMessage Bool
debug Socket Router
sock
Chan (Chan Message, Message) -> (Chan Message, Message) -> IO ()
forall a. Chan a -> a -> IO ()
writeChan Chan (Chan Message, Message)
requestChannel (Chan Message
replyChannel, Message
message)
Chan Message -> IO Message
forall a. Chan a -> IO a
readChan Chan Message
replyChannel IO Message -> (Message -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Bool -> ByteString -> Socket Router -> Message -> IO ()
forall a.
Sender a =>
Bool -> ByteString -> Socket a -> Message -> IO ()
sendMessage Bool
debug (ZeroMQInterface -> ByteString
hmacKey ZeroMQInterface
channels) Socket Router
sock
where
requestChannel :: Chan (Chan Message, Message)
requestChannel = ZeroMQInterface -> Chan (Chan Message, Message)
shellRequestChannel ZeroMQInterface
channels
replyChannel :: Chan Message
replyChannel = ZeroMQInterface -> Chan Message
shellReplyChannel ZeroMQInterface
channels
control :: Bool -> ZeroMQInterface -> Socket Router -> IO ()
control :: Bool -> ZeroMQInterface -> Socket Router -> IO ()
control Bool
debug ZeroMQInterface
channels Socket Router
sock = do
Message
message <- Bool -> Socket Router -> IO Message
forall a. Receiver a => Bool -> Socket a -> IO Message
receiveMessage Bool
debug Socket Router
sock
Chan (Chan Message, Message) -> (Chan Message, Message) -> IO ()
forall a. Chan a -> a -> IO ()
writeChan Chan (Chan Message, Message)
requestChannel (Chan Message
replyChannel, Message
message)
Chan Message -> IO Message
forall a. Chan a -> IO a
readChan Chan Message
replyChannel IO Message -> (Message -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Bool -> ByteString -> Socket Router -> Message -> IO ()
forall a.
Sender a =>
Bool -> ByteString -> Socket a -> Message -> IO ()
sendMessage Bool
debug (ZeroMQInterface -> ByteString
hmacKey ZeroMQInterface
channels) Socket Router
sock
where
requestChannel :: Chan (Chan Message, Message)
requestChannel = ZeroMQInterface -> Chan (Chan Message, Message)
controlRequestChannel ZeroMQInterface
channels
replyChannel :: Chan Message
replyChannel = ZeroMQInterface -> Chan Message
controlReplyChannel ZeroMQInterface
channels
iopub :: Bool -> ZeroMQInterface -> Socket Pub -> IO ()
iopub :: Bool -> ZeroMQInterface -> Socket Pub -> IO ()
iopub Bool
debug ZeroMQInterface
channels Socket Pub
sock =
Chan Message -> IO Message
forall a. Chan a -> IO a
readChan (ZeroMQInterface -> Chan Message
iopubChannel ZeroMQInterface
channels) IO Message -> (Message -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Bool -> ByteString -> Socket Pub -> Message -> IO ()
forall a.
Sender a =>
Bool -> ByteString -> Socket a -> Message -> IO ()
sendMessage Bool
debug (ZeroMQInterface -> ByteString
hmacKey ZeroMQInterface
channels) Socket Pub
sock
trySendMessage :: Sender a => String -> Bool -> ByteString -> Socket a -> Message -> IO Bool
trySendMessage :: forall a.
Sender a =>
IP -> Bool -> ByteString -> Socket a -> Message -> IO Bool
trySendMessage IP
_ Bool
debug ByteString
hmackey Socket a
sock Message
msg = do
let zmqErrorHandler :: ZMQError -> IO Bool
zmqErrorHandler :: ZMQError -> IO Bool
zmqErrorHandler ZMQError
e
| ZMQError -> Int
errno ZMQError
e Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
38 = Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
| Bool
otherwise = ZMQError -> IO Bool
forall e a. Exception e => e -> IO a
throwIO ZMQError
e
(Bool -> ByteString -> Socket a -> Message -> IO ()
forall a.
Sender a =>
Bool -> ByteString -> Socket a -> Message -> IO ()
sendMessage Bool
debug ByteString
hmackey Socket a
sock Message
msg IO () -> IO Bool -> IO Bool
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True) IO Bool -> (ZMQError -> IO Bool) -> IO Bool
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` ZMQError -> IO Bool
zmqErrorHandler
checkedIOpub :: Bool -> ZeroMQInterface -> Socket Pub -> IO ()
checkedIOpub :: Bool -> ZeroMQInterface -> Socket Pub -> IO ()
checkedIOpub Bool
debug ZeroMQInterface
channels Socket Pub
sock = do
Message
msg <- Chan Message -> IO Message
forall a. Chan a -> IO a
readChan (ZeroMQInterface -> Chan Message
iopubChannel ZeroMQInterface
channels)
Bool
cont <- IP -> Bool -> ByteString -> Socket Pub -> Message -> IO Bool
forall a.
Sender a =>
IP -> Bool -> ByteString -> Socket a -> Message -> IO Bool
trySendMessage IP
"io" Bool
debug (ZeroMQInterface -> ByteString
hmacKey ZeroMQInterface
channels) Socket Pub
sock Message
msg
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
cont (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
Bool -> ZeroMQInterface -> Socket Pub -> IO ()
checkedIOpub Bool
debug ZeroMQInterface
channels Socket Pub
sock
receiveMessage :: Receiver a => Bool -> Socket a -> IO Message
receiveMessage :: forall a. Receiver a => Bool -> Socket a -> IO Message
receiveMessage Bool
debug Socket a
sock = do
[ByteString]
blobs <- Socket a -> IO [ByteString]
forall a. Receiver a => Socket a -> IO [ByteString]
receiveMulti Socket a
sock
ParsecT
[ByteString]
()
IO
([ByteString], ByteString, ByteString, ByteString, ByteString,
[ByteString])
-> ()
-> IP
-> [ByteString]
-> IO
(Either
ParseError
([ByteString], ByteString, ByteString, ByteString, ByteString,
[ByteString]))
forall s (m :: * -> *) t u a.
Stream s m t =>
ParsecT s u m a -> u -> IP -> s -> m (Either ParseError a)
runParserT ParsecT
[ByteString]
()
IO
([ByteString], ByteString, ByteString, ByteString, ByteString,
[ByteString])
forall {u}.
ParsecT
[ByteString]
u
IO
([ByteString], ByteString, ByteString, ByteString, ByteString,
[ByteString])
parseBlobs () IP
"" [ByteString]
blobs IO
(Either
ParseError
([ByteString], ByteString, ByteString, ByteString, ByteString,
[ByteString]))
-> (Either
ParseError
([ByteString], ByteString, ByteString, ByteString, ByteString,
[ByteString])
-> IO Message)
-> IO Message
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Either
ParseError
([ByteString], ByteString, ByteString, ByteString, ByteString,
[ByteString])
r -> case Either
ParseError
([ByteString], ByteString, ByteString, ByteString, ByteString,
[ByteString])
r of
Left ParseError
parseerr -> IP -> IO Message
forall a. IP -> IO a
forall (m :: * -> *) a. MonadFail m => IP -> m a
fail (IP -> IO Message) -> IP -> IO Message
forall a b. (a -> b) -> a -> b
$ IP
"Malformed Wire Protocol message: " IP -> IP -> IP
forall a. Semigroup a => a -> a -> a
<> ParseError -> IP
forall a. Show a => a -> IP
show ParseError
parseerr
Right ([ByteString]
idents, ByteString
headerData, ByteString
parentHeader, ByteString
metaData, ByteString
content, [ByteString]
buffers) -> do
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
debug (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
IP -> IO ()
putStr IP
"Header: "
ByteString -> IO ()
Char.putStrLn ByteString
headerData
IP -> IO ()
putStr IP
"Content: "
ByteString -> IO ()
Char.putStrLn ByteString
content
Message -> IO Message
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Message -> IO Message) -> Message -> IO Message
forall a b. (a -> b) -> a -> b
$ [ByteString]
-> ByteString
-> ByteString
-> ByteString
-> ByteString
-> [ByteString]
-> Message
parseMessage [ByteString]
idents ByteString
headerData ByteString
parentHeader ByteString
metaData ByteString
content [ByteString]
buffers
where
parseBlobs :: ParsecT
[ByteString]
u
IO
([ByteString], ByteString, ByteString, ByteString, ByteString,
[ByteString])
parseBlobs = do
[ByteString]
idents <- ParsecT [ByteString] u IO ByteString
-> ParsecT [ByteString] u IO ByteString
-> ParsecT [ByteString] u IO [ByteString]
forall s (m :: * -> *) t u a end.
Stream s m t =>
ParsecT s u m a -> ParsecT s u m end -> ParsecT s u m [a]
manyTill ParsecT [ByteString] u IO ByteString
forall s (m :: * -> *) t u.
(Stream s m t, Show t) =>
ParsecT s u m t
anyToken ((ByteString -> Bool) -> ParsecT [ByteString] u IO ByteString
forall {s} {m :: * -> *} {u}.
Stream s m ByteString =>
(ByteString -> Bool) -> ParsecT s u m ByteString
satisfy (ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
==ByteString
"<IDS|MSG>"))
ByteString
_ <- ParsecT [ByteString] u IO ByteString
forall s (m :: * -> *) t u.
(Stream s m t, Show t) =>
ParsecT s u m t
anyToken ParsecT [ByteString] u IO ByteString
-> ParsecT [ByteString] u IO ByteString
-> ParsecT [ByteString] u IO ByteString
forall s u (m :: * -> *) a.
ParsecT s u m a -> ParsecT s u m a -> ParsecT s u m a
<|> IP -> ParsecT [ByteString] u IO ByteString
forall a. IP -> ParsecT [ByteString] u IO a
forall (m :: * -> *) a. MonadFail m => IP -> m a
fail IP
"No signature"
ByteString
headerData <- ParsecT [ByteString] u IO ByteString
forall s (m :: * -> *) t u.
(Stream s m t, Show t) =>
ParsecT s u m t
anyToken ParsecT [ByteString] u IO ByteString
-> ParsecT [ByteString] u IO ByteString
-> ParsecT [ByteString] u IO ByteString
forall s u (m :: * -> *) a.
ParsecT s u m a -> ParsecT s u m a -> ParsecT s u m a
<|> IP -> ParsecT [ByteString] u IO ByteString
forall a. IP -> ParsecT [ByteString] u IO a
forall (m :: * -> *) a. MonadFail m => IP -> m a
fail IP
"No headerData"
ByteString
parentHeader <- ParsecT [ByteString] u IO ByteString
forall s (m :: * -> *) t u.
(Stream s m t, Show t) =>
ParsecT s u m t
anyToken ParsecT [ByteString] u IO ByteString
-> ParsecT [ByteString] u IO ByteString
-> ParsecT [ByteString] u IO ByteString
forall s u (m :: * -> *) a.
ParsecT s u m a -> ParsecT s u m a -> ParsecT s u m a
<|> IP -> ParsecT [ByteString] u IO ByteString
forall a. IP -> ParsecT [ByteString] u IO a
forall (m :: * -> *) a. MonadFail m => IP -> m a
fail IP
"No parentHeader"
ByteString
metaData <- ParsecT [ByteString] u IO ByteString
forall s (m :: * -> *) t u.
(Stream s m t, Show t) =>
ParsecT s u m t
anyToken ParsecT [ByteString] u IO ByteString
-> ParsecT [ByteString] u IO ByteString
-> ParsecT [ByteString] u IO ByteString
forall s u (m :: * -> *) a.
ParsecT s u m a -> ParsecT s u m a -> ParsecT s u m a
<|> IP -> ParsecT [ByteString] u IO ByteString
forall a. IP -> ParsecT [ByteString] u IO a
forall (m :: * -> *) a. MonadFail m => IP -> m a
fail IP
"No metaData"
ByteString
content <- ParsecT [ByteString] u IO ByteString
forall s (m :: * -> *) t u.
(Stream s m t, Show t) =>
ParsecT s u m t
anyToken ParsecT [ByteString] u IO ByteString
-> ParsecT [ByteString] u IO ByteString
-> ParsecT [ByteString] u IO ByteString
forall s u (m :: * -> *) a.
ParsecT s u m a -> ParsecT s u m a -> ParsecT s u m a
<|> IP -> ParsecT [ByteString] u IO ByteString
forall a. IP -> ParsecT [ByteString] u IO a
forall (m :: * -> *) a. MonadFail m => IP -> m a
fail IP
"No contents"
[ByteString]
buffers <- ParsecT [ByteString] u IO ByteString
-> ParsecT [ByteString] u IO ()
-> ParsecT [ByteString] u IO [ByteString]
forall s (m :: * -> *) t u a end.
Stream s m t =>
ParsecT s u m a -> ParsecT s u m end -> ParsecT s u m [a]
manyTill ParsecT [ByteString] u IO ByteString
forall s (m :: * -> *) t u.
(Stream s m t, Show t) =>
ParsecT s u m t
anyToken ParsecT [ByteString] u IO ()
forall s (m :: * -> *) t u.
(Stream s m t, Show t) =>
ParsecT s u m ()
eof
([ByteString], ByteString, ByteString, ByteString, ByteString,
[ByteString])
-> ParsecT
[ByteString]
u
IO
([ByteString], ByteString, ByteString, ByteString, ByteString,
[ByteString])
forall a. a -> ParsecT [ByteString] u IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([ByteString]
idents, ByteString
headerData, ByteString
parentHeader, ByteString
metaData, ByteString
content, [ByteString]
buffers)
satisfy :: (ByteString -> Bool) -> ParsecT s u m ByteString
satisfy ByteString -> Bool
f = (ByteString -> IP)
-> (SourcePos -> ByteString -> s -> SourcePos)
-> (ByteString -> Maybe ByteString)
-> ParsecT s u m ByteString
forall s (m :: * -> *) t a u.
Stream s m t =>
(t -> IP)
-> (SourcePos -> t -> s -> SourcePos)
-> (t -> Maybe a)
-> ParsecT s u m a
tokenPrim ByteString -> IP
Char.unpack (\SourcePos
pos ByteString
_ s
_ -> SourcePos -> Int -> SourcePos
incSourceColumn SourcePos
pos Int
1)
(\ByteString
c -> if ByteString -> Bool
f ByteString
c then ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ByteString
c else Maybe ByteString
forall a. Maybe a
Nothing)
sendMessage :: Sender a => Bool -> ByteString -> Socket a -> Message -> IO ()
sendMessage :: forall a.
Sender a =>
Bool -> ByteString -> Socket a -> Message -> IO ()
sendMessage Bool
_ ByteString
_ Socket a
_ Message
SendNothing = () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
sendMessage Bool
debug ByteString
hmackey Socket a
sock Message
msg = do
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
debug (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
IP -> IO ()
putStr IP
"Message: "
Message -> IO ()
forall a. Show a => a -> IO ()
print Message
msg
IP -> IO ()
putStr IP
"Sent: "
ByteString -> IO ()
forall a. Show a => a -> IO ()
print ByteString
content
(ByteString -> IO ()) -> [ByteString] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ ByteString -> IO ()
sendPiece [ByteString]
idents
ByteString -> IO ()
sendPiece ByteString
"<IDS|MSG>"
ByteString -> IO ()
sendPiece ByteString
signature
ByteString -> IO ()
sendPiece ByteString
headStr
ByteString -> IO ()
sendPiece ByteString
parentHeaderStr
ByteString -> IO ()
sendPiece ByteString
metadata
case MessageHeader -> [ByteString]
mhBuffers MessageHeader
hdr of
[] -> ByteString -> IO ()
sendLast ByteString
content
[ByteString]
_ -> ByteString -> IO ()
sendPiece ByteString
content
[ByteString] -> IO ()
sendBuffers ([ByteString] -> IO ()) -> [ByteString] -> IO ()
forall a b. (a -> b) -> a -> b
$ MessageHeader -> [ByteString]
mhBuffers MessageHeader
hdr
where
sendBuffers :: [ByteString] -> IO ()
sendBuffers [] = () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
sendBuffers [ByteString
b] = ByteString -> IO ()
sendLast ByteString
b
sendBuffers (ByteString
b:[ByteString]
bs) = ByteString -> IO ()
sendPiece ByteString
b IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> [ByteString] -> IO ()
sendBuffers [ByteString]
bs
sendPiece :: ByteString -> IO ()
sendPiece = Socket a -> [Flag] -> ByteString -> IO ()
forall a. Sender a => Socket a -> [Flag] -> ByteString -> IO ()
send Socket a
sock [Flag
SendMore]
sendLast :: ByteString -> IO ()
sendLast = Socket a -> [Flag] -> ByteString -> IO ()
forall a. Sender a => Socket a -> [Flag] -> ByteString -> IO ()
send Socket a
sock []
encodeStrict :: ToJSON a => a -> ByteString
encodeStrict :: forall a. ToJSON a => a -> ByteString
encodeStrict = LazyByteString -> ByteString
LBS.toStrict (LazyByteString -> ByteString)
-> (a -> LazyByteString) -> a -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> LazyByteString
forall a. ToJSON a => a -> LazyByteString
encode
signature :: ByteString
signature :: ByteString
signature = ByteString -> ByteString
hmac (ByteString -> ByteString) -> ByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ ByteString
headStr ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
parentHeaderStr ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
metadata ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
content
hmac :: ByteString -> ByteString
hmac :: ByteString -> ByteString
hmac = (Base -> Digest SHA256 -> ByteString
forall bin bout.
(ByteArrayAccess bin, ByteArray bout) =>
Base -> bin -> bout
Encoding.convertToBase Base
Encoding.Base16 :: Hash.Digest SHA256 -> ByteString)
(Digest SHA256 -> ByteString)
-> (ByteString -> Digest SHA256) -> ByteString -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. HMAC SHA256 -> Digest SHA256
forall a. HMAC a -> Digest a
HMAC.hmacGetDigest
(HMAC SHA256 -> Digest SHA256)
-> (ByteString -> HMAC SHA256) -> ByteString -> Digest SHA256
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString -> HMAC SHA256
forall key message a.
(ByteArrayAccess key, ByteArrayAccess message, HashAlgorithm a) =>
key -> message -> HMAC a
HMAC.hmac ByteString
hmackey
hdr :: MessageHeader
hdr = Message -> MessageHeader
header Message
msg
parentHeaderStr :: ByteString
parentHeaderStr = ByteString
-> (MessageHeader -> ByteString)
-> Maybe MessageHeader
-> ByteString
forall b a. b -> (a -> b) -> Maybe a -> b
maybe ByteString
"{}" MessageHeader -> ByteString
forall a. ToJSON a => a -> ByteString
encodeStrict (Maybe MessageHeader -> ByteString)
-> Maybe MessageHeader -> ByteString
forall a b. (a -> b) -> a -> b
$ MessageHeader -> Maybe MessageHeader
mhParentHeader MessageHeader
hdr
idents :: [ByteString]
idents = MessageHeader -> [ByteString]
mhIdentifiers MessageHeader
hdr
metadata :: ByteString
metadata = let Metadata Object
mdobject = MessageHeader -> Metadata
mhMetadata MessageHeader
hdr in Object -> ByteString
forall a. ToJSON a => a -> ByteString
encodeStrict Object
mdobject
content :: ByteString
content = Message -> ByteString
forall a. ToJSON a => a -> ByteString
encodeStrict Message
msg
headStr :: ByteString
headStr = MessageHeader -> ByteString
forall a. ToJSON a => a -> ByteString
encodeStrict MessageHeader
hdr