{-# LANGUAGE DeriveAnyClass, DeriveGeneric #-}
{-# LANGUAGE OverloadedStrings, RecordWildCards #-}

module Main where

import           Control.Concurrent             ( threadDelay )
import           Control.Concurrent.Async       ( concurrently_ )
import           Control.Monad                  ( forever )
import           Data.Aeson
import qualified Data.ByteString.Lazy.Char8    as CL
import           Data.Foldable                  ( traverse_ )
import           Data.Text                      ( Text )
import           GHC.Generics                   ( Generic )
import           Pulsar
import           Streamly                       ( asyncly
                                                , maxThreads
                                                )
import qualified Streamly.Prelude              as S

main :: IO ()
main = demo

data Msg = Msg
  { name :: Text
  , amount :: Int
  } deriving (Generic, FromJSON, ToJSON, Show)

messages :: [PulsarMessage]
messages =
  let msg = [Msg "foo" 2, Msg "bar" 5, Msg "taz" 1]
  in  PulsarMessage . encode <$> msg

msgDecoder :: CL.ByteString -> IO ()
msgDecoder bs =
  let msg = decode bs :: Maybe Msg
  in  putStrLn $ "-----------------> " <> show msg

topic :: Topic
topic = defaultTopic "app"

demo :: IO ()
demo = runPulsar resources $ \(Consumer {..}, Producer {..}) ->
  let c = forever $ fetch >>= \(Message i m) -> msgDecoder m >> ack i
      p = forever $ sleep 5 >> traverse_ produce messages
  in  concurrently_ c p

resources :: Pulsar (Consumer IO, Producer IO)
resources = do
  ctx      <- connect defaultConnectData
  consumer <- newConsumer ctx topic "test-sub"
  producer <- newProducer ctx topic
  return (consumer, producer)

sleep :: Int -> IO ()
sleep n = threadDelay (n * 1000000)

logOpts :: LogOptions
logOpts = LogOptions Info StdOut

streamDemo :: IO ()
streamDemo = runPulsar' logOpts resources $ \(Consumer {..}, Producer {..}) ->
  let c = forever $ fetch >>= \(Message i m) -> msgDecoder m >> ack i
      p = forever $ sleep 5 >> traverse_ produce messages
  in  S.drain . asyncly . maxThreads 10 $ S.yieldM c <> S.yieldM p