This is a reference vignette of the package’s core functionality. Other package vignettes cover additional features.
mirai (Japanese for ‘future’) implements the concept of futures in R.
Futures are an abstraction that represent the result of code evaluation that will be available at some point in the future. The actual code evaluation is sent to and performed in a separate R process (daemon), and the result is sent back to the main (host) process when it completes.
The package has one main function: mirai()
to create a mirai object.
This function returns almost immediately, and is never blocking. This is the essence of async: whilst the mirai evaluation is ongoing on the daemon, the host R process is free to continue.
As the mirai expression is sent to another process, it must be self-contained. This means that any functions and variables used in it must be available in that process as well. This requires that:
::
, or library()
calls be made within the expression....
argument, to be sent along to the daemon.A mirai is either unresolved if the result has yet to be received, or resolved if it has. unresolved()
is helper function to check the state of a mirai.
For a mirai m
, the result is available at m$data
once it has resolved.
If successful, this will be the return value of the evaluated expression.
If the expression errored or caused the process to crash then this will be an ‘errorValue’ instead.
See the section Errors in a mirai below.
Rather than repeatedly checking unresolved(m)
, it is more efficient to wait for and collect its value by using m[]
.
When a developer or code author writes a mirai()
call, they should not be concerned about where or how execution of that code actually happens.
It is simply meant to be executed on the resources that are available to it at the time it is run.
Instead, it is for the end-user running the code to declare the resources available for evaluating mirai calls. This is done using the package’s other main function: daemons()
.
If daemons have not been set, each mirai()
call will by default create a new local background process (ephemeral daemon) on which to perform its evaluation.
Instead, daemons()
sets up persistent daemons on which to evaluate mirai expressions.
How to set up and launch daemons is covered in sections below, starting with local daemons.
Multiple long computes (model fits etc.) can be performed in parallel on available computing cores.
The following mimics an expensive calculation that eventually returns a random value.
library(mirai)
args <- list(time = 2L, mean = 4)
m <- mirai(
{
Sys.sleep(time)
rnorm(5L, mean)
},
time = args$time,
mean = args$mean
)
m
#> < mirai [] >
m$data
#> 'unresolved' logi NA
unresolved(m)
#> [1] TRUE
# Do other stuff
collect_mirai(m)
#> [1] 4.393826 3.289580 3.218941 2.785651 4.276051
m[]
#> [1] 4.393826 3.289580 3.218941 2.785651 4.276051
For easy programmatic use of mirai()
, ‘.expr’ accepts a pre-constructed language object, and also a list of named arguments passed via ‘.args’.
So, the following would be equivalent to the above:
expr <- quote({Sys.sleep(time); rnorm(5L, mean)})
args <- list(time = 2L, mean = 4)
m1 <- mirai(.expr = expr, .args = args)
m2 <- mirai(.expr = expr, .args = args)
m1[]
#> [1] 4.573154 4.507001 2.195403 2.917958 3.160021
m2[]
#> [1] 2.384668 4.595920 2.856312 4.273929 4.119159
By running the above two calculations in parallel, they take roughly half the time as running sequentially (minus a relatively inconsequential parallelization overhead).
Problem: high-frequency real-time data cannot be written to file/database synchronously without disrupting the execution flow of ingesting the data.
Solution: cache data in memory and use mirai()
to perform periodic write operations asynchronously from a separate process.
Below, ‘.args’ is used to pass environment()
, which is the calling environment.
This provides a convenient method of passing in existing objects, as as the x
and file
arguments to the write.cv.async()
function.
library(mirai)
write.cv.async <- function(x, file) {
mirai(write.csv(x, file), .args = environment())
}
m <- write.cv.async(x = rnorm(1e6), file = tempfile())
while (unresolved(m)) {
cat("Writing file...\n")
Sys.sleep(0.5)
}
#> Writing file...
#> Writing file...
cat("Write complete:", is.null(m$data))
#> Write complete: TRUE
Code that can potentially fail is isolated in a separate process to ensure continued uptime.
As part of a data science / machine learning pipeline, iterations of model training may periodically fail for stochastic reasons (e.g. problematic graphics cards memory management).
Running each iteration in a mirai isolates this potentially problematic code such that it doesn’t bring down the entire pipeline, even if it fails.
library(mirai)
run_iteration <- function(i) {
# simulates a stochastic error rate
if (runif(1) < 0.1) stop("random error\n", call. = FALSE)
sprintf("iteration %d successful\n", i)
}
for (i in 1:10) {
m <- mirai(run_iteration(i), environment())
while (is_error_value(m[])) {
cat(m$data)
m <- mirai(run_iteration(i), environment())
}
cat(m$data)
}
#> Error: random error
#> iteration 1 successful
#> iteration 2 successful
#> iteration 3 successful
#> iteration 4 successful
#> iteration 5 successful
#> iteration 6 successful
#> iteration 7 successful
#> Error: random error
#> iteration 8 successful
#> iteration 9 successful
#> iteration 10 successful
By testing the return value of each mirai for errors, error-handling code is able to automate recovery and re-attempts, as above. The result is a resilient and fault-tolerant pipeline that minimizes downtime by eliminating interruptions of long computes.
If execution in a mirai fails, the error message is returned as a character string of class ‘miraiError’ and ‘errorValue’ to facilitate debugging.
is_mirai_error()
may be used to test for mirai execution errors.
m1 <- mirai(stop("occurred with a custom message", call. = FALSE))
m1[]
#> 'miraiError' chr Error: occurred with a custom message
m2 <- mirai(mirai::mirai())
m2[]
#> 'miraiError' chr Error in mirai::mirai(): missing expression, perhaps wrap in {}?
is_mirai_error(m2$data)
#> [1] TRUE
is_error_value(m2$data)
#> [1] TRUE
A full stack trace of evaluation within the mirai is recorded and accessible at $stack.trace
on the error object.
f <- function(x) if (x > 0) stop("positive")
m3 <- mirai({f(-1); f(1)}, f = f)
m3[]
#> 'miraiError' chr Error in f(1): positive
m3$data$stack.trace
#> [[1]]
#> stop("positive")
#>
#> [[2]]
#> f(1)
Elements of the original error condition are also accessible via $
on the error object.
For example, additional metadata recorded by rlang::abort()
is preserved:
f <- function(x) if (x > 0) stop("positive")
m4 <- mirai(rlang::abort("aborted", meta_uid = "UID001"))
m4[]
#> 'miraiError' chr Error: aborted
m4$data$meta_uid
#> [1] "UID001"
If a daemon instance is sent a user interrupt, the mirai will resolve to an object of class ‘miraiInterrupt’ and ‘errorValue’.
is_mirai_interrupt()
may be used to test for such interrupts.
m4 <- mirai(rlang::interrupt()) # simulates a user interrupt
is_mirai_interrupt(m4[])
#> [1] TRUE
If execution of a mirai surpasses the timeout set via the ‘.timeout’ argument, the mirai will resolve to an ‘errorValue’ of 5L (timed out). This can, amongst other things, guard against mirai processes that have the potential to hang and never return.
m5 <- mirai(nanonext::msleep(1000), .timeout = 500)
m5[]
#> 'errorValue' int 5 | Timed out
is_mirai_error(m5$data)
#> [1] FALSE
is_mirai_interrupt(m5$data)
#> [1] FALSE
is_error_value(m5$data)
#> [1] TRUE
is_error_value()
tests for all mirai execution errors, user interrupts and timeouts.
Daemons, or persistent background processes, may be set to receive ‘mirai’ requests.
Daemons inherit the default system configuration and read in the relevant ‘.Renviron’ and ‘.Rprofile’ etc. on startup. They also load the default packages. To instead only load the
base
package (which cuts out more than half of R’s startup time), the environment variableR_SCRIPT_DEFAULT_PACKAGES=NULL
may be set prior to launching daemons.
Call daemons()
specifying the number of daemons to launch.
daemons(6)
#> [1] 6
The default dispatcher = TRUE
creates a dispatcher()
background process that connects to individual daemon processes on the local machine.
This ensures that tasks are dispatched efficiently on a first-in first-out (FIFO) basis to daemons for processing.
Tasks are queued at dispatcher and sent to a daemon as soon as it can accept the task for immediate execution.
Dispatcher employs an event-driven approach that is efficient both in terms of consuming no resources while waiting, whilst also being fully synchronized with events.
To view the current status, status()
provides:
waiting
number of tasks queued for execution at dispatcherassigned
number of tasks sent to a daemon for executioncomplete
number of tasks for which the result has been received (either completed or cancelled)status()
#> $connections
#> [1] 6
#>
#> $daemons
#> [1] "abstract://5fbc0d90e04a443f07687f1e"
#>
#> $mirai
#> awaiting executing completed
#> 0 0 0
daemons(0)
#> [1] 0
Set the number of daemons to zero to reset. This reverts to the default of creating a new background process for each ‘mirai’ request.
Alternatively, specifying dispatcher = FALSE
, the background daemons connect directly to the host process.
daemons(6, dispatcher = FALSE)
#> [1] 6
This means that tasks are sent immediately in a round-robin fashion, which ensures that they are evenly-distributed amongst daemons. This does not however guarantee optimal scheduling, as the duration of tasks cannot be known a priori. As an example, tasks could be queued at a daemon behind a long-running task, whilst other daemons are idle having already completed their tasks.
The advantage of this approach is that it is resource-light and does not require an additional dispatcher process. It is suited to working with similar-length tasks, or where concurrent tasks typically do not exceed available daemons.
Requesting the status now shows 6 connections, along with the host URL:
status()
#> $connections
#> [1] 6
#>
#> $daemons
#> [1] "abstract://8e8fccde572178a673639aac"
everywhere()
may be used to evaluate an expression on all connected daemons and persist the resultant state, regardless of a daemon’s ‘cleanup’ setting.
everywhere(library(DBI))
The above keeps the DBI
package loaded for all evaluations.
Other types of setup task may also be performed, including making a common resource available, such as a database connection:
everywhere(con <<- dbConnect(RSQLite::SQLite(), file), file = tempfile())
By super-assignment, the conenction ‘con’ will be available in the global environment of all daemon instances. Subsequent mirai calls may then make use of ‘con’.
mirai(exists("con"))[]
#> [1] TRUE
Disconnect from the database everywhere:
everywhere(dbDisconnect(con))
Sometimes it may be necessary to evaluate an expression in the global environment of each daemon. As mirai evaluation does not occur in the global environment itself, but one inheriting from it, an explicit call to
evalq(envir = .GlobalEnv)
achieves this. An example use case isbox::use()
to import a module or package:
everywhere(
evalq(
box::use(dplyr[select], mirai[...]),
envir = .GlobalEnv
)
)
daemons(0)
#> [1] 0
daemons()
has a with()
method, which evaluates an expression with daemons created for the duration of the expression and automatically torn down upon completion.
It was originally designed for running a Shiny app with the desired number of daemons, as in the example below:
with(daemons(4), shiny::runApp(app))
Note: it is assumed the app is already created. Wrapping a call to
shiny::shinyApp()
would not work asrunApp()
is implicitly called when the app is printed, however printing occurs only afterwith()
has returned, hence the app would run outside of the scope of thewith()
statement.
In the case of a Shiny app, all mirai calls will be executed before the app returns as the app itself is blocking. In the case of other expressions, be sure to call the results (or collect the values) of all mirai within the expression to ensure that they all complete before the daemons are torn down.
If specifying a compute profile for the daemons()
call, all calls with .compute = NULL
within the with()
clause will default to this compute profile.
The daemons interface may also be used to send tasks for computation to remote daemon processes on the network.
Call daemons()
specifying ‘url’ as a character string such as: ‘tcp://10.75.32.70:5555’ at which daemon processes should connect.
Alternatively, use host_url()
to automatically construct a valid URL.
The host (or dispatcher) listens at this address, utilising a single port.
IPv6 addresses are also supported and must be enclosed in square brackets
[]
to avoid confusion with the final colon separating the port. For example, port 5555 on the IPv6 address::ffff:a6f:50d
would be specified astcp://[::ffff:a6f:50d]:5555
.
For options on actually launching the daemons, please see the next section.
Below, calling host_url()
without a port value uses the default of ‘0’.
This is a wildcard value that will automatically assigns a free ephemeral port:
daemons(url = host_url())
#> [1] 0
The actual assigned port may be queried at any time via status()
:
status()
#> $connections
#> [1] 0
#>
#> $daemons
#> [1] "tcp://192.168.1.71:36673"
#>
#> $mirai
#> awaiting executing completed
#> 0 0 0
The number of daemons connected at any time may be dynamically scaled up or down, according to requirements.
To reset all connections and revert to default behaviour:
daemons(0)
#> [1] 0
Closing the connection causes all connected daemons to exit automatically. If using dispatcher, it will cause dispatcher to exit, and in turn all connected daemons when their respective connections with the dispatcher are terminated.
To launch remote daemons, supply a remote launch configuration to the ‘remote’ argument of daemons()
, or launch_remote()
at any time thereafter.
There are currently 3 options for generating remote launch configurations:
ssh_config()
where there is SSH access to the remote machine.cluster_config()
to use HPC cluster resource managers such as Slurm, SGE, Torque/PBS and LSF.remote_config()
for a generic, flexible method that caters for other custom launchers.The return value of all of these functions is a simple list. This means that they may be pre-constructed, saved and re-used whenever the same configuration is required.
This method is appropriate for internal networks and in trusted, properly-configured environments where it is safe for your machine to accept incoming connections on certain ports. In the examples below, the remote daemons connect back directly to port 5555 on the local machine.
In these cases, using TLS is often desirable to provide additional security to the connections.
The first example below launches 4 daemons on the machine 10.75.32.90 (using the default SSH port of 22 as this was not specified), connecting back to the host URL:
daemons(
n = 4,
url = host_url(tls = TRUE, port = 5555),
remote = ssh_config("ssh://10.75.32.90")
)
The second example below launches one daemon on each of 10.75.32.90 and 10.75.32.91 using the custom SSH port of 222:
daemons(
n = 1,
url = host_url(tls = TRUE, port = 5555),
remote = ssh_config(c("ssh://10.75.32.90:222", "ssh://10.75.32.91:222"))
)
Use SSH tunnelling to launch daemons on any machine you are able to access via SSH, whether on the local network or the cloud. SSH key-based authentication must already be in place, but no other configuration is required.
This provides a convenient way to launch remote daemons without them needing to directly access the host. Firewall configurations or security policies often prevent opening a port to accept outside connections. In these cases, SSH tunnelling creates a tunnel once the initial SSH connection is made. For simplicity, the implementation in mirai uses the same tunnel port on both the host and daemon.
To use tunnelling, supply a URL with hostname of ‘127.0.0.1’ to ‘url’ for the daemons()
call.
local_url(tcp = TRUE)
does this for you.For example, if local_url(tcp = TRUE, port = 5555)
is specified, the tunnel is created using port 5555 on each machine.
The host listens to 127.0.0.1:5555
on its side, and the daemons each dial into 127.0.0.1:5555
on their own respective machines.
The below example launches 2 daemons on the remote machine 10.75.32.90 using SSH tunnelling:
daemons(
n = 2,
url = local_url(tcp = TRUE),
remote = ssh_config("ssh://10.75.32.90", tunnel = TRUE)
)
cluster_config()
may be used to deploy daemons using a cluster resource manager / scheduler.
command
. This should be:"sbatch"
if using Slurm"qsub"
if using SGE / Torque / PBS"bsub"
if using LSF.options
are any options that you would normally supply in a shell script to pass to the scheduler. These are script lines typically preceded by a #
. Slurm: "#SBATCH --job-name=mirai
#SBATCH --mem=10G
#SBATCH --output=job.out"
SGE: "#$ -N mirai
#$ -l mem_free=10G
#$ -o job.out"
Torque/PBS: "#PBS -N mirai
#PBS -l mem=10gb
#PBS -o job.out"
LSF: "#BSUB -J mirai
#BSUB -M 10000
#BSUB -o job.out"
\n
to denote a newline.#!/bin/bash
is not required.module load R
or for a specific R version:
module load R/4.5.0
rscript
defaults to "Rscript"
, which assumes that R is on the file search path.
This may be substituted for the full path to a specific R executable, such as that returned by file.path(R.home("bin"), "Rscript")
.remote_config()
provides a generic, flexible framework for running any shell command that may be used to deploy daemons.
Conceptually, this function takes an args
argument, which must contain “.”. The correctly-configured call to daemon()
is substituted in for this “.”, so that command
is run with this as one of its arguments.
This can provide an alternative for cluster resource managers in certain cases, although cluster_config()
provides an easier and more complete interface. Using Slurm as an example, the following uses sbatch
to launch a daemon on the cluster, with some additional Slurm options passed via command line arguments to sbatch
:
daemons(
n = 2,
url = host_url(),
remote = remote_config(
command = "sbatch",
args = c("--mem 512", "-n 1", "--wrap", "."),
rscript = file.path(R.home("bin"), "Rscript"),
quote = TRUE
)
)
As an alternative to automated launches, calling launch_remote()
without specifying ‘remote’ may be used to return the shell commands for deploying daemons manually.
The printed return values may then be copy / pasted directly to a remote machine e.g. via a terminal session.
daemons(url = host_url())
#> [1] 0
launch_remote()
#> [1]
#> Rscript -e 'mirai::daemon("tcp://192.168.1.71:36635")'
daemons(0)
#> [1] 0
TLS provides a robust solution for securing communications from the local machine to remote daemons.
Simply specify a secure URL using the scheme tls+tcp://
when setting daemons, or use host_url(tls = TRUE)
, for example:
daemons(url = host_url(tls = TRUE))
#> [1] 0
Single-use keys and certificates are automatically generated and configured, without requiring any further intervention. The private key is always retained on the host machine and never transmitted.
The generated self-signed certificate is available via launch_remote()
, where it is included as part of the shell command for manually launching a daemon on a remote machine.
launch_remote(1)
#> [1]
#> Rscript -e 'mirai::daemon("tls+tcp://192.168.1.71:33169",tls=c("-----BEGIN CERTIFICATE-----
#> MIIFPzCCAyegAwIBAgIBATANBgkqhkiG9w0BAQsFADA3MRUwEwYDVQQDDAwxOTIu
#> MTY4LjEuNzExETAPBgNVBAoMCE5hbm9uZXh0MQswCQYDVQQGEwJKUDAeFw0wMTAx
#> MDEwMDAwMDBaFw0zMDEyMzEyMzU5NTlaMDcxFTATBgNVBAMMDDE5Mi4xNjguMS43
#> MTERMA8GA1UECgwITmFub25leHQxCzAJBgNVBAYTAkpQMIICIjANBgkqhkiG9w0B
#> AQEFAAOCAg8AMIICCgKCAgEA2+nAp9A8/TT/cMlt+iB0/if1x7alMqlWl3jJ5XW7
#> cq0Qu12MkPVHx5uTY3e7l1ln0ll9sBfSz4OycKrdnxCpJ7/B+4CjBmhM7cex0xpU
#> KvYOeHX4lVqxVA2zfTi5P+/2Iwl315h8dvGuUGnEkmoT28lbYGYN4xZ6X2ZxZDhZ
#> loVo88T9QpRMGLb9gLAq7w2/jBGievgwAJGrFRFjt1M5DCVkXMIcXhIihCs3YC2M
#> 0duXIdRAeAp6coqEIfHWoDuu19CpeWRi9CY9jqS2UbywTvf4MqLPMZ3T3j5WjCQ0
#> c3fkxx0zsU9k8ouejz6O3rHM++X4wFLU9OuDF7eldJixrTjNrrKNJmANu1zvSdq6
#> VvSVimmh6mZfqHTTi3ncv/fQLl80YBt6VMtxB3nxmYFS6KW+xaaxFBugjqOCTn5S
#> GFtM4v40rGQeFnAdRMl8G0fPjS3tNYrWKCOnpQJuR/9tsUsMnlcn1S1QIcZRImYY
#> 004fJw4+rMnq7dULJFwjiuhZqm5OYqjHrU6BGwLgr0Iy+1CcHC1JegbL3OGuictl
#> u1BQZuv6w/cSEXgDqu3NclVoVaVmRQaEP5qzMJNO4sqGqtkklESaz58cwvXAb76g
#> mhaoJJfscuPVz1pE+jROdDxx1Qj9PNfoHV2TzmliV+XqVfgnuunUXiGLMlFJooCZ
#> oh0CAwEAAaNWMFQwEgYDVR0TAQH/BAgwBgEB/wIBADAdBgNVHQ4EFgQU/Zegy99x
#> ZnJ0iNaLHAURTqA2cN8wHwYDVR0jBBgwFoAU/Zegy99xZnJ0iNaLHAURTqA2cN8w
#> DQYJKoZIhvcNAQELBQADggIBAM7cgcvAdxa6liNn09XmH4x1VPmUKJY6qEYCQX5F
#> Hav5YGG3WmDhgNV2tB85e4ilPTx1IpFmsEVNzGPynMzRUXdZSa1t+TZfH+FN+gHS
#> OZb7RaTgX/K9E8zo7pKBQcMTIPD+5d/nrLRhUcWpH/vFCdk6l4fAC1BgqILv2fR6
#> z4Wp5TMKt52r9WH0xiHZifLIS+8CGZ2JwTrFHhJpmvJvaZwGzB6I7+/qVf8iwXXx
#> VZHG2kABT9odDTp51OUTMITSTglTprKExQmRjn/LIMAsa4Lm/Ja4GXkelOCcUwh5
#> vD4bzgCKR7im088UFzqV6MLAgwqHsWTw9EhyunLHq94knz0Cnya7zeaepSR/JcRT
#> uVeVtdRfQo023xFzMWNBRHulaKbipzjHDu6dcK4zvIdVYUc7AZg3r9naptHNPt/0
#> 17yCbW6unJuHwAxErxy/4kvHq0fnyGL+9cfEl+ng/iPfA4wNuY1m6H9reXQ++f3t
#> +LKO1YUXImA+IQZjOX0gGfuG2d1aGXRXFmgdrbZPjHyylEn7pL6lGtvdd6Wqo5z4
#> mS4BfPhC8QmT3MIJxg3a99aJs7F7munN2ragFIJlJuePWmwvy93XSFuZS3KlPLOU
#> aC/yT0GQWsMQ/mcieLcsq4jv/Yq3aTFTPDUBhII2vcc8hdQESDi0uK92CAgyQo92
#> 0OEj
#> -----END CERTIFICATE-----
#> ",""))'
daemons(0)
#> [1] 0
As an alternative to the zero-configuration default, a certificate may also be generated via a Certificate Signing Request (CSR) to a Certificate Authority (CA). The CA may be a public CA or internal to an organisation.
-----BEGIN CERTIFICATE-----
and -----END CERTIFICATE-----
. Make sure to request the certificate in the PEM format. If only available in other formats, the TLS library used should usually provide conversion utilities.-----BEGIN PRIVATE KEY-----
and -----END PRIVATE KEY-----
.daemons()
.cert
and key
respectively, then the ‘tls’ argument may be specified as the character vector c(cert, key)
.daemon()
or launch_remote()
.-----BEGIN CERTIFICATE-----
and -----END CERTIFICATE-----
markers. The first one should be the newly-generated TLS certificate, the same supplied to daemons()
, and the final one should be a CA root certificate.certchain
, then the character vector comprising this and an empty character string c(certchain, "")
may be supplied to the relevant ‘tls’ argument.daemons()
has a .compute
argument to specify separate sets of daemons (compute profiles) that operate totally independently. This is useful for managing tasks with heterogeneous compute requirements:
Simply pass a character string to .compute
to use as the profile name (which, if NULL
, is ‘default’).
The daemons settings are saved under the named profile.
To create a ‘mirai’ task using a specific compute profile, specify the .compute
argument to mirai()
, which uses the ‘default’ compute profile if this is NULL
.
Similarly, functions such as status()
, launch_local()
or launch_remote()
should be specified with the desired .compute
argument.