MyNixOS website logo
Description

Unofficial Haskell SDK for the CloudEvents specification.

CloudEvents Haskell SDK

An unofficial Haskell SDK for the CloudEvents specification, particularly for version 1.0.2 of the spec. (JSON Schema)

Overview

CloudEvents is a vendor-neutral specification for defining the format of event data. This SDK provides a type-safe and idiomatic way to work with CloudEvents in Haskell, making it easier to produce and consume CloudEvents across different services and platforms.

The CloudEvents Haskell SDK allows you to:

  • Create CloudEvents with strongly typed event data and extensions
  • Validate CloudEvents against the specification
  • Encode and decode CloudEvents in JSON format
  • Integrate with message brokers like Apache Kafka

Installation

Add the package to your project's dependencies:

Using Cabal

# In your .cabal file
build-depends: cloudevents-haskell

Using Stack

# In your package.yaml file
dependencies:
  - cloudevents-haskell

Key Features

  • Type-safe CloudEvent representation - Events modeled as native Haskell types with proper validation
  • Flexible event data - Support for arbitrary event data types via polymorphism
  • Extension mechanism - Define and use custom extension attributes
  • Protocol bindings - Integration with Apache Kafka (binary and structured modes)
  • JSON encoding/decoding - Automatic serialization with Aeson via Autodocodec
  • Lenses - Convenient access and modification of CloudEvent fields
  • Validation - Validate CloudEvents against the JSON schema

Quick Start Examples

Creating a Simple CloudEvent

import CloudEvent.V1.Event.Data
import Data.Text (Text)

-- Create a CloudEvent with Text payload and no extensions
simpleEvent :: CloudEvent EmptyExtension Text
simpleEvent =
  mkCloudEvent
    MkCloudEventArg
      { ceId = "1234-1234-1234"
      , ceSource = -- ... (Iri value)
      , ceEventType = "com.example.event.created"
      , ceDataContentType = Just "text/plain"
      , ceDataSchema = Nothing
      , ceSubject = Just "resource123"
      , ceTime = -- ... (UTCTime value)
      }
    "This is the event payload as plain text"
    EmptyExtension -- No extensions

Working with Structured Data

import CloudEvent.V1.Event.Data
import Data.Aeson (FromJSON, ToJSON)
import Data.Text (Text)
import GHC.Generics (Generic)
import Autodocodec

-- Define custom event data type
data UserCreatedEvent = UserCreatedEvent
  { userId :: Text
  , username :: Text
  , email :: Text
  }
  deriving stock (Eq, Show, Generic)
  deriving (FromJSON, ToJSON) via (Autodocodec UserCreatedEvent)

instance HasCodec UserCreatedEvent where
  codec =
    object "UserCreatedEvent" $
      UserCreatedEvent
        <$> requiredField' "userId" .= (.userId)
        <*> requiredField' "username" .= (.username)
        <*> requiredField' "email" .= (.email)

-- Create an event with structured data
userCreatedEvent :: CloudEvent EmptyExtension UserCreatedEvent
userCreatedEvent =
  mkCloudEvent
    MkCloudEventArg
      { ceId = "user-123-456"
      , ceSource = -- ... (Iri value)
      , ceEventType = "com.example.user.created"
      , ceDataContentType = Just "application/json"
      , ceDataSchema = Nothing
      , ceSubject = Just "resource123"
      , ceTime = -- ... (UTCTime value)
      }
    UserCreatedEvent
      { userId = "123"
      , username = "johndoe"
      , email = "[email protected]"
      }
    EmptyExtension

Using Custom Extensions

import CloudEvent.V1.Event.Data
import Data.Aeson (FromJSON, ToJSON)
import Data.Text (Text)
import GHC.Generics (Generic)
import Autodocodec

-- Define custom extension attributes
data TraceExtension = TraceExtension
  { traceId :: Text
  , spanId :: Text
  }
  deriving stock (Eq, Show, Generic)
  deriving (FromJSON, ToJSON) via (Autodocodec TraceExtension)

instance HasCodec TraceExtension where
  codec = object "TraceExtension" objectCodec

-- This is required
instance HasObjectCodec TraceExtension where
  objectCodec =
    TraceExtension
      <$> requiredField' "traceId" .= (.traceId)
      <*> requiredField' "spanId" .= (.spanId)

