{-|
Module      : Async.Worker.Broker.Redis
Description : Redis broker functionality
Copyright   : (c) Gargantext, 2024-Present
License     : AGPL
Maintainer  : gargantext@iscpif.fr
Stability   : experimental
Portability : POSIX

Based on lists:
https://redis.io/glossary/redis-queue/

The design is as follows:
- for each queue we have an 'id counter'
- each queue is represented as a set of message ids
- each message is stored under unique key, derived from its id
- the above allows us to have an archive with messages
- deleting a message means removing it's unique key from Redis

The queue itself is a list, the archive is a set (so that we can use
SISMEMBER).
-}


{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
    
module Async.Worker.Broker.Redis
  ( RedisBroker
  , BrokerInitParams(..)
  , RedisWithMsgId(..) )
where

import Async.Worker.Broker.Types (MessageBroker(..), Queue, SerializableMessage, TimeoutS(..), renderQueue)
-- import Control.Concurrent (threadDelay)
import Control.Monad (void)
import Data.Aeson qualified as Aeson
import Data.Aeson (FromJSON(..), ToJSON(..), (.:), (.=), withObject, object, withScientific)
import Data.ByteString.Char8 qualified as BSC
import Data.ByteString.Lazy qualified as BSL
import Data.Maybe (catMaybes, fromMaybe)
import Data.Scientific (floatingOrInteger)
import Data.UnixTime (getUnixTime, UnixTime(..))
import Database.Redis qualified as Redis
import Foreign.C.Types (CTime(..))
import Text.Read (readMaybe)


data RedisBroker

instance (SerializableMessage a, Show a) => MessageBroker RedisBroker a where
  data Broker RedisBroker a =
    RedisBroker' {
        forall a. Broker RedisBroker a -> Connection
conn :: Redis.Connection
      }
  data BrokerMessage RedisBroker a =
    RedisBM (RedisWithMsgId a)
    deriving (Int -> BrokerMessage RedisBroker a -> ShowS
[BrokerMessage RedisBroker a] -> ShowS
BrokerMessage RedisBroker a -> String
(Int -> BrokerMessage RedisBroker a -> ShowS)
-> (BrokerMessage RedisBroker a -> String)
-> ([BrokerMessage RedisBroker a] -> ShowS)
-> Show (BrokerMessage RedisBroker a)
forall a. Show a => Int -> BrokerMessage RedisBroker a -> ShowS
forall a. Show a => [BrokerMessage RedisBroker a] -> ShowS
forall a. Show a => BrokerMessage RedisBroker a -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: forall a. Show a => Int -> BrokerMessage RedisBroker a -> ShowS
showsPrec :: Int -> BrokerMessage RedisBroker a -> ShowS
$cshow :: forall a. Show a => BrokerMessage RedisBroker a -> String
show :: BrokerMessage RedisBroker a -> String
$cshowList :: forall a. Show a => [BrokerMessage RedisBroker a] -> ShowS
showList :: [BrokerMessage RedisBroker a] -> ShowS
Show)
  data Message RedisBroker a = RedisM a
  data MessageId RedisBroker = RedisMid Int
    deriving (MessageId RedisBroker -> MessageId RedisBroker -> Bool
(MessageId RedisBroker -> MessageId RedisBroker -> Bool)
-> (MessageId RedisBroker -> MessageId RedisBroker -> Bool)
-> Eq (MessageId RedisBroker)
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: MessageId RedisBroker -> MessageId RedisBroker -> Bool
== :: MessageId RedisBroker -> MessageId RedisBroker -> Bool
$c/= :: MessageId RedisBroker -> MessageId RedisBroker -> Bool
/= :: MessageId RedisBroker -> MessageId RedisBroker -> Bool
Eq, Int -> MessageId RedisBroker -> ShowS
[MessageId RedisBroker] -> ShowS
MessageId RedisBroker -> String
(Int -> MessageId RedisBroker -> ShowS)
-> (MessageId RedisBroker -> String)
-> ([MessageId RedisBroker] -> ShowS)
-> Show (MessageId RedisBroker)
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> MessageId RedisBroker -> ShowS
showsPrec :: Int -> MessageId RedisBroker -> ShowS
$cshow :: MessageId RedisBroker -> String
show :: MessageId RedisBroker -> String
$cshowList :: [MessageId RedisBroker] -> ShowS
showList :: [MessageId RedisBroker] -> ShowS
Show, Eq (MessageId RedisBroker)
Eq (MessageId RedisBroker) =>
(MessageId RedisBroker -> MessageId RedisBroker -> Ordering)
-> (MessageId RedisBroker -> MessageId RedisBroker -> Bool)
-> (MessageId RedisBroker -> MessageId RedisBroker -> Bool)
-> (MessageId RedisBroker -> MessageId RedisBroker -> Bool)
-> (MessageId RedisBroker -> MessageId RedisBroker -> Bool)
-> (MessageId RedisBroker
    -> MessageId RedisBroker -> MessageId RedisBroker)
-> (MessageId RedisBroker
    -> MessageId RedisBroker -> MessageId RedisBroker)
-> Ord (MessageId RedisBroker)
MessageId RedisBroker -> MessageId RedisBroker -> Bool
MessageId RedisBroker -> MessageId RedisBroker -> Ordering
MessageId RedisBroker
-> MessageId RedisBroker -> MessageId RedisBroker
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: MessageId RedisBroker -> MessageId RedisBroker -> Ordering
compare :: MessageId RedisBroker -> MessageId RedisBroker -> Ordering
$c< :: MessageId RedisBroker -> MessageId RedisBroker -> Bool
< :: MessageId RedisBroker -> MessageId RedisBroker -> Bool
$c<= :: MessageId RedisBroker -> MessageId RedisBroker -> Bool
<= :: MessageId RedisBroker -> MessageId RedisBroker -> Bool
$c> :: MessageId RedisBroker -> MessageId RedisBroker -> Bool
> :: MessageId RedisBroker -> MessageId RedisBroker -> Bool
$c>= :: MessageId RedisBroker -> MessageId RedisBroker -> Bool
>= :: MessageId RedisBroker -> MessageId RedisBroker -> Bool
$cmax :: MessageId RedisBroker
-> MessageId RedisBroker -> MessageId RedisBroker
max :: MessageId RedisBroker
-> MessageId RedisBroker -> MessageId RedisBroker
$cmin :: MessageId RedisBroker
-> MessageId RedisBroker -> MessageId RedisBroker
min :: MessageId RedisBroker
-> MessageId RedisBroker -> MessageId RedisBroker
Ord)
  data BrokerInitParams RedisBroker a = RedisBrokerInitParams Redis.ConnectInfo

  messageId :: BrokerMessage RedisBroker a -> MessageId RedisBroker
