MyNixOS website logo
Description

Effectful effects for hw-kafka-client.

kafka-effectful

Effectful effects and interpreters for hw-kafka-client, a Haskell binding to Apache Kafka via librdkafka.

Provides typed, composable KafkaProducer and KafkaConsumer effects for the effectful ecosystem.

Status: experimental. This package is on its first release. The API may change in breaking ways in subsequent 0.x versions. Pin to an exact version in production until 1.0 is tagged.

Features

  • KafkaProducer -- send messages and flush the producer queue
  • KafkaConsumer -- poll messages (single or batch), manage offsets, assign/pause/resume/seek partitions, and query committed offsets, positions, assignments, and subscriptions
  • Resource-safe interpreters that acquire and release Kafka handles via bracket
  • Errors surfaced through Effectful.Error.Static (Error KafkaError)

Usage

Producer

import Kafka.Effectful

example :: (IOE :> es, Error KafkaError :> es) => Eff es ()
example =
  runKafkaProducer producerProps $ do
    produceMessage record
    flushProducer

Consumer

import Kafka.Effectful

example :: (IOE :> es, Error KafkaError :> es) => Eff es ()
example =
  runKafkaConsumer consumerProps subscription loop
  where
    loop = do
      mbMsg <- pollMessage (Timeout 1000)
      case mbMsg of
        Nothing  -> loop
        Just msg -> do
          commitOffsetMessage OffsetCommit msg
          loop

pollMessage returns Nothing when the timeout elapses without a message arriving; non-timeout failures are thrown via the Error KafkaError effect.

Running it

The effect handlers runKafkaProducer and runKafkaConsumer require IOE and Error KafkaError in the effect stack. A complete program wires them with runEff and runError:

{-# LANGUAGE TypeApplications #-}

import Effectful
import Effectful.Error.Static (runError)
import Kafka.Effectful

main :: IO ()
main = do
  result <- runEff . runError @KafkaError $ runProgram
  case result of
    Left (_, err) -> putStrLn ("Kafka error: " <> show err)
    Right ()      -> pure ()
  where
    runProgram =
      runKafkaProducer producerProps $ do
        produceMessage record
        flushProducer

Replace producerProps and record with your own ProducerProperties and ProducerRecord values (see the Kafka.Effectful.Producer module for the available builders).

Module Structure

ModuleDescription
Kafka.EffectfulConvenience re-export of both effects and common types
Kafka.Effectful.ProducerProducer effect, interpreter, and types
Kafka.Effectful.ConsumerConsumer effect, interpreter, and types
Kafka.Effectful.Producer.EffectKafkaProducer effect definition and operations
Kafka.Effectful.Producer.InterpreterrunKafkaProducer interpreter
Kafka.Effectful.Consumer.EffectKafkaConsumer effect definition and operations
Kafka.Effectful.Consumer.InterpreterrunKafkaConsumer interpreter

Requirements

  • GHC >= 9.12
  • librdkafka (system dependency)

License

MIT.

Metadata

Version

0.1.0.0

License

Platforms (80)

    Darwin
    FreeBSD
    Genode
    GHCJS
    Linux
    MMIXware
    NetBSD
    none
    OpenBSD
    Redox
    Solaris
    uefi
    WASI
    Windows
Show all
  • aarch64-darwin
  • aarch64-freebsd
  • aarch64-genode
  • aarch64-linux
  • aarch64-netbsd
  • aarch64-none
  • aarch64-uefi
  • aarch64-windows
  • aarch64_be-none
  • arc-linux
  • arm-none
  • armv5tel-linux
  • armv6l-linux
  • armv6l-netbsd
  • armv6l-none
  • armv7a-linux
  • armv7a-netbsd
  • armv7l-linux
  • armv7l-netbsd
  • avr-none
  • i686-cygwin
  • i686-freebsd
  • i686-genode
  • i686-linux
  • i686-netbsd
  • i686-none
  • i686-openbsd
  • i686-windows
  • javascript-ghcjs
  • loongarch64-linux
  • m68k-linux
  • m68k-netbsd
  • m68k-none
  • microblaze-linux
  • microblaze-none
  • microblazeel-linux
  • microblazeel-none
  • mips-linux
  • mips-none
  • mips64-linux
  • mips64-none
  • mips64el-linux
  • mipsel-linux
  • mipsel-netbsd
  • mmix-mmixware
  • msp430-none
  • or1k-none
  • powerpc-linux
  • powerpc-netbsd
  • powerpc-none
  • powerpc64-linux
  • powerpc64le-linux
  • powerpcle-none
  • riscv32-linux
  • riscv32-netbsd
  • riscv32-none
  • riscv64-linux
  • riscv64-netbsd
  • riscv64-none
  • rx-none
  • s390-linux
  • s390-none
  • s390x-linux
  • s390x-none
  • sh4-linux
  • vc4-none
  • wasm32-wasi
  • wasm64-wasi
  • x86_64-cygwin
  • x86_64-darwin
  • x86_64-freebsd
  • x86_64-genode
  • x86_64-linux
  • x86_64-netbsd
  • x86_64-none
  • x86_64-openbsd
  • x86_64-redox
  • x86_64-solaris
  • x86_64-uefi
  • x86_64-windows