-- Create an event with custom extensions
eventWithTracing :: CloudEvent TraceExtension Text
eventWithTracing =
  mkCloudEvent
    MkCloudEventArg
      { ceId = "evt-789-abc"
      , ceSource = -- ... (Iri value)
      , ceEventType = "com.example.order.processed"
      , ceDataContentType = Just "text/plain"
      , ceDataSchema = Nothing
      , ceSubject = Just "order/456"
      , ceTime = -- ... (UTCTime value)
      }
    "Order 456 has been processed successfully"
    TraceExtension
      { traceId = "trace-123456789"
      , spanId = "span-abcdef"
      }

Validating CloudEvents

The SDK provides functionality to validate CloudEvents against the specification:

import CloudEvent.V1.Event.Data
import CloudEvent.V1.Event.Validation
import Data.Aeson (Value, toJSON)

-- Validate a CloudEvent JSON representation
validateJsonCloudEvent :: Value -> Bool
validateJsonCloudEvent jsonValue =
  isValidCloudEvent @MyEventData @MyExtensions jsonValue

-- Example usage:
-- let eventJson = toJSON myCloudEvent
-- if validateJsonCloudEvent eventJson
--   then putStrLn "Valid CloudEvent"
--   else putStrLn "Invalid CloudEvent"

Kafka Integration

The SDK provides integration with Apache Kafka through the CloudEvent.V1.Bindings.Kafka module:

import CloudEvent.V1.Bindings.Kafka
import CloudEvent.V1.Event.Data
import Data.Text (Text)
import Kafka.Producer (TopicName)

-- Create a CloudEvent with Kafka extension
kafkaEvent :: CloudEvent KafkaCloudEventExtAttributes Text
kafkaEvent =
  mkCloudEvent
    MkCloudEventArg
      { ceId = "evt-kafka-123"
      , ceSource = -- ... (Iri value)
      , ceEventType = "com.example.inventory.updated"
      , ceDataContentType = Just "text/plain"
      , ceDataSchema = Nothing
      , ceSubject = Just "product/123"
      , ceTime = -- ... (UTCTime value)
      }
    "Product 123 inventory updated to 42 units"
    KafkaCloudEventExtAttributes{partitionKey = "product-123"}

-- Send using binary content mode
sendBinary :: TopicName -> CloudEvent KafkaCloudEventExtAttributes Text -> ProducerRecord
sendBinary topic event = toBinaryKafkaMessage topic event
-- Use with Kafka producer: producerSend producer producerRecord

-- Send using structured content mode
sendStructured :: TopicName -> CloudEvent KafkaCloudEventExtAttributes Text -> ProducerRecord
sendStructured topic event = toStructuredKafkaMessage topic event
-- Use with Kafka producer: producerSend producer producerRecord

Using Lenses

The SDK provides lenses for convenient access and modification of CloudEvent fields:

import CloudEvent.V1.Event.Data
import CloudEvent.V1.Event.Lens
import Control.Lens
import Data.Text (Text)

-- Access fields using lenses
getEventId :: CloudEvent ext eventData -> Text
getEventId event = event ^. ceId

-- Modify a CloudEvent using lenses
updateSubject :: Text -> CloudEvent ext eventData -> CloudEvent ext eventData
updateSubject newSubject event =
  event & ceSubject .~ Just newSubject

Complete Example

Here's a more complete example showing how to create, validate, and send a CloudEvent to Kafka:

