MyNixOS website logo
Description

Query-Driven Pipeline Execution and Query Templates.

Runs a function iteratively over each row of either a dataframe or the results of a query. Use the 'BigQuery' and 'DBI' wrappers to iteratively pass each row of query results to a function. If a field contains a 'JSON' string, it will be converted to an object. This is helpful for queries that return 'JSON' strings that represent objects. These fields can then be treated as objects by the pipeline.

condusco

Overview

condusco lets you run a function iteratively, passing it the rows of a dataframe or the results of a query.

We call the functions condusco runs pipelines, and define a pipeline as a function that accepts a list of parameters and run a series of customized commands based on the values of the parameters.

The most common use case for condusco are data pipelines. For data pipelines that primarily run SQL queries, we can template queries with a library (ie. whisker), so that parametrized values are separated from the query logic. We can then render the query with the appropriate values:

parameters <- source("params.R")

#define a pipeline
pipeline <- function(parameters){
 query <- "SELECT * FROM {{dataset}}.{{table_prefix}}_results LIMIT {{limit_size}}"
 query_with_params <- whisker.render(query, parameters)
 run_query(query_with_params)
}

# run the pipeline with the parameters in 'params.R'
pipeline(parameters)

condusco provides the following extensions in functionality to the above design pattern:

  • the user can provide a data-frame that contains multiple rows of parameters to be iteratively passed to the pipeline
  • the user can provide a query and each row of results is iteratively passed to the pipeline
  • any JSON-string parameter will be converted to an object before being passed to the pipeline

Functions

functiondescription
run_pipeline(pipeline, parameters)iteratively pass each row of parameters to a pipeline, converting any JSON parameters to objects
run_pipeline_gbq(pipeline, query, project)calls run_pipeline with the results of query executed via bigrquery
run_pipeline_dbi(pipline, query, con)calls run_pipeline with the results of query executed via DBI

Installation

install.packages("condusco")

