CloudEvents Haskell SDK
An unofficial Haskell SDK for the CloudEvents specification, particularly for version 1.0.2 of the spec. (JSON Schema)
Overview
CloudEvents is a vendor-neutral specification for defining the format of event data. This SDK provides a type-safe and idiomatic way to work with CloudEvents in Haskell, making it easier to produce and consume CloudEvents across different services and platforms.
The CloudEvents Haskell SDK allows you to:
- Create CloudEvents with strongly typed event data and extensions
- Validate CloudEvents against the specification
- Encode and decode CloudEvents in JSON format
- Integrate with message brokers like Apache Kafka
Installation
Add the package to your project's dependencies:
Using Cabal
# In your .cabal file
build-depends: cloudevents-haskell
Using Stack
# In your package.yaml file
dependencies:
- cloudevents-haskell
Key Features
- Type-safe CloudEvent representation - Events modeled as native Haskell types with proper validation
- Flexible event data - Support for arbitrary event data types via polymorphism
- Extension mechanism - Define and use custom extension attributes
- Protocol bindings - Integration with Apache Kafka (binary and structured modes)
- JSON encoding/decoding - Automatic serialization with Aeson via Autodocodec
- Lenses - Convenient access and modification of CloudEvent fields
- Validation - Validate CloudEvents against the JSON schema
Quick Start Examples
Creating a Simple CloudEvent
import CloudEvent.V1.Event.Data
import Data.Text (Text)
-- Create a CloudEvent with Text payload and no extensions
simpleEvent :: CloudEvent EmptyExtension Text
simpleEvent =
mkCloudEvent
MkCloudEventArg
{ ceId = "1234-1234-1234"
, ceSource = -- ... (Iri value)
, ceEventType = "com.example.event.created"
, ceDataContentType = Just "text/plain"
, ceDataSchema = Nothing
, ceSubject = Just "resource123"
, ceTime = -- ... (UTCTime value)
}
"This is the event payload as plain text"
EmptyExtension -- No extensions
Working with Structured Data
import CloudEvent.V1.Event.Data
import Data.Aeson (FromJSON, ToJSON)
import Data.Text (Text)
import GHC.Generics (Generic)
import Autodocodec
-- Define custom event data type
data UserCreatedEvent = UserCreatedEvent
{ userId :: Text
, username :: Text
, email :: Text
}
deriving stock (Eq, Show, Generic)
deriving (FromJSON, ToJSON) via (Autodocodec UserCreatedEvent)
instance HasCodec UserCreatedEvent where
codec =
object "UserCreatedEvent" $
UserCreatedEvent
<$> requiredField' "userId" .= (.userId)
<*> requiredField' "username" .= (.username)
<*> requiredField' "email" .= (.email)
-- Create an event with structured data
userCreatedEvent :: CloudEvent EmptyExtension UserCreatedEvent
userCreatedEvent =
mkCloudEvent
MkCloudEventArg
{ ceId = "user-123-456"
, ceSource = -- ... (Iri value)
, ceEventType = "com.example.user.created"
, ceDataContentType = Just "application/json"
, ceDataSchema = Nothing
, ceSubject = Just "resource123"
, ceTime = -- ... (UTCTime value)
}
UserCreatedEvent
{ userId = "123"
, username = "johndoe"
, email = "john@example.com"
}
EmptyExtension
Using Custom Extensions
import CloudEvent.V1.Event.Data
import Data.Aeson (FromJSON, ToJSON)
import Data.Text (Text)
import GHC.Generics (Generic)
import Autodocodec
-- Define custom extension attributes
data TraceExtension = TraceExtension
{ traceId :: Text
, spanId :: Text
}
deriving stock (Eq, Show, Generic)
deriving (FromJSON, ToJSON) via (Autodocodec TraceExtension)
instance HasCodec TraceExtension where
codec = object "TraceExtension" objectCodec
-- This is required
instance HasObjectCodec TraceExtension where
objectCodec =
TraceExtension
<$> requiredField' "traceId" .= (.traceId)
<*> requiredField' "spanId" .= (.spanId)
-- Create an event with custom extensions
eventWithTracing :: CloudEvent TraceExtension Text
eventWithTracing =
mkCloudEvent
MkCloudEventArg
{ ceId = "evt-789-abc"
, ceSource = -- ... (Iri value)
, ceEventType = "com.example.order.processed"
, ceDataContentType = Just "text/plain"
, ceDataSchema = Nothing
, ceSubject = Just "order/456"
, ceTime = -- ... (UTCTime value)
}
"Order 456 has been processed successfully"
TraceExtension
{ traceId = "trace-123456789"
, spanId = "span-abcdef"
}
Validating CloudEvents
The SDK provides functionality to validate CloudEvents against the specification:
import CloudEvent.V1.Event.Data
import CloudEvent.V1.Event.Validation
import Data.Aeson (Value, toJSON)
-- Validate a CloudEvent JSON representation
validateJsonCloudEvent :: Value -> Bool
validateJsonCloudEvent jsonValue =
isValidCloudEvent @MyEventData @MyExtensions jsonValue
-- Example usage:
-- let eventJson = toJSON myCloudEvent
-- if validateJsonCloudEvent eventJson
-- then putStrLn "Valid CloudEvent"
-- else putStrLn "Invalid CloudEvent"
Kafka Integration
The SDK provides integration with Apache Kafka through the CloudEvent.V1.Bindings.Kafka
module:
import CloudEvent.V1.Bindings.Kafka
import CloudEvent.V1.Event.Data
import Data.Text (Text)
import Kafka.Producer (TopicName)
-- Create a CloudEvent with Kafka extension
kafkaEvent :: CloudEvent KafkaCloudEventExtAttributes Text
kafkaEvent =
mkCloudEvent
MkCloudEventArg
{ ceId = "evt-kafka-123"
, ceSource = -- ... (Iri value)
, ceEventType = "com.example.inventory.updated"
, ceDataContentType = Just "text/plain"
, ceDataSchema = Nothing
, ceSubject = Just "product/123"
, ceTime = -- ... (UTCTime value)
}
"Product 123 inventory updated to 42 units"
KafkaCloudEventExtAttributes{partitionKey = "product-123"}
-- Send using binary content mode
sendBinary :: TopicName -> CloudEvent KafkaCloudEventExtAttributes Text -> ProducerRecord
sendBinary topic event = toBinaryKafkaMessage topic event
-- Use with Kafka producer: producerSend producer producerRecord
-- Send using structured content mode
sendStructured :: TopicName -> CloudEvent KafkaCloudEventExtAttributes Text -> ProducerRecord
sendStructured topic event = toStructuredKafkaMessage topic event
-- Use with Kafka producer: producerSend producer producerRecord
Using Lenses
The SDK provides lenses for convenient access and modification of CloudEvent fields:
import CloudEvent.V1.Event.Data
import CloudEvent.V1.Event.Lens
import Control.Lens
import Data.Text (Text)
-- Access fields using lenses
getEventId :: CloudEvent ext eventData -> Text
getEventId event = event ^. ceId
-- Modify a CloudEvent using lenses
updateSubject :: Text -> CloudEvent ext eventData -> CloudEvent ext eventData
updateSubject newSubject event =
event & ceSubject .~ Just newSubject
Complete Example
Here's a more complete example showing how to create, validate, and send a CloudEvent to Kafka:
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE DerivingVia #-}
{-# LANGUAGE OverloadedRecordDot #-}
module Example where
import Autodocodec
import CloudEvent.V1.Bindings.Kafka
import CloudEvent.V1.Event.Data
import CloudEvent.V1.Event.Lens
import CloudEvent.V1.Event.Validation
import Control.Lens
import Data.Aeson (FromJSON, ToJSON, toJSON)
import Data.Text (Text)
import GHC.Generics (Generic)
import Kafka.Producer
-- Define our event data type
data OrderCreatedEvent = OrderCreatedEvent
{ orderId :: Text
, customerId :: Text
, amount :: Double
, items :: [Text]
}
deriving stock (Eq, Show, Generic)
deriving (FromJSON, ToJSON) via (Autodocodec OrderCreatedEvent)
instance HasCodec OrderCreatedEvent where
codec =
object "OrderCreatedEvent" $
OrderCreatedEvent
<$> requiredField' "orderId" .= (.orderId)
<*> requiredField' "customerId" .= (.customerId)
<*> requiredField' "amount" .= (.amount)
<*> requiredField' "items" .= (.items)
-- Define our Kafka-compatible extension
data OrderExtension = OrderExtension
{ partitionKey :: Text -- For Kafka routing
, region :: Text -- Custom extension field
}
deriving stock (Eq, Show, Generic)
instance HasCodec OrderExtension where
codec = object "OrderExtension" objectCodec
instance HasObjectCodec OrderExtension where
objectCodec =
OrderExtension
<$> requiredField' "partitionKey" .= (.partitionKey)
<*> requiredField' "region" .= (.region)
-- Create our CloudEvent
createOrderEvent :: Text -> OrderCreatedEvent -> OrderExtension -> CloudEvent OrderExtension OrderCreatedEvent
createOrderEvent orderId orderData ext =
mkCloudEvent
MkCloudEventArg
{ ceId = "order-" <> orderId
, ceSource = -- ... (Iri value)
, ceEventType = "com.example.order.created"
, ceDataContentType = Just "application/json"
, ceDataSchema = Nothing
, ceSubject = Just ("order/" <> orderId)
, ceTime = -- ... (UTCTime value)
}
orderData
ext
-- Example usage in a function that processes and sends an order event
processOrder :: Text -> Text -> Double -> [Text] -> Text -> IO ()
processOrder orderId customerId amount items region = do
-- Create the order data
let orderData = OrderCreatedEvent
{ orderId = orderId
, customerId = customerId
, amount = amount
, items = items
}
-- Create extension with Kafka partition key and region
extension = OrderExtension
{ partitionKey = orderId -- Use orderId as partition key
, region = region
}
-- Create the CloudEvent
event = createOrderEvent orderId orderData extension
-- Define Kafka topic
topic = "orders"
-- Create Kafka producer record in binary mode
producerRecord = toBinaryKafkaMessage topic event
-- Validate the event before sending
-- In real code, you would decode the JSON first
let isValid = isValidCloudEvent @OrderCreatedEvent @OrderExtension (toJSON event)
if isValid
then do
putStrLn "Event is valid, sending to Kafka..."
-- In a real application, you would send to Kafka:
-- producer <- newProducer producerProperties
-- sendMessageBatch producer [producerRecord]
-- closeProducer producer
pure ()
else
putStrLn "Event validation failed!"
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
License
This project is licensed under the BSD-3-Clause License - see the LICENSE file for details.