webqueue

cran conda downloads dev covr

The goal of webqueue is to process HTTP requests on interruptible background processes.

Use cases:

Installation

# Install the latest stable version from CRAN:
install.packages("webqueue")

# Or the development version from GitHub:
install.packages("pak")
pak::pak("cmmr/webqueue")

Example

library(webqueue)

wq <- WebQueue$new(~{ 'Hello world!\n' })

readLines('http://localhost:8080')
#> [1] "Hello world!"

wq$stop()

Query Parameters

wq <- WebQueue$new(~{ jsonlite::toJSON(.$ARGS) })

cat(RCurl::getURL('http://localhost:8080?myvar=123'))
#> {"myvar":["123"]}

wq$stop()

Accepts both GET and POST parameters.

Simple API

wq <- WebQueue$new(
  handler = function (req) {
    switch(
      EXPR = req$PATH_INFO,
      '/date' = date(),
      '/req'  = ls.str(as.list(req)),
      '/args' = req$ARGS,
      '/cpus' = as.numeric(parallelly::availableCores()),
      404L )
})

cat(RCurl::getURL('http://localhost:8080/cpus'))
#> 6

cat(RCurl::getURL('http://localhost:8080/req?x=123&id=ABC'))
#> ARGS : List of 2
#>  $ x : chr "123"
#>  $ id: chr "ABC"
#> COOKIES :  Named list()
#> HEADERS : List of 1
#>  $ host: chr "localhost:8080"
#> PATH_INFO :  chr "/req"
#> REMOTE_ADDR :  chr "127.0.0.1"
#> REQUEST_METHOD :  chr "GET"
#> SERVER_NAME :  chr "127.0.0.1"
#> SERVER_PORT :  chr "8080"

wq$stop()

Interrupting Requests

The strength of webqueue is its ability to cleanly abort a request at any point.

See vignette('interrupts') for more detailed examples.

Set a time limit

wq <- WebQueue$new(
  handler = ~{ Sys.sleep(.$ARGS$s); 'Hello world!' }, 
  timeout = 1 )

RCurl::getURL('http://localhost:8080?s=2')
#> [1] "timeout: total runtime exceeded 1 second\n"

RCurl::getURL('http://localhost:8080?s=0')
#> [1] "Hello world!"

wq$stop()

Merge duplicate requests

wq <- WebQueue$new(
  handler = function (req) { Sys.sleep(1); req$ARGS$x }, 
  copy_id = function (job) job$req$PATH_INFO )
# ^^^^^^^   `copy_id` will be '/a' or '/b'

# Fetch two URLs at the same time. '/b' path is merged.
jq <- jobqueue::Queue$new(workers = 3L)$wait()   # vv
a1 <- jq$run({ RCurl::getURL('http://localhost:8080/a?x=first') })
b1 <- jq$run({ RCurl::getURL('http://localhost:8080/b?x=second') })
b2 <- jq$run({ RCurl::getURL('http://localhost:8080/b?x=third') })

dput(c(a1 = a1$result, b1 = b1$result, b2 = b2$result))
#> c(a1 = "first", b1 = "second", b2 = "second")

jq$stop()
wq$stop()

Stop duplicate requests

wq <- WebQueue$new(
  handler = function (req) { Sys.sleep(1); req$ARGS$x }, 
  stop_id = function (job) job$req$PATH_INFO )
# ^^^^^^^   `stop_id` will be '/a' or '/b'

# Fetch three URLs at the same time. '/b' path is stopped.
jq <- jobqueue::Queue$new(workers = 3L)$wait()   # vv
a1 <- jq$run({ RCurl::getURL('http://localhost:8080/a?x=first') })
b1 <- jq$run({ RCurl::getURL('http://localhost:8080/b?x=second') })
b2 <- jq$run({ RCurl::getURL('http://localhost:8080/b?x=third') })

dput(c(a1 = a1$result, b1 = b1$result, b2 = b2$result))
#> c(a1 = "first", b1 = "superseded: duplicated stop_id\n", b2 = "third")

jq$stop()
wq$stop()