{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE DerivingVia #-}
{-# LANGUAGE OverloadedRecordDot #-}

module Example where

import Autodocodec
import CloudEvent.V1.Bindings.Kafka
import CloudEvent.V1.Event.Data
import CloudEvent.V1.Event.Lens
import CloudEvent.V1.Event.Validation
import Control.Lens
import Data.Aeson (FromJSON, ToJSON, toJSON)
import Data.Text (Text)
import GHC.Generics (Generic)
import Kafka.Producer

-- Define our event data type
data OrderCreatedEvent = OrderCreatedEvent
  { orderId :: Text
  , customerId :: Text
  , amount :: Double
  , items :: [Text]
  }
  deriving stock (Eq, Show, Generic)
  deriving (FromJSON, ToJSON) via (Autodocodec OrderCreatedEvent)

instance HasCodec OrderCreatedEvent where
  codec =
    object "OrderCreatedEvent" $
      OrderCreatedEvent
        <$> requiredField' "orderId" .= (.orderId)
        <*> requiredField' "customerId" .= (.customerId)
        <*> requiredField' "amount" .= (.amount)
        <*> requiredField' "items" .= (.items)

-- Define our Kafka-compatible extension
data OrderExtension = OrderExtension
  { partitionKey :: Text  -- For Kafka routing
  , region :: Text        -- Custom extension field
  }
  deriving stock (Eq, Show, Generic)

instance HasCodec OrderExtension where
  codec = object "OrderExtension" objectCodec

instance HasObjectCodec OrderExtension where
  objectCodec =
    OrderExtension
      <$> requiredField' "partitionKey" .= (.partitionKey)
      <*> requiredField' "region" .= (.region)

-- Create our CloudEvent
createOrderEvent :: Text -> OrderCreatedEvent -> OrderExtension -> CloudEvent OrderExtension OrderCreatedEvent
createOrderEvent orderId orderData ext =
  mkCloudEvent
    MkCloudEventArg
      { ceId = "order-" <> orderId
      , ceSource = -- ... (Iri value)
      , ceEventType = "com.example.order.created"
      , ceDataContentType = Just "application/json"
      , ceDataSchema = Nothing
      , ceSubject = Just ("order/" <> orderId)
      , ceTime = -- ... (UTCTime value)
      }
    orderData
    ext

-- Example usage in a function that processes and sends an order event
processOrder :: Text -> Text -> Double -> [Text] -> Text -> IO ()
processOrder orderId customerId amount items region = do
  -- Create the order data
  let orderData = OrderCreatedEvent
        { orderId = orderId
        , customerId = customerId
        , amount = amount
        , items = items
        }

      -- Create extension with Kafka partition key and region
      extension = OrderExtension
        { partitionKey = orderId  -- Use orderId as partition key
        , region = region
        }

      -- Create the CloudEvent
      event = createOrderEvent orderId orderData extension

      -- Define Kafka topic
      topic = "orders"

      -- Create Kafka producer record in binary mode
      producerRecord = toBinaryKafkaMessage topic event

  -- Validate the event before sending
  -- In real code, you would decode the JSON first
  let isValid = isValidCloudEvent @OrderCreatedEvent @OrderExtension (toJSON event)

  if isValid
    then do
      putStrLn "Event is valid, sending to Kafka..."
      -- In a real application, you would send to Kafka:
      -- producer <- newProducer producerProperties
      -- sendMessageBatch producer [producerRecord]
      -- closeProducer producer
      pure ()
    else
      putStrLn "Event validation failed!"

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

License

This project is licensed under the BSD-3-Clause License - see the LICENSE file for details.

Metadata

Version

0.1.0.0

Platforms (75)

    Darwin
    FreeBSD
    Genode
    GHCJS
    Linux
    MMIXware
    NetBSD
    none
    OpenBSD
    Redox
    Solaris
    WASI
    Windows
Show all
  • aarch64-darwin
  • aarch64-freebsd
  • aarch64-genode
  • aarch64-linux
  • aarch64-netbsd
  • aarch64-none
  • aarch64-windows
  • aarch64_be-none
  • arm-none
  • armv5tel-linux
  • armv6l-linux
  • armv6l-netbsd
  • armv6l-none
  • armv7a-linux
  • armv7a-netbsd
  • armv7l-linux
  • armv7l-netbsd
  • avr-none
  • i686-cygwin
  • i686-freebsd
  • i686-genode
  • i686-linux
  • i686-netbsd
  • i686-none
  • i686-openbsd
  • i686-windows
  • javascript-ghcjs
  • loongarch64-linux
  • m68k-linux
  • m68k-netbsd
  • m68k-none
  • microblaze-linux
  • microblaze-none
  • microblazeel-linux
  • microblazeel-none
  • mips-linux
  • mips-none
  • mips64-linux
  • mips64-none
  • mips64el-linux
  • mipsel-linux
  • mipsel-netbsd
  • mmix-mmixware
  • msp430-none
  • or1k-none
  • powerpc-netbsd
  • powerpc-none
  • powerpc64-linux
  • powerpc64le-linux
  • powerpcle-none
  • riscv32-linux
  • riscv32-netbsd
  • riscv32-none
  • riscv64-linux
  • riscv64-netbsd
  • riscv64-none
  • rx-none
  • s390-linux
  • s390-none
  • s390x-linux
  • s390x-none
  • vc4-none
  • wasm32-wasi
  • wasm64-wasi
  • x86_64-cygwin
  • x86_64-darwin
  • x86_64-freebsd
  • x86_64-genode
  • x86_64-linux
  • x86_64-netbsd
  • x86_64-none
  • x86_64-openbsd
  • x86_64-redox
  • x86_64-solaris
  • x86_64-windows