--- title: "Promises - Shiny and Plumber" vignette: > %\VignetteIndexEntry{Promises - Shiny and Plumber} %\VignetteEngine{litedown::vignette} %\VignetteEncoding{UTF-8} --- ### 1. Event-driven promises `mirai` supplies its own `as.promise()` method, allowing it to be readily converted to a promise from the [`promises`](https://rstudio.github.io/promises/) package. See the vignettes for the promises package for a full treatment. A mirai may be piped directly using the promise pipe `%...>%`, which implicitly calls `as.promise()` on it. Similarly, all promise-aware functions such as `promises::then()` or `shiny::ExtendedTask$new()` accept a mirai directly. Alternatively, a mirai may be explicitly converted to a promise by `as.promise()`, which then allows using the methods `$then()`, `$finally()` etc. directly from the promise object. Whilst normal usage of a mirai involves collecting its value, a promise instead registers an action to be taken as soon as a mirai resolves. This will happen automatically if the R session is idle at the top prompt, or if within a loop or function where `later::run_now()` is being called periodically to service promise actions (e.g. in Shiny). For a mirai promise, the mirai's value is passed to the `onFulfilled` argument of `promises::then()` if no error has occurred, and the mirai's `errorValue` is passed to the `onRejected` argument if an error has occured. Promises converted from mirai are next-generation, event-driven promises, developed in collaboration with Joe Cheng (creator of Shiny). - Promise actions are triggered as soon as each mirai resolves, without time-polling for completion as a `future_promise()` does. - Allows for much higher responsiveness (zero latency) and massive scalability (thousands or even millions of concurrent promises). The following example outputs "hello" to the console after one second when the 'mirai' resolves. ``` r library(mirai) library(promises) p <- mirai({Sys.sleep(1); "hello"}) %...>% cat() p #> ``` It is possible to both access a 'mirai' value at `$data` and to use a promise for enacting a side effect (assigning the value to an environment in the example below). ``` r env <- new.env() m <- mirai({ Sys.sleep(1) "hello" }) promises::then(m, function(x) env$res <- x) m[] #> [1] "hello" ``` After returning to the top level prompt: ```r env$res #> [1] "hello" ``` A `mirai_map` also has an `as.promise()` method. It resolves when the entire map operation completes or at least one mirai in the map is rejected. ### 2. The One Million Promises Challenge The code below is taken from the challenge to launch and collect one million promises. For illustration, the example is scaled down to ten thousand. ``` r library(mirai) daemons(8, dispatcher = FALSE) #> [1] 8 r <- 0 start <- Sys.time() m <- mirai_map(1:10000, \(x) x, .promise = \(x) r <<- r + x) Sys.time() - start #> Time difference of 3.19889 secs later::run_now() r #> [1] 50005000 daemons(0) #> [1] 0 ``` The one million promises challenge took 6 mins 25 secs to complete using an Intel i7 11th gen mobile processor with 16GB RAM. ### 3. Shiny ExtendedTask: Introduction mirai is an asynchronous backend to scale [Shiny](https://shiny.posit.co/) applications. Depending on the options supplied to `daemons()`, mirai tasks may be distributed across parallel processes locally or across the network. Shiny ExtendedTask allows the creation of scalable Shiny apps, which remain responsive intra-session for each user, as well as inter-session for multiple concurrent users. In the example below, the app remains responsive, with the clock continuing to tick whilst the simulated expensive computation is running asynchronously in a parallel process. Also the button is disabled and the plot greyed out until the computation is complete. > The call to `daemons()` is made at the top level, and `onStop()` may be used to automatically shut them down when the app exits. ``` r library(shiny) library(bslib) library(mirai) ui <- page_fluid( p("The time is ", textOutput("current_time", inline = TRUE)), hr(), numericInput("n", "Sample size (n)", 100), numericInput("delay", "Seconds to take for plot", 5), input_task_button("btn", "Plot uniform distribution"), plotOutput("plot") ) server <- function(input, output, session) { output$current_time <- renderText({ invalidateLater(1000) format(Sys.time(), "%H:%M:%S %p") }) task <- ExtendedTask$new( function(...) mirai({Sys.sleep(y); runif(x)}, ...) ) |> bind_task_button("btn") observeEvent(input$btn, task$invoke(x = input$n, y = input$delay)) output$plot <- renderPlot(hist(task$result())) } # run app using 1 local daemon daemons(1) # automatically shutdown daemons when app exits onStop(function() daemons(0)) shinyApp(ui = ui, server = server) ``` *Thanks to Joe Cheng for providing examples on which the above is based.* The key components to using ExtendedTask are: 1. In the UI, use `bslib::input_task_button()`. This is a button which is disabled during computation to prevent additional clicks. ``` r input_task_button("btn", "Plot uniform distribution") ``` 2. In the server, create an ExtendedTask object by calling `ExtendedTask$new()` on an anonymous function passing `...` arguments to `mirai()`, and bind it to the button created in (1). ``` r task <- ExtendedTask$new( function(...) mirai({Sys.sleep(y); runif(x)}, ...) ) |> bind_task_button("btn") ``` 3. In the server, create an observer on the input button, which invokes the ExtendedTask, passing in named arguments to the anonymous function (and hence the mirai) above. ``` r observeEvent(input$btn, task$invoke(x = input$n, y = input$delay)) ``` 4. In the server, create a render function for the output, which consumes the result of the ExtendedTask. ``` r output$plot <- renderPlot(hist(task$result())) ``` ### 4. Shiny ExtendedTask: Cancellation The app below is a demonstration of mirai's cancellation capability. Cancellation is performed in the same way irrespective of where the mirai task may be executing, locally or remotely. It builds on the introductory app by adding a button that sends an infinite sleep extendedTask. This will block execution as we are using a single daemon - any new extendedTasks will be queued behind this never-ending task. There is also a button to cancel that blocking task and allow any queued plots to continue processing. It works by assigning a reference to the mirai created in the `extendedTask$new()` method, which can then be passed to `stop_mirai()`. ``` r library(shiny) library(bslib) library(mirai) ui <- page_fluid( p("The time is ", textOutput("current_time", inline = TRUE)), hr(), numericInput("n", "Sample size (n)", 100), numericInput("delay", "Seconds to take for plot", 5), input_task_button("btn", "Plot uniform distribution"), hr(), p("Click 'block' to suspend execution, and 'cancel' to resume"), input_task_button("block", "Block"), actionButton("cancel", "Cancel block"), hr(), plotOutput("plot") ) server <- function(input, output, session) { output$current_time <- renderText({ invalidateLater(1000) format(Sys.time(), "%H:%M:%S %p") }) task <- ExtendedTask$new( function(...) mirai({Sys.sleep(y); runif(x)}, ...) ) |> bind_task_button("btn") m <- NULL block <- ExtendedTask$new( function() m <<- mirai(Sys.sleep(Inf)) ) |> bind_task_button("block") observeEvent(input$btn, task$invoke(x = input$n, y = input$delay)) observeEvent(input$block, block$invoke()) observeEvent(input$cancel, stop_mirai(m)) observe({ updateActionButton(session, "cancel", disabled = block$status() != "running") }) output$plot <- renderPlot(hist(task$result())) } # run app using 1 local daemon daemons(1) # automatically shutdown daemons when app exits onStop(function() daemons(0)) shinyApp(ui = ui, server = server) ``` *Thanks to Joe Cheng for providing examples on which the above is based.* ### 5. Shiny ExtendedTask: Generative Art The following app produces pretty spiral patterns. The user can add multiple plots, making use of Shiny modules, each having a different calculation time. The plots are generated asynchronously, and it is easy to see the practical limitations of the number of daemons set. For example, if updating 4 plots, and there are only 3 daemons, the 4th plot will not start to be generated until one of the other plots has finished. By wrapping the `runApp()` call in `with(daemons(...), ...)` the daemons are set up for the duration of the app, exiting automatically when the app is stopped. ``` r library(shiny) library(mirai) library(bslib) library(ggplot2) library(aRtsy) # function definitions run_task <- function(calc_time) { Sys.sleep(calc_time) list( colors = aRtsy::colorPalette(name = "random", n = 3), angle = runif(n = 1, min = - 2 * pi, max = 2 * pi), size = 1, p = 1 ) } plot_result <- function(result) { do.call(what = canvas_phyllotaxis, args = result) } # modules for individual plots plotUI <- function(id, calc_time) { ns <- NS(id) card( strong(paste0("Plot (calc time = ", calc_time, " secs)")), input_task_button(ns("resample"), "Resample"), plotOutput(ns("plot"), height="400px", width="400px") ) } plotServer <- function(id, calc_time) { force(id) force(calc_time) moduleServer( id, function(input, output, session) { task <- ExtendedTask$new( function(time, run) mirai(run(time), environment()) ) |> bind_task_button("resample") observeEvent(input$resample, task$invoke(calc_time, run_task)) output$plot <- renderPlot(plot_result(task$result())) } ) } # ui and server ui <- page_sidebar(fillable = FALSE, sidebar = sidebar( numericInput("calc_time", "Calculation time (secs)", 5), actionButton("add", "Add", class="btn-primary"), ), layout_column_wrap(id = "results", width = "400px", fillable = FALSE) ) server <- function(input, output, session) { observeEvent(input$add, { id <- nanonext::random(4) insertUI("#results", where = "beforeEnd", ui = plotUI(id, input$calc_time)) plotServer(id, input$calc_time) }) } app <- shinyApp(ui, server) # run app using 3 local daemons with(daemons(3), runApp(app)) ``` *The above example builds on original code by Joe Cheng, Daniel Woodie and William Landau.* The above uses `environment()` instead of `...` as an alternative and equivalent way of passing variables present in the calling environment to the mirai. The key components to using this ExtendedTask example are: 1. In the UI, use `bslib::input_task_button()`. This is a button which is disabled during computation to prevent additional clicks. ``` r input_task_button(ns("resample"), "Resample") ``` 2. In the server, create an ExtendedTask object by calling `ExtendedTask$new()` on an anonymous function passing _named_ arguments to `mirai()`, and bind it to the button created in (1). These are passed through to the mirai by the use of `environment()`. ``` r task <- ExtendedTask$new( function(time, run) mirai(run(time), environment()) ) |> bind_task_button("resample") ``` 3. In the server, create an observer on the input button, which invokes the ExtendedTask, supplying the arguments to the anonymous function above. ``` r observeEvent(input$resample, task$invoke(calc_time, run_task)) ``` 4. In the server, create a render function for the output, which consumes the result of the ExtendedTask. ``` r output$plot <- renderPlot(plot_result(task$result())) ``` ### 6. Shiny ExtendedTask: mirai map A `mirai_map` also has an `as.promise()` method, which allows it to be used directly in a Shiny ExtendedTask. It will resolve when the entire map operation completes or at least one mirai in the map is rejected. This example, uses `mirai_map()` to perform multiple calculations simultaneously in multiple daemons, returning the results asynchronously. ```r library(shiny) library(bslib) library(mirai) ui <- page_fluid( titlePanel("ExtendedTask Map Demo"), hr(), p("The time is ", textOutput("current_time", inline = TRUE)), p("Perform 4 calculations that each take between 1 and 4 secs to complete:"), input_task_button("calculate", "Calculate"), p(textOutput("result")), tags$style(type="text/css", "#result {white-space: pre-wrap;}") ) server <- function(input, output) { task <- ExtendedTask$new(function() { mirai_map(1:4, function(i) { # simulated long calculation Sys.sleep(i) sprintf( "Calc %d | PID %d | Finished at %s.", i, Sys.getpid(), format(Sys.time()) ) }) }) |> bind_task_button("calculate") observeEvent(input$calculate, { task$invoke() }) output$result <- renderText({ # result of mirai_map() is a list as.character(task$result()) }, sep = "\n") output$current_time <- renderText({ invalidateLater(1000) format(Sys.time(), "%H:%M:%S %p") }) } app <- shinyApp(ui, server) with(daemons(4), runApp(app)) ``` ### 7. Shiny Async: Coin Flips The below example demonstrates how to integrate a `mirai_map()` operation into a Shiny app in an observer, without using ExtendedTask. By specifying the '.promise' argument, this registers a promise action against each mapped operation. These can then be used to update reactive values or otherwise interact with the Shiny app. ``` r library(shiny) library(mirai) flip_coin <- function(...) { Sys.sleep(0.1) rbinom(n = 1, size = 1, prob = 0.501) } ui <- fluidPage( div("Is the coin fair?"), actionButton("task", "Flip 1000 coins"), textOutput("status"), textOutput("outcomes") ) server <- function(input, output, session) { # Keep running totals of heads, tails, and task errors flips <- reactiveValues(heads = 0, tails = 0, flips = 0) # Button to submit a batch of coin flips observeEvent(input$task, { mirai_map( 1:1000, flip_coin, .promise = \(x) { if (x) flips$heads <- flips$heads + 1 else flips$tails <- flips$tails + 1 } ) # Ensure there is something after mirai_map() in the observer, as it is # convertible to a promise, and will otherwise be waited for before returning flips$flips <- flips$flips + 1000 }) # Print time and task status output$status <- renderText({ invalidateLater(millis = 1000) time <- format(Sys.time(), "%H:%M:%S") sprintf("%s | %s flips submitted", time, flips$flips) }) # Print number of heads and tails output$outcomes <- renderText( sprintf("%s heads %s tails", flips$heads, flips$tails) ) } app <- shinyApp(ui = ui, server = server) # run app using 8 local non-dispatcher daemons (tasks are the same length) with(daemons(8, dispatcher = FALSE), { # pre-load flip_coin function on all daemons for efficiency everywhere({}, flip_coin = flip_coin) runApp(app) }) ``` *This is an adaptation of an original example provided by Will Landau for use of `crew` with Shiny. Please see .* ### 8. Shiny Async: Progress Bar The below example uses a `mirai_map()` operation in an observer to update a Shiny progress bar with custom messages, and also to update a reactive value once the entire map operation has completed (asynchronously). ```r library(shiny) library(mirai) library(promises) slow_squared <- function(x) { Sys.sleep(runif(1)) x^2 } ui <- fluidPage( titlePanel("Asynchronous Squares Calculator"), p("The time is ", textOutput("current_time", inline = TRUE)), hr(), actionButton("start", "Start Calculation"), br(), br(), uiOutput("progress_ui"), verbatimTextOutput("result") ) server <- function(input, output, session) { x <- 1:100 y <- reactiveVal() observeEvent(input$start, { progress <- Progress$new(session, min = 0, max = length(x)) progress$set(message = "Parallel calculation in progress", detail = "Starting...") completed <- reactiveVal(0) mirai_map( x, slow_squared, slow_squared = slow_squared, .promise = function(result) { new_val <- completed() + 1 completed(new_val) # Increment completed counter progress$inc(1, detail = paste("Completed", new_val)) # Update progress } ) %...>% { y(unlist(.)) progress$close() } # Ensure there is something after mirai_map() in the observer, as otherwise # the created promise will be waited for before returning y(0) }) output$current_time <- renderText({ invalidateLater(1000) format(Sys.time(), "%H:%M:%S %p") }) output$result <- renderPrint({ cat("Sum of squares calculated: ", sum(y()), "\n") }) } app <- shinyApp(ui, server) with(daemons(8), runApp(app)) ``` *This example adapts a contribution from Davide Magno.* ### 9. Plumber GET Endpoint mirai may be used as an asynchronous backend for [`plumber`](https://www.rplumber.io/) pipelines. In this example, the plumber router code is run in a daemon process itself so that it does not block the current process - this is useful in interactive sessions, but otherwise just taking the code within the outer `mirai()` call will suffice. The /echo endpoint takes a GET request, sleeps for 1 second (simulating an expensive computation) and simply returns the 'msg' request header together with a timestamp and the process ID of the process it is run on. ``` r library(mirai) daemons(1L, dispatcher = FALSE) #> [1] 1 m <- mirai({ library(plumber) library(promises) # to provide the promise pipe library(mirai) # more efficient not to use dispatcher if all requests are similar length daemons(4L, dispatcher = FALSE) # handles 4 requests simultaneously pr() |> pr_get( "/echo", function(req, res) { mirai( { Sys.sleep(1L) list( status = 200L, body = list( time = format(Sys.time()), msg = msg, pid = Sys.getpid() ) ) }, msg = req[["HEADERS"]][["msg"]] ) %...>% (function(x) { res$status <- x$status res$body <- x$body }) } ) |> pr_run(host = "127.0.0.1", port = 8985) }) ``` The API can be queried using an async HTTP client such as `nanonext::ncurl_aio()`. Here, all 8 requests are submitted at once, but we note that that responses have differing timestamps as only 4 can be processed at any one time (limited by the number of daemons set). ``` r library(nanonext) res <- lapply( 1:8, function(i) ncurl_aio( "http://127.0.0.1:8985/echo", headers = c(msg = as.character(i)) ) ) collect_aio(res) #> [[1]] #> [1] "{\"time\":[\"2025-06-24 21:29:39\"],\"msg\":[\"1\"],\"pid\":[45605]}" #> #> [[2]] #> [1] "{\"time\":[\"2025-06-24 21:29:40\"],\"msg\":[\"2\"],\"pid\":[45607]}" #> #> [[3]] #> [1] "{\"time\":[\"2025-06-24 21:29:40\"],\"msg\":[\"3\"],\"pid\":[45603]}" #> #> [[4]] #> [1] "{\"time\":[\"2025-06-24 21:29:39\"],\"msg\":[\"4\"],\"pid\":[45610]}" #> #> [[5]] #> [1] "{\"time\":[\"2025-06-24 21:29:39\"],\"msg\":[\"5\"],\"pid\":[45607]}" #> #> [[6]] #> [1] "{\"time\":[\"2025-06-24 21:29:40\"],\"msg\":[\"6\"],\"pid\":[45610]}" #> #> [[7]] #> [1] "{\"time\":[\"2025-06-24 21:29:39\"],\"msg\":[\"7\"],\"pid\":[45603]}" #> #> [[8]] #> [1] "{\"time\":[\"2025-06-24 21:29:40\"],\"msg\":[\"8\"],\"pid\":[45605]}" daemons(0) #> [1] 0 ``` ### 10. Plumber POST Endpoint This is the equivalent using a POST endpoint, accepting a JSON instruction sent as request data. Note that `req$postBody` should always be accessed in the router process and passed in as an argument to the 'mirai', as this is retrieved using a connection that is not serializable. ``` r library(mirai) daemons(1L, dispatcher = FALSE) #> [1] 1 m <- mirai({ library(plumber) library(promises) # to provide the promise pipe library(mirai) # uses dispatcher - suitable when requests take differing times to complete daemons(4L) # handles 4 requests simultaneously pr() |> pr_post( "/echo", function(req, res) { mirai( { Sys.sleep(1L) # simulate expensive computation list( status = 200L, body = list( time = format(Sys.time()), msg = jsonlite::fromJSON(data)[["msg"]], pid = Sys.getpid() ) ) }, data = req$postBody ) %...>% (function(x) { res$status <- x$status res$body <- x$body }) } ) |> pr_run(host = "127.0.0.1", port = 8986) }) ``` Querying the endpoint produces the same set of outputs as the previous example. ``` r library(nanonext) res <- lapply( 1:8, function(i) ncurl_aio( "http://127.0.0.1:8986/echo", method = "POST", data = sprintf('{"msg":"%d"}', i) ) ) collect_aio(res) #> [[1]] #> [1] "{\"time\":[\"2025-06-24 21:29:43\"],\"msg\":[\"1\"],\"pid\":[45889]}" #> #> [[2]] #> [1] "{\"time\":[\"2025-06-24 21:29:43\"],\"msg\":[\"2\"],\"pid\":[45882]}" #> #> [[3]] #> [1] "{\"time\":[\"2025-06-24 21:29:43\"],\"msg\":[\"3\"],\"pid\":[45886]}" #> #> [[4]] #> [1] "{\"time\":[\"2025-06-24 21:29:44\"],\"msg\":[\"4\"],\"pid\":[45884]}" #> #> [[5]] #> [1] "{\"time\":[\"2025-06-24 21:29:43\"],\"msg\":[\"5\"],\"pid\":[45884]}" #> #> [[6]] #> [1] "{\"time\":[\"2025-06-24 21:29:44\"],\"msg\":[\"6\"],\"pid\":[45882]}" #> #> [[7]] #> [1] "{\"time\":[\"2025-06-24 21:29:44\"],\"msg\":[\"7\"],\"pid\":[45889]}" #> #> [[8]] #> [1] "{\"time\":[\"2025-06-24 21:29:44\"],\"msg\":[\"8\"],\"pid\":[45886]}" daemons(0) #> [1] 0 ```