Features

  • Name-based substitution of local parameters into pipelines, iterating through rows of parameters:

    run_pipeline(
      #the pipeline
      function(parameters){
        query <- "SELECT * FROM {{table_prefix}}_results;"
        print(whisker.render(query,parameters))
      },
      #the parameters
      data.frame(
        table_prefix = c('batman', 'robin')
      )
    )
    
  • Name-based substitution of query-results into pipelines, iterating through rows of parameters dataframe:

    con <- dbConnect(RSQLite::SQLite(), ":memory:")
    
    pipeline <- function(parameters){
    
      query <-"
        SELECT count(*) as n_hits 
        FROM user_hits 
        WHERE date(date_time) BETWEEN date('{{{date_low}}}') AND date('{{{date_high}}}')
      ;"
    
      whisker.render(query,parameters)
    
    }
    
    run_pipeline_dbi(pipeline,
      "SELECT date('now', '-5 days') as date_low, date('now') as date_high",
      con
    )
    
    dbDisconnect(con)
    
  • Dynamic query generation based on JSON strings:

    con <- dbConnect(RSQLite::SQLite(), ":memory:")
    mtcars
    dbWriteTable(con, "mtcars", mtcars)
    
    #for each cylinder count, count the number of top 5 hps it has
    pipeline <- function(swap){
    
      query <- "SELECT
        {{#list}}
          SUM(CASE WHEN hp='{{val}}' THEN 1 ELSE 0 END )as n_hp_{{val}},
        {{/list}}
        cyl
        FROM mtcars
        GROUP BY cyl
      ;"
    
      print(whisker.render(query,swap))
    
      print(
        dbGetQuery(
          con,
          whisker.render(query,swap)
        )
      )
    }
    
    
    #pass the top 5 most common hps as val parameters
    run_pipeline_dbi(
      pipeline,
      '
      SELECT "[" || GROUP_CONCAT("{ ""val"": """ || hp ||  """ }") || "]" AS list
      FROM (
        SELECT 
          CAST(hp as INTEGER) as HP,
          count(hp) as cnt
        FROM mtcars 
        GROUP BY hp
        ORDER BY cnt DESC
        LIMIT 5
      )
      ',
      con
    )
    
    
    dbDisconnect(con)
    

Google BigQuery Examples

This is not available as a vignette because it requires user authentication

library(whisker)
library(bigrquery)
library(condusco)

#Set GBQ project
project <- ''

#Set the following options for GBQ authentication on a cloud instance
options("httr_oauth_cache" = "~/.httr-oauth")
options(httr_oob_default=TRUE)

#Run the below query to authenticate and write credentials to .httr-oauth file
query_exec("SELECT 'foo' as bar",project=project);

Dynamically generated queries via JSON

If list is defined, convert the JSON string to an object and iterate through name1,name2 pairs. This illustrates how to dynamically generate a query based on the JSON constructed by another query. In this example, we create a trivial JSON object manually. We'll use a dynamically generated JSON object in the next example.

pipeline <- function(params){

  query <- "SELECT {{{value}}} as dollars_won,
    {{#list}}
    '{{name1}}' as {{name2}},
    {{/list}}
    {{{field}}} as field
  FROM {{table_name}}
  LIMIT {{limit_size}}
  ;"

  res <- query_exec(whisker.render(query,params),
                    project=project,
                    use_legacy_sql = FALSE
  );
  
  print(res)
}

project

run_pipeline_gbq(pipeline, "
    SELECT 1000 as value,
    'word' as field,
    '[{\"name1\":\"foo\", \"name2\":\"bar\"},{\"name1\":\"foo2\", \"name2\":\"bar2\"}]' as list,
    'publicdata:samples.shakespeare' AS table_name,
    5 AS limit_size
", project)

Feature Generation Query

Create features for each of the repos describing how many commits the top 10 commiters made to that repo.

pipeline <- function(params){

  query <- "
    SELECT
      {{#list}}
        SUM(CASE WHEN author.name ='{{name}}' THEN 1 ELSE 0 END) as n_{{name_clean}},
      {{/list}}
      repo_name
    FROM `bigquery-public-data.github_repos.sample_commits`
    GROUP BY repo_name
  ;"

  res <- query_exec(
    whisker.render(query,params),
    project=project,
    use_legacy_sql = FALSE
  );
  
  print(res)
}

run_pipeline_gbq(pipeline, "
  SELECT CONCAT('[',
  STRING_AGG(
    CONCAT('{\"name\":\"',name,'\",'
      ,'\"name_clean\":\"', REGEXP_REPLACE(name, r'[^[:alpha:]]', ''),'\"}'
    )
  ),
  ']') as list
  FROM (
    SELECT author.name,
      COUNT(commit) n_commits
    FROM `bigquery-public-data.github_repos.sample_commits`
    GROUP BY 1
    ORDER BY 2 DESC
    LIMIT 10
  )
",
project,
use_legacy_sql = FALSE
)

Metadata

Version

0.1.0

License

Unknown

Platforms (75)

    Darwin
    FreeBSD
    Genode
    GHCJS
    Linux
    MMIXware
    NetBSD
    none
    OpenBSD
    Redox
    Solaris
    WASI
    Windows
Show all
  • aarch64-darwin
  • aarch64-genode
  • aarch64-linux
  • aarch64-netbsd
  • aarch64-none
  • aarch64_be-none
  • arm-none
  • armv5tel-linux
  • armv6l-linux
  • armv6l-netbsd
  • armv6l-none
  • armv7a-darwin
  • armv7a-linux
  • armv7a-netbsd
  • armv7l-linux
  • armv7l-netbsd
  • avr-none
  • i686-cygwin
  • i686-darwin
  • i686-freebsd
  • i686-genode
  • i686-linux
  • i686-netbsd
  • i686-none
  • i686-openbsd
  • i686-windows
  • javascript-ghcjs
  • loongarch64-linux
  • m68k-linux
  • m68k-netbsd
  • m68k-none
  • microblaze-linux
  • microblaze-none
  • microblazeel-linux
  • microblazeel-none
  • mips-linux
  • mips-none
  • mips64-linux
  • mips64-none
  • mips64el-linux
  • mipsel-linux
  • mipsel-netbsd
  • mmix-mmixware
  • msp430-none
  • or1k-none
  • powerpc-netbsd
  • powerpc-none
  • powerpc64-linux
  • powerpc64le-linux
  • powerpcle-none
  • riscv32-linux
  • riscv32-netbsd
  • riscv32-none
  • riscv64-linux
  • riscv64-netbsd
  • riscv64-none
  • rx-none
  • s390-linux
  • s390-none
  • s390x-linux
  • s390x-none
  • vc4-none
  • wasm32-wasi
  • wasm64-wasi
  • x86_64-cygwin
  • x86_64-darwin
  • x86_64-freebsd
  • x86_64-genode
  • x86_64-linux
  • x86_64-netbsd
  • x86_64-none
  • x86_64-openbsd
  • x86_64-redox
  • x86_64-solaris
  • x86_64-windows