| License | Apache-2.0 |
|---|---|
| Maintainer | gabriel.volpe@chatroulette.com |
| Stability | experimental |
| Safe Haskell | None |
| Language | Haskell2010 |
Pulsar
Description
Consider the following imports (needs the async library).
import Control.Concurrent ( threadDelay ) import Control.Concurrent.Async ( concurrently_ ) import Control.Monad ( forever ) import Pulsar
A quick example of a consumer and producer running concurrently.
resources :: Pulsar (Consumer IO, Producer IO) resources = do ctx <- connect defaultConnectData consumer <- newConsumer ctx topic "test-sub" producer <- newProducer ctx topic return (consumer, producer)
A Pulsar connection, consumers, and producers are long-lived resources that are managed accordingly for you. Once the program exits, the resources will be released in the respective order (always opposite to the order of acquisition).
main :: IO ()
main = runPulsar resources $ (Consumer {..}, Producer {..}) ->
let c = forever $ fetch >>= (Message i m) -> print m >> ack i
p = forever $ threadDelay (5 * 1000000) >> produce "hello world"
in concurrently_ c p
Synopsis
- connect :: (MonadThrow m, MonadIO m, MonadManaged m) => ConnectData -> m PulsarCtx
- defaultConnectData :: ConnectData
- newConsumer :: (MonadManaged m, MonadIO f) => PulsarCtx -> Topic -> SubscriptionName -> m (Consumer f)
- newProducer :: (MonadManaged m, MonadIO f) => PulsarCtx -> Topic -> m (Producer f)
- runPulsar :: forall a b. Pulsar a -> (a -> IO b) -> IO b
- runPulsar' :: forall a b. LogOptions -> Pulsar a -> (a -> IO b) -> IO b
- data Consumer m = Consumer {}
- newtype Producer m = Producer {
- produce :: PulsarMessage -> m ()
- data Pulsar a
- data PulsarCtx
- data ConnectData
- data LogLevel
- data LogOptions = LogOptions {}
- data LogOutput
- data Topic = Topic {}
- defaultTopic :: String -> Topic
- data TopicType
- newtype Tenant = Tenant String
- newtype NameSpace = NameSpace String
- newtype TopicName = TopicName String
- newtype MsgId = MsgId MessageIdData
- data Message = Message MsgId ByteString
- newtype PulsarMessage = PulsarMessage ByteString
- newtype SubscriptionName = SubscriptionName Text
Documentation
connect :: (MonadThrow m, MonadIO m, MonadManaged m) => ConnectData -> m PulsarCtx Source #
Starts a Pulsar connection with the supplied ConnectData
defaultConnectData :: ConnectData Source #
Default connection data: "127.0.0.1:6650"
newConsumer :: (MonadManaged m, MonadIO f) => PulsarCtx -> Topic -> SubscriptionName -> m (Consumer f) Source #
Create a new Consumer by supplying a PulsarCtx (returned by connect), a Topic and a SubscriptionName.
newProducer :: (MonadManaged m, MonadIO f) => PulsarCtx -> Topic -> m (Producer f) Source #
runPulsar :: forall a b. Pulsar a -> (a -> IO b) -> IO b Source #
Runs a Pulsar computation with default logging to standard output
runPulsar' :: forall a b. LogOptions -> Pulsar a -> (a -> IO b) -> IO b Source #
Runs a Pulsar computation with the supplied logging options
An abstract Producer able to produce messages of type PulsarMessage.
Constructors
| Producer | |
Fields
| |
The main Pulsar monad, which abstracts over a Managed monad.
Instances
| Monad Pulsar Source # | |
| Functor Pulsar Source # | |
| Applicative Pulsar Source # | |
| MonadIO Pulsar Source # | |
Defined in Pulsar.Internal.Core | |
| MonadThrow Pulsar Source # | |
Defined in Pulsar.Internal.Core | |
| MonadManaged Pulsar Source # | |
Defined in Pulsar.Internal.Core | |
Internal Pulsar context. You will never need to access its content (not exported) but might need to take it as argument.
data ConnectData Source #
Connection details: host and port.
Instances
| Show ConnectData Source # | |
Defined in Pulsar.Connection Methods showsPrec :: Int -> ConnectData -> ShowS # show :: ConnectData -> String # showList :: [ConnectData] -> ShowS # | |
Internal logging level, part of LogOptions. Can be used together with runPulsar`.
data LogOptions Source #
Internal logging options. Can be used together with runPulsar`.
Constructors
| LogOptions | |
Instances
| Show LogOptions Source # | |
Defined in Pulsar.Internal.Core Methods showsPrec :: Int -> LogOptions -> ShowS # show :: LogOptions -> String # showList :: [LogOptions] -> ShowS # | |
Internal logging output, part of LogOptions. Can be used together with runPulsar`.
A Topic is in the form "type://tenant/namespace/topic-name", which is what the Show instance does.
Constructors
| Topic | |
defaultTopic :: String -> Topic Source #
A default Topic: "non-persistent://public/default/my-topic".
A topic can be either Persistent or NonPersistent.
Constructors
| Persistent | |
| NonPersistent |
A tenant can be any string value. Default value is "public".
A namespace can be any string value. Default value is "default".
A topic name can be any string value.
A consumed message, containing both MsgId and payload as bytestring.
Constructors
| Message MsgId ByteString |
newtype PulsarMessage Source #
A produced message, containing just a payload as bytestring.
Constructors
| PulsarMessage ByteString |
Instances
| Show PulsarMessage Source # | |
Defined in Pulsar.Types Methods showsPrec :: Int -> PulsarMessage -> ShowS # show :: PulsarMessage -> String # showList :: [PulsarMessage] -> ShowS # | |
| IsString PulsarMessage Source # | |
Defined in Pulsar.Types Methods fromString :: String -> PulsarMessage # | |
newtype SubscriptionName Source #
A subscription name can be any string value.
Constructors
| SubscriptionName Text |
Instances
| Show SubscriptionName Source # | |
Defined in Pulsar.Types Methods showsPrec :: Int -> SubscriptionName -> ShowS # show :: SubscriptionName -> String # showList :: [SubscriptionName] -> ShowS # | |
| IsString SubscriptionName Source # | |
Defined in Pulsar.Types Methods fromString :: String -> SubscriptionName # | |