MyNixOS website logo
Description

Run Interruptible Code Asynchronously.

Takes an R expression and returns a job object with a $stop() method which can be called to terminate the background job. Also provides timeouts and other mechanisms for automatically terminating a background job. The result of the expression is available synchronously via $result or asynchronously with callbacks or through the 'promises' package framework.

jobqueue

cran conda downloads dev covr

The goals of jobqueue are to:

  • Run jobs in parallel on background processes.
  • Allow jobs to be stopped at any point.
  • Process job results with asynchronous callbacks.

Installation

# Install the latest stable version from CRAN:
install.packages("jobqueue")

# Or the development version from GitHub:
install.packages("pak")
pak::pak("cmmr/jobqueue")

Example

library(jobqueue)

jq <- jobqueue()

job <- jq$run({ paste('Hello', 'world!') })
job$result
#> [1] "Hello world!"

Asynchronous Callbacks

Main articles: vignette('results') and vignette('hooks')

job <- jq$run(
  expr  = { 42 }, 
  hooks = list(
    'created' = ~{ message("We're uid '", .$uid, "'.") },
    '*'       = ~{ message('  - ', .$state) }))
#> We're uid 'J2'.
#>   - created
#>   - submitted
#>   - queued
#>   - starting
#>   - running
#>   - done

job$on('done', ~{ message('result = ', .$result) })
#> result = 42

Converting to Promises

job      <- jq$run({ 3.14 })
callback <- function (result) message('resolved with: ', result)

job %...>% callback
#> resolved with: 3.14

job %>% then(callback)
#> resolved with: 3.14

as.promise(job)$then(callback)
#> resolved with: 3.14

See also https://rstudio.github.io/promises/

Shiny Integration

function(input, output, session) {
  output$plot <- renderPlot({
    jq$run({ read.table(url) }, list(url = input$url)) %...>%
      head(input$n) %...>%
      plot()
  })
}

See also https://rstudio.github.io/promises/articles/promises_06_shiny.html

Stopping Jobs

When a running job is stopped, the background process for it is terminated. Terminated background process are automatically replaced by new ones.

Stopped jobs will return a condition object of class 'interrupt' as their result.

Main article: vignette('stops')

Manually

job <- jq$run({ Sys.sleep(2); 'Zzzzz' })
job$stop()
job$result
#> <interrupt: job stopped by user>

A custom message can also be given, e.g. job$stop('my reason'), which will be returned in the condition object.

Runtime Limits

job <- jq$run({ Sys.sleep(2); 'Zzzzz' }, timeout = 0.2)
job$result
#> <interrupt: total runtime exceeded 0.2 seconds>

Limits (in seconds) can be set on:

  • the total 'submitted' to 'done' time: timeout = 2
  • on a per-state basis: timeout = list(queued = 1, running = 2)
  • or both: timeout = list(total = 3, queued = 2, running = 2)

Stop ID

New jobs will replace existing jobs with the same stop_id.

job1 <- jq$run({ Sys.sleep(1); 'A' }, stop_id = 123)
job2 <- jq$run({ 'B' },               stop_id = 123)
job1$result
#> <superseded: duplicated stop_id>
job2$result
#> [1] "B"

Copy ID

New jobs will mirror the output of existing jobs with the same copy_id.

job1 <- jq$run({ Sys.sleep(1); 'A' }, copy_id = 456)
job2 <- jq$run({ 'B' },               copy_id = 456)
job1$result
#> [1] "A"
job2$result
#> [1] "A"

Variables

Main article: vignette('eval')

jq2  <- jobqueue(globals = list(G = 8))
expr <- quote(c(x = x , y = y, G = G))
job  <- jq2$run(expr, vars = list(x = 10, y = 2))

dput(job$result)
#> c(x = 10, y = 2, G = 8)

jq2$stop()
Metadata

Version

1.7.0

License

Unknown

Platforms (75)

    Darwin
    FreeBSD
    Genode
    GHCJS
    Linux
    MMIXware
    NetBSD
    none
    OpenBSD
    Redox
    Solaris
    WASI
    Windows
Show all
  • aarch64-darwin
  • aarch64-freebsd
  • aarch64-genode
  • aarch64-linux
  • aarch64-netbsd
  • aarch64-none
  • aarch64-windows
  • aarch64_be-none
  • arm-none
  • armv5tel-linux
  • armv6l-linux
  • armv6l-netbsd
  • armv6l-none
  • armv7a-linux
  • armv7a-netbsd
  • armv7l-linux
  • armv7l-netbsd
  • avr-none
  • i686-cygwin
  • i686-freebsd
  • i686-genode
  • i686-linux
  • i686-netbsd
  • i686-none
  • i686-openbsd
  • i686-windows
  • javascript-ghcjs
  • loongarch64-linux
  • m68k-linux
  • m68k-netbsd
  • m68k-none
  • microblaze-linux
  • microblaze-none
  • microblazeel-linux
  • microblazeel-none
  • mips-linux
  • mips-none
  • mips64-linux
  • mips64-none
  • mips64el-linux
  • mipsel-linux
  • mipsel-netbsd
  • mmix-mmixware
  • msp430-none
  • or1k-none
  • powerpc-netbsd
  • powerpc-none
  • powerpc64-linux
  • powerpc64le-linux
  • powerpcle-none
  • riscv32-linux
  • riscv32-netbsd
  • riscv32-none
  • riscv64-linux
  • riscv64-netbsd
  • riscv64-none
  • rx-none
  • s390-linux
  • s390-none
  • s390x-linux
  • s390x-none
  • vc4-none
  • wasm32-wasi
  • wasm64-wasi
  • x86_64-cygwin
  • x86_64-darwin
  • x86_64-freebsd
  • x86_64-genode
  • x86_64-linux
  • x86_64-netbsd
  • x86_64-none
  • x86_64-openbsd
  • x86_64-redox
  • x86_64-solaris
  • x86_64-windows