{-# LANGUAGE DerivingVia, DeriveGeneric, RankNTypes, ScopedTypeVariables, MultiParamTypeClasses, OverloadedStrings, GeneralizedNewtypeDeriving, CPP, ExistentialQuantification, StandaloneDeriving, GADTs, UnboxedTuples, BangPatterns #-}
{-# OPTIONS_GHC -fno-warn-orphans #-}
module Network.RPC.Curryer.Server where
import qualified Streamly.Data.Stream.Prelude as SP
import Streamly.Data.Stream as Stream hiding (foldr)
import Streamly.Internal.Data.Array (pinnedCreateOf)
import Streamly.Network.Socket as SSock
import Network.Socket as Socket
import Network.Socket.ByteString as Socket
import Streamly.Data.Parser as P
import Codec.Winery
import Codec.Winery.Internal (varInt, decodeVarInt, getBytes)
import GHC.Generics
import GHC.Fingerprint
import Data.Typeable
import Control.Concurrent.MVar (MVar, newMVar, withMVar)
import Control.Exception
import Data.Function ((&))
import Data.Word
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as BSL
import qualified Data.ByteString.FastBuilder as BB
import Streamly.Data.Fold as FL hiding (foldr)
import qualified Streamly.Data.Stream.Prelude as P
import qualified Streamly.External.ByteString as StreamlyBS
import Streamly.Data.Stream.Prelude as Stream hiding (foldr)
import Streamly.Internal.Data.Binary.Parser (word32be)
import qualified Data.Binary as B
import qualified Data.UUID as UUIDBase
import qualified Data.UUID.V4 as UUIDBase
import Control.Monad
import Data.Functor
import Control.Applicative
import qualified Network.RPC.Curryer.StreamlyAdditions as SA
import Data.Hashable
import System.Timeout
import qualified Network.ByteOrder as BO
#define CURRYER_SHOW_BYTES 0
#define CURRYER_PASS_SCHEMA 0
#if CURRYER_SHOW_BYTES == 1
import Debug.Trace
#endif
traceBytes :: Applicative f => String -> BS.ByteString -> f ()
#if CURRYER_SHOW_BYTES == 1
traceBytes msg bs = traceShowM (msg, BS.length bs, bs)
#else
traceBytes :: forall (f :: * -> *). Applicative f => String -> ByteString -> f ()
traceBytes String
_ ByteString
_ = forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
#endif
msgSerialise :: Serialise a => a -> BS.ByteString
#if CURRYER_PASS_SCHEMA == 1
msgSerialise = serialise
#else
msgSerialise :: forall a. Serialise a => a -> ByteString
msgSerialise = forall a. Serialise a => a -> ByteString
serialiseOnly
#endif
msgDeserialise :: forall s. Serialise s => BS.ByteString -> Either WineryException s
#if CURRYER_PASS_SCHEMA == 1
msgDeserialise = deserialise
#else
msgDeserialise :: forall s. Serialise s => ByteString -> Either WineryException s
msgDeserialise = forall s. Serialise s => ByteString -> Either WineryException s
deserialiseOnly'
#endif
data Locking a = Locking (MVar ()) a
newLock :: a -> IO (Locking a)
newLock :: forall a. a -> IO (Locking a)
newLock a
x = do
MVar ()
lock <- forall a. a -> IO (MVar a)
newMVar ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a. MVar () -> a -> Locking a
Locking MVar ()
lock a
x)
withLock :: Locking a -> (a -> IO b) -> IO b
withLock :: forall a b. Locking a -> (a -> IO b) -> IO b
withLock (Locking MVar ()
mvar a
v) a -> IO b
m =
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar MVar ()
mvar forall a b. (a -> b) -> a -> b
$ \()
_ -> a -> IO b
m a
v
lockless :: Locking a -> a
lockless :: forall a. Locking a -> a
lockless (Locking MVar ()
_ a
a) = a
a
type Timeout = Word32
type BinaryMessage = BS.ByteString
data Envelope = Envelope {
Envelope -> Fingerprint
envFingerprint :: !Fingerprint,
Envelope -> MessageType
envMessageType :: !MessageType,
Envelope -> UUID
envMsgId :: !UUID,
Envelope -> ByteString
envPayload :: !BinaryMessage
}
deriving (forall x. Rep Envelope x -> Envelope
forall x. Envelope -> Rep Envelope x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep Envelope x -> Envelope
$cfrom :: forall x. Envelope -> Rep Envelope x
Generic, Int -> Envelope -> ShowS
[Envelope] -> ShowS
Envelope -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Envelope] -> ShowS
$cshowList :: [Envelope] -> ShowS
show :: Envelope -> String
$cshow :: Envelope -> String
showsPrec :: Int -> Envelope -> ShowS
$cshowsPrec :: Int -> Envelope -> ShowS
Show)
type TimeoutMicroseconds = Int
#if MIN_VERSION_base(4,15,0)
#else
deriving instance Generic Fingerprint
#endif
deriving via WineryVariant Fingerprint instance Serialise Fingerprint
data MessageType = RequestMessage TimeoutMicroseconds
| ResponseMessage
| TimeoutResponseMessage
| ExceptionResponseMessage
deriving (forall x. Rep MessageType x -> MessageType
forall x. MessageType -> Rep MessageType x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep MessageType x -> MessageType
$cfrom :: forall x. MessageType -> Rep MessageType x
Generic, Int -> MessageType -> ShowS
[MessageType] -> ShowS
MessageType -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [MessageType] -> ShowS
$cshowList :: [MessageType] -> ShowS
show :: MessageType -> String
$cshow :: MessageType -> String
showsPrec :: Int -> MessageType -> ShowS
$cshowsPrec :: Int -> MessageType -> ShowS
Show)
deriving Typeable MessageType
BundleSerialise MessageType
Extractor MessageType
Decoder MessageType
Proxy MessageType -> SchemaGen Schema
MessageType -> Builder
forall a.
Typeable a
-> (Proxy a -> SchemaGen Schema)
-> (a -> Builder)
-> Extractor a
-> Decoder a
-> BundleSerialise a
-> Serialise a
bundleSerialise :: BundleSerialise MessageType
$cbundleSerialise :: BundleSerialise MessageType
decodeCurrent :: Decoder MessageType
$cdecodeCurrent :: Decoder MessageType
extractor :: Extractor MessageType
$cextractor :: Extractor MessageType
toBuilder :: MessageType -> Builder
$ctoBuilder :: MessageType -> Builder
schemaGen :: Proxy MessageType -> SchemaGen Schema
$cschemaGen :: Proxy MessageType -> SchemaGen Schema
Serialise via WineryVariant MessageType
type RequestHandlers serverState = [RequestHandler serverState]
data RequestHandler serverState where
RequestHandler :: forall a b serverState. (Serialise a, Serialise b) => (ConnectionState serverState -> a -> IO b) -> RequestHandler serverState
AsyncRequestHandler :: forall a serverState. Serialise a => (ConnectionState serverState -> a -> IO ()) -> RequestHandler serverState
data ConnectionState a = ConnectionState {
forall a. ConnectionState a -> a
connectionServerState :: a,
forall a. ConnectionState a -> Locking Socket
connectionSocket :: Locking Socket
}
sendMessage :: Serialise a => Locking Socket -> a -> IO ()
sendMessage :: forall a. Serialise a => Locking Socket -> a -> IO ()
sendMessage Locking Socket
lockSock a
msg = do
UUID
requestID <- UUID -> UUID
UUID forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUIDBase.nextRandom
let env :: Envelope
env =
Fingerprint -> MessageType -> UUID -> ByteString -> Envelope
Envelope (forall a. Typeable a => a -> Fingerprint
fingerprint a
msg) (Int -> MessageType
RequestMessage Int
timeout') UUID
requestID (forall a. Serialise a => a -> ByteString
msgSerialise a
msg)
timeout' :: Int
timeout' = Int
0
Envelope -> Locking Socket -> IO ()
sendEnvelope Envelope
env Locking Socket
lockSock
newtype UUID = UUID { UUID -> UUID
_unUUID :: UUIDBase.UUID }
deriving (Int -> UUID -> ShowS
[UUID] -> ShowS
UUID -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [UUID] -> ShowS
$cshowList :: [UUID] -> ShowS
show :: UUID -> String
$cshow :: UUID -> String
showsPrec :: Int -> UUID -> ShowS
$cshowsPrec :: Int -> UUID -> ShowS
Show, UUID -> UUID -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: UUID -> UUID -> Bool
$c/= :: UUID -> UUID -> Bool
== :: UUID -> UUID -> Bool
$c== :: UUID -> UUID -> Bool
Eq, Get UUID
[UUID] -> Put
UUID -> Put
forall t. (t -> Put) -> Get t -> ([t] -> Put) -> Binary t
putList :: [UUID] -> Put
$cputList :: [UUID] -> Put
get :: Get UUID
$cget :: Get UUID
put :: UUID -> Put
$cput :: UUID -> Put
B.Binary, Eq UUID
Int -> UUID -> Int
UUID -> Int
forall a. Eq a -> (Int -> a -> Int) -> (a -> Int) -> Hashable a
hash :: UUID -> Int
$chash :: UUID -> Int
hashWithSalt :: Int -> UUID -> Int
$chashWithSalt :: Int -> UUID -> Int
Hashable)
instance Serialise UUID where
schemaGen :: Proxy UUID -> SchemaGen Schema
schemaGen Proxy UUID
_ = forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a. Tag -> SchemaP a -> SchemaP a
STag (Text -> Tag
TagStr Text
"Data.UUID") forall a. SchemaP a
SBytes)
toBuilder :: UUID -> Builder
toBuilder UUID
uuid = let bytes :: ByteString
bytes = ByteString -> ByteString
BSL.toStrict (forall a. Binary a => a -> ByteString
B.encode UUID
uuid) in
forall a. (Bits a, Integral a) => a -> Builder
varInt (ByteString -> Int
BS.length ByteString
bytes) forall a. Semigroup a => a -> a -> a
<> ByteString -> Builder
BB.byteString ByteString
bytes
{-# INLINE toBuilder #-}
extractor :: Extractor UUID
extractor = forall a.
Typeable a =>
(Schema -> Strategy' (Term -> a)) -> Extractor a
mkExtractor forall a b. (a -> b) -> a -> b
$
\Schema
schema' -> case Schema
schema' of
STag (TagStr Text
"Data.UUID") Schema
SBytes ->
forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ \Term
term -> case Term
term of
TBytes ByteString
bs -> forall a. Binary a => ByteString -> a
B.decode (ByteString -> ByteString
BSL.fromStrict ByteString
bs)
Term
term' -> forall a e. Exception e => e -> a
throw (Term -> ExtractException
InvalidTerm Term
term')
Schema
x -> forall a. HasCallStack => String -> a
error forall a b. (a -> b) -> a -> b
$ String
"invalid schema element " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Schema
x
decodeCurrent :: Decoder UUID
decodeCurrent = forall a. Binary a => ByteString -> a
B.decode forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
BSL.fromStrict forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (forall a. (Num a, Bits a) => Decoder a
decodeVarInt forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Int -> Decoder ByteString
getBytes)
data ConnectionError = CodecError String
| TimeoutError
| ExceptionError String
deriving (forall x. Rep ConnectionError x -> ConnectionError
forall x. ConnectionError -> Rep ConnectionError x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep ConnectionError x -> ConnectionError
$cfrom :: forall x. ConnectionError -> Rep ConnectionError x
Generic, Int -> ConnectionError -> ShowS
[ConnectionError] -> ShowS
ConnectionError -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ConnectionError] -> ShowS
$cshowList :: [ConnectionError] -> ShowS
show :: ConnectionError -> String
$cshow :: ConnectionError -> String
showsPrec :: Int -> ConnectionError -> ShowS
$cshowsPrec :: Int -> ConnectionError -> ShowS
Show, ConnectionError -> ConnectionError -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ConnectionError -> ConnectionError -> Bool
$c/= :: ConnectionError -> ConnectionError -> Bool
== :: ConnectionError -> ConnectionError -> Bool
$c== :: ConnectionError -> ConnectionError -> Bool
Eq)
deriving Typeable ConnectionError
BundleSerialise ConnectionError
Extractor ConnectionError
Decoder ConnectionError
Proxy ConnectionError -> SchemaGen Schema
ConnectionError -> Builder
forall a.
Typeable a
-> (Proxy a -> SchemaGen Schema)
-> (a -> Builder)
-> Extractor a
-> Decoder a
-> BundleSerialise a
-> Serialise a
bundleSerialise :: BundleSerialise ConnectionError
$cbundleSerialise :: BundleSerialise ConnectionError
decodeCurrent :: Decoder ConnectionError
$cdecodeCurrent :: Decoder ConnectionError
extractor :: Extractor ConnectionError
$cextractor :: Extractor ConnectionError
toBuilder :: ConnectionError -> Builder
$ctoBuilder :: ConnectionError -> Builder
schemaGen :: Proxy ConnectionError -> SchemaGen Schema
$cschemaGen :: Proxy ConnectionError -> SchemaGen Schema
Serialise via WineryVariant ConnectionError
data TimeoutException = TimeoutException
deriving Int -> TimeoutException -> ShowS
[TimeoutException] -> ShowS
TimeoutException -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [TimeoutException] -> ShowS
$cshowList :: [TimeoutException] -> ShowS
show :: TimeoutException -> String
$cshow :: TimeoutException -> String
showsPrec :: Int -> TimeoutException -> ShowS
$cshowsPrec :: Int -> TimeoutException -> ShowS
Show
instance Exception TimeoutException
type BParser a = Parser Word8 IO a
type HostAddressTuple = (Word8, Word8, Word8, Word8)
type HostAddressTuple6 = (Word16, Word16, Word16, Word16, Word16, Word16, Word16, Word16)
allHostAddrs,localHostAddr :: HostAddressTuple
allHostAddrs :: HostAddressTuple
allHostAddrs = (Word8
0,Word8
0,Word8
0,Word8
0)
localHostAddr :: HostAddressTuple
localHostAddr = (Word8
127,Word8
0,Word8
0,Word8
1)
localHostAddr6 :: HostAddressTuple6
localHostAddr6 :: HostAddressTuple6
localHostAddr6 = (Word16
0,Word16
0,Word16
0,Word16
0,Word16
0,Word16
0,Word16
0,Word16
1)
msgTypeP :: BParser MessageType
msgTypeP :: BParser MessageType
msgTypeP = (forall (m :: * -> *) a. Monad m => (a -> Bool) -> Parser a m a
P.satisfy (forall a. Eq a => a -> a -> Bool
== Word8
0) forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*>
(Int -> MessageType
RequestMessage forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. (Integral a, Num b) => a -> b
fromIntegral forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> BParser Word32
word32P)) forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|>
(forall (m :: * -> *) a. Monad m => (a -> Bool) -> Parser a m a
P.satisfy (forall a. Eq a => a -> a -> Bool
== Word8
1) forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> MessageType
ResponseMessage) forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|>
(forall (m :: * -> *) a. Monad m => (a -> Bool) -> Parser a m a
P.satisfy (forall a. Eq a => a -> a -> Bool
== Word8
2) forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> MessageType
TimeoutResponseMessage) forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|>
(forall (m :: * -> *) a. Monad m => (a -> Bool) -> Parser a m a
P.satisfy (forall a. Eq a => a -> a -> Bool
== Word8
3) forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> MessageType
ExceptionResponseMessage)
envelopeP :: BParser Envelope
envelopeP :: BParser Envelope
envelopeP = do
let lenPrefixedByteStringP :: Parser Word8 IO ByteString
lenPrefixedByteStringP = do
Int
c <- forall a b. (Integral a, Num b) => a -> b
fromIntegral forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (m :: * -> *). Monad m => Parser Word8 m Word32
word32be
if Int
c forall a. Eq a => a -> a -> Bool
== Int
0 then
forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Monoid a => a
mempty
else do
Array Word8
ps <- forall (m :: * -> *) a b.
Monad m =>
Int -> Fold m a b -> Parser a m b
P.takeEQ Int
c (forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Int -> Fold m a (Array a)
pinnedCreateOf Int
c)
let !bs :: ByteString
bs = Array Word8 -> ByteString
StreamlyBS.fromArray Array Word8
ps
forall (f :: * -> *) a. Applicative f => a -> f a
pure ByteString
bs
Fingerprint -> MessageType -> UUID -> ByteString -> Envelope
Envelope forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> BParser Fingerprint
fingerprintP forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> BParser MessageType
msgTypeP forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> BParser UUID
uuidP forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Parser Word8 IO ByteString
lenPrefixedByteStringP
encodeEnvelope :: Envelope -> BS.ByteString
encodeEnvelope :: Envelope -> ByteString
encodeEnvelope (Envelope (Fingerprint Word64
fp1 Word64
fp2) MessageType
msgType UUID
msgId ByteString
bs) =
ByteString
completeMessage
where
completeMessage :: ByteString
completeMessage = ByteString
fingerprintBs forall a. Semigroup a => a -> a -> a
<> ByteString
msgTypeBs forall a. Semigroup a => a -> a -> a
<> ByteString
msgIdBs forall a. Semigroup a => a -> a -> a
<> ByteString
lenPrefixedBs
fingerprintBs :: ByteString
fingerprintBs = Word64 -> ByteString
BO.bytestring64 Word64
fp1 forall a. Semigroup a => a -> a -> a
<> Word64 -> ByteString
BO.bytestring64 Word64
fp2
msgTypeBs :: ByteString
msgTypeBs = case MessageType
msgType of
RequestMessage Int
timeoutms -> Word8 -> ByteString
BS.singleton Word8
0 forall a. Semigroup a => a -> a -> a
<> Word32 -> ByteString
BO.bytestring32 (forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
timeoutms)
MessageType
ResponseMessage -> Word8 -> ByteString
BS.singleton Word8
1
MessageType
TimeoutResponseMessage -> Word8 -> ByteString
BS.singleton Word8
2
MessageType
ExceptionResponseMessage -> Word8 -> ByteString
BS.singleton Word8
3
msgIdBs :: ByteString
msgIdBs =
case UUID -> (Word32, Word32, Word32, Word32)
UUIDBase.toWords (UUID -> UUID
_unUUID UUID
msgId) of
(Word32
u1, Word32
u2, Word32
u3, Word32
u4) -> forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr (forall a. Semigroup a => a -> a -> a
(<>) forall b c a. (b -> c) -> (a -> b) -> a -> c
. Word32 -> ByteString
BO.bytestring32) ByteString
BS.empty [Word32
u1, Word32
u2, Word32
u3, Word32
u4]
lenPrefixedBs :: ByteString
lenPrefixedBs = Word32 -> ByteString
BO.bytestring32 Word32
payloadLen forall a. Semigroup a => a -> a -> a
<> ByteString
bs
payloadLen :: Word32
payloadLen = forall a b. (Integral a, Num b) => a -> b
fromIntegral (ByteString -> Int
BS.length ByteString
bs)
fingerprintP :: BParser Fingerprint
fingerprintP :: BParser Fingerprint
fingerprintP =
Word64 -> Word64 -> Fingerprint
Fingerprint forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> BParser Word64
word64P forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> BParser Word64
word64P
word64P :: BParser Word64
word64P :: BParser Word64
word64P = do
let s :: Fold IO a [a]
s = forall (m :: * -> *) a. Monad m => Fold m a [a]
FL.toList
[Word8]
b <- forall (m :: * -> *) a b.
Monad m =>
Int -> Fold m a b -> Parser a m b
P.takeEQ Int
8 forall {a}. Fold IO a [a]
s
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString -> Word64
BO.word64 ([Word8] -> ByteString
BS.pack [Word8]
b))
word32P :: BParser Word32
word32P :: BParser Word32
word32P = do
let s :: Fold IO a [a]
s = forall (m :: * -> *) a. Monad m => Fold m a [a]
FL.toList
[Word8]
w4x8 <- forall (m :: * -> *) a b.
Monad m =>
Int -> Fold m a b -> Parser a m b
P.takeEQ Int
4 forall {a}. Fold IO a [a]
s
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString -> Word32
BO.word32 ([Word8] -> ByteString
BS.pack [Word8]
w4x8))
uuidP :: BParser UUID
uuidP :: BParser UUID
uuidP = do
Word32
u1 <- BParser Word32
word32P
Word32
u2 <- BParser Word32
word32P
Word32
u3 <- BParser Word32
word32P
UUID -> UUID
UUID forall b c a. (b -> c) -> (a -> b) -> a -> c
. Word32 -> Word32 -> Word32 -> Word32 -> UUID
UUIDBase.fromWords Word32
u1 Word32
u2 Word32
u3 forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> BParser Word32
word32P
type NewConnectionHandler msg = IO (Maybe msg)
type NewMessageHandler req resp = req -> IO resp
defaultSocketOptions :: [(SocketOption, Int)]
defaultSocketOptions :: [(SocketOption, Int)]
defaultSocketOptions = [(SocketOption
ReuseAddr, Int
1), (SocketOption
NoDelay, Int
1)]
serveIPv4 :: RequestHandlers s -> s -> HostAddressTuple -> PortNumber -> Maybe (MVar SockAddr) -> IO Bool
serveIPv4 :: forall s.
RequestHandlers s
-> s
-> HostAddressTuple
-> PortNumber
-> Maybe (MVar SockAddr)
-> IO Bool
serveIPv4 RequestHandlers s
handlers s
state HostAddressTuple
hostaddr PortNumber
port Maybe (MVar SockAddr)
mSockLock =
forall s.
RequestHandlers s
-> s -> SockSpec -> SockAddr -> Maybe (MVar SockAddr) -> IO Bool
serve RequestHandlers s
handlers s
state SockSpec
sockSpec SockAddr
sockAddr Maybe (MVar SockAddr)
mSockLock
where
sockAddr :: SockAddr
sockAddr = PortNumber -> Word32 -> SockAddr
SockAddrInet PortNumber
port (HostAddressTuple -> Word32
tupleToHostAddress HostAddressTuple
hostaddr)
sockSpec :: SockSpec
sockSpec = SockSpec { sockFamily :: Family
sockFamily = Family
AF_INET,
sockType :: SocketType
sockType = SocketType
Stream,
sockProto :: ProtocolNumber
sockProto = ProtocolNumber
0,
sockOpts :: [(SocketOption, Int)]
sockOpts = [(SocketOption, Int)]
defaultSocketOptions
}
serveIPv6 :: RequestHandlers s -> s -> HostAddressTuple6 -> PortNumber -> Maybe (MVar SockAddr) -> IO Bool
serveIPv6 :: forall s.
RequestHandlers s
-> s
-> HostAddressTuple6
-> PortNumber
-> Maybe (MVar SockAddr)
-> IO Bool
serveIPv6 RequestHandlers s
handlers s
state HostAddressTuple6
hostaddr PortNumber
port Maybe (MVar SockAddr)
mSockLock =
forall s.
RequestHandlers s
-> s -> SockSpec -> SockAddr -> Maybe (MVar SockAddr) -> IO Bool
serve RequestHandlers s
handlers s
state SockSpec
sockSpec SockAddr
sockAddr Maybe (MVar SockAddr)
mSockLock
where
flowInfo :: Word32
flowInfo = Word32
0
scopeInfo :: Word32
scopeInfo = Word32
0
sockAddr :: SockAddr
sockAddr = PortNumber
-> Word32 -> (Word32, Word32, Word32, Word32) -> Word32 -> SockAddr
SockAddrInet6 PortNumber
port Word32
flowInfo (HostAddressTuple6 -> (Word32, Word32, Word32, Word32)
tupleToHostAddress6 HostAddressTuple6
hostaddr) Word32
scopeInfo
sockSpec :: SockSpec
sockSpec = SockSpec { sockFamily :: Family
sockFamily = Family
AF_INET6,
sockType :: SocketType
sockType = SocketType
Stream,
sockProto :: ProtocolNumber
sockProto = ProtocolNumber
0,
sockOpts :: [(SocketOption, Int)]
sockOpts = [(SocketOption, Int)]
defaultSocketOptions
}
serveUnixDomain :: RequestHandlers s -> s -> FilePath -> Maybe (MVar SockAddr) -> IO Bool
serveUnixDomain :: forall s.
RequestHandlers s
-> s -> String -> Maybe (MVar SockAddr) -> IO Bool
serveUnixDomain RequestHandlers s
handlers s
state String
socketPath Maybe (MVar SockAddr)
mSockLock =
forall s.
RequestHandlers s
-> s -> SockSpec -> SockAddr -> Maybe (MVar SockAddr) -> IO Bool
serve RequestHandlers s
handlers s
state SockSpec
sockSpec SockAddr
sockAddr Maybe (MVar SockAddr)
mSockLock
where
sockSpec :: SockSpec
sockSpec = SockSpec { sockFamily :: Family
sockFamily = Family
AF_UNIX,
sockType :: SocketType
sockType = SocketType
Stream,
sockProto :: ProtocolNumber
sockProto = ProtocolNumber
0,
sockOpts :: [(SocketOption, Int)]
sockOpts = [] }
sockAddr :: SockAddr
sockAddr = String -> SockAddr
SockAddrUnix String
socketPath
serve ::
RequestHandlers s->
s ->
SockSpec ->
SockAddr ->
Maybe (MVar SockAddr) ->
IO Bool
serve :: forall s.
RequestHandlers s
-> s -> SockSpec -> SockAddr -> Maybe (MVar SockAddr) -> IO Bool
serve RequestHandlers s
userMsgHandlers s
serverState SockSpec
sockSpec SockAddr
sockAddr Maybe (MVar SockAddr)
mSockLock = do
let handleSock :: Socket -> IO ()
handleSock Socket
sock = do
Locking Socket
lockingSocket <- forall a. a -> IO (Locking a)
newLock Socket
sock
Socket -> EnvelopeHandler -> IO ()
drainSocketMessages Socket
sock (forall s.
Locking Socket -> RequestHandlers s -> s -> EnvelopeHandler
serverEnvelopeHandler Locking Socket
lockingSocket RequestHandlers s
userMsgHandlers s
serverState)
forall (m :: * -> *) a b.
Applicative m =>
Unfold m a b -> a -> Stream m b
Stream.unfold (forall (m :: * -> *).
MonadIO m =>
SockSpec -> Maybe (MVar SockAddr) -> Unfold m SockAddr Socket
SA.acceptorOnSockSpec SockSpec
sockSpec Maybe (MVar SockAddr)
mSockLock) SockAddr
sockAddr
forall a b. a -> (a -> b) -> b
& forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> (a -> m b) -> Stream m a -> Stream m b
Stream.parMapM forall a. a -> a
id Socket -> IO ()
handleSock
forall a b. a -> (a -> b) -> b
& forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
Stream.fold forall (m :: * -> *) a. Monad m => Fold m a ()
FL.drain
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
openEnvelope :: forall s. (Serialise s, Typeable s) => Envelope -> Maybe s
openEnvelope :: forall s. (Serialise s, Typeable s) => Envelope -> Maybe s
openEnvelope (Envelope Fingerprint
eprint MessageType
_ UUID
_ ByteString
bytes) =
if Fingerprint
eprint forall a. Eq a => a -> a -> Bool
== forall a. Typeable a => a -> Fingerprint
fingerprint (forall a. HasCallStack => a
undefined :: s) then
case forall s. Serialise s => ByteString -> Either WineryException s
msgDeserialise ByteString
bytes of
Left WineryException
_e -> forall a. Maybe a
Nothing
Right s
decoded -> forall a. a -> Maybe a
Just s
decoded
else
forall a. Maybe a
Nothing
deserialiseOnly' :: forall s. Serialise s => BS.ByteString -> Either WineryException s
deserialiseOnly' :: forall s. Serialise s => ByteString -> Either WineryException s
deserialiseOnly' ByteString
bytes = do
Decoder s
dec <- forall a.
Serialise a =>
Schema -> Either WineryException (Decoder a)
getDecoder (forall (proxy :: * -> *) a. Serialise a => proxy a -> Schema
schema (forall {k} (t :: k). Proxy t
Proxy :: Proxy s))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a. Decoder a -> ByteString -> a
evalDecoder Decoder s
dec ByteString
bytes)
matchEnvelope :: forall a b s. (Serialise a, Serialise b, Typeable b) =>
Envelope ->
(ConnectionState s -> a -> IO b) ->
Maybe (ConnectionState s -> a -> IO b, a)
matchEnvelope :: forall a b s.
(Serialise a, Serialise b, Typeable b) =>
Envelope
-> (ConnectionState s -> a -> IO b)
-> Maybe (ConnectionState s -> a -> IO b, a)
matchEnvelope Envelope
envelope ConnectionState s -> a -> IO b
dispatchf =
case forall s. (Serialise s, Typeable s) => Envelope -> Maybe s
openEnvelope Envelope
envelope :: Maybe a of
Maybe a
Nothing -> forall a. Maybe a
Nothing
Just a
decoded -> forall a. a -> Maybe a
Just (ConnectionState s -> a -> IO b
dispatchf, a
decoded)
serverEnvelopeHandler ::
Locking Socket
-> RequestHandlers s
-> s
-> Envelope
-> IO ()
serverEnvelopeHandler :: forall s.
Locking Socket -> RequestHandlers s -> s -> EnvelopeHandler
serverEnvelopeHandler Locking Socket
_ RequestHandlers s
_ s
_ (Envelope Fingerprint
_ MessageType
TimeoutResponseMessage UUID
_ ByteString
_) = forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
serverEnvelopeHandler Locking Socket
_ RequestHandlers s
_ s
_ (Envelope Fingerprint
_ MessageType
ExceptionResponseMessage UUID
_ ByteString
_) = forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
serverEnvelopeHandler Locking Socket
_ RequestHandlers s
_ s
_ (Envelope Fingerprint
_ MessageType
ResponseMessage UUID
_ ByteString
_) = forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
serverEnvelopeHandler Locking Socket
sockLock RequestHandlers s
msgHandlers s
serverState envelope :: Envelope
envelope@(Envelope Fingerprint
_ (RequestMessage Int
timeoutms) UUID
msgId ByteString
_) = do
let runTimeout :: IO b -> IO (Maybe b)
runTimeout :: forall b. IO b -> IO (Maybe b)
runTimeout IO b
m =
if Int
timeoutms forall a. Eq a => a -> a -> Bool
== Int
0 then
(forall a. a -> Maybe a
Just forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO b
m) forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` forall b. TimeoutException -> IO (Maybe b)
timeoutExcHandler
else
forall a. Int -> IO a -> IO (Maybe a)
timeout (forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
timeoutms) IO b
m forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` forall b. TimeoutException -> IO (Maybe b)
timeoutExcHandler
timeoutExcHandler :: TimeoutException -> IO (Maybe b)
timeoutExcHandler :: forall b. TimeoutException -> IO (Maybe b)
timeoutExcHandler TimeoutException
_ = forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Maybe a
Nothing
sState :: ConnectionState s
sState = ConnectionState {
connectionServerState :: s
connectionServerState = s
serverState,
connectionSocket :: Locking Socket
connectionSocket = Locking Socket
sockLock
}
firstMatcher :: RequestHandler s -> Maybe () -> IO (Maybe ())
firstMatcher (RequestHandler ConnectionState s -> a -> IO b
msghandler) Maybe ()
Nothing =
case forall a b s.
(Serialise a, Serialise b, Typeable b) =>
Envelope
-> (ConnectionState s -> a -> IO b)
-> Maybe (ConnectionState s -> a -> IO b, a)
matchEnvelope Envelope
envelope ConnectionState s -> a -> IO b
msghandler of
Maybe (ConnectionState s -> a -> IO b, a)
Nothing -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Maybe a
Nothing
Just (ConnectionState s -> a -> IO b
dispatchf, a
decoded) -> do
Maybe b
mResponse <- forall b. IO b -> IO (Maybe b)
runTimeout (ConnectionState s -> a -> IO b
dispatchf ConnectionState s
sState a
decoded)
let envelopeResponse :: Envelope
envelopeResponse =
case Maybe b
mResponse of
Just b
response ->
Fingerprint -> MessageType -> UUID -> ByteString -> Envelope
Envelope (forall a. Typeable a => a -> Fingerprint
fingerprint b
response) MessageType
ResponseMessage UUID
msgId (forall a. Serialise a => a -> ByteString
msgSerialise b
response)
Maybe b
Nothing ->
Fingerprint -> MessageType -> UUID -> ByteString -> Envelope
Envelope (forall a. Typeable a => a -> Fingerprint
fingerprint ConnectionError
TimeoutError) MessageType
TimeoutResponseMessage UUID
msgId ByteString
BS.empty
Envelope -> Locking Socket -> IO ()
sendEnvelope Envelope
envelopeResponse Locking Socket
sockLock
forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a. a -> Maybe a
Just ())
firstMatcher (AsyncRequestHandler ConnectionState s -> a -> IO ()
msghandler) Maybe ()
Nothing =
case forall a b s.
(Serialise a, Serialise b, Typeable b) =>
Envelope
-> (ConnectionState s -> a -> IO b)
-> Maybe (ConnectionState s -> a -> IO b, a)
matchEnvelope Envelope
envelope ConnectionState s -> a -> IO ()
msghandler of
Maybe (ConnectionState s -> a -> IO (), a)
Nothing -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Maybe a
Nothing
Just (ConnectionState s -> a -> IO ()
dispatchf, a
decoded) -> do
()
_ <- ConnectionState s -> a -> IO ()
dispatchf ConnectionState s
sState a
decoded
forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a. a -> Maybe a
Just ())
firstMatcher RequestHandler s
_ Maybe ()
acc = forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ()
acc
Either SomeException ()
eExc <- forall e a. Exception e => IO a -> IO (Either e a)
try forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *) (m :: * -> *) b a.
(Foldable t, Monad m) =>
(b -> a -> m b) -> b -> t a -> m ()
foldM_ (forall a b c. (a -> b -> c) -> b -> a -> c
flip RequestHandler s -> Maybe () -> IO (Maybe ())
firstMatcher) forall a. Maybe a
Nothing RequestHandlers s
msgHandlers :: IO (Either SomeException ())
case Either SomeException ()
eExc of
Left SomeException
exc ->
let env :: Envelope
env = Fingerprint -> MessageType -> UUID -> ByteString -> Envelope
Envelope (forall a. Typeable a => a -> Fingerprint
fingerprint (forall a. Show a => a -> String
show SomeException
exc)) MessageType
ExceptionResponseMessage UUID
msgId (forall a. Serialise a => a -> ByteString
msgSerialise (forall a. Show a => a -> String
show SomeException
exc)) in
Envelope -> Locking Socket -> IO ()
sendEnvelope Envelope
env Locking Socket
sockLock
Right () -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
type EnvelopeHandler = Envelope -> IO ()
drainSocketMessages :: Socket -> EnvelopeHandler -> IO ()
drainSocketMessages :: Socket -> EnvelopeHandler -> IO ()
drainSocketMessages Socket
sock EnvelopeHandler
envelopeHandler = do
forall (m :: * -> *) a b.
Applicative m =>
Unfold m a b -> a -> Stream m b
SP.unfold forall (m :: * -> *). MonadIO m => Unfold m Socket Word8
SSock.reader Socket
sock
forall a b. a -> (a -> b) -> b
& forall (m :: * -> *) a b.
Monad m =>
Parser a m b -> Stream m a -> Stream m (Either ParseError b)
P.parseMany BParser Envelope
envelopeP
forall a b. a -> (a -> b) -> b
& forall (m :: * -> *) a b.
Monad m =>
Stream m (Either a b) -> Stream m b
SP.catRights
forall a b. a -> (a -> b) -> b
& forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> (a -> m b) -> Stream m a -> Stream m b
SP.parMapM (Bool -> Config -> Config
SP.ordered Bool
False) EnvelopeHandler
envelopeHandler
forall a b. a -> (a -> b) -> b
& forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
SP.fold forall (m :: * -> *) a. Monad m => Fold m a ()
FL.drain
sendEnvelope :: Envelope -> Locking Socket -> IO ()
sendEnvelope :: Envelope -> Locking Socket -> IO ()
sendEnvelope Envelope
envelope Locking Socket
sockLock = do
let envelopebytes :: ByteString
envelopebytes = Envelope -> ByteString
encodeEnvelope Envelope
envelope
forall a b. Locking a -> (a -> IO b) -> IO b
withLock Locking Socket
sockLock forall a b. (a -> b) -> a -> b
$ \Socket
socket' -> do
Socket -> ByteString -> IO ()
Socket.sendAll Socket
socket' ByteString
envelopebytes
forall (f :: * -> *). Applicative f => String -> ByteString -> f ()
traceBytes String
"sendEnvelope" ByteString
envelopebytes
fingerprint :: Typeable a => a -> Fingerprint
fingerprint :: forall a. Typeable a => a -> Fingerprint
fingerprint = TypeRep -> Fingerprint
typeRepFingerprint forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Typeable a => a -> TypeRep
typeOf