MyNixOS website logo
Description

R Interface for Google 'Cloud Pub/Sub' REST API.

Provides an easy to use interface to the 'Google Pub/Sub' REST API <https://cloud.google.com/pubsub/docs/reference/rest>.

googlePubsubR

R-CMD-check-ascran testthat codecov

This library offers an easy to use interface for the Google Pub/Sub REST API (docs here).

Not an official Google product.

Setup

You can install the package from CRAN or get the dev version from Github:

install.packages("googlePubsubR")

# Or get the dev version from Github
devtools::install_github("andodet/googlePubsubR@dev")

In order to use the library, you will need:

  • An active GCP project
  • The Pub/Sub API correctly activated
  • JSON credentials for a service account or another method of authentication (e.g token). You can pass the path of the file as an argument to pubsub_auth or setting an GCP_AUTH_FILE env variable.
  • A GCP_PROJECT env variable set with a valid GCP project id. Since 0.0.3, GCP project id can also be set using ps_project_set.

Usage

On a very basic level, the library can be used to publish messages, pull and acknowledge them.
The following example shows how to:

  1. Create topics and subscriptions
  2. Encode a dataframe as a Pub/Sub message
  3. Publish a message
  4. Pull and decode messages from a Pub/Sub subscription
  5. Delete resources
library(googlePubsubR)
library(base64enc)
library(jsonlite)

# Authenticate 
pubsub_auth()

# Create resources
topic_readme <- topics_create("readme-topic")
sub_readme <- subscriptions_create("readme-sub", topic_readme)

# Prepare the message
msg <- mtcars %>%
  toJSON(auto_unbox = TRUE) %>%
  # Pub/Sub expects a base64 encoded string
  msg_encode() %>% 
  PubsubMessage() 

# Publish the message!
topics_publish(msg, topic_readme)

# Pull the message from server
msgs_pull <- subscriptions_pull(sub_readme)

msg_decoded <- msgs_pull$receivedMessages$message$data %>%
  msg_decode() %>% 
  fromJSON()

head(msg_decoded)

# Prints
# mpg cyl disp  hp drat    wt  qsec vs am gear carb
# Mazda RX4         21.0   6  160 110 3.90 2.620 16.46  0  1    4    4
# Mazda RX4 Wag     21.0   6  160 110 3.90 2.875 17.02  0  1    4    4
# Datsun 710        22.8   4  108  93 3.85 2.320 18.61  1  1    4    1
# Hornet 4 Drive    21.4   6  258 110 3.08 3.215 19.44  1  0    3    1
# Hornet Sportabout 18.7   8  360 175 3.15 3.440 17.02  0  0    3    2
# Valiant           18.1   6  225 105 2.76 3.460 20.22  1  0    3    1

# We can acknowledge that the message has been consumed
subscriptions_ack(msgs_pull$receivedMessages$ackId, sub_readme)
# [1] TRUE

# A subsequent pull will return no messages from the server
subscriptions_pull(sub_readme)
# named list()

# Cleanup resources
topics_delete(topic_readme)
subscriptions_delete(sub_readme)

Use cases

The main use-cases for Pub/Sub messaging queue:

  • Stream data into Dataflow pipelines
  • Trigger workflows hosted in Cloud Run or Cloud Functions
  • Expand interactivity in Shiny dashboards (more on this here).
  • Add event driven actions in {plumbr}

Contributing

In order to contribute to googlePubsubR you'll need to go through the following steps:

  1. Set up a GCP project and create a service account with Pub/Sub admin rights.

  2. Download a JSON key for the newly created account. Naming the file .gcp_creds.json and placing it in the package root folder will make it automatically gitignored.

  3. Set up the following env vars (either through a tool like direnv or a .Renviron file).

    GCP_AUTH_FILE=<paht_to_json_auth_file>
    GCP_PROJECT=<gcp_project_id_string>
    
  4. Check everything is set up correctly by running a test run via devtools::test().

Metadata

Version

0.0.4

License

Unknown

Platforms (77)

    Darwin
    FreeBSD
    Genode
    GHCJS
    Linux
    MMIXware
    NetBSD
    none
    OpenBSD
    Redox
    Solaris
    WASI
    Windows
Show all
  • aarch64-darwin
  • aarch64-freebsd
  • aarch64-genode
  • aarch64-linux
  • aarch64-netbsd
  • aarch64-none
  • aarch64-windows
  • aarch64_be-none
  • arm-none
  • armv5tel-linux
  • armv6l-linux
  • armv6l-netbsd
  • armv6l-none
  • armv7a-darwin
  • armv7a-linux
  • armv7a-netbsd
  • armv7l-linux
  • armv7l-netbsd
  • avr-none
  • i686-cygwin
  • i686-darwin
  • i686-freebsd
  • i686-genode
  • i686-linux
  • i686-netbsd
  • i686-none
  • i686-openbsd
  • i686-windows
  • javascript-ghcjs
  • loongarch64-linux
  • m68k-linux
  • m68k-netbsd
  • m68k-none
  • microblaze-linux
  • microblaze-none
  • microblazeel-linux
  • microblazeel-none
  • mips-linux
  • mips-none
  • mips64-linux
  • mips64-none
  • mips64el-linux
  • mipsel-linux
  • mipsel-netbsd
  • mmix-mmixware
  • msp430-none
  • or1k-none
  • powerpc-netbsd
  • powerpc-none
  • powerpc64-linux
  • powerpc64le-linux
  • powerpcle-none
  • riscv32-linux
  • riscv32-netbsd
  • riscv32-none
  • riscv64-linux
  • riscv64-netbsd
  • riscv64-none
  • rx-none
  • s390-linux
  • s390-none
  • s390x-linux
  • s390x-none
  • vc4-none
  • wasm32-wasi
  • wasm64-wasi
  • x86_64-cygwin
  • x86_64-darwin
  • x86_64-freebsd
  • x86_64-genode
  • x86_64-linux
  • x86_64-netbsd
  • x86_64-none
  • x86_64-openbsd
  • x86_64-redox
  • x86_64-solaris
  • x86_64-windows