{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
module Async.Worker.Broker.Redis
( RedisBroker
, BrokerInitParams(..)
, RedisWithMsgId(..) )
where
import Async.Worker.Broker.Types (MessageBroker(..), Queue, SerializableMessage, TimeoutS(..), renderQueue)
import Control.Monad (void)
import Data.Aeson qualified as Aeson
import Data.Aeson (FromJSON(..), ToJSON(..), (.:), (.=), withObject, object, withScientific)
import Data.ByteString.Char8 qualified as BSC
import Data.ByteString.Lazy qualified as BSL
import Data.Maybe (catMaybes, fromMaybe)
import Data.Scientific (floatingOrInteger)
import Data.UnixTime (getUnixTime, UnixTime(..))
import Database.Redis qualified as Redis
import Foreign.C.Types (CTime(..))
import Text.Read (readMaybe)
data RedisBroker
instance (SerializableMessage a, Show a) => MessageBroker RedisBroker a where
data Broker RedisBroker a =
RedisBroker' {
forall a. Broker RedisBroker a -> Connection
conn :: Redis.Connection
}
data BrokerMessage RedisBroker a =
RedisBM (RedisWithMsgId a)
deriving (Int -> BrokerMessage RedisBroker a -> ShowS
[BrokerMessage RedisBroker a] -> ShowS
BrokerMessage RedisBroker a -> String
(Int -> BrokerMessage RedisBroker a -> ShowS)
-> (BrokerMessage RedisBroker a -> String)
-> ([BrokerMessage RedisBroker a] -> ShowS)
-> Show (BrokerMessage RedisBroker a)
forall a. Show a => Int -> BrokerMessage RedisBroker a -> ShowS
forall a. Show a => [BrokerMessage RedisBroker a] -> ShowS
forall a. Show a => BrokerMessage RedisBroker a -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: forall a. Show a => Int -> BrokerMessage RedisBroker a -> ShowS
showsPrec :: Int -> BrokerMessage RedisBroker a -> ShowS
$cshow :: forall a. Show a => BrokerMessage RedisBroker a -> String
show :: BrokerMessage RedisBroker a -> String
$cshowList :: forall a. Show a => [BrokerMessage RedisBroker a] -> ShowS
showList :: [BrokerMessage RedisBroker a] -> ShowS
Show)
data Message RedisBroker a = RedisM a
data MessageId RedisBroker = RedisMid Int
deriving (MessageId RedisBroker -> MessageId RedisBroker -> Bool
(MessageId RedisBroker -> MessageId RedisBroker -> Bool)
-> (MessageId RedisBroker -> MessageId RedisBroker -> Bool)
-> Eq (MessageId RedisBroker)
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: MessageId RedisBroker -> MessageId RedisBroker -> Bool
== :: MessageId RedisBroker -> MessageId RedisBroker -> Bool
$c/= :: MessageId RedisBroker -> MessageId RedisBroker -> Bool
/= :: MessageId RedisBroker -> MessageId RedisBroker -> Bool
Eq, Int -> MessageId RedisBroker -> ShowS
[MessageId RedisBroker] -> ShowS
MessageId RedisBroker -> String
(Int -> MessageId RedisBroker -> ShowS)
-> (MessageId RedisBroker -> String)
-> ([MessageId RedisBroker] -> ShowS)
-> Show (MessageId RedisBroker)
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> MessageId RedisBroker -> ShowS
showsPrec :: Int -> MessageId RedisBroker -> ShowS
$cshow :: MessageId RedisBroker -> String
show :: MessageId RedisBroker -> String
$cshowList :: [MessageId RedisBroker] -> ShowS
showList :: [MessageId RedisBroker] -> ShowS
Show, Eq (MessageId RedisBroker)
Eq (MessageId RedisBroker) =>
(MessageId RedisBroker -> MessageId RedisBroker -> Ordering)
-> (MessageId RedisBroker -> MessageId RedisBroker -> Bool)
-> (MessageId RedisBroker -> MessageId RedisBroker -> Bool)
-> (MessageId RedisBroker -> MessageId RedisBroker -> Bool)
-> (MessageId RedisBroker -> MessageId RedisBroker -> Bool)
-> (MessageId RedisBroker
-> MessageId RedisBroker -> MessageId RedisBroker)
-> (MessageId RedisBroker
-> MessageId RedisBroker -> MessageId RedisBroker)
-> Ord (MessageId RedisBroker)
MessageId RedisBroker -> MessageId RedisBroker -> Bool
MessageId RedisBroker -> MessageId RedisBroker -> Ordering
MessageId RedisBroker
-> MessageId RedisBroker -> MessageId RedisBroker
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: MessageId RedisBroker -> MessageId RedisBroker -> Ordering
compare :: MessageId RedisBroker -> MessageId RedisBroker -> Ordering
$c< :: MessageId RedisBroker -> MessageId RedisBroker -> Bool
< :: MessageId RedisBroker -> MessageId RedisBroker -> Bool
$c<= :: MessageId RedisBroker -> MessageId RedisBroker -> Bool
<= :: MessageId RedisBroker -> MessageId RedisBroker -> Bool
$c> :: MessageId RedisBroker -> MessageId RedisBroker -> Bool
> :: MessageId RedisBroker -> MessageId RedisBroker -> Bool
$c>= :: MessageId RedisBroker -> MessageId RedisBroker -> Bool
>= :: MessageId RedisBroker -> MessageId RedisBroker -> Bool
$cmax :: MessageId RedisBroker
-> MessageId RedisBroker -> MessageId RedisBroker
max :: MessageId RedisBroker
-> MessageId RedisBroker -> MessageId RedisBroker
$cmin :: MessageId RedisBroker
-> MessageId RedisBroker -> MessageId RedisBroker
min :: MessageId RedisBroker
-> MessageId RedisBroker -> MessageId RedisBroker
Ord)
data BrokerInitParams RedisBroker a = RedisBrokerInitParams Redis.ConnectInfo
messageId :: BrokerMessage RedisBroker a -> MessageId RedisBroker
messageId (RedisBM (RedisWithMsgId { Int
rmidId :: Int
rmidId :: forall a. RedisWithMsgId a -> Int
rmidId })) = Int -> MessageId RedisBroker
RedisMid Int
rmidId
getMessage :: BrokerMessage RedisBroker a -> Message RedisBroker a
getMessage (RedisBM (RedisWithMsgId { a
rmida :: a
rmida :: forall a. RedisWithMsgId a -> a
rmida })) = a -> Message RedisBroker a
forall a. a -> Message RedisBroker a
RedisM a
rmida
toMessage :: a -> Message RedisBroker a
toMessage a
message = a -> Message RedisBroker a
forall a. a -> Message RedisBroker a
RedisM a
message
toA :: Message RedisBroker a -> a
toA (RedisM a
message) = a
message
initBroker :: BrokerInitParams RedisBroker a -> IO (Broker RedisBroker a)
initBroker (RedisBrokerInitParams ConnectInfo
connInfo) = do
Connection
conn <- ConnectInfo -> IO Connection
Redis.checkedConnect ConnectInfo
connInfo
Broker RedisBroker a -> IO (Broker RedisBroker a)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Broker RedisBroker a -> IO (Broker RedisBroker a))
-> Broker RedisBroker a -> IO (Broker RedisBroker a)
forall a b. (a -> b) -> a -> b
$ RedisBroker' { Connection
conn :: Connection
conn :: Connection
conn }
deinitBroker :: Broker RedisBroker a -> IO ()
deinitBroker (RedisBroker' { Connection
conn :: forall a. Broker RedisBroker a -> Connection
conn :: Connection
conn }) = Connection -> IO ()
Redis.disconnect Connection
conn
createQueue :: Broker RedisBroker a -> Queue -> IO ()
createQueue Broker RedisBroker a
_broker Queue
_queue = do
() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
dropQueue :: Broker RedisBroker a -> Queue -> IO ()
dropQueue (RedisBroker' { Connection
conn :: forall a. Broker RedisBroker a -> Connection
conn :: Connection
conn }) Queue
queue = do
let queueK :: ByteString
queueK = Queue -> ByteString
queueKey Queue
queue
IO (Either Reply Integer) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Either Reply Integer) -> IO ())
-> IO (Either Reply Integer) -> IO ()
forall a b. (a -> b) -> a -> b
$ Connection
-> Redis (Either Reply Integer) -> IO (Either Reply Integer)
forall a. Connection -> Redis a -> IO a
Redis.runRedis Connection
conn (Redis (Either Reply Integer) -> IO (Either Reply Integer))
-> Redis (Either Reply Integer) -> IO (Either Reply Integer)
forall a b. (a -> b) -> a -> b
$ [ByteString] -> Redis (Either Reply Integer)
forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
[ByteString] -> m (f Integer)
Redis.del [ByteString
queueK]
readMessageWaiting :: Broker RedisBroker a -> Queue -> IO (BrokerMessage RedisBroker a)
readMessageWaiting = Broker RedisBroker a -> Queue -> IO (BrokerMessage RedisBroker a)
forall b a.
MessageBroker b a =>
Broker b a -> Queue -> IO (BrokerMessage b a)
popMessageWaiting
popMessageWaiting :: Broker RedisBroker a -> Queue -> IO (BrokerMessage RedisBroker a)
popMessageWaiting b :: Broker RedisBroker a
b@(RedisBroker' { Connection
conn :: forall a. Broker RedisBroker a -> Connection
conn :: Connection
conn }) Queue
queue = IO (BrokerMessage RedisBroker a)
loop
where
queueK :: ByteString
queueK = Queue -> ByteString
queueKey Queue
queue
loop :: IO (BrokerMessage RedisBroker a)
loop = do
Either Reply (Maybe (ByteString, ByteString))
eData <- Connection
-> Redis (Either Reply (Maybe (ByteString, ByteString)))
-> IO (Either Reply (Maybe (ByteString, ByteString)))
forall a. Connection -> Redis a -> IO a
Redis.runRedis Connection
conn (Redis (Either Reply (Maybe (ByteString, ByteString)))
-> IO (Either Reply (Maybe (ByteString, ByteString))))
-> Redis (Either Reply (Maybe (ByteString, ByteString)))
-> IO (Either Reply (Maybe (ByteString, ByteString)))
forall a b. (a -> b) -> a -> b
$ [ByteString]
-> Integer -> Redis (Either Reply (Maybe (ByteString, ByteString)))
forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
[ByteString] -> Integer -> m (f (Maybe (ByteString, ByteString)))
Redis.blpop [ByteString
queueK] Integer
0
case Either Reply (Maybe (ByteString, ByteString))
eData of
Left Reply
_ -> IO (BrokerMessage RedisBroker a)
forall a. HasCallStack => a
undefined
Right Maybe (ByteString, ByteString)
Nothing -> IO (BrokerMessage RedisBroker a)
forall a. HasCallStack => a
undefined
Right (Just (ByteString
_queueK, ByteString
msgIdBS)) -> case ByteString -> Maybe (MessageId RedisBroker)
bsToId ByteString
msgIdBS of
Maybe (MessageId RedisBroker)
Nothing -> IO (BrokerMessage RedisBroker a)
forall a. HasCallStack => a
undefined
Just MessageId RedisBroker
msgId -> do
Maybe (BrokerMessage RedisBroker a)
mMsg <- Broker RedisBroker a
-> Queue
-> MessageId RedisBroker
-> IO (Maybe (BrokerMessage RedisBroker a))
forall a.
FromJSON a =>
Broker RedisBroker a
-> Queue
-> MessageId RedisBroker
-> IO (Maybe (BrokerMessage RedisBroker a))
getRedisMessage Broker RedisBroker a
b Queue
queue MessageId RedisBroker
msgId
IO (BrokerMessage RedisBroker a)
-> (BrokerMessage RedisBroker a
-> IO (BrokerMessage RedisBroker a))
-> Maybe (BrokerMessage RedisBroker a)
-> IO (BrokerMessage RedisBroker a)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe IO (BrokerMessage RedisBroker a)
forall a. HasCallStack => a
undefined BrokerMessage RedisBroker a -> IO (BrokerMessage RedisBroker a)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (BrokerMessage RedisBroker a)
mMsg
setMessageTimeout :: Broker RedisBroker a
-> Queue -> MessageId RedisBroker -> TimeoutS -> IO ()
setMessageTimeout (RedisBroker' { Connection
conn :: forall a. Broker RedisBroker a -> Connection
conn :: Connection
conn }) Queue
queue MessageId RedisBroker
msgId (TimeoutS Int
timeoutS) = do
UnixTime
ut <- IO UnixTime
getUnixTime
IO (Either Reply Integer) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Either Reply Integer) -> IO ())
-> IO (Either Reply Integer) -> IO ()
forall a b. (a -> b) -> a -> b
$ Connection
-> Redis (Either Reply Integer) -> IO (Either Reply Integer)
forall a. Connection -> Redis a -> IO a
Redis.runRedis Connection
conn (Redis (Either Reply Integer) -> IO (Either Reply Integer))
-> Redis (Either Reply Integer) -> IO (Either Reply Integer)
forall a b. (a -> b) -> a -> b
$ do
let CTime Int64
t = UnixTime -> CTime
utSeconds UnixTime
ut
let ms :: Integer
ms = Int32 -> Integer
forall a b. (Integral a, Num b) => a -> b
fromIntegral (UnixTime -> Int32
utMicroSeconds UnixTime
ut) :: Integer
ByteString
-> ByteString -> ByteString -> Redis (Either Reply Integer)
forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
ByteString -> ByteString -> ByteString -> m (f Integer)
Redis.hset ByteString
queueK (MessageId RedisBroker -> ByteString
idToBS MessageId RedisBroker
msgId) (String -> ByteString
BSC.pack (String -> ByteString) -> String -> ByteString
forall a b. (a -> b) -> a -> b
$ (Integer -> String
forall a. Show a => a -> String
show (Integer -> String) -> Integer -> String
forall a b. (a -> b) -> a -> b
$ Int64 -> Integer
forall a. Integral a => a -> Integer
toInteger Int64
t Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
+ Int -> Integer
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
timeoutS) String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
"." String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Integer -> String
forall a. Show a => a -> String
show Integer
ms)
where
queueK :: ByteString
queueK = Queue -> ByteString
messageTimeoutKey Queue
queue
sendMessage :: Broker RedisBroker a
-> Queue -> Message RedisBroker a -> IO (MessageId RedisBroker)
sendMessage b :: Broker RedisBroker a
b@(RedisBroker' { Connection
conn :: forall a. Broker RedisBroker a -> Connection
conn :: Connection
conn }) Queue
queue (RedisM a
message) = do
Maybe Int
mId <- Broker RedisBroker a -> Queue -> IO (Maybe Int)
forall a. Broker RedisBroker a -> Queue -> IO (Maybe Int)
nextId Broker RedisBroker a
b Queue
queue
case Maybe Int
mId of
Maybe Int
Nothing -> IO (MessageId RedisBroker)
forall a. HasCallStack => a
undefined
Just Int
id' -> do
let msgId :: MessageId RedisBroker
msgId = Int -> MessageId RedisBroker
RedisMid Int
id'
let m :: RedisWithMsgId a
m = RedisWithMsgId { rmidId :: Int
rmidId = Int
id', rmida :: a
rmida = a
message }
let msgK :: ByteString
msgK = Queue -> MessageId RedisBroker -> ByteString
messageKey Queue
queue MessageId RedisBroker
msgId
let queueK :: ByteString
queueK = Queue -> ByteString
queueKey Queue
queue
IO (Either Reply Integer) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Either Reply Integer) -> IO ())
-> IO (Either Reply Integer) -> IO ()
forall a b. (a -> b) -> a -> b
$ Connection
-> Redis (Either Reply Integer) -> IO (Either Reply Integer)
forall a. Connection -> Redis a -> IO a
Redis.runRedis Connection
conn (Redis (Either Reply Integer) -> IO (Either Reply Integer))
-> Redis (Either Reply Integer) -> IO (Either Reply Integer)
forall a b. (a -> b) -> a -> b
$ do
Either Reply Status
_ <- ByteString -> ByteString -> Redis (Either Reply Status)
forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
ByteString -> ByteString -> m (f Status)
Redis.set ByteString
msgK (LazyByteString -> ByteString
BSL.toStrict (LazyByteString -> ByteString) -> LazyByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ RedisWithMsgId a -> LazyByteString
forall a. ToJSON a => a -> LazyByteString
Aeson.encode RedisWithMsgId a
m)
ByteString -> [ByteString] -> Redis (Either Reply Integer)
forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
ByteString -> [ByteString] -> m (f Integer)
Redis.lpush ByteString
queueK [MessageId RedisBroker -> ByteString
idToBS MessageId RedisBroker
msgId]
MessageId RedisBroker -> IO (MessageId RedisBroker)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return MessageId RedisBroker
msgId
sendMessageDelayed :: Broker RedisBroker a
-> Queue
-> Message RedisBroker a
-> TimeoutS
-> IO (MessageId RedisBroker)
sendMessageDelayed Broker RedisBroker a
b Queue
queue Message RedisBroker a
message TimeoutS
_t = Broker RedisBroker a
-> Queue -> Message RedisBroker a -> IO (MessageId RedisBroker)
forall b a.
MessageBroker b a =>
Broker b a -> Queue -> Message b a -> IO (MessageId b)
sendMessage Broker RedisBroker a
b Queue
queue Message RedisBroker a
message
deleteMessage :: Broker RedisBroker a -> Queue -> MessageId RedisBroker -> IO ()
deleteMessage (RedisBroker' { Connection
conn :: forall a. Broker RedisBroker a -> Connection
conn :: Connection
conn }) Queue
queue MessageId RedisBroker
msgId = do
let queueK :: ByteString
queueK = Queue -> ByteString
queueKey Queue
queue
let messageK :: ByteString
messageK = Queue -> MessageId RedisBroker -> ByteString
messageKey Queue
queue MessageId RedisBroker
msgId
let timeoutK :: ByteString
timeoutK = Queue -> ByteString
messageTimeoutKey Queue
queue
IO (Either Reply Integer) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Either Reply Integer) -> IO ())
-> IO (Either Reply Integer) -> IO ()
forall a b. (a -> b) -> a -> b
$ Connection
-> Redis (Either Reply Integer) -> IO (Either Reply Integer)
forall a. Connection -> Redis a -> IO a
Redis.runRedis Connection
conn (Redis (Either Reply Integer) -> IO (Either Reply Integer))
-> Redis (Either Reply Integer) -> IO (Either Reply Integer)
forall a b. (a -> b) -> a -> b
$ do
Redis (Either Reply Integer) -> Redis ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Redis (Either Reply Integer) -> Redis ())
-> Redis (Either Reply Integer) -> Redis ()
forall a b. (a -> b) -> a -> b
$ ByteString -> Integer -> ByteString -> Redis (Either Reply Integer)
forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
ByteString -> Integer -> ByteString -> m (f Integer)
Redis.lrem ByteString
queueK Integer
1 (MessageId RedisBroker -> ByteString
idToBS MessageId RedisBroker
msgId)
Redis (Either Reply Integer) -> Redis ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Redis (Either Reply Integer) -> Redis ())
-> Redis (Either Reply Integer) -> Redis ()
forall a b. (a -> b) -> a -> b
$ [ByteString] -> Redis (Either Reply Integer)
forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
[ByteString] -> m (f Integer)
Redis.del [ByteString
messageK]
ByteString -> [ByteString] -> Redis (Either Reply Integer)
forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
ByteString -> [ByteString] -> m (f Integer)
Redis.hdel ByteString
timeoutK [MessageId RedisBroker -> ByteString
idToBS MessageId RedisBroker
msgId]
archiveMessage :: Broker RedisBroker a -> Queue -> MessageId RedisBroker -> IO ()
archiveMessage (RedisBroker' { Connection
conn :: forall a. Broker RedisBroker a -> Connection
conn :: Connection
conn }) Queue
queue MessageId RedisBroker
msgId = do
let queueK :: ByteString
queueK = Queue -> ByteString
queueKey Queue
queue
let archiveK :: ByteString
archiveK = Queue -> ByteString
archiveKey Queue
queue
let timeoutK :: ByteString
timeoutK = Queue -> ByteString
messageTimeoutKey Queue
queue
IO (Either Reply Integer) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Either Reply Integer) -> IO ())
-> IO (Either Reply Integer) -> IO ()
forall a b. (a -> b) -> a -> b
$ Connection
-> Redis (Either Reply Integer) -> IO (Either Reply Integer)
forall a. Connection -> Redis a -> IO a
Redis.runRedis Connection
conn (Redis (Either Reply Integer) -> IO (Either Reply Integer))
-> Redis (Either Reply Integer) -> IO (Either Reply Integer)
forall a b. (a -> b) -> a -> b
$ do
Redis (Either Reply Integer) -> Redis ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Redis (Either Reply Integer) -> Redis ())
-> Redis (Either Reply Integer) -> Redis ()
forall a b. (a -> b) -> a -> b
$ ByteString -> Integer -> ByteString -> Redis (Either Reply Integer)
forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
ByteString -> Integer -> ByteString -> m (f Integer)
Redis.lrem ByteString
queueK Integer
1 (MessageId RedisBroker -> ByteString
idToBS MessageId RedisBroker
msgId)
Redis (Either Reply Integer) -> Redis ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Redis (Either Reply Integer) -> Redis ())
-> Redis (Either Reply Integer) -> Redis ()
forall a b. (a -> b) -> a -> b
$ ByteString -> [ByteString] -> Redis (Either Reply Integer)
forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
ByteString -> [ByteString] -> m (f Integer)
Redis.sadd ByteString
archiveK [MessageId RedisBroker -> ByteString
idToBS MessageId RedisBroker
msgId]
ByteString -> [ByteString] -> Redis (Either Reply Integer)
forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
ByteString -> [ByteString] -> m (f Integer)
Redis.hdel ByteString
timeoutK [MessageId RedisBroker -> ByteString
idToBS MessageId RedisBroker
msgId]
getQueueSize :: Broker RedisBroker a -> Queue -> IO Int
getQueueSize Broker RedisBroker a
b Queue
queue = do
[MessageId RedisBroker]
msgIds <- Broker RedisBroker a -> Queue -> IO [MessageId RedisBroker]
forall b a.
MessageBroker b a =>
Broker b a -> Queue -> IO [MessageId b]
listPendingMessageIds Broker RedisBroker a
b Queue
queue
Int -> IO Int
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> IO Int) -> Int -> IO Int
forall a b. (a -> b) -> a -> b
$ [MessageId RedisBroker] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [MessageId RedisBroker]
msgIds
getArchivedMessage :: Broker RedisBroker a
-> Queue
-> MessageId RedisBroker
-> IO (Maybe (BrokerMessage RedisBroker a))
getArchivedMessage b :: Broker RedisBroker a
b@(RedisBroker' { Connection
conn :: forall a. Broker RedisBroker a -> Connection
conn :: Connection
conn }) Queue
queue MessageId RedisBroker
msgId = do
let archiveK :: ByteString
archiveK = Queue -> ByteString
archiveKey Queue
queue
Either Reply Bool
eIsMember <- Connection -> Redis (Either Reply Bool) -> IO (Either Reply Bool)
forall a. Connection -> Redis a -> IO a
Redis.runRedis Connection
conn (Redis (Either Reply Bool) -> IO (Either Reply Bool))
-> Redis (Either Reply Bool) -> IO (Either Reply Bool)
forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString -> Redis (Either Reply Bool)
forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
ByteString -> ByteString -> m (f Bool)
Redis.sismember ByteString
archiveK (MessageId RedisBroker -> ByteString
idToBS MessageId RedisBroker
msgId)
case Either Reply Bool
eIsMember of
Right Bool
True -> do
Broker RedisBroker a
-> Queue
-> MessageId RedisBroker
-> IO (Maybe (BrokerMessage RedisBroker a))
forall a.
FromJSON a =>
Broker RedisBroker a
-> Queue
-> MessageId RedisBroker
-> IO (Maybe (BrokerMessage RedisBroker a))
getRedisMessage Broker RedisBroker a
b Queue
queue MessageId RedisBroker
msgId
Either Reply Bool
_ -> Maybe (BrokerMessage RedisBroker a)
-> IO (Maybe (BrokerMessage RedisBroker a))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (BrokerMessage RedisBroker a)
forall a. Maybe a
Nothing
listPendingMessageIds :: Broker RedisBroker a -> Queue -> IO [MessageId RedisBroker]
listPendingMessageIds b :: Broker RedisBroker a
b@(RedisBroker' { Connection
conn :: forall a. Broker RedisBroker a -> Connection
conn :: Connection
conn }) Queue
queue = do
let queueK :: ByteString
queueK = Queue -> ByteString
queueKey Queue
queue
Either Reply [ByteString]
eMsgIds <- Connection
-> Redis (Either Reply [ByteString])
-> IO (Either Reply [ByteString])
forall a. Connection -> Redis a -> IO a
Redis.runRedis Connection
conn (Redis (Either Reply [ByteString])
-> IO (Either Reply [ByteString]))
-> Redis (Either Reply [ByteString])
-> IO (Either Reply [ByteString])
forall a b. (a -> b) -> a -> b
$ ByteString
-> Integer -> Integer -> Redis (Either Reply [ByteString])
forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
ByteString -> Integer -> Integer -> m (f [ByteString])
Redis.lrange ByteString
queueK Integer
0 (-Integer
1)
case Either Reply [ByteString]
eMsgIds of
Left Reply
_ -> [MessageId RedisBroker] -> IO [MessageId RedisBroker]
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return []
Right [ByteString]
msgIds -> do
let msgIds' :: [MessageId RedisBroker]
msgIds' = [Maybe (MessageId RedisBroker)] -> [MessageId RedisBroker]
forall a. [Maybe a] -> [a]
catMaybes (ByteString -> Maybe (MessageId RedisBroker)
bsToId (ByteString -> Maybe (MessageId RedisBroker))
-> [ByteString] -> [Maybe (MessageId RedisBroker)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [ByteString]
msgIds)
UnixTime
ut <- IO UnixTime
getUnixTime
[Maybe UnixTime]
timeouts <- (MessageId RedisBroker -> IO (Maybe UnixTime))
-> [MessageId RedisBroker] -> IO [Maybe UnixTime]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (Broker RedisBroker a
-> Queue -> MessageId RedisBroker -> IO (Maybe UnixTime)
forall a.
Broker RedisBroker a
-> Queue -> MessageId RedisBroker -> IO (Maybe UnixTime)
getMessageTimeout Broker RedisBroker a
b Queue
queue) [MessageId RedisBroker]
msgIds'
[MessageId RedisBroker] -> IO [MessageId RedisBroker]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([MessageId RedisBroker] -> IO [MessageId RedisBroker])
-> [MessageId RedisBroker] -> IO [MessageId RedisBroker]
forall a b. (a -> b) -> a -> b
$ ((MessageId RedisBroker, Maybe UnixTime) -> MessageId RedisBroker)
-> [(MessageId RedisBroker, Maybe UnixTime)]
-> [MessageId RedisBroker]
forall a b. (a -> b) -> [a] -> [b]
map (MessageId RedisBroker, Maybe UnixTime) -> MessageId RedisBroker
forall a b. (a, b) -> a
fst ([(MessageId RedisBroker, Maybe UnixTime)]
-> [MessageId RedisBroker])
-> [(MessageId RedisBroker, Maybe UnixTime)]
-> [MessageId RedisBroker]
forall a b. (a -> b) -> a -> b
$ ((MessageId RedisBroker, Maybe UnixTime) -> Bool)
-> [(MessageId RedisBroker, Maybe UnixTime)]
-> [(MessageId RedisBroker, Maybe UnixTime)]
forall a. (a -> Bool) -> [a] -> [a]
filter (\(MessageId RedisBroker
_msgId, Maybe UnixTime
ts) -> (UnixTime -> Maybe UnixTime -> UnixTime
forall a. a -> Maybe a -> a
fromMaybe UnixTime
ut Maybe UnixTime
ts) UnixTime -> UnixTime -> Bool
forall a. Ord a => a -> a -> Bool
<= UnixTime
ut) ([(MessageId RedisBroker, Maybe UnixTime)]
-> [(MessageId RedisBroker, Maybe UnixTime)])
-> [(MessageId RedisBroker, Maybe UnixTime)]
-> [(MessageId RedisBroker, Maybe UnixTime)]
forall a b. (a -> b) -> a -> b
$ [MessageId RedisBroker]
-> [Maybe UnixTime] -> [(MessageId RedisBroker, Maybe UnixTime)]
forall a b. [a] -> [b] -> [(a, b)]
zip [MessageId RedisBroker]
msgIds' [Maybe UnixTime]
timeouts
getMessageById :: Broker RedisBroker a
-> Queue
-> MessageId RedisBroker
-> IO (Maybe (BrokerMessage RedisBroker a))
getMessageById Broker RedisBroker a
b Queue
queue MessageId RedisBroker
msgId = do
Broker RedisBroker a
-> Queue
-> MessageId RedisBroker
-> IO (Maybe (BrokerMessage RedisBroker a))
forall a.
FromJSON a =>
Broker RedisBroker a
-> Queue
-> MessageId RedisBroker
-> IO (Maybe (BrokerMessage RedisBroker a))
getRedisMessage Broker RedisBroker a
b Queue
queue MessageId RedisBroker
msgId
getMessageTimeout :: Broker RedisBroker a -> Queue -> MessageId RedisBroker -> IO (Maybe UnixTime)
getMessageTimeout :: forall a.
Broker RedisBroker a
-> Queue -> MessageId RedisBroker -> IO (Maybe UnixTime)
getMessageTimeout (RedisBroker' { Connection
conn :: forall a. Broker RedisBroker a -> Connection
conn :: Connection
conn }) Queue
queue MessageId RedisBroker
msgId = do
Either Reply (Maybe ByteString)
eData <- Connection
-> Redis (Either Reply (Maybe ByteString))
-> IO (Either Reply (Maybe ByteString))
forall a. Connection -> Redis a -> IO a
Redis.runRedis Connection
conn (Redis (Either Reply (Maybe ByteString))
-> IO (Either Reply (Maybe ByteString)))
-> Redis (Either Reply (Maybe ByteString))
-> IO (Either Reply (Maybe ByteString))
forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString -> Redis (Either Reply (Maybe ByteString))
forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
ByteString -> ByteString -> m (f (Maybe ByteString))
Redis.hget ByteString
queueK (MessageId RedisBroker -> ByteString
idToBS MessageId RedisBroker
msgId)
case Either Reply (Maybe ByteString)
eData of
Left Reply
_ -> IO (Maybe UnixTime)
forall a. HasCallStack => a
undefined
Right Maybe ByteString
Nothing -> Maybe UnixTime -> IO (Maybe UnixTime)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe UnixTime
forall a. Maybe a
Nothing
Right (Just ByteString
timeoutBs) -> do
case (Char -> Bool) -> ByteString -> (ByteString, ByteString)
BSC.break (Char -> Char -> Bool
forall a. Eq a => a -> a -> Bool
== Char
'.') ByteString
timeoutBs of
(ByteString
s, ByteString
ms) -> case (String -> Maybe Int64
forall a. Read a => String -> Maybe a
readMaybe (String -> Maybe Int64) -> String -> Maybe Int64
forall a b. (a -> b) -> a -> b
$ ByteString -> String
BSC.unpack ByteString
s, String -> Maybe Int32
forall a. Read a => String -> Maybe a
readMaybe (String -> Maybe Int32) -> String -> Maybe Int32
forall a b. (a -> b) -> a -> b
$ ByteString -> String
BSC.unpack (ByteString -> String) -> ByteString -> String
forall a b. (a -> b) -> a -> b
$ Int -> ByteString -> ByteString
BSC.drop Int
1 ByteString
ms) of
(Just Int64
s', Just Int32
ms') -> Maybe UnixTime -> IO (Maybe UnixTime)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe UnixTime -> IO (Maybe UnixTime))
-> Maybe UnixTime -> IO (Maybe UnixTime)
forall a b. (a -> b) -> a -> b
$ UnixTime -> Maybe UnixTime
forall a. a -> Maybe a
Just (UnixTime -> Maybe UnixTime) -> UnixTime -> Maybe UnixTime
forall a b. (a -> b) -> a -> b
$ CTime -> Int32 -> UnixTime
UnixTime (Int64 -> CTime
CTime Int64
s') Int32
ms'
(Maybe Int64, Maybe Int32)
_ -> Maybe UnixTime -> IO (Maybe UnixTime)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe UnixTime
forall a. Maybe a
Nothing
where
queueK :: ByteString
queueK = Queue -> ByteString
messageTimeoutKey Queue
queue
idToBS :: MessageId RedisBroker -> BSC.ByteString
idToBS :: MessageId RedisBroker -> ByteString
idToBS (RedisMid Int
msgId) = LazyByteString -> ByteString
BSL.toStrict (LazyByteString -> ByteString) -> LazyByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ Int -> LazyByteString
forall a. ToJSON a => a -> LazyByteString
Aeson.encode Int
msgId
bsToId :: BSC.ByteString -> Maybe (MessageId RedisBroker)
bsToId :: ByteString -> Maybe (MessageId RedisBroker)
bsToId ByteString
bs = Int -> MessageId RedisBroker
RedisMid (Int -> MessageId RedisBroker)
-> Maybe Int -> Maybe (MessageId RedisBroker)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> LazyByteString -> Maybe Int
forall a. FromJSON a => LazyByteString -> Maybe a
Aeson.decode (ByteString -> LazyByteString
BSL.fromStrict ByteString
bs)
beePrefix :: String
beePrefix :: String
beePrefix = String
"bee-"
idKey :: Queue -> BSC.ByteString
idKey :: Queue -> ByteString
idKey Queue
queue = String -> ByteString
BSC.pack (String -> ByteString) -> String -> ByteString
forall a b. (a -> b) -> a -> b
$ String
beePrefix String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
"sequence-" String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Queue -> String
renderQueue Queue
queue
nextId :: Broker RedisBroker a -> Queue -> IO (Maybe Int)
nextId :: forall a. Broker RedisBroker a -> Queue -> IO (Maybe Int)
nextId (RedisBroker' { Connection
conn :: forall a. Broker RedisBroker a -> Connection
conn :: Connection
conn }) Queue
queue = do
let key :: ByteString
key = Queue -> ByteString
idKey Queue
queue
Either Reply Integer
eId <- Connection
-> Redis (Either Reply Integer) -> IO (Either Reply Integer)
forall a. Connection -> Redis a -> IO a
Redis.runRedis Connection
conn (Redis (Either Reply Integer) -> IO (Either Reply Integer))
-> Redis (Either Reply Integer) -> IO (Either Reply Integer)
forall a b. (a -> b) -> a -> b
$ ByteString -> Redis (Either Reply Integer)
forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
ByteString -> m (f Integer)
Redis.incr ByteString
key
case Either Reply Integer
eId of
Right Integer
id' -> Maybe Int -> IO (Maybe Int)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int -> Maybe Int
forall a. a -> Maybe a
Just (Int -> Maybe Int) -> Int -> Maybe Int
forall a b. (a -> b) -> a -> b
$ Integer -> Int
forall a. Num a => Integer -> a
fromInteger Integer
id')
Either Reply Integer
_ -> Maybe Int -> IO (Maybe Int)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe Int
forall a. Maybe a
Nothing
messageKey :: Queue -> MessageId RedisBroker -> BSC.ByteString
messageKey :: Queue -> MessageId RedisBroker -> ByteString
messageKey Queue
queue (RedisMid Int
msgId) = Queue -> ByteString
queueKey Queue
queue ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> String -> ByteString
BSC.pack (String
"-message-" String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show Int
msgId)
queueKey :: Queue -> BSC.ByteString
queueKey :: Queue -> ByteString
queueKey Queue
queue = String -> ByteString
BSC.pack (String -> ByteString) -> String -> ByteString
forall a b. (a -> b) -> a -> b
$ String
beePrefix String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
"queue-" String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Queue -> String
renderQueue Queue
queue
archiveKey :: Queue -> BSC.ByteString
archiveKey :: Queue -> ByteString
archiveKey Queue
queue = String -> ByteString
BSC.pack (String -> ByteString) -> String -> ByteString
forall a b. (a -> b) -> a -> b
$ String
beePrefix String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
"archive-" String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Queue -> String
renderQueue Queue
queue
messageTimeoutKey :: Queue -> BSC.ByteString
messageTimeoutKey :: Queue -> ByteString
messageTimeoutKey Queue
queue = String -> ByteString
BSC.pack (String -> ByteString) -> String -> ByteString
forall a b. (a -> b) -> a -> b
$ String
beePrefix String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
"timeout-" String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Queue -> String
renderQueue Queue
queue
getRedisMessage :: FromJSON a
=> Broker RedisBroker a
-> Queue
-> MessageId RedisBroker
-> IO (Maybe (BrokerMessage RedisBroker a))
getRedisMessage :: forall a.
FromJSON a =>
Broker RedisBroker a
-> Queue
-> MessageId RedisBroker
-> IO (Maybe (BrokerMessage RedisBroker a))
getRedisMessage (RedisBroker' { Connection
conn :: forall a. Broker RedisBroker a -> Connection
conn :: Connection
conn }) Queue
queue MessageId RedisBroker
msgId = do
let msgKey :: ByteString
msgKey = Queue -> MessageId RedisBroker -> ByteString
messageKey Queue
queue MessageId RedisBroker
msgId
Either Reply (Maybe ByteString)
eMsg <- Connection
-> Redis (Either Reply (Maybe ByteString))
-> IO (Either Reply (Maybe ByteString))
forall a. Connection -> Redis a -> IO a
Redis.runRedis Connection
conn (Redis (Either Reply (Maybe ByteString))
-> IO (Either Reply (Maybe ByteString)))
-> Redis (Either Reply (Maybe ByteString))
-> IO (Either Reply (Maybe ByteString))
forall a b. (a -> b) -> a -> b
$ ByteString -> Redis (Either Reply (Maybe ByteString))
forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
ByteString -> m (f (Maybe ByteString))
Redis.get ByteString
msgKey
case Either Reply (Maybe ByteString)
eMsg of
Left Reply
_ -> Maybe (BrokerMessage RedisBroker a)
-> IO (Maybe (BrokerMessage RedisBroker a))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (BrokerMessage RedisBroker a)
forall a. Maybe a
Nothing
Right Maybe ByteString
Nothing -> Maybe (BrokerMessage RedisBroker a)
-> IO (Maybe (BrokerMessage RedisBroker a))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (BrokerMessage RedisBroker a)
forall a. Maybe a
Nothing
Right (Just ByteString
msg) ->
case LazyByteString -> Maybe (RedisWithMsgId a)
forall a. FromJSON a => LazyByteString -> Maybe a
Aeson.decode (ByteString -> LazyByteString
BSL.fromStrict ByteString
msg) of
Just RedisWithMsgId a
dmsg -> Maybe (BrokerMessage RedisBroker a)
-> IO (Maybe (BrokerMessage RedisBroker a))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (BrokerMessage RedisBroker a)
-> IO (Maybe (BrokerMessage RedisBroker a)))
-> Maybe (BrokerMessage RedisBroker a)
-> IO (Maybe (BrokerMessage RedisBroker a))
forall a b. (a -> b) -> a -> b
$ BrokerMessage RedisBroker a -> Maybe (BrokerMessage RedisBroker a)
forall a. a -> Maybe a
Just (BrokerMessage RedisBroker a
-> Maybe (BrokerMessage RedisBroker a))
-> BrokerMessage RedisBroker a
-> Maybe (BrokerMessage RedisBroker a)
forall a b. (a -> b) -> a -> b
$ RedisWithMsgId a -> BrokerMessage RedisBroker a
forall a. RedisWithMsgId a -> BrokerMessage RedisBroker a
RedisBM RedisWithMsgId a
dmsg
Maybe (RedisWithMsgId a)
Nothing -> Maybe (BrokerMessage RedisBroker a)
-> IO (Maybe (BrokerMessage RedisBroker a))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (BrokerMessage RedisBroker a)
forall a. Maybe a
Nothing
data RedisWithMsgId a =
RedisWithMsgId { forall a. RedisWithMsgId a -> a
rmida :: a
, forall a. RedisWithMsgId a -> Int
rmidId :: Int }
deriving (Int -> RedisWithMsgId a -> ShowS
[RedisWithMsgId a] -> ShowS
RedisWithMsgId a -> String
(Int -> RedisWithMsgId a -> ShowS)
-> (RedisWithMsgId a -> String)
-> ([RedisWithMsgId a] -> ShowS)
-> Show (RedisWithMsgId a)
forall a. Show a => Int -> RedisWithMsgId a -> ShowS
forall a. Show a => [RedisWithMsgId a] -> ShowS
forall a. Show a => RedisWithMsgId a -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: forall a. Show a => Int -> RedisWithMsgId a -> ShowS
showsPrec :: Int -> RedisWithMsgId a -> ShowS
$cshow :: forall a. Show a => RedisWithMsgId a -> String
show :: RedisWithMsgId a -> String
$cshowList :: forall a. Show a => [RedisWithMsgId a] -> ShowS
showList :: [RedisWithMsgId a] -> ShowS
Show, RedisWithMsgId a -> RedisWithMsgId a -> Bool
(RedisWithMsgId a -> RedisWithMsgId a -> Bool)
-> (RedisWithMsgId a -> RedisWithMsgId a -> Bool)
-> Eq (RedisWithMsgId a)
forall a. Eq a => RedisWithMsgId a -> RedisWithMsgId a -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: forall a. Eq a => RedisWithMsgId a -> RedisWithMsgId a -> Bool
== :: RedisWithMsgId a -> RedisWithMsgId a -> Bool
$c/= :: forall a. Eq a => RedisWithMsgId a -> RedisWithMsgId a -> Bool
/= :: RedisWithMsgId a -> RedisWithMsgId a -> Bool
Eq)
instance FromJSON a => FromJSON (RedisWithMsgId a) where
parseJSON :: Value -> Parser (RedisWithMsgId a)
parseJSON = String
-> (Object -> Parser (RedisWithMsgId a))
-> Value
-> Parser (RedisWithMsgId a)
forall a. String -> (Object -> Parser a) -> Value -> Parser a
withObject String
"RedisWithMsgId" ((Object -> Parser (RedisWithMsgId a))
-> Value -> Parser (RedisWithMsgId a))
-> (Object -> Parser (RedisWithMsgId a))
-> Value
-> Parser (RedisWithMsgId a)
forall a b. (a -> b) -> a -> b
$ \Object
o -> do
a
rmida <- Object
o Object -> Key -> Parser a
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"rmida"
Int
rmidId <- Object
o Object -> Key -> Parser Int
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"rmidId"
RedisWithMsgId a -> Parser (RedisWithMsgId a)
forall a. a -> Parser a
forall (m :: * -> *) a. Monad m => a -> m a
return (RedisWithMsgId a -> Parser (RedisWithMsgId a))
-> RedisWithMsgId a -> Parser (RedisWithMsgId a)
forall a b. (a -> b) -> a -> b
$ RedisWithMsgId { a
rmida :: a
rmida :: a
rmida, Int
rmidId :: Int
rmidId :: Int
rmidId }
instance ToJSON a => ToJSON (RedisWithMsgId a) where
toJSON :: RedisWithMsgId a -> Value
toJSON (RedisWithMsgId { a
Int
rmidId :: forall a. RedisWithMsgId a -> Int
rmida :: forall a. RedisWithMsgId a -> a
rmida :: a
rmidId :: Int
.. }) = Value -> Value
forall a. ToJSON a => a -> Value
toJSON (Value -> Value) -> Value -> Value
forall a b. (a -> b) -> a -> b
$ [Pair] -> Value
object [
Key
"rmida" Key -> a -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= a
rmida
, Key
"rmidId" Key -> Int -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= Int
rmidId
]
instance ToJSON (MessageId RedisBroker) where
toJSON :: MessageId RedisBroker -> Value
toJSON (RedisMid Int
i) = Int -> Value
forall a. ToJSON a => a -> Value
toJSON Int
i
instance FromJSON (MessageId RedisBroker) where
parseJSON :: Value -> Parser (MessageId RedisBroker)
parseJSON = String
-> (Scientific -> Parser (MessageId RedisBroker))
-> Value
-> Parser (MessageId RedisBroker)
forall a. String -> (Scientific -> Parser a) -> Value -> Parser a
withScientific String
"RedisMid" ((Scientific -> Parser (MessageId RedisBroker))
-> Value -> Parser (MessageId RedisBroker))
-> (Scientific -> Parser (MessageId RedisBroker))
-> Value
-> Parser (MessageId RedisBroker)
forall a b. (a -> b) -> a -> b
$ \Scientific
n ->
case Scientific -> Either Double Int
forall r i. (RealFloat r, Integral i) => Scientific -> Either r i
floatingOrInteger Scientific
n of
Right Int
i -> MessageId RedisBroker -> Parser (MessageId RedisBroker)
forall a. a -> Parser a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (MessageId RedisBroker -> Parser (MessageId RedisBroker))
-> MessageId RedisBroker -> Parser (MessageId RedisBroker)
forall a b. (a -> b) -> a -> b
$ Int -> MessageId RedisBroker
RedisMid Int
i
Left (Double
f :: Double) -> String -> Parser (MessageId RedisBroker)
forall a. String -> Parser a
forall (m :: * -> *) a. MonadFail m => String -> m a
fail (String -> Parser (MessageId RedisBroker))
-> String -> Parser (MessageId RedisBroker)
forall a b. (a -> b) -> a -> b
$ String
"Integer expected: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Double -> String
forall a. Show a => a -> String
show Double
f