MyNixOS website logo
Description

Streaming Postgres bindings.

Elsen Accelerated Computing Engine

pgstream

Streaming of Postgres through the binary protocol into Haskell. Uses attoparsec and some hand-written kernels for array extraction. Results are streamed into vectors or batched into serial or parallel Conduit pipelines for stream composition.

Build Status

Installation

$ cabal install pg_stream.cabal

Usage

Connections

Connections to Postgres are established with the connect function yielding the connection object.

connect :: ConnSettings -> IO PQ.Connection

Connections are specified by a ConnSettings.

creds :: ConnSettings
creds = ConnSettings {_host = "localhost", _dbname="invest", _user="dbadmin"}

Connections are pooled per process. Connection pooling is specified by three parameters.

  • Stripes: Stripe count. The number of distinct sub-pools to maintain. The smallest acceptable value is 1.
  • Keep Alive: Amount of time for which an unused resource is kept open. The smallest acceptable value is 0.5 seconds.
  • Affinity: Maximum number of resources to keep open per stripe. The smallest acceptable value is 1.

The default settings are:

defaultPoolSettings :: PoolSettings
defaultPoolSettings = PoolSettings { _stripes = 1, _keepalive = 10, _affinity = 10 }

Queries

Queries are executed using query for statements that yield result sets or by execute for queries that return a status code.

query :: (FromRow r, ToSQL a) => PQ.Connection -> Query -> a -> IO [r]
execute :: (ToSQL a) => PQ.Connection -> Query -> a -> IO ()

For example:

run :: IO [Row]
run = do
  conn <- connect creds
  query conn sample args

SQL queries are constructed via quasiquoter ([sql| ... |]) which generates a Query (newtype around a bytestring). Values and SQL fragments can be spliced into this template as arguments.

{-# LANGUAGE QuasiQuotes #-}

sample :: Query
sample = [sql|
    SELECT
      deltas.sid AS sid,
      EXTRACT(EPOCH FROM deltas.day) AS day,
      (ohlcs :: float4[])
    FROM deltas
    INNER JOIN security_groupings ON deltas.sid = security_groupings.sid
    INNER JOIN currentprice ON (
      deltas.sid = currentprice.sid
      AND deltas.DAY = currentprice.DAY
      AND currentprice.val BETWEEN 0 AND 500
    )
    WHERE security_groupings.name = 'SP900'
    AND deltas.day BETWEEN TO_TIMESTAMP({1}) AND TO_TIMESTAMP({2})
    ORDER BY deltas.sid,
             deltas.DAY ASC
    {3}
    ;
|]

Arguments

If the types of arguments are constrained by inference then no annotations are necessary. Otherwise annotations are needed to refine the Num/String instances into concrete types so they can be serialized and sent to Postgres.

args :: (Int, Int, SQL)
args = ( 1335855600 , 1336374000 , "LIMIT 100000")

The conversion from Haskell to Postgres types is defined by the FromField/ToField typeclasses with the mapping given by.

PostgresHaskell
int2Int8
int4Int32
int8Int64
float4Float
float8Double
numericScientific
uuidUUID
charChar
textText
dateDay
byteaByteString
boolBool
int4[]Vector Int32
float4[]Vector Float
moneyFixed E3
null aMaybe a

If the result set type is given as Maybe a then any missing value are manifest as Nothing values. And all concrete values are Just. Effectively makes errors from null values used in unchecked logic unrepresentable as any function which consumes a potentially nullable field is forced by the type system to handle both cases.

Streaming

stream :: (FromRow r, ToSQL a, MonadBaseControl IO m, MonadIO m) =>
     PQ.Connection       -- ^ Connection
  -> Query               -- ^ Query
  -> a                   -- ^ Query arguments
  -> Int                 -- ^ Batch size
  -> C.Source m [r]      -- ^ Source conduit

Parallel streams can be composed together Software Transactional Memory (STM) threads to synchronize the polling.

import Database.PostgreSQL.Stream.Parallel

parallelStream ::
  PQ.Connection
  -> (PQ.Connection -> Source (ResourceT IO) a)  -- Source
  -> Sink a (ResourceT IO) ()                    -- Sink
  -> IO ()

Development

$ cabal sandbox init
$ cabal install --only-dependencies
$ cabal build

To attach to the Elsen compute engine:

$ cabal sandbox add-source path_to_tree

Documentation

$ cabal haddock

Legal

Copyright (c) 2015 Elsen Inc. All rights reserved.

Metadata

Version

0.1.0.3

Platforms (75)

    Darwin
    FreeBSD
    Genode
    GHCJS
    Linux
    MMIXware
    NetBSD
    none
    OpenBSD
    Redox
    Solaris
    WASI
    Windows
Show all
  • aarch64-darwin
  • aarch64-genode
  • aarch64-linux
  • aarch64-netbsd
  • aarch64-none
  • aarch64_be-none
  • arm-none
  • armv5tel-linux
  • armv6l-linux
  • armv6l-netbsd
  • armv6l-none
  • armv7a-darwin
  • armv7a-linux
  • armv7a-netbsd
  • armv7l-linux
  • armv7l-netbsd
  • avr-none
  • i686-cygwin
  • i686-darwin
  • i686-freebsd
  • i686-genode
  • i686-linux
  • i686-netbsd
  • i686-none
  • i686-openbsd
  • i686-windows
  • javascript-ghcjs
  • loongarch64-linux
  • m68k-linux
  • m68k-netbsd
  • m68k-none
  • microblaze-linux
  • microblaze-none
  • microblazeel-linux
  • microblazeel-none
  • mips-linux
  • mips-none
  • mips64-linux
  • mips64-none
  • mips64el-linux
  • mipsel-linux
  • mipsel-netbsd
  • mmix-mmixware
  • msp430-none
  • or1k-none
  • powerpc-netbsd
  • powerpc-none
  • powerpc64-linux
  • powerpc64le-linux
  • powerpcle-none
  • riscv32-linux
  • riscv32-netbsd
  • riscv32-none
  • riscv64-linux
  • riscv64-netbsd
  • riscv64-none
  • rx-none
  • s390-linux
  • s390-none
  • s390x-linux
  • s390x-none
  • vc4-none
  • wasm32-wasi
  • wasm64-wasi
  • x86_64-cygwin
  • x86_64-darwin
  • x86_64-freebsd
  • x86_64-genode
  • x86_64-linux
  • x86_64-netbsd
  • x86_64-none
  • x86_64-openbsd
  • x86_64-redox
  • x86_64-solaris
  • x86_64-windows