MyNixOS website logo
Description

Streamly bindings for hw-kafka-client.

hw-kafka-streamly

Streamly streaming integration for hw-kafka-client — the Haskell binding to Apache Kafka via librdkafka.

hw-kafka-streamly exposes Kafka consumers as composable Streamly Streams and Kafka producers as Streamly Folds, so message processing can be expressed in the same vocabulary used elsewhere in a Streamly pipeline. Resource handling (consumer/producer creation, close, flush) is layered so the caller picks the amount of bracketing they want.

Installation

Add to your .cabal file:

build-depends:
  , hw-kafka-streamly  >=0.1 && <0.2
  , hw-kafka-client    >=5.3 && <6
  , streamly-core      >=0.4 && <0.5

Note:streamly-core 0.4 is not yet published to Hackage at the time of writing. Until it is, depend on a source-repository-package for upstream Streamly in your cabal.project. The plan to land Streamly 0.4 on Hackage is tracked in the project's master plan (docs/masterplans/1-release-hw-kafka-streamly-0.1.0.0.md, EP-5/EP-6).

Modules

  • Kafka.Streamly.Source — consumer streams, error predicates and filters, value-mapping helpers built on Bifunctor/Bitraversable.
  • Kafka.Streamly.Sink — producer folds and a withKafkaProducer bracket helper.
  • Kafka.Streamly.Combinators — batching combinators and helpers that throw Left values as exceptions.

Consuming

The source module ships three variants that differ only in how they manage the underlying KafkaConsumer:

  • kafkaSource — creates the consumer from ConsumerProperties and Subscription, closes it when the stream ends. Use this when the stream fully owns the consumer's lifecycle.
  • kafkaSourceAutoClose — wraps a caller-supplied KafkaConsumer and closes it on stream end. Use this when the consumer is created elsewhere but its lifetime matches the stream.
  • kafkaSourceNoClose — wraps a caller-supplied KafkaConsumer and leaves it open. Use this when the consumer outlives the stream.
import Kafka.Consumer
import Kafka.Streamly.Source (kafkaSource, skipNonFatal)
import Streamly.Data.Stream qualified as Stream

main :: IO ()
main = do
  let props = brokersList ["localhost:9092"]
           <> groupId "my-group"
           <> noAutoCommit
      sub  = topics ["events"] <> offsetReset Earliest
  Stream.fold (Fold.drainBy print)
    . skipNonFatal
    $ kafkaSource props sub (Timeout 1000)

Producing

Producer sinks are Streamly Folds:

import Kafka.Producer
import Kafka.Streamly.Sink (kafkaSink, withKafkaProducer)
import Streamly.Data.Fold qualified as Fold
import Streamly.Data.Stream qualified as Stream

main :: IO ()
main = do
  let props = brokersList ["localhost:9092"]
  result <- withKafkaProducer props $ \producer ->
    Stream.fold (kafkaSink producer)
      . fmap mkRecord
      $ Stream.fromList ["a", "b", "c"]
  print result
  where
    mkRecord v =
      ProducerRecord
        { prTopic     = TopicName "events"
        , prPartition = UnassignedPartition
        , prKey       = Nothing
        , prValue     = Just v
        , prHeaders   = mempty
        }

Cookbook

For end-to-end runnable examples — concurrent consumers, batching producers, error handling, transform pipelines — see the companion hw-kafka-streamly-jitsurei package in this repository. It is not published to Hackage; clone the repo and run the executables locally.

Design notes

The original design plan for the bindings lives at docs/plans/1-streamly-bindings-for-hw-kafka-client.md.

License

MIT — see LICENSE.

Metadata

Version

0.1.0.0

License

Platforms (80)

    Darwin
    FreeBSD
    Genode
    GHCJS
    Linux
    MMIXware
    NetBSD
    none
    OpenBSD
    Redox
    Solaris
    uefi
    WASI
    Windows
Show all
  • aarch64-darwin
  • aarch64-freebsd
  • aarch64-genode
  • aarch64-linux
  • aarch64-netbsd
  • aarch64-none
  • aarch64-uefi
  • aarch64-windows
  • aarch64_be-none
  • arc-linux
  • arm-none
  • armv5tel-linux
  • armv6l-linux
  • armv6l-netbsd
  • armv6l-none
  • armv7a-linux
  • armv7a-netbsd
  • armv7l-linux
  • armv7l-netbsd
  • avr-none
  • i686-cygwin
  • i686-freebsd
  • i686-genode
  • i686-linux
  • i686-netbsd
  • i686-none
  • i686-openbsd
  • i686-windows
  • javascript-ghcjs
  • loongarch64-linux
  • m68k-linux
  • m68k-netbsd
  • m68k-none
  • microblaze-linux
  • microblaze-none
  • microblazeel-linux
  • microblazeel-none
  • mips-linux
  • mips-none
  • mips64-linux
  • mips64-none
  • mips64el-linux
  • mipsel-linux
  • mipsel-netbsd
  • mmix-mmixware
  • msp430-none
  • or1k-none
  • powerpc-linux
  • powerpc-netbsd
  • powerpc-none
  • powerpc64-linux
  • powerpc64le-linux
  • powerpcle-none
  • riscv32-linux
  • riscv32-netbsd
  • riscv32-none
  • riscv64-linux
  • riscv64-netbsd
  • riscv64-none
  • rx-none
  • s390-linux
  • s390-none
  • s390x-linux
  • s390x-none
  • sh4-linux
  • vc4-none
  • wasm32-wasi
  • wasm64-wasi
  • x86_64-cygwin
  • x86_64-darwin
  • x86_64-freebsd
  • x86_64-genode
  • x86_64-linux
  • x86_64-netbsd
  • x86_64-none
  • x86_64-openbsd
  • x86_64-redox
  • x86_64-solaris
  • x86_64-uefi
  • x86_64-windows