messageId (RedisBM (RedisWithMsgId { Int
rmidId :: Int
rmidId :: forall a. RedisWithMsgId a -> Int
rmidId })) = Int -> MessageId RedisBroker
RedisMid Int
rmidId
  getMessage :: BrokerMessage RedisBroker a -> Message RedisBroker a
getMessage (RedisBM (RedisWithMsgId { a
rmida :: a
rmida :: forall a. RedisWithMsgId a -> a
rmida })) = a -> Message RedisBroker a
forall a. a -> Message RedisBroker a
RedisM a
rmida
  toMessage :: a -> Message RedisBroker a
toMessage a
message = a -> Message RedisBroker a
forall a. a -> Message RedisBroker a
RedisM a
message
  toA :: Message RedisBroker a -> a
toA (RedisM a
message) = a
message
  initBroker :: BrokerInitParams RedisBroker a -> IO (Broker RedisBroker a)
initBroker (RedisBrokerInitParams ConnectInfo
connInfo) = do
    Connection
conn <- ConnectInfo -> IO Connection
Redis.checkedConnect ConnectInfo
connInfo
    Broker RedisBroker a -> IO (Broker RedisBroker a)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Broker RedisBroker a -> IO (Broker RedisBroker a))
-> Broker RedisBroker a -> IO (Broker RedisBroker a)
forall a b. (a -> b) -> a -> b
$ RedisBroker' { Connection
conn :: Connection
conn :: Connection
conn }
  deinitBroker :: Broker RedisBroker a -> IO ()
deinitBroker (RedisBroker' { Connection
conn :: forall a. Broker RedisBroker a -> Connection
conn :: Connection
conn }) = Connection -> IO ()
Redis.disconnect Connection
conn
  
  -- createQueue (RedisBroker' { conn }) queue = do
  createQueue :: Broker RedisBroker a -> Queue -> IO ()
createQueue Broker RedisBroker a
_broker Queue
_queue = do
    -- No need to actually pre-create queues
    () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()

  -- dropQueue (RedisBroker' { conn }) queue = do
  dropQueue :: Broker RedisBroker a -> Queue -> IO ()
dropQueue (RedisBroker' { Connection
conn :: forall a. Broker RedisBroker a -> Connection
conn :: Connection
conn }) Queue
queue = do
    let queueK :: ByteString
queueK = Queue -> ByteString
queueKey Queue
queue
    IO (Either Reply Integer) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Either Reply Integer) -> IO ())
-> IO (Either Reply Integer) -> IO ()
forall a b. (a -> b) -> a -> b
$ Connection
-> Redis (Either Reply Integer) -> IO (Either Reply Integer)
forall a. Connection -> Redis a -> IO a
Redis.runRedis Connection
conn (Redis (Either Reply Integer) -> IO (Either Reply Integer))
-> Redis (Either Reply Integer) -> IO (Either Reply Integer)
forall a b. (a -> b) -> a -> b
$ [ByteString] -> Redis (Either Reply Integer)
forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
[ByteString] -> m (f Integer)
Redis.del [ByteString
queueK]

  -- TODO This is simplified
  readMessageWaiting :: Broker RedisBroker a -> Queue -> IO (BrokerMessage RedisBroker a)
readMessageWaiting = Broker RedisBroker a -> Queue -> IO (BrokerMessage RedisBroker a)
forall b a.
MessageBroker b a =>
Broker b a -> Queue -> IO (BrokerMessage b a)
popMessageWaiting

  popMessageWaiting :: Broker RedisBroker a -> Queue -> IO (BrokerMessage RedisBroker a)
popMessageWaiting b :: Broker RedisBroker a
b@(RedisBroker' { Connection
conn :: forall a. Broker RedisBroker a -> Connection
conn :: Connection
conn }) Queue
queue = IO (BrokerMessage RedisBroker a)
loop
    where
      queueK :: ByteString
queueK = Queue -> ByteString
queueKey Queue
queue
      loop :: IO (BrokerMessage RedisBroker a)
loop = do
        -- 0 means block indefinitely
        -- https://redis.io/docs/latest/commands/blpop/
        Either Reply (Maybe (ByteString, ByteString))
eData <- Connection
-> Redis (Either Reply (Maybe (ByteString, ByteString)))
-> IO (Either Reply (Maybe (ByteString, ByteString)))
forall a. Connection -> Redis a -> IO a
Redis.runRedis Connection
conn (Redis (Either Reply (Maybe (ByteString, ByteString)))
 -> IO (Either Reply (Maybe (ByteString, ByteString))))
-> Redis (Either Reply (Maybe (ByteString, ByteString)))
-> IO (Either Reply (Maybe (ByteString, ByteString)))
forall a b. (a -> b) -> a -> b
$ [ByteString]
-> Integer -> Redis (Either Reply (Maybe (ByteString, ByteString)))
forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
[ByteString] -> Integer -> m (f (Maybe (ByteString, ByteString)))
Redis.blpop [ByteString
queueK] Integer
0
        case Either Reply (Maybe (ByteString, ByteString))
eData of
          Left Reply
_ -> IO (BrokerMessage RedisBroker a)
forall a. HasCallStack => a
undefined
          Right Maybe (ByteString, ByteString)
Nothing -> IO (BrokerMessage RedisBroker a)
forall a. HasCallStack => a
undefined
          Right (Just (ByteString
_queueK, ByteString
msgIdBS)) -> case ByteString -> Maybe (MessageId RedisBroker)
bsToId ByteString
msgIdBS of
            Maybe (MessageId RedisBroker)
Nothing -> IO (BrokerMessage RedisBroker a)
forall a. HasCallStack => a
undefined
            Just MessageId RedisBroker
msgId -> do
              Maybe (BrokerMessage RedisBroker a)
