{-# LANGUAGE OverloadedStrings, DoAndIfThenElse, FlexibleContexts, CPP #-}

-- | Description : Low-level ZeroMQ communication wrapper.
--
-- The "ZeroMQ" module abstracts away the low-level 0MQ based interface with IPython, replacing it
-- instead with a Haskell Channel based interface. The `serveProfile` function takes a IPython
-- profile specification and returns the channel interface to use.
module IHaskell.IPython.ZeroMQ (
    ZeroMQInterface(..),
    ZeroMQStdin(..),
    serveProfile,
    serveStdin,
    ZeroMQEphemeralPorts,
    withEphemeralPorts,
    ) where

import           Control.Concurrent
import           Control.Exception
import           Control.Monad
import qualified Crypto.Hash as Hash
import           Crypto.Hash.Algorithms (SHA256)
import qualified Crypto.MAC.HMAC as HMAC
import           Data.Aeson
import qualified Data.ByteArray.Encoding as Encoding
import           Data.ByteString (ByteString)
import qualified Data.ByteString.Char8 as Char
import qualified Data.ByteString.Lazy as LBS
import           Data.Char
import           Data.Monoid ((<>))
import qualified Data.Text.Encoding as Text
import           System.ZMQ4 as ZMQ4
import           Text.Read (readMaybe)
import           Text.Parsec (runParserT, manyTill, anyToken, (<|>), eof, tokenPrim, incSourceColumn)

import           IHaskell.IPython.Message.Parser
import           IHaskell.IPython.Types

-- | The channel interface to the ZeroMQ sockets. All communication is done via Messages, which are
-- encoded and decoded into a lower level form before being transmitted to IPython. These channels
-- should functionally serve as high-level sockets which speak Messages instead of ByteStrings.
data ZeroMQInterface =
       Channels
         {
         -- | A channel populated with requests from the frontend.
         -- Shared between the shell and control channels.
         -- The channel content is a tuple (replyChannel, message):
         --   * If the message comes from the shell channel, 'replyChannel' is the shell reply channel.
         --   * If the message comes from the control channel, 'replyChannel' is the control reply channel.
         ZeroMQInterface -> Chan (Chan Message, Message)
shellRequestChannel :: Chan (Chan Message, Message)
         -- | Writing to this channel causes a reply to be sent to the frontend.
         , ZeroMQInterface -> Chan Message
shellReplyChannel :: Chan Message
         -- | This channel is a duplicate of the shell request channel, though using a different backend
         -- socket.
         , ZeroMQInterface -> Chan (Chan Message, Message)
controlRequestChannel :: Chan (Chan Message, Message)
         -- | This channel is a duplicate of the shell reply channel, though using a different backend
         -- socket.
         , ZeroMQInterface -> Chan Message
controlReplyChannel :: Chan Message
         -- | Writing to this channel sends an iopub message to the frontend.
         , ZeroMQInterface -> Chan Message
iopubChannel :: Chan Message
         -- | Key used to sign messages.
         , ZeroMQInterface -> ByteString
hmacKey :: ByteString
         }

data ZeroMQStdin =
       StdinChannel
         { ZeroMQStdin -> Chan Message
stdinRequestChannel :: Chan Message
         , ZeroMQStdin -> Chan Message
stdinReplyChannel :: Chan Message
         }

-- | Create new channels for a ZeroMQInterface
newZeroMQInterface :: ByteString -> IO ZeroMQInterface
newZeroMQInterface :: ByteString -> IO ZeroMQInterface
newZeroMQInterface ByteString
key = do
  Chan (Chan Message, Message)
shellReqChan <- IO (Chan (Chan Message, Message))
forall a. IO (Chan a)
newChan
  Chan Message
shellRepChan <- IO (Chan Message)
forall a. IO (Chan a)
newChan
  Chan (Chan Message, Message)
controlReqChan <- Chan (Chan Message, Message) -> IO (Chan (Chan Message, Message))
forall a. Chan a -> IO (Chan a)
dupChan Chan (Chan Message, Message)
shellReqChan
  Chan Message
controlRepChan <- IO (Chan Message)
forall a. IO (Chan a)
newChan
  Chan Message
iopubChan <- IO (Chan Message)
forall a. IO (Chan a)
newChan
  ZeroMQInterface -> IO ZeroMQInterface
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (ZeroMQInterface -> IO ZeroMQInterface)
-> ZeroMQInterface -> IO ZeroMQInterface
forall a b. (a -> b) -> a -> b
$! Channels
    { shellRequestChannel :: Chan (Chan Message, Message)
shellRequestChannel = Chan (Chan Message, Message)
shellReqChan
    , shellReplyChannel :: Chan Message
shellReplyChannel = Chan Message
shellRepChan
    , controlRequestChannel :: Chan (Chan Message, Message)
controlRequestChannel = Chan (Chan Message, Message)
controlReqChan
    , controlReplyChannel :: Chan Message
controlReplyChannel = Chan Message
controlRepChan
    , iopubChannel :: Chan Message
iopubChannel = Chan Message
iopubChan
    , hmacKey :: ByteString
hmacKey = ByteString
key
    }

-- | Start responding on all ZeroMQ channels used to communicate with IPython | via the provided
-- profile. Return a set of channels which can be used to | communicate with IPython in a more
-- structured manner.
serveProfile :: Profile            -- ^ The profile specifying which ports and transport mechanisms to use.
             -> Bool               -- ^ Print debug output
             -> IO ZeroMQInterface -- ^ The Message-channel based interface to the sockets.
serveProfile :: Profile -> Bool -> IO ZeroMQInterface
serveProfile Profile
profile Bool
debug = do
  ZeroMQInterface
channels <- ByteString -> IO ZeroMQInterface
newZeroMQInterface (Profile -> ByteString
signatureKey Profile
profile)

  -- Create the context in a separate thread that never finishes. If withContext or withSocket
  -- complete, the context or socket become invalid.
  ThreadId
_ <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ (Context -> IO ()) -> IO ()
forall a. (Context -> IO a) -> IO a
withContext ((Context -> IO ()) -> IO ()) -> (Context -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Context
ctxt -> do
    -- Serve on all sockets.
    ThreadId
_ <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ Context -> Rep -> IP -> Int -> (Socket Rep -> IO ()) -> IO ()
forall a b.
SocketType a =>
Context -> a -> IP -> Int -> (Socket a -> IO b) -> IO ()
serveSocket Context
ctxt Rep
Rep (Profile -> IP
ip Profile
profile) (Profile -> Int
hbPort Profile
profile) ((Socket Rep -> IO ()) -> IO ()) -> (Socket Rep -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ ZeroMQInterface -> Socket Rep -> IO ()
heartbeat ZeroMQInterface
channels
    ThreadId
_ <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ Context -> Router -> IP -> Int -> (Socket Router -> IO ()) -> IO ()
forall a b.
SocketType a =>
Context -> a -> IP -> Int -> (Socket a -> IO b) -> IO ()
serveSocket Context
ctxt Router
Router (Profile -> IP
ip Profile
profile) (Profile -> Int
controlPort Profile
profile) ((Socket Router -> IO ()) -> IO ())
-> (Socket Router -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ Bool -> ZeroMQInterface -> Socket Router -> IO ()
control Bool
debug ZeroMQInterface
channels
    ThreadId
_ <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ Context -> Router -> IP -> Int -> (Socket Router -> IO ()) -> IO ()
forall a b.
SocketType a =>
Context -> a -> IP -> Int -> (Socket a -> IO b) -> IO ()
serveSocket Context
ctxt Router
Router (Profile -> IP
ip Profile
profile) (Profile -> Int
shellPort Profile
profile) ((Socket Router -> IO ()) -> IO ())
-> (Socket Router -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ Bool -> ZeroMQInterface -> Socket Router -> IO ()
shell Bool
debug ZeroMQInterface
channels

    -- The ctxt is reference counted in this thread only. Thus, the last serveSocket cannot be
    -- asynchronous, because otherwise ctxt would be garbage collectable - since it would only be
    -- used in other threads. Thus, keep the last serveSocket in this thread.
    Context -> Pub -> IP -> Int -> (Socket Pub -> IO ()) -> IO ()
forall a b.
SocketType a =>
Context -> a -> IP -> Int -> (Socket a -> IO b) -> IO ()
serveSocket Context
ctxt Pub
Pub (Profile -> IP
ip Profile
profile) (Profile -> Int
iopubPort Profile
profile) ((Socket Pub -> IO ()) -> IO ()) -> (Socket Pub -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ Bool -> ZeroMQInterface -> Socket Pub -> IO ()
iopub Bool
debug ZeroMQInterface
channels

  ZeroMQInterface -> IO ZeroMQInterface
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ZeroMQInterface
channels

-- | Describes ports used when creating an ephemeral ZeroMQ session. Used to generate the ipython
-- JSON config file.
data ZeroMQEphemeralPorts =
       ZeroMQEphemeralPorts
         { ZeroMQEphemeralPorts -> Int
ephHbPort :: !Port
         , ZeroMQEphemeralPorts -> Int
ephControlPort :: !Port
         , ZeroMQEphemeralPorts -> Int
ephShellPort :: !Port
         , ZeroMQEphemeralPorts -> Int
ephIOPubPort :: !Port
         , ZeroMQEphemeralPorts -> ByteString
ephSignatureKey :: !ByteString
         }

instance ToJSON ZeroMQEphemeralPorts where
  toJSON :: ZeroMQEphemeralPorts -> Value
toJSON ZeroMQEphemeralPorts
ports =
    [Pair] -> Value
object
      [ Key
"ip" Key -> IP -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= (IP
"127.0.0.1" :: String)
      , Key
"transport" Key -> Transport -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= Transport
TCP
      , Key
"control_port" Key -> Int -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= ZeroMQEphemeralPorts -> Int
ephControlPort ZeroMQEphemeralPorts
ports
      , Key
"hb_port" Key -> Int -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= ZeroMQEphemeralPorts -> Int
ephHbPort ZeroMQEphemeralPorts
ports
      , Key
"shell_port" Key -> Int -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= ZeroMQEphemeralPorts -> Int
ephShellPort ZeroMQEphemeralPorts
ports
      , Key
"iopub_port" Key -> Int -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= ZeroMQEphemeralPorts -> Int
ephIOPubPort ZeroMQEphemeralPorts
ports
      , Key
"key" Key -> Text -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= ByteString -> Text
Text.decodeUtf8 (ZeroMQEphemeralPorts -> ByteString
ephSignatureKey ZeroMQEphemeralPorts
ports)
      ]

parsePort :: String -> Maybe Int
parsePort :: IP -> Maybe Int
parsePort IP
s = IP -> Maybe Int
forall a. Read a => IP -> Maybe a
readMaybe IP
num
  where
    num :: IP
num = IP -> IP
forall a. [a] -> [a]
reverse ((Char -> Bool) -> IP -> IP
forall a. (a -> Bool) -> [a] -> [a]
takeWhile Char -> Bool
isNumber (IP -> IP
forall a. [a] -> [a]
reverse IP
s))

bindLocalEphemeralPort :: Socket a -> IO Int
bindLocalEphemeralPort :: forall a. Socket a -> IO Int
bindLocalEphemeralPort Socket a
sock = do
  Socket a -> IP -> IO ()
forall a. Socket a -> IP -> IO ()
bind Socket a
sock (IP -> IO ()) -> IP -> IO ()
forall a b. (a -> b) -> a -> b
$ IP
"tcp://127.0.0.1:*"
  IP
endpointString <- Socket a -> IO IP
forall a. Socket a -> IO IP
lastEndpoint Socket a
sock
  case IP -> Maybe Int
parsePort IP
endpointString of
    Maybe Int
Nothing ->
      IP -> IO Int
forall a. IP -> IO a
forall (m :: * -> *) a. MonadFail m => IP -> m a
fail (IP -> IO Int) -> IP -> IO Int
forall a b. (a -> b) -> a -> b
$ IP
"internalError: IHaskell.IPython.ZeroMQ.bindLocalEphemeralPort encountered a port index that could not be interpreted as an int."
    Just Int
endpointIndex ->
      Int -> IO Int
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Int
endpointIndex

-- | Run session for communicating with an IPython instance on ephemerally allocated ZMQ4 sockets.
-- The sockets will be closed when the callback returns.
withEphemeralPorts :: ByteString -- ^ HMAC encryption key
                   -> Bool -- ^ Print debug output
                   -> (ZeroMQEphemeralPorts
                    -> ZeroMQInterface
                    -> IO a) -- ^ Callback that takes the interface to the sockets.
                   -> IO a
withEphemeralPorts :: forall a.
ByteString
-> Bool
-> (ZeroMQEphemeralPorts -> ZeroMQInterface -> IO a)
-> IO a
withEphemeralPorts ByteString
key Bool
debug ZeroMQEphemeralPorts -> ZeroMQInterface -> IO a
callback = do
  ZeroMQInterface
channels <- ByteString -> IO ZeroMQInterface
newZeroMQInterface ByteString
key
  -- Create the ZMQ4 context
  (Context -> IO a) -> IO a
forall a. (Context -> IO a) -> IO a
withContext ((Context -> IO a) -> IO a) -> (Context -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \Context
ctxt -> do
    -- Create the sockets to communicate with.
    Context -> Rep -> (Socket Rep -> IO a) -> IO a
forall a b.
SocketType a =>
Context -> a -> (Socket a -> IO b) -> IO b
withSocket Context
ctxt Rep
Rep ((Socket Rep -> IO a) -> IO a) -> (Socket Rep -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \Socket Rep
heartbeatSocket -> do
      Context -> Router -> (Socket Router -> IO a) -> IO a
forall a b.
SocketType a =>
Context -> a -> (Socket a -> IO b) -> IO b
withSocket Context
ctxt Router
Router ((Socket Router -> IO a) -> IO a)
-> (Socket Router -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \Socket Router
controlportSocket -> do
        Context -> Router -> (Socket Router -> IO a) -> IO a
forall a b.
SocketType a =>
Context -> a -> (Socket a -> IO b) -> IO b
withSocket Context
ctxt Router
Router ((Socket Router -> IO a) -> IO a)
-> (Socket Router -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \Socket Router
shellportSocket -> do
          Context -> Pub -> (Socket Pub -> IO a) -> IO a
forall a b.
SocketType a =>
Context -> a -> (Socket a -> IO b) -> IO b
withSocket Context
ctxt Pub
Pub ((Socket Pub -> IO a) -> IO a) -> (Socket Pub -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \Socket Pub
iopubSocket -> do
            -- Bind each socket to a local port, getting the port chosen.
            Int
hbPt <- Socket Rep -> IO Int
forall a. Socket a -> IO Int
bindLocalEphemeralPort Socket Rep
heartbeatSocket
            Int
controlPt <- Socket Router -> IO Int
forall a. Socket a -> IO Int
bindLocalEphemeralPort Socket Router
controlportSocket
            Int
shellPt <- Socket Router -> IO Int
forall a. Socket a -> IO Int
bindLocalEphemeralPort Socket Router
shellportSocket
            Int
iopubPt <- Socket Pub -> IO Int
forall a. Socket a -> IO Int
bindLocalEphemeralPort Socket Pub
iopubSocket
            -- Create object to store ephemeral ports
            let ports :: ZeroMQEphemeralPorts
ports = Int -> Int -> Int -> Int -> ByteString -> ZeroMQEphemeralPorts
ZeroMQEphemeralPorts Int
hbPt Int
controlPt Int
shellPt Int
iopubPt ByteString
key
            -- Launch actions to listen to communicate between channels and cockets.
            ThreadId
_ <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ ZeroMQInterface -> Socket Rep -> IO ()
heartbeat ZeroMQInterface
channels Socket Rep
heartbeatSocket
            ThreadId
_ <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Bool -> ZeroMQInterface -> Socket Router -> IO ()
control Bool
debug ZeroMQInterface
channels Socket Router
controlportSocket
            ThreadId
_ <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Bool -> ZeroMQInterface -> Socket Router -> IO ()
shell Bool
debug ZeroMQInterface
channels Socket Router
shellportSocket
            ThreadId
_ <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ Bool -> ZeroMQInterface -> Socket Pub -> IO ()
checkedIOpub Bool
debug ZeroMQInterface
channels Socket Pub
iopubSocket
            -- Run callback function; provide it with both ports and channels.
            ZeroMQEphemeralPorts -> ZeroMQInterface -> IO a
callback ZeroMQEphemeralPorts
ports ZeroMQInterface
channels

serveStdin :: Profile -> IO ZeroMQStdin
serveStdin :: Profile -> IO ZeroMQStdin
serveStdin Profile
profile = do
  Chan Message
reqChannel <- IO (Chan Message)
forall a. IO (Chan a)
newChan
  Chan Message
repChannel <- IO (Chan Message)
forall a. IO (Chan a)
newChan

  -- Create the context in a separate thread that never finishes. If withContext or withSocket
  -- complete, the context or socket become invalid.
  ThreadId
_ <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ (Context -> IO ()) -> IO ()
forall a. (Context -> IO a) -> IO a
withContext ((Context -> IO ()) -> IO ()) -> (Context -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Context
ctxt ->
    -- Serve on all sockets.
    Context -> Router -> IP -> Int -> (Socket Router -> IO ()) -> IO ()
forall a b.
SocketType a =>
Context -> a -> IP -> Int -> (Socket a -> IO b) -> IO ()
serveSocket Context
ctxt Router
Router (Profile -> IP
ip Profile
profile) (Profile -> Int
stdinPort Profile
profile) ((Socket Router -> IO ()) -> IO ())
-> (Socket Router -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Socket Router
sock -> do
      -- Read the request from the interface channel and send it.
      Chan Message -> IO Message
forall a. Chan a -> IO a
readChan Chan Message
reqChannel IO Message -> (Message -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Bool -> ByteString -> Socket Router -> Message -> IO ()
forall a.
Sender a =>
Bool -> ByteString -> Socket a -> Message -> IO ()
sendMessage Bool
False (Profile -> ByteString
signatureKey Profile
profile) Socket Router
sock

      -- Receive a response and write it to the interface channel.
      Bool -> Socket Router -> IO Message
forall a. Receiver a => Bool -> Socket a -> IO Message
receiveMessage Bool
False Socket Router
sock IO Message -> (Message -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Chan Message -> Message -> IO ()
forall a. Chan a -> a -> IO ()
writeChan Chan Message
repChannel

  ZeroMQStdin -> IO ZeroMQStdin
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (ZeroMQStdin -> IO ZeroMQStdin) -> ZeroMQStdin -> IO ZeroMQStdin
forall a b. (a -> b) -> a -> b
$ Chan Message -> Chan Message -> ZeroMQStdin
StdinChannel Chan Message
reqChannel Chan Message
repChannel

-- | Serve on a given sock in a separate thread. Bind the sock in the | given context and then
-- loop the provided action, which should listen | on the sock and respond to any events.
serveSocket :: SocketType a => Context -> a -> IP -> Port -> (Socket a -> IO b) -> IO ()
serveSocket :: forall a b.
SocketType a =>
Context -> a -> IP -> Int -> (Socket a -> IO b) -> IO ()
serveSocket Context
ctxt a
socketType IP
ipAddress Int
port Socket a -> IO b
action = IO Any -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Any -> IO ()) -> IO Any -> IO ()
forall a b. (a -> b) -> a -> b
$
  Context -> a -> (Socket a -> IO Any) -> IO Any
forall a b.
SocketType a =>
Context -> a -> (Socket a -> IO b) -> IO b
withSocket Context
ctxt a
socketType ((Socket a -> IO Any) -> IO Any) -> (Socket a -> IO Any) -> IO Any
forall a b. (a -> b) -> a -> b
$ \Socket a
sock -> do
    Socket a -> IP -> IO ()
forall a. Socket a -> IP -> IO ()
bind Socket a
sock (IP -> IO ()) -> IP -> IO ()
forall a b. (a -> b) -> a -> b
$ IP
"tcp://" IP -> IP -> IP
forall a. [a] -> [a] -> [a]
++ IP
ipAddress IP -> IP -> IP
forall a. [a] -> [a] -> [a]
++ IP
":" IP -> IP -> IP
forall a. [a] -> [a] -> [a]
++ Int -> IP
forall a. Show a => a -> IP
show Int
port
    IO b -> IO Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO b -> IO Any) -> IO b -> IO Any
forall a b. (a -> b) -> a -> b
$ Socket a -> IO b
action Socket a
sock

-- | Listener on the heartbeat port. Echoes back any data it was sent.
heartbeat :: ZeroMQInterface -> Socket Rep -> IO ()
heartbeat :: ZeroMQInterface -> Socket Rep -> IO ()
heartbeat ZeroMQInterface
_ Socket Rep
sock = do
  -- Read some data.
  ByteString
request <- Socket Rep -> IO ByteString
forall a. Receiver a => Socket a -> IO ByteString
receive Socket Rep
sock

  -- Send it back.
  Socket Rep -> [Flag] -> ByteString -> IO ()
forall a. Sender a => Socket a -> [Flag] -> ByteString -> IO ()
send Socket Rep
sock [] ByteString
request

-- | Listener on the shell port. Reads messages and writes them to | the shell request channel. For
-- each message, reads a response from the | shell reply channel of the interface and sends it back
-- to the frontend.
shell :: Bool -> ZeroMQInterface -> Socket Router -> IO ()
shell :: Bool -> ZeroMQInterface -> Socket Router -> IO ()
shell Bool
debug ZeroMQInterface
channels Socket Router
sock = do
  -- Receive a message and write it to the interface channel.
  Message
message <- Bool -> Socket Router -> IO Message
forall a. Receiver a => Bool -> Socket a -> IO Message
receiveMessage Bool
debug Socket Router
sock
  Chan (Chan Message, Message) -> (Chan Message, Message) -> IO ()
forall a. Chan a -> a -> IO ()
writeChan Chan (Chan Message, Message)
requestChannel (Chan Message
replyChannel, Message
message)

  -- Read the reply from the interface channel and send it.
  Chan Message -> IO Message
forall a. Chan a -> IO a
readChan Chan Message
replyChannel IO Message -> (Message -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Bool -> ByteString -> Socket Router -> Message -> IO ()
forall a.
Sender a =>
Bool -> ByteString -> Socket a -> Message -> IO ()
sendMessage Bool
debug (ZeroMQInterface -> ByteString
hmacKey ZeroMQInterface
channels) Socket Router
sock

  where
    requestChannel :: Chan (Chan Message, Message)
requestChannel = ZeroMQInterface -> Chan (Chan Message, Message)
shellRequestChannel ZeroMQInterface
channels
    replyChannel :: Chan Message
replyChannel = ZeroMQInterface -> Chan Message
shellReplyChannel ZeroMQInterface
channels

-- | Listener on the shell port. Reads messages and writes them to | the shell request channel. For
-- each message, reads a response from the | shell reply channel of the interface and sends it back
-- to the frontend.
control :: Bool -> ZeroMQInterface -> Socket Router -> IO ()
control :: Bool -> ZeroMQInterface -> Socket Router -> IO ()
control Bool
debug ZeroMQInterface
channels Socket Router
sock = do
  -- Receive a message and write it to the interface channel.
  Message
message <- Bool -> Socket Router -> IO Message
forall a. Receiver a => Bool -> Socket a -> IO Message
receiveMessage Bool
debug Socket Router
sock
  Chan (Chan Message, Message) -> (Chan Message, Message) -> IO ()
forall a. Chan a -> a -> IO ()
writeChan Chan (Chan Message, Message)
requestChannel (Chan Message
replyChannel, Message
message)

  -- Read the reply from the interface channel and send it.
  Chan Message -> IO Message
forall a. Chan a -> IO a
readChan Chan Message
replyChannel IO Message -> (Message -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Bool -> ByteString -> Socket Router -> Message -> IO ()
forall a.
Sender a =>
Bool -> ByteString -> Socket a -> Message -> IO ()
sendMessage Bool
debug (ZeroMQInterface -> ByteString
hmacKey ZeroMQInterface
channels) Socket Router
sock

  where
    requestChannel :: Chan (Chan Message, Message)
requestChannel = ZeroMQInterface -> Chan (Chan Message, Message)
controlRequestChannel ZeroMQInterface
channels
    replyChannel :: Chan Message
replyChannel = ZeroMQInterface -> Chan Message
controlReplyChannel ZeroMQInterface
channels

-- | Send messages via the iopub channel. | This reads messages from the ZeroMQ iopub interface
-- channel | and then writes the messages to the socket.
iopub :: Bool -> ZeroMQInterface -> Socket Pub -> IO ()
iopub :: Bool -> ZeroMQInterface -> Socket Pub -> IO ()
iopub Bool
debug ZeroMQInterface
channels Socket Pub
sock =
  Chan Message -> IO Message
forall a. Chan a -> IO a
readChan (ZeroMQInterface -> Chan Message
iopubChannel ZeroMQInterface
channels) IO Message -> (Message -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Bool -> ByteString -> Socket Pub -> Message -> IO ()
forall a.
Sender a =>
Bool -> ByteString -> Socket a -> Message -> IO ()
sendMessage Bool
debug (ZeroMQInterface -> ByteString
hmacKey ZeroMQInterface
channels) Socket Pub
sock

-- | Attempt to send a message along the socket, returning true if successful.
trySendMessage :: Sender a => String -> Bool -> ByteString -> Socket a -> Message -> IO Bool
trySendMessage :: forall a.
Sender a =>
IP -> Bool -> ByteString -> Socket a -> Message -> IO Bool
trySendMessage IP
_ Bool
debug ByteString
hmackey Socket a
sock Message
msg = do
  let zmqErrorHandler :: ZMQError -> IO Bool
      zmqErrorHandler :: ZMQError -> IO Bool
zmqErrorHandler ZMQError
e
        -- Ignore errors if we cannot send. We may want to forward this to the thread that tried put the
        -- message in the Chan initially.
        | ZMQError -> Int
errno ZMQError
e Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
38 = Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
        | Bool
otherwise = ZMQError -> IO Bool
forall e a. Exception e => e -> IO a
throwIO ZMQError
e
  (Bool -> ByteString -> Socket a -> Message -> IO ()
forall a.
Sender a =>
Bool -> ByteString -> Socket a -> Message -> IO ()
sendMessage Bool
debug ByteString
hmackey Socket a
sock Message
msg IO () -> IO Bool -> IO Bool
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True) IO Bool -> (ZMQError -> IO Bool) -> IO Bool
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` ZMQError -> IO Bool
zmqErrorHandler

-- | Send messages via the iopub channel. This reads messages from the ZeroMQ iopub interface
-- channel and then writes the messages to the socket. This is a checked implementation which will
-- stop if the socket is closed.
checkedIOpub :: Bool -> ZeroMQInterface -> Socket Pub -> IO ()
checkedIOpub :: Bool -> ZeroMQInterface -> Socket Pub -> IO ()
checkedIOpub Bool
debug ZeroMQInterface
channels Socket Pub
sock = do
  Message
msg <- Chan Message -> IO Message
forall a. Chan a -> IO a
readChan (ZeroMQInterface -> Chan Message
iopubChannel ZeroMQInterface
channels)
  Bool
cont <- IP -> Bool -> ByteString -> Socket Pub -> Message -> IO Bool
forall a.
Sender a =>
IP -> Bool -> ByteString -> Socket a -> Message -> IO Bool
trySendMessage IP
"io" Bool
debug (ZeroMQInterface -> ByteString
hmacKey ZeroMQInterface
channels) Socket Pub
sock Message
msg
  Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
cont (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
    Bool -> ZeroMQInterface -> Socket Pub -> IO ()
checkedIOpub Bool
debug ZeroMQInterface
channels Socket Pub
sock

-- | Receive and parse a message from a socket.
receiveMessage :: Receiver a => Bool -> Socket a -> IO Message
receiveMessage :: forall a. Receiver a => Bool -> Socket a -> IO Message
receiveMessage Bool
debug Socket a
sock = do
  [ByteString]
blobs <- Socket a -> IO [ByteString]
forall a. Receiver a => Socket a -> IO [ByteString]
receiveMulti Socket a
sock
  ParsecT
  [ByteString]
  ()
  IO
  ([ByteString], ByteString, ByteString, ByteString, ByteString,
   [ByteString])
-> ()
-> IP
-> [ByteString]
-> IO
     (Either
        ParseError
        ([ByteString], ByteString, ByteString, ByteString, ByteString,
         [ByteString]))
forall s (m :: * -> *) t u a.
Stream s m t =>
ParsecT s u m a -> u -> IP -> s -> m (Either ParseError a)
runParserT ParsecT
  [ByteString]
  ()
  IO
  ([ByteString], ByteString, ByteString, ByteString, ByteString,
   [ByteString])
forall {u}.
ParsecT
  [ByteString]
  u
  IO
  ([ByteString], ByteString, ByteString, ByteString, ByteString,
   [ByteString])
parseBlobs () IP
"" [ByteString]
blobs IO
  (Either
     ParseError
     ([ByteString], ByteString, ByteString, ByteString, ByteString,
      [ByteString]))
-> (Either
      ParseError
      ([ByteString], ByteString, ByteString, ByteString, ByteString,
       [ByteString])
    -> IO Message)
-> IO Message
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Either
  ParseError
  ([ByteString], ByteString, ByteString, ByteString, ByteString,
   [ByteString])
r -> case Either
  ParseError
  ([ByteString], ByteString, ByteString, ByteString, ByteString,
   [ByteString])
r of
    Left ParseError
parseerr -> IP -> IO Message
forall a. IP -> IO a
forall (m :: * -> *) a. MonadFail m => IP -> m a
fail (IP -> IO Message) -> IP -> IO Message
forall a b. (a -> b) -> a -> b
$ IP
"Malformed Wire Protocol message: " IP -> IP -> IP
forall a. Semigroup a => a -> a -> a
<> ParseError -> IP
forall a. Show a => a -> IP
show ParseError
parseerr
    Right ([ByteString]
idents, ByteString
headerData, ByteString
parentHeader, ByteString
metaData, ByteString
content, [ByteString]
buffers) -> do
      Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
debug (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        IP -> IO ()
putStr IP
"Header: "
        ByteString -> IO ()
Char.putStrLn ByteString
headerData
        IP -> IO ()
putStr IP
"Content: "
        ByteString -> IO ()
Char.putStrLn ByteString
content
      Message -> IO Message
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Message -> IO Message) -> Message -> IO Message
forall a b. (a -> b) -> a -> b
$ [ByteString]
-> ByteString
-> ByteString
-> ByteString
-> ByteString
-> [ByteString]
-> Message
parseMessage [ByteString]
idents ByteString
headerData ByteString
parentHeader ByteString
metaData ByteString
content [ByteString]
buffers
  where
    parseBlobs :: ParsecT
  [ByteString]
  u
  IO
  ([ByteString], ByteString, ByteString, ByteString, ByteString,
   [ByteString])
parseBlobs = do
      [ByteString]
idents       <- ParsecT [ByteString] u IO ByteString
-> ParsecT [ByteString] u IO ByteString
-> ParsecT [ByteString] u IO [ByteString]
forall s (m :: * -> *) t u a end.
Stream s m t =>
ParsecT s u m a -> ParsecT s u m end -> ParsecT s u m [a]
manyTill ParsecT [ByteString] u IO ByteString
forall s (m :: * -> *) t u.
(Stream s m t, Show t) =>
ParsecT s u m t
anyToken ((ByteString -> Bool) -> ParsecT [ByteString] u IO ByteString
forall {s} {m :: * -> *} {u}.
Stream s m ByteString =>
(ByteString -> Bool) -> ParsecT s u m ByteString
satisfy (ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
==ByteString
"<IDS|MSG>"))
      ByteString
_            <- ParsecT [ByteString] u IO ByteString
forall s (m :: * -> *) t u.
(Stream s m t, Show t) =>
ParsecT s u m t
anyToken ParsecT [ByteString] u IO ByteString
-> ParsecT [ByteString] u IO ByteString
-> ParsecT [ByteString] u IO ByteString
forall s u (m :: * -> *) a.
ParsecT s u m a -> ParsecT s u m a -> ParsecT s u m a
<|> IP -> ParsecT [ByteString] u IO ByteString
forall a. IP -> ParsecT [ByteString] u IO a
forall (m :: * -> *) a. MonadFail m => IP -> m a
fail IP
"No signature"
      ByteString
headerData   <- ParsecT [ByteString] u IO ByteString
forall s (m :: * -> *) t u.
(Stream s m t, Show t) =>
ParsecT s u m t
anyToken ParsecT [ByteString] u IO ByteString
-> ParsecT [ByteString] u IO ByteString
-> ParsecT [ByteString] u IO ByteString
forall s u (m :: * -> *) a.
ParsecT s u m a -> ParsecT s u m a -> ParsecT s u m a
<|> IP -> ParsecT [ByteString] u IO ByteString
forall a. IP -> ParsecT [ByteString] u IO a
forall (m :: * -> *) a. MonadFail m => IP -> m a
fail IP
"No headerData"
      ByteString
parentHeader <- ParsecT [ByteString] u IO ByteString
forall s (m :: * -> *) t u.
(Stream s m t, Show t) =>
ParsecT s u m t
anyToken ParsecT [ByteString] u IO ByteString
-> ParsecT [ByteString] u IO ByteString
-> ParsecT [ByteString] u IO ByteString
forall s u (m :: * -> *) a.
ParsecT s u m a -> ParsecT s u m a -> ParsecT s u m a
<|> IP -> ParsecT [ByteString] u IO ByteString
forall a. IP -> ParsecT [ByteString] u IO a
forall (m :: * -> *) a. MonadFail m => IP -> m a
fail IP
"No parentHeader"
      ByteString
metaData     <- ParsecT [ByteString] u IO ByteString
forall s (m :: * -> *) t u.
(Stream s m t, Show t) =>
ParsecT s u m t
anyToken ParsecT [ByteString] u IO ByteString
-> ParsecT [ByteString] u IO ByteString
-> ParsecT [ByteString] u IO ByteString
forall s u (m :: * -> *) a.
ParsecT s u m a -> ParsecT s u m a -> ParsecT s u m a
<|> IP -> ParsecT [ByteString] u IO ByteString
forall a. IP -> ParsecT [ByteString] u IO a
forall (m :: * -> *) a. MonadFail m => IP -> m a
fail IP
"No metaData"
      ByteString
content      <- ParsecT [ByteString] u IO ByteString
forall s (m :: * -> *) t u.
(Stream s m t, Show t) =>
ParsecT s u m t
anyToken ParsecT [ByteString] u IO ByteString
-> ParsecT [ByteString] u IO ByteString
-> ParsecT [ByteString] u IO ByteString
forall s u (m :: * -> *) a.
ParsecT s u m a -> ParsecT s u m a -> ParsecT s u m a
<|> IP -> ParsecT [ByteString] u IO ByteString
forall a. IP -> ParsecT [ByteString] u IO a
forall (m :: * -> *) a. MonadFail m => IP -> m a
fail IP
"No contents"
      [ByteString]
buffers      <- ParsecT [ByteString] u IO ByteString
-> ParsecT [ByteString] u IO ()
-> ParsecT [ByteString] u IO [ByteString]
forall s (m :: * -> *) t u a end.
Stream s m t =>
ParsecT s u m a -> ParsecT s u m end -> ParsecT s u m [a]
manyTill ParsecT [ByteString] u IO ByteString
forall s (m :: * -> *) t u.
(Stream s m t, Show t) =>
ParsecT s u m t
anyToken ParsecT [ByteString] u IO ()
forall s (m :: * -> *) t u.
(Stream s m t, Show t) =>
ParsecT s u m ()
eof
      ([ByteString], ByteString, ByteString, ByteString, ByteString,
 [ByteString])
-> ParsecT
     [ByteString]
     u
     IO
     ([ByteString], ByteString, ByteString, ByteString, ByteString,
      [ByteString])
forall a. a -> ParsecT [ByteString] u IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([ByteString]
idents, ByteString
headerData, ByteString
parentHeader, ByteString
metaData, ByteString
content, [ByteString]
buffers)
    satisfy :: (ByteString -> Bool) -> ParsecT s u m ByteString
satisfy ByteString -> Bool
f = (ByteString -> IP)
-> (SourcePos -> ByteString -> s -> SourcePos)
-> (ByteString -> Maybe ByteString)
-> ParsecT s u m ByteString
forall s (m :: * -> *) t a u.
Stream s m t =>
(t -> IP)
-> (SourcePos -> t -> s -> SourcePos)
-> (t -> Maybe a)
-> ParsecT s u m a
tokenPrim ByteString -> IP
Char.unpack (\SourcePos
pos ByteString
_ s
_ -> SourcePos -> Int -> SourcePos
incSourceColumn SourcePos
pos Int
1)
                                      (\ByteString
c -> if ByteString -> Bool
f ByteString
c then ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ByteString
c else Maybe ByteString
forall a. Maybe a
Nothing)

-- | Encode a message in the IPython ZeroMQ communication protocol and send it through the provided
-- socket. Sign it using HMAC with SHA-256 using the provided key.
sendMessage :: Sender a => Bool -> ByteString -> Socket a -> Message -> IO ()
sendMessage :: forall a.
Sender a =>
Bool -> ByteString -> Socket a -> Message -> IO ()
sendMessage Bool
_ ByteString
_ Socket a
_ Message
SendNothing = () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
sendMessage Bool
debug ByteString
hmackey Socket a
sock Message
msg = do
  Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
debug (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    IP -> IO ()
putStr IP
"Message: "
    Message -> IO ()
forall a. Show a => a -> IO ()
print Message
msg
    IP -> IO ()
putStr IP
"Sent: "
    ByteString -> IO ()
forall a. Show a => a -> IO ()
print ByteString
content

  -- Send all pieces of the message.
  (ByteString -> IO ()) -> [ByteString] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ ByteString -> IO ()
sendPiece [ByteString]
idents
  ByteString -> IO ()
sendPiece ByteString
"<IDS|MSG>"
  ByteString -> IO ()
sendPiece ByteString
signature
  ByteString -> IO ()
sendPiece ByteString
headStr
  ByteString -> IO ()
sendPiece ByteString
parentHeaderStr
  ByteString -> IO ()
sendPiece ByteString
metadata

  -- If there are no mhBuffers, then conclude transmission with content.
  case MessageHeader -> [ByteString]
mhBuffers MessageHeader
hdr of
    [] -> ByteString -> IO ()
sendLast ByteString
content
    [ByteString]
_  -> ByteString -> IO ()
sendPiece ByteString
content

  [ByteString] -> IO ()
sendBuffers ([ByteString] -> IO ()) -> [ByteString] -> IO ()
forall a b. (a -> b) -> a -> b
$ MessageHeader -> [ByteString]
mhBuffers MessageHeader
hdr

  where
    sendBuffers :: [ByteString] -> IO ()
sendBuffers [] = () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    sendBuffers [ByteString
b] = ByteString -> IO ()
sendLast ByteString
b
    sendBuffers (ByteString
b:[ByteString]
bs) = ByteString -> IO ()
sendPiece ByteString
b IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> [ByteString] -> IO ()
sendBuffers [ByteString]
bs

    sendPiece :: ByteString -> IO ()
sendPiece = Socket a -> [Flag] -> ByteString -> IO ()
forall a. Sender a => Socket a -> [Flag] -> ByteString -> IO ()
send Socket a
sock [Flag
SendMore]
    sendLast :: ByteString -> IO ()
sendLast = Socket a -> [Flag] -> ByteString -> IO ()
forall a. Sender a => Socket a -> [Flag] -> ByteString -> IO ()
send Socket a
sock []

    -- Encode to a strict bytestring.
    encodeStrict :: ToJSON a => a -> ByteString
    encodeStrict :: forall a. ToJSON a => a -> ByteString
encodeStrict = LazyByteString -> ByteString
LBS.toStrict (LazyByteString -> ByteString)
-> (a -> LazyByteString) -> a -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> LazyByteString
forall a. ToJSON a => a -> LazyByteString
encode

    -- Signature for the message using HMAC SHA-256.
    signature :: ByteString
    signature :: ByteString
signature = ByteString -> ByteString
hmac (ByteString -> ByteString) -> ByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ ByteString
headStr ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
parentHeaderStr ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
metadata ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
content

    -- Compute the HMAC SHA-256 signature of a bytestring message.
    hmac :: ByteString -> ByteString
    hmac :: ByteString -> ByteString
hmac = (Base -> Digest SHA256 -> ByteString
forall bin bout.
(ByteArrayAccess bin, ByteArray bout) =>
Base -> bin -> bout
Encoding.convertToBase Base
Encoding.Base16 :: Hash.Digest SHA256 -> ByteString)
      (Digest SHA256 -> ByteString)
-> (ByteString -> Digest SHA256) -> ByteString -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. HMAC SHA256 -> Digest SHA256
forall a. HMAC a -> Digest a
HMAC.hmacGetDigest
      (HMAC SHA256 -> Digest SHA256)
-> (ByteString -> HMAC SHA256) -> ByteString -> Digest SHA256
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString -> HMAC SHA256
forall key message a.
(ByteArrayAccess key, ByteArrayAccess message, HashAlgorithm a) =>
key -> message -> HMAC a
HMAC.hmac ByteString
hmackey

    -- Pieces of the message.
    hdr :: MessageHeader
hdr = Message -> MessageHeader
header Message
msg
    parentHeaderStr :: ByteString
parentHeaderStr = ByteString
-> (MessageHeader -> ByteString)
-> Maybe MessageHeader
-> ByteString
forall b a. b -> (a -> b) -> Maybe a -> b
maybe ByteString
"{}" MessageHeader -> ByteString
forall a. ToJSON a => a -> ByteString
encodeStrict (Maybe MessageHeader -> ByteString)
-> Maybe MessageHeader -> ByteString
forall a b. (a -> b) -> a -> b
$ MessageHeader -> Maybe MessageHeader
mhParentHeader MessageHeader
hdr
    idents :: [ByteString]
idents = MessageHeader -> [ByteString]
mhIdentifiers MessageHeader
hdr
    metadata :: ByteString
metadata = let Metadata Object
mdobject = MessageHeader -> Metadata
mhMetadata MessageHeader
hdr in Object -> ByteString
forall a. ToJSON a => a -> ByteString
encodeStrict Object
mdobject
    content :: ByteString
content = Message -> ByteString
forall a. ToJSON a => a -> ByteString
encodeStrict Message
msg
    headStr :: ByteString
headStr = MessageHeader -> ByteString
forall a. ToJSON a => a -> ByteString
encodeStrict MessageHeader
hdr