MyNixOS website logo
Description

Type-safe AMQP workers.

Please see the README on GitHub at https://github.com/seanhess/amqp-worker#readme

AMQP Worker

Type-safe AMQP workers. Compatible with RabbitMQ

{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE OverloadedStrings #-}

module Main where

import Control.Concurrent (forkIO)
import Control.Monad.Catch (SomeException)
import Data.Aeson (FromJSON, ToJSON)
import Data.Function ((&))
import Data.Text (Text)
import GHC.Generics (Generic)
import Network.AMQP.Worker
import qualified Network.AMQP.Worker as Worker
import System.IO (BufferMode (..), hSetBuffering, stderr, stdout)

newtype Greeting = Greeting
    {message :: Text}
    deriving (Generic, Show, Eq)

instance FromJSON Greeting
instance ToJSON Greeting

newGreetings :: Key Bind Greeting
newGreetings = key "greetings" & word "new"

anyGreetings :: Key Bind Greeting
anyGreetings = key "greetings" & any1

example :: IO ()
example = do
    conn <- Worker.connect (fromURI "amqp://guest:guest@localhost:5672")
    simple conn

publishing :: Connection -> IO ()
publishing conn = do
    Worker.publish conn newGreetings $ Greeting "Hello"

-- | Create a queue to process messages
simple :: Connection -> IO ()
simple conn = do
    -- create a queue to receive them
    q <- Worker.queue conn def newGreetings

    -- publish a message (delivered to queue)
    Worker.publish conn newGreetings $ Greeting "Hello"

    -- We cannot publish to anyGreetings because it is a binding key (with wildcards in it)
    -- Worker.publish conn anyGreetings $ TestMessage "Compiler Error"

    -- Loop and print any values received
    Worker.worker conn def q onError (print . value)

-- | Multiple queues with distinct names will each get copies of published messages
multiple :: Connection -> IO ()
multiple conn = do
    -- create two separate queues
    one <- Worker.queue conn "one" newGreetings
    two <- Worker.queue conn "two" newGreetings

    -- publish a message (delivered to both)
    Worker.publish conn newGreetings $ Greeting "Hello"

    -- Each of these workers will receive the same message
    _ <- forkIO $ Worker.worker conn def one onError $ \m -> putStrLn "one" >> print (value m)
    _ <- forkIO $ Worker.worker conn def two onError $ \m -> putStrLn "two" >> print (value m)

    putStrLn "Press any key to exit"
    _ <- getLine
    return ()

-- | Create multiple workers on the same queue to load balance between them
balance :: Connection -> IO ()
balance conn = do
    -- create a single queue
    q <- Worker.queue conn def newGreetings

    -- publish two messages
    Worker.publish conn newGreetings $ Greeting "Hello1"
    Worker.publish conn newGreetings $ Greeting "Hello2"

    -- Each worker will receive one of the messages
    _ <- forkIO $ Worker.worker conn def q onError $ \m -> putStrLn "one" >> print (value m)
    _ <- forkIO $ Worker.worker conn def q onError $ \m -> putStrLn "two" >> print (value m)

    putStrLn "Press any key to exit"
    _ <- getLine
    return ()

-- | You can bind to messages dynamically with wildcards in Binding Keys
dynamic :: Connection -> IO ()
dynamic conn = do
    -- \| anyGreetings matches `greetings.*`
    q <- Worker.queue conn def anyGreetings

    -- You can only publish to a Routing Key. Publishing to anyGreetings will give a compile error
    Worker.publish conn newGreetings $ Greeting "Hello"

    -- This queue listens for anything under `greetings.`
    Worker.worker conn def q onError $ \m -> putStrLn "Got: " >> print (value m)

onError :: WorkerException SomeException -> IO ()
onError e = do
    putStrLn "Do something with errors"
    print e

test :: (Connection -> IO ()) -> IO ()
test action = do
    hSetBuffering stdout LineBuffering
    hSetBuffering stderr LineBuffering
    conn <- Worker.connect (fromURI "amqp://guest:guest@localhost:5672")
    action conn

main :: IO ()
main = example

Metadata

Version

2.0.1

Platforms (76)

    Darwin
    FreeBSD
    Genode
    GHCJS
    Linux
    MMIXware
    NetBSD
    none
    OpenBSD
    Redox
    Solaris
    WASI
    Windows
Show all
  • aarch64-darwin
  • aarch64-freebsd
  • aarch64-genode
  • aarch64-linux
  • aarch64-netbsd
  • aarch64-none
  • aarch64-windows
  • aarch64_be-none
  • arm-none
  • armv5tel-linux
  • armv6l-linux
  • armv6l-netbsd
  • armv6l-none
  • armv7a-linux
  • armv7a-netbsd
  • armv7l-linux
  • armv7l-netbsd
  • avr-none
  • i686-cygwin
  • i686-freebsd
  • i686-genode
  • i686-linux
  • i686-netbsd
  • i686-none
  • i686-openbsd
  • i686-windows
  • javascript-ghcjs
  • loongarch64-linux
  • m68k-linux
  • m68k-netbsd
  • m68k-none
  • microblaze-linux
  • microblaze-none
  • microblazeel-linux
  • microblazeel-none
  • mips-linux
  • mips-none
  • mips64-linux
  • mips64-none
  • mips64el-linux
  • mipsel-linux
  • mipsel-netbsd
  • mmix-mmixware
  • msp430-none
  • or1k-none
  • powerpc-linux
  • powerpc-netbsd
  • powerpc-none
  • powerpc64-linux
  • powerpc64le-linux
  • powerpcle-none
  • riscv32-linux
  • riscv32-netbsd
  • riscv32-none
  • riscv64-linux
  • riscv64-netbsd
  • riscv64-none
  • rx-none
  • s390-linux
  • s390-none
  • s390x-linux
  • s390x-none
  • vc4-none
  • wasm32-wasi
  • wasm64-wasi
  • x86_64-cygwin
  • x86_64-darwin
  • x86_64-freebsd
  • x86_64-genode
  • x86_64-linux
  • x86_64-netbsd
  • x86_64-none
  • x86_64-openbsd
  • x86_64-redox
  • x86_64-solaris
  • x86_64-windows