mMsg <- Broker RedisBroker a
-> Queue
-> MessageId RedisBroker
-> IO (Maybe (BrokerMessage RedisBroker a))
forall a.
FromJSON a =>
Broker RedisBroker a
-> Queue
-> MessageId RedisBroker
-> IO (Maybe (BrokerMessage RedisBroker a))
getRedisMessage Broker RedisBroker a
b Queue
queue MessageId RedisBroker
msgId
              IO (BrokerMessage RedisBroker a)
-> (BrokerMessage RedisBroker a
    -> IO (BrokerMessage RedisBroker a))
-> Maybe (BrokerMessage RedisBroker a)
-> IO (BrokerMessage RedisBroker a)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe IO (BrokerMessage RedisBroker a)
forall a. HasCallStack => a
undefined BrokerMessage RedisBroker a -> IO (BrokerMessage RedisBroker a)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (BrokerMessage RedisBroker a)
mMsg

  -- popMessageWaiting b@(RedisBroker' { conn }) queue = loop
  --   where
  --     queueK = queueKey queue
  --     loop = do
  --       eMsgId <- Redis.runRedis conn $ Redis.spop queueK
  --       case eMsgId of
  --         Left _ -> undefined
  --         Right Nothing -> do
  --           threadDelay (10*1000)
  --           popMessageWaiting b queue
  --         Right (Just msgIdBS) -> case bsToId msgIdBS of
  --           Nothing -> undefined
  --           Just msgId -> do
  --             mMsg <- getRedisMessage b queue msgId
  --             case mMsg of
  --               Nothing -> undefined
  --               Just msg -> return msg
                
  setMessageTimeout :: Broker RedisBroker a
-> Queue -> MessageId RedisBroker -> TimeoutS -> IO ()
setMessageTimeout (RedisBroker' { Connection
conn :: forall a. Broker RedisBroker a -> Connection
conn :: Connection
conn }) Queue
queue MessageId RedisBroker
msgId (TimeoutS Int
timeoutS) = do
    UnixTime
ut <- IO UnixTime
getUnixTime
    IO (Either Reply Integer) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Either Reply Integer) -> IO ())
-> IO (Either Reply Integer) -> IO ()
forall a b. (a -> b) -> a -> b
$ Connection
-> Redis (Either Reply Integer) -> IO (Either Reply Integer)
forall a. Connection -> Redis a -> IO a
Redis.runRedis Connection
conn (Redis (Either Reply Integer) -> IO (Either Reply Integer))
-> Redis (Either Reply Integer) -> IO (Either Reply Integer)
forall a b. (a -> b) -> a -> b
$ do
      let CTime Int64
t = UnixTime -> CTime
utSeconds UnixTime
ut
      let ms :: Integer
ms = Int32 -> Integer
forall a b. (Integral a, Num b) => a -> b
fromIntegral (UnixTime -> Int32
utMicroSeconds UnixTime
ut) :: Integer
      ByteString
-> ByteString -> ByteString -> Redis (Either Reply Integer)
forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
ByteString -> ByteString -> ByteString -> m (f Integer)
Redis.hset ByteString
queueK (MessageId RedisBroker -> ByteString
idToBS MessageId RedisBroker
msgId) (String -> ByteString
BSC.pack (String -> ByteString) -> String -> ByteString
forall a b. (a -> b) -> a -> b
$ (Integer -> String
forall a. Show a => a -> String
show (Integer -> String) -> Integer -> String
forall a b. (a -> b) -> a -> b
$ Int64 -> Integer
forall a. Integral a => a -> Integer
toInteger Int64
t Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
+ Int -> Integer
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
timeoutS) String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
"." String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Integer -> String
forall a. Show a => a -> String
show Integer
ms)
    where
      queueK :: ByteString
queueK = Queue -> ByteString
messageTimeoutKey Queue
queue

  sendMessage :: Broker RedisBroker a
-> Queue -> Message RedisBroker a -> IO (MessageId RedisBroker)
sendMessage b :: Broker RedisBroker a
b@(RedisBroker' { Connection
conn :: forall a. Broker RedisBroker a -> Connection
conn :: Connection
conn }) Queue
queue (RedisM a
message) = do
    Maybe Int
mId <- Broker RedisBroker a -> Queue -> IO (Maybe Int)
forall a. Broker RedisBroker a -> Queue -> IO (Maybe Int)
nextId Broker RedisBroker a
b Queue
queue
    case Maybe Int
mId of
      Maybe Int
Nothing -> IO (MessageId RedisBroker)
forall a. HasCallStack => a
undefined
      Just Int
id' -> do
        let msgId :: MessageId RedisBroker
msgId = Int -> MessageId RedisBroker
RedisMid Int
id'
        let m :: RedisWithMsgId a
m = RedisWithMsgId { rmidId :: Int
rmidId = Int
id', rmida :: a
rmida = a
message }
        let msgK :: ByteString
msgK = Queue -> MessageId RedisBroker -> ByteString
messageKey Queue
queue MessageId RedisBroker
msgId
        let queueK :: ByteString
queueK = Queue -> ByteString
queueKey Queue
queue
        IO (Either Reply Integer) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Either Reply Integer) -> IO ())
-> IO (Either Reply Integer) -> IO ()
forall a b. (a -> b) -> a -> b
$ Connection
-> Redis (Either Reply Integer) -> IO (Either Reply Integer)
forall a. Connection -> Redis a -> IO a
Redis.runRedis Connection
conn (Redis (Either Reply Integer) -> IO (Either Reply Integer))
-> Redis (Either Reply Integer) -> IO (Either Reply Integer)
forall a b. (a -> b) -> a -> b
$ do
          -- write the message itself under unique key
          Either Reply Status
_ <- ByteString -> ByteString -> Redis (Either Reply Status)
forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
ByteString -> ByteString -> m (f Status)
Redis.set ByteString
msgK (LazyByteString -> ByteString
BSL.toStrict (LazyByteString -> ByteString) -> LazyByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ RedisWithMsgId a -> LazyByteString
forall a. ToJSON a => a -> LazyByteString
Aeson.encode RedisWithMsgId a
m)
          -- add message id to the list
          -- Redis.sadd queueK [idToBS msgId]
          ByteString -> [ByteString] -> Redis (Either Reply Integer)
forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
ByteString -> [ByteString] -> m (f Integer)
Redis.lpush ByteString
queueK [MessageId RedisBroker -> ByteString
idToBS MessageId RedisBroker
msgId]
        MessageId RedisBroker -> IO (MessageId RedisBroker)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return MessageId RedisBroker
msgId

  -- | TODO Not implemented
  sendMessageDelayed :: Broker RedisBroker a
-> Queue
-> Message RedisBroker a
-> TimeoutS
-> IO (MessageId RedisBroker)
sendMessageDelayed Broker RedisBroker a
b Queue
queue Message RedisBroker a
message TimeoutS
_t = Broker RedisBroker a
-> Queue -> Message RedisBroker a -> IO (MessageId RedisBroker)
forall b a.
MessageBroker b a =>
Broker b a -> Queue -> Message b a -> IO (MessageId b)
sendMessage Broker RedisBroker a
b Queue
queue Message RedisBroker a
message

  -- deleteMessage (RedisBroker' { conn }) queue (RedisMid msgId) = do
  deleteMessage :: Broker RedisBroker a -> Queue -> MessageId RedisBroker -> IO ()
deleteMessage (RedisBroker' { Connection
conn :: forall a. Broker RedisBroker a -> Connection
conn :: Connection
conn }) Queue
queue MessageId RedisBroker
msgId = do
    let queueK :: ByteString
queueK = Queue -> ByteString
queueKey Queue
queue
    let messageK :: ByteString
messageK = Queue -> MessageId RedisBroker -> ByteString
messageKey Queue
queue MessageId RedisBroker
msgId
    let timeoutK :: ByteString
timeoutK = Queue -> ByteString
messageTimeoutKey Queue
queue
    -- void $ Redis.runRedis conn $ Redis.srem queueK [idToBS msgId]
    IO (Either Reply Integer) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Either Reply Integer) -> IO ())
-> IO (Either Reply Integer) -> IO ()
forall a b. (a -> b) -> a -> b
$ Connection
-> Redis (Either Reply Integer) -> IO (Either Reply Integer)
forall a. Connection -> Redis a -> IO a
Redis.runRedis Connection
conn (Redis (Either Reply Integer) -> IO (Either Reply Integer))
-> Redis (Either Reply Integer) -> IO (Either Reply Integer)
forall a b. (a -> b) -> a -> b
$ do
      Redis (Either Reply Integer) -> Redis ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Redis (Either Reply Integer) -> Redis ())
-> Redis (Either Reply Integer) -> Redis ()
forall a b. (a -> b) -> a -> b
$ ByteString -> Integer -> ByteString -> Redis (Either Reply Integer)
forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
ByteString -> Integer -> ByteString -> m (f Integer)
Redis.lrem ByteString
queueK Integer
1 (MessageId RedisBroker -> ByteString
idToBS MessageId RedisBroker
msgId)
      Redis (Either Reply Integer) -> Redis ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Redis (Either Reply Integer) -> Redis ())
-> Redis (Either Reply Integer) -> Redis ()
forall a b. (a -> b) -> a -> b
$ [ByteString] -> Redis (Either Reply Integer)
forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
[ByteString] -> m (f Integer)
Redis.del [ByteString
messageK]
      ByteString -> [ByteString] -> Redis (Either Reply Integer)
forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
ByteString -> [ByteString] -> m (f Integer)
Redis.hdel ByteString
timeoutK [MessageId RedisBroker -> ByteString
idToBS MessageId RedisBroker
msgId]

  -- archiveMessage (RedisBroker' { conn }) queue (RedisMid msgId) = do
  archiveMessage :: Broker RedisBroker a -> Queue -> MessageId RedisBroker -> IO ()
archiveMessage (RedisBroker' { Connection
conn :: forall a. Broker RedisBroker a -> Connection
conn :: Connection
conn }) Queue
queue MessageId RedisBroker
msgId = do
    let queueK :: ByteString
queueK = Queue -> ByteString
queueKey Queue
queue
    let archiveK :: ByteString
archiveK = Queue -> ByteString
archiveKey Queue
queue
    let timeoutK :: ByteString
timeoutK = Queue -> ByteString
messageTimeoutKey Queue
queue
    IO (Either Reply Integer) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Either Reply Integer) -> IO ())
-> IO (Either Reply Integer) -> IO ()
forall a b. (a -> b) -> a -> b
$ Connection
-> Redis (Either Reply Integer) -> IO (Either Reply Integer)
forall a. Connection -> Redis a -> IO a
Redis.runRedis Connection
conn (Redis (Either Reply Integer) -> IO (Either Reply Integer))
-> Redis (Either Reply Integer) -> IO (Either Reply Integer)
forall a b. (a -> b) -> a -> b
$ do
      Redis (Either Reply Integer) -> Redis ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Redis (Either Reply Integer) -> Redis ())
-> Redis (Either Reply Integer) -> Redis ()
forall a b. (a -> b) -> a -> b
$ ByteString -> Integer -> ByteString -> Redis (Either Reply Integer)
forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
ByteString -> Integer -> ByteString -> m (f Integer)
Redis.lrem ByteString
queueK Integer
1 (MessageId RedisBroker -> ByteString
idToBS MessageId RedisBroker
msgId)
      Redis (Either Reply Integer) -> Redis ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Redis (Either Reply Integer) -> Redis ())
-> Redis (Either Reply Integer) -> Redis ()
forall a b. (a -> b) -> a -> b
$ ByteString -> [ByteString] -> Redis (Either Reply Integer)
forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
ByteString -> [ByteString] -> m (f Integer)
Redis.sadd ByteString
archiveK [MessageId RedisBroker -> ByteString
idToBS MessageId RedisBroker
msgId]
      ByteString -> [ByteString] -> Redis (Either Reply Integer)
forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
ByteString -> [ByteString] -> m (f Integer)
Redis.hdel ByteString
timeoutK [MessageId RedisBroker -> ByteString
idToBS MessageId RedisBroker
msgId]
    -- eMove <- Redis.runRedis conn $ Redis.smove queueK archiveK (idToBS msgId)
    -- case eMove of
    --   Left _ -> undefined
    --   Right True -> return ()
    --   Right False -> do
    --     -- OK so the queue might not have the id, we just add it to archive to make sure
    --     void $ Redis.runRedis conn $ Redis.sadd archiveK [idToBS msgId]

  -- TODO This is incorrect: we should include message timeout in this count
  -- getQueueSize (RedisBroker' { conn }) queue = do
  --   let queueK = queueKey queue
  --   -- eLen <- Redis.runRedis conn $ Redis.scard queueK
  --   eLen <- Redis.runRedis conn $ Redis.llen queueK
  --   case eLen of
  --     Right len -> return $ fromIntegral len
  --     Left _ -> undefined
  getQueueSize :: Broker RedisBroker a -> Queue -> IO Int
getQueueSize Broker RedisBroker a
b Queue
queue = do
    [MessageId RedisBroker]
msgIds <- Broker RedisBroker a -> Queue -> IO [MessageId RedisBroker]
forall b a.
MessageBroker b a =>
Broker b a -> Queue -> IO [MessageId b]
listPendingMessageIds Broker RedisBroker a
b Queue
queue
    Int -> IO Int
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> IO Int) -> Int -> IO Int
forall a b. (a -> b) -> a -> b
$ [MessageId RedisBroker] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [MessageId RedisBroker]
msgIds

  getArchivedMessage :: Broker RedisBroker a
-> Queue
-> MessageId RedisBroker
-> IO (Maybe (BrokerMessage RedisBroker a))
getArchivedMessage b :: Broker RedisBroker a
b@(RedisBroker' { Connection
conn :: forall a. Broker RedisBroker a -> Connection
conn :: Connection
conn }) Queue
queue MessageId RedisBroker
msgId = do
    let archiveK :: ByteString
archiveK = Queue -> ByteString
archiveKey Queue
queue
    Either Reply Bool
eIsMember <- Connection -> Redis (Either Reply Bool) -> IO (Either Reply Bool)
forall a. Connection -> Redis a -> IO a
Redis.runRedis Connection
conn (Redis (Either Reply Bool) -> IO (Either Reply Bool))
-> Redis (Either Reply Bool) -> IO (Either Reply Bool)
forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString -> Redis (Either Reply Bool)
forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
ByteString -> ByteString -> m (f Bool)
Redis.sismember ByteString
archiveK (MessageId RedisBroker -> ByteString
idToBS MessageId RedisBroker
msgId)
    case Either Reply Bool
eIsMember of
      Right Bool
True -> do
        Broker RedisBroker a
-> Queue
-> MessageId RedisBroker
-> IO (Maybe (BrokerMessage RedisBroker a))
forall a.
FromJSON a =>
Broker RedisBroker a
-> Queue
-> MessageId RedisBroker
-> IO (Maybe (BrokerMessage RedisBroker a))
getRedisMessage Broker RedisBroker a
b Queue
queue MessageId RedisBroker
msgId
      Either Reply Bool
_ -> Maybe (BrokerMessage RedisBroker a)
-> IO (Maybe (BrokerMessage RedisBroker a))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (BrokerMessage RedisBroker a)
forall a. Maybe a
Nothing

  listPendingMessageIds :: Broker RedisBroker a -> Queue -> IO [MessageId RedisBroker]
listPendingMessageIds b :: Broker RedisBroker a
b@(RedisBroker' { Connection
conn :: forall a. Broker RedisBroker a -> Connection
conn :: Connection
conn }) Queue
queue = do
    let queueK :: ByteString
queueK = Queue -> ByteString
queueKey Queue
queue
    Either Reply [ByteString]
eMsgIds <- Connection
-> Redis (Either Reply [ByteString])
-> IO (Either Reply [ByteString])
forall a. Connection -> Redis a -> IO a
Redis.runRedis Connection
conn (Redis (Either Reply [ByteString])
 -> IO (Either Reply [ByteString]))
-> Redis (Either Reply [ByteString])
-> IO (Either Reply [ByteString])
forall a b. (a -> b) -> a -> b
$ ByteString
-> Integer -> Integer -> Redis (Either Reply [ByteString])
forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
ByteString -> Integer -> Integer -> m (f [ByteString])
Redis.lrange ByteString
queueK Integer
0 (-Integer
1)
    case Either Reply [ByteString]
eMsgIds of
      Left Reply
_ -> [MessageId RedisBroker] -> IO [MessageId RedisBroker]
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return []
      Right [ByteString]
msgIds -> do
        let msgIds' :: [MessageId RedisBroker]
msgIds' = [Maybe (MessageId RedisBroker)] -> [MessageId RedisBroker]
forall a. [Maybe a] -> [a]
catMaybes (ByteString -> Maybe (MessageId RedisBroker)
bsToId (ByteString -> Maybe (MessageId RedisBroker))
-> [ByteString] -> [Maybe (MessageId RedisBroker)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [ByteString]
msgIds)
        UnixTime
ut <- IO UnixTime
getUnixTime
        [Maybe UnixTime]
timeouts <- (MessageId RedisBroker -> IO (Maybe UnixTime))
-> [MessageId RedisBroker] -> IO [Maybe UnixTime]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (Broker RedisBroker a
-> Queue -> MessageId RedisBroker -> IO (Maybe UnixTime)
forall a.
Broker RedisBroker a
-> Queue -> MessageId RedisBroker -> IO (Maybe UnixTime)
getMessageTimeout Broker RedisBroker a
b Queue
queue) [MessageId RedisBroker]
msgIds'
        [MessageId RedisBroker] -> IO [MessageId RedisBroker]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([MessageId RedisBroker] -> IO [MessageId RedisBroker])
-> [MessageId RedisBroker] -> IO [MessageId RedisBroker]
forall a b. (a -> b) -> a -> b
$ ((MessageId RedisBroker, Maybe UnixTime) -> MessageId RedisBroker)
-> [(MessageId RedisBroker, Maybe UnixTime)]
-> [MessageId RedisBroker]
forall a b. (a -> b) -> [a] -> [b]
map (MessageId RedisBroker, Maybe UnixTime) -> MessageId RedisBroker
forall a b. (a, b) -> a
fst ([(MessageId RedisBroker, Maybe UnixTime)]
 -> [MessageId RedisBroker])
-> [(MessageId RedisBroker, Maybe UnixTime)]
-> [MessageId RedisBroker]
forall a b. (a -> b) -> a -> b
$ ((MessageId RedisBroker, Maybe UnixTime) -> Bool)
-> [(MessageId RedisBroker, Maybe UnixTime)]
-> [(MessageId RedisBroker, Maybe UnixTime)]
forall a. (a -> Bool) -> [a] -> [a]
filter (\(MessageId RedisBroker
_msgId, Maybe UnixTime
ts) -> (UnixTime -> Maybe UnixTime -> UnixTime
forall a. a -> Maybe a -> a
fromMaybe UnixTime
ut Maybe UnixTime
ts) UnixTime -> UnixTime -> Bool
forall a. Ord a => a -> a -> Bool
<= UnixTime
ut) ([(MessageId RedisBroker, Maybe UnixTime)]
 -> [(MessageId RedisBroker, Maybe UnixTime)])
-> [(MessageId RedisBroker, Maybe UnixTime)]
-> [(MessageId RedisBroker, Maybe UnixTime)]
forall a b. (a -> b) -> a -> b
$ [MessageId RedisBroker]
-> [Maybe UnixTime] -> [(MessageId RedisBroker, Maybe UnixTime)]
forall a b. [a] -> [b] -> [(a, b)]
zip [MessageId RedisBroker]
msgIds' [Maybe UnixTime]
timeouts

  getMessageById :: Broker RedisBroker a
-> Queue
-> MessageId RedisBroker
-> IO (Maybe (BrokerMessage RedisBroker a))
getMessageById Broker RedisBroker a
b Queue
queue MessageId RedisBroker
msgId = do
    Broker RedisBroker a
-> Queue
-> MessageId RedisBroker
-> IO (Maybe (BrokerMessage RedisBroker a))
forall a.
FromJSON a =>
Broker RedisBroker a
-> Queue
-> MessageId RedisBroker
-> IO (Maybe (BrokerMessage RedisBroker a))
getRedisMessage Broker RedisBroker a
b Queue
queue MessageId RedisBroker
msgId

getMessageTimeout :: Broker RedisBroker a -> Queue -> MessageId RedisBroker -> IO (Maybe UnixTime)
getMessageTimeout :: forall a.
Broker RedisBroker a
-> Queue -> MessageId RedisBroker -> IO (Maybe UnixTime)
getMessageTimeout (RedisBroker' { Connection
conn :: forall a. Broker RedisBroker a -> Connection
conn :: Connection
conn }) Queue
queue MessageId RedisBroker
msgId = do
  Either Reply (Maybe ByteString)
eData <- Connection
-> Redis (Either Reply (Maybe ByteString))
-> IO (Either Reply (Maybe ByteString))
forall a. Connection -> Redis a -> IO a
Redis.runRedis Connection
conn (Redis (Either Reply (Maybe ByteString))
 -> IO (Either Reply (Maybe ByteString)))
-> Redis (Either Reply (Maybe ByteString))
-> IO (Either Reply (Maybe ByteString))
forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString -> Redis (Either Reply (Maybe ByteString))
forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
ByteString -> ByteString -> m (f (Maybe ByteString))
Redis.hget ByteString
queueK (MessageId RedisBroker -> ByteString
idToBS MessageId RedisBroker
msgId)
  case Either Reply (Maybe ByteString)
eData of
    Left Reply
_ -> IO (Maybe UnixTime)
forall a. HasCallStack => a
undefined
    Right Maybe ByteString
Nothing -> Maybe UnixTime -> IO (Maybe UnixTime)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe UnixTime
forall a. Maybe a
Nothing
    Right (Just ByteString
timeoutBs) -> do
      case (Char -> Bool) -> ByteString -> (ByteString, ByteString)
BSC.break (Char -> Char -> Bool
forall a. Eq a => a -> a -> Bool
== Char
'.') ByteString
timeoutBs of
        (ByteString
s, ByteString
ms) -> case (String -> Maybe Int64
forall a. Read a => String -> Maybe a
readMaybe (String -> Maybe Int64) -> String -> Maybe Int64
forall a b. (a -> b) -> a -> b
$ ByteString -> String
BSC.unpack ByteString
s, String -> Maybe Int32
forall a. Read a => String -> Maybe a
readMaybe (String -> Maybe Int32) -> String -> Maybe Int32
forall a b. (a -> b) -> a -> b
$ ByteString -> String
BSC.unpack (ByteString -> String) -> ByteString -> String
forall a b. (a -> b) -> a -> b
$ Int -> ByteString -> ByteString
BSC.drop Int
1 ByteString
ms) of
          (Just Int64
s', Just Int32
ms') -> Maybe UnixTime -> IO (Maybe UnixTime)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe UnixTime -> IO (Maybe UnixTime))
-> Maybe UnixTime -> IO (Maybe UnixTime)
forall a b. (a -> b) -> a -> b
$ UnixTime -> Maybe UnixTime
forall a. a -> Maybe a
Just (UnixTime -> Maybe UnixTime) -> UnixTime -> Maybe UnixTime
forall a b. (a -> b) -> a -> b
$ CTime -> Int32 -> UnixTime
UnixTime (Int64 -> CTime
CTime Int64
s') Int32
ms'
          (Maybe Int64, Maybe Int32)
_ -> Maybe UnixTime -> IO (Maybe UnixTime)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe UnixTime
forall a. Maybe a
Nothing
  where
    queueK :: ByteString
queueK = Queue -> ByteString
messageTimeoutKey Queue
queue

-- Helper functions for getting redis keys

-- | Redis counter is an 'Int', while sets can only store strings
idToBS :: MessageId RedisBroker -> BSC.ByteString
idToBS :: MessageId RedisBroker -> ByteString
idToBS (RedisMid Int
msgId) = LazyByteString -> ByteString
BSL.toStrict (LazyByteString -> ByteString) -> LazyByteString -> ByteString
forall a b. (a -> b) -> a -> b
$  Int -> LazyByteString
forall a. ToJSON a => a -> LazyByteString
Aeson.encode Int
msgId

bsToId :: BSC.ByteString -> Maybe (MessageId RedisBroker)
bsToId :: ByteString -> Maybe (MessageId RedisBroker)
bsToId ByteString
bs = Int -> MessageId RedisBroker
RedisMid (Int -> MessageId RedisBroker)
-> Maybe Int -> Maybe (MessageId RedisBroker)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> LazyByteString -> Maybe Int
forall a. FromJSON a => LazyByteString -> Maybe a
Aeson.decode (ByteString -> LazyByteString
BSL.fromStrict ByteString
bs)

-- | A global prefix used for all keys
beePrefix :: String
beePrefix :: String
beePrefix = String
"bee-"

-- | Redis counter that returns message ids
idKey :: Queue -> BSC.ByteString
idKey :: Queue -> ByteString
idKey Queue
queue = String -> ByteString
BSC.pack (String -> ByteString) -> String -> ByteString
forall a b. (a -> b) -> a -> b
$ String
beePrefix String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
"sequence-" String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Queue -> String
renderQueue Queue
queue

nextId :: Broker RedisBroker a -> Queue -> IO (Maybe Int)
nextId :: forall a. Broker RedisBroker a -> Queue -> IO (Maybe Int)
nextId (RedisBroker' { Connection
conn :: forall a. Broker RedisBroker a -> Connection
conn :: Connection
conn }) Queue
queue = do
  let key :: ByteString
key = Queue -> ByteString
idKey Queue
queue
  Either Reply Integer
eId <- Connection
-> Redis (Either Reply Integer) -> IO (Either Reply Integer)
forall a. Connection -> Redis a -> IO a
Redis.runRedis  Connection
conn (Redis (Either Reply Integer) -> IO (Either Reply Integer))
-> Redis (Either Reply Integer) -> IO (Either Reply Integer)
forall a b. (a -> b) -> a -> b
$ ByteString -> Redis (Either Reply Integer)
forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
ByteString -> m (f Integer)
Redis.incr ByteString
key
  case Either Reply Integer
eId of
    Right Integer
id' -> Maybe Int -> IO (Maybe Int)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int -> Maybe Int
forall a. a -> Maybe a
Just (Int -> Maybe Int) -> Int -> Maybe Int
forall a b. (a -> b) -> a -> b
$ Integer -> Int
forall a. Num a => Integer -> a
fromInteger Integer
id')
    Either Reply Integer
_ -> Maybe Int -> IO (Maybe Int)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe Int
forall a. Maybe a
Nothing

-- | Key under which a message is stored
messageKey :: Queue -> MessageId RedisBroker -> BSC.ByteString
messageKey :: Queue -> MessageId RedisBroker -> ByteString
messageKey Queue
queue (RedisMid Int
msgId) = Queue -> ByteString
queueKey Queue
queue ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> String -> ByteString
BSC.pack (String
"-message-" String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show Int
msgId)

-- | Key for storing the set of message ids in queue
queueKey :: Queue -> BSC.ByteString
queueKey :: Queue -> ByteString
queueKey Queue
queue = String -> ByteString
BSC.pack (String -> ByteString) -> String -> ByteString
forall a b. (a -> b) -> a -> b
$ String
beePrefix String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
"queue-" String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Queue -> String
renderQueue Queue
queue

-- | Key for storing the set of message ids in archive
archiveKey :: Queue -> BSC.ByteString
archiveKey :: Queue -> ByteString
archiveKey Queue
queue = String -> ByteString
BSC.pack (String -> ByteString) -> String -> ByteString
forall a b. (a -> b) -> a -> b
$ String
beePrefix String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
"archive-" String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Queue -> String
renderQueue Queue
queue

-- | Key for storing message timeouts
messageTimeoutKey :: Queue -> BSC.ByteString
messageTimeoutKey :: Queue -> ByteString
messageTimeoutKey Queue
queue = String -> ByteString
BSC.pack (String -> ByteString) -> String -> ByteString
forall a b. (a -> b) -> a -> b
$ String
beePrefix String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
"timeout-" String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Queue -> String
renderQueue Queue
queue

getRedisMessage :: FromJSON a
                => Broker RedisBroker a
                -> Queue
                -> MessageId RedisBroker
                -> IO (Maybe (BrokerMessage RedisBroker a))
getRedisMessage :: forall a.
FromJSON a =>
Broker RedisBroker a
-> Queue
-> MessageId RedisBroker
-> IO (Maybe (BrokerMessage RedisBroker a))
getRedisMessage (RedisBroker' { Connection
conn :: forall a. Broker RedisBroker a -> Connection
conn :: Connection
conn }) Queue
queue MessageId RedisBroker
msgId = do
  let msgKey :: ByteString
msgKey = Queue -> MessageId RedisBroker -> ByteString
messageKey Queue
queue MessageId RedisBroker
msgId
  Either Reply (Maybe ByteString)
eMsg <- Connection
-> Redis (Either Reply (Maybe ByteString))
-> IO (Either Reply (Maybe ByteString))
forall a. Connection -> Redis a -> IO a
Redis.runRedis Connection
conn (Redis (Either Reply (Maybe ByteString))
 -> IO (Either Reply (Maybe ByteString)))
-> Redis (Either Reply (Maybe ByteString))
-> IO (Either Reply (Maybe ByteString))
forall a b. (a -> b) -> a -> b
$ ByteString -> Redis (Either Reply (Maybe ByteString))
forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
ByteString -> m (f (Maybe ByteString))
Redis.get ByteString
msgKey
  case Either Reply (Maybe ByteString)
eMsg of
    Left Reply
_ -> Maybe (BrokerMessage RedisBroker a)
-> IO (Maybe (BrokerMessage RedisBroker a))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (BrokerMessage RedisBroker a)
forall a. Maybe a
Nothing
    Right Maybe ByteString
Nothing -> Maybe (BrokerMessage RedisBroker a)
-> IO (Maybe (BrokerMessage RedisBroker a))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (BrokerMessage RedisBroker a)
forall a. Maybe a
Nothing
    Right (Just ByteString
msg) ->
      case LazyByteString -> Maybe (RedisWithMsgId a)
forall a. FromJSON a => LazyByteString -> Maybe a
Aeson.decode (ByteString -> LazyByteString
BSL.fromStrict ByteString
msg) of
        Just RedisWithMsgId a
dmsg -> Maybe (BrokerMessage RedisBroker a)
-> IO (Maybe (BrokerMessage RedisBroker a))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (BrokerMessage RedisBroker a)
 -> IO (Maybe (BrokerMessage RedisBroker a)))
-> Maybe (BrokerMessage RedisBroker a)
-> IO (Maybe (BrokerMessage RedisBroker a))
forall a b. (a -> b) -> a -> b
$ BrokerMessage RedisBroker a -> Maybe (BrokerMessage RedisBroker a)
forall a. a -> Maybe a
Just (BrokerMessage RedisBroker a
 -> Maybe (BrokerMessage RedisBroker a))
-> BrokerMessage RedisBroker a
-> Maybe (BrokerMessage RedisBroker a)
forall a b. (a -> b) -> a -> b
$ RedisWithMsgId a -> BrokerMessage RedisBroker a
forall a. RedisWithMsgId a -> BrokerMessage RedisBroker a
RedisBM RedisWithMsgId a
dmsg
        Maybe (RedisWithMsgId a)
Nothing -> Maybe (BrokerMessage RedisBroker a)
-> IO (Maybe (BrokerMessage RedisBroker a))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (BrokerMessage RedisBroker a)
forall a. Maybe a
Nothing
    
-- | Helper datatype to store message with a unique id.
-- We fetch the id by using 'INCR'
-- https://redis.io/docs/latest/commands/incr/
data RedisWithMsgId a =
  RedisWithMsgId { forall a. RedisWithMsgId a -> a
rmida  :: a
                 , forall a. RedisWithMsgId a -> Int
rmidId :: Int }
  deriving (Int -> RedisWithMsgId a -> ShowS
[RedisWithMsgId a] -> ShowS
RedisWithMsgId a -> String
(Int -> RedisWithMsgId a -> ShowS)
-> (RedisWithMsgId a -> String)
-> ([RedisWithMsgId a] -> ShowS)
-> Show (RedisWithMsgId a)
forall a. Show a => Int -> RedisWithMsgId a -> ShowS
forall a. Show a => [RedisWithMsgId a] -> ShowS
forall a. Show a => RedisWithMsgId a -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: forall a. Show a => Int -> RedisWithMsgId a -> ShowS
showsPrec :: Int -> RedisWithMsgId a -> ShowS
$cshow :: forall a. Show a => RedisWithMsgId a -> String
show :: RedisWithMsgId a -> String
$cshowList :: forall a. Show a => [RedisWithMsgId a] -> ShowS
showList :: [RedisWithMsgId a] -> ShowS
Show, RedisWithMsgId a -> RedisWithMsgId a -> Bool
(RedisWithMsgId a -> RedisWithMsgId a -> Bool)
-> (RedisWithMsgId a -> RedisWithMsgId a -> Bool)
-> Eq (RedisWithMsgId a)
forall a. Eq a => RedisWithMsgId a -> RedisWithMsgId a -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: forall a. Eq a => RedisWithMsgId a -> RedisWithMsgId a -> Bool
== :: RedisWithMsgId a -> RedisWithMsgId a -> Bool
$c/= :: forall a. Eq a => RedisWithMsgId a -> RedisWithMsgId a -> Bool
/= :: RedisWithMsgId a -> RedisWithMsgId a -> Bool
Eq)
instance FromJSON a => FromJSON (RedisWithMsgId a) where
  parseJSON :: Value -> Parser (RedisWithMsgId a)
parseJSON = String
-> (Object -> Parser (RedisWithMsgId a))
-> Value
-> Parser (RedisWithMsgId a)
forall a. String -> (Object -> Parser a) -> Value -> Parser a
withObject String
"RedisWithMsgId" ((Object -> Parser (RedisWithMsgId a))
 -> Value -> Parser (RedisWithMsgId a))
-> (Object -> Parser (RedisWithMsgId a))
-> Value
-> Parser (RedisWithMsgId a)
forall a b. (a -> b) -> a -> b
$ \Object
o -> do
    a
rmida <- Object
o Object -> Key -> Parser a
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"rmida"
    Int
rmidId <- Object
o Object -> Key -> Parser Int
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"rmidId"
    RedisWithMsgId a -> Parser (RedisWithMsgId a)
forall a. a -> Parser a
forall (m :: * -> *) a. Monad m => a -> m a
return (RedisWithMsgId a -> Parser (RedisWithMsgId a))
-> RedisWithMsgId a -> Parser (RedisWithMsgId a)
forall a b. (a -> b) -> a -> b
$ RedisWithMsgId { a
rmida :: a
rmida :: a
rmida, Int
rmidId :: Int
rmidId :: Int
rmidId }
instance ToJSON a => ToJSON (RedisWithMsgId a) where
  toJSON :: RedisWithMsgId a -> Value
toJSON (RedisWithMsgId { a
Int
rmidId :: forall a. RedisWithMsgId a -> Int
rmida :: forall a. RedisWithMsgId a -> a
rmida :: a
rmidId :: Int
.. }) = Value -> Value
forall a. ToJSON a => a -> Value
toJSON (Value -> Value) -> Value -> Value
forall a b. (a -> b) -> a -> b
$ [Pair] -> Value
object [
      Key
"rmida" Key -> a -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= a
rmida
    , Key
"rmidId" Key -> Int -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= Int
rmidId
    ]



instance ToJSON (MessageId RedisBroker) where
  toJSON :: MessageId RedisBroker -> Value
toJSON (RedisMid Int
i) = Int -> Value
forall a. ToJSON a => a -> Value
toJSON Int
i
instance FromJSON (MessageId RedisBroker) where
  parseJSON :: Value -> Parser (MessageId RedisBroker)
parseJSON = String
-> (Scientific -> Parser (MessageId RedisBroker))
-> Value
-> Parser (MessageId RedisBroker)
forall a. String -> (Scientific -> Parser a) -> Value -> Parser a
withScientific String
"RedisMid" ((Scientific -> Parser (MessageId RedisBroker))
 -> Value -> Parser (MessageId RedisBroker))
-> (Scientific -> Parser (MessageId RedisBroker))
-> Value
-> Parser (MessageId RedisBroker)
forall a b. (a -> b) -> a -> b
$ \Scientific
n ->
    case Scientific -> Either Double Int
forall r i. (RealFloat r, Integral i) => Scientific -> Either r i
floatingOrInteger Scientific
n of
      Right Int
i -> MessageId RedisBroker -> Parser (MessageId RedisBroker)
forall a. a -> Parser a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (MessageId RedisBroker -> Parser (MessageId RedisBroker))
-> MessageId RedisBroker -> Parser (MessageId RedisBroker)
forall a b. (a -> b) -> a -> b
$ Int -> MessageId RedisBroker
RedisMid Int
i
      Left (Double
f :: Double) -> String -> Parser (MessageId RedisBroker)
forall a. String -> Parser a
forall (m :: * -> *) a. MonadFail m => String -> m a
fail (String -> Parser (MessageId RedisBroker))
-> String -> Parser (MessageId RedisBroker)
forall a b. (a -> b) -> a -> b
$ String
"Integer expected: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Double -> String
forall a. Show a => a -> String
show Double
f