mirai = future in Japanese. Async evaluation framework for R built on NNG/nanonext.
Architecture: daemons dial into host, a topology which facilitates dynamic scaling.
This is a cheatsheet. Refer to the mirai reference manual for a detailed introduction.
mirai() returns immediately, access result via m[] or m$datadaemons() sets persistent background processeslocal_url(tcp=TRUE) + tunnel=TRUE when ports blockedcluster_config() with appropriate scheduler.compute parametermirai_map(): Parallel map with progress bars, early stopping, flatmaplibrary(mirai)
# Create a mirai (returns immediately)
m <- mirai(
{
Sys.sleep(1)
rnorm(5, mean)
},
mean = 10
)
# Direct access (non-blocking)
unresolved(m) # Check if resolved (TRUE if still running)
m$data # Returns value (NA if unresolved)
# Access result (blocks until ready)
m[] # Wait and return value
collect_mirai(m) # Wait and return value
call_mirai(m) # Wait and return mirai object
# Via ... (assigned to daemon global env)
m <- mirai(func(x), func = my_func, x = data)
# Via .args (local to evaluation env)
m <- mirai(func(x), .args = list(func = my_func, x = data))
# Pass entire environment
write_async <- function(x, file) {
mirai(write.csv(x, file), .args = environment())
}
# Set 4 local daemons (with dispatcher - default)
daemons(4)
# Without dispatcher (round-robin distribution)
daemons(4, dispatcher = FALSE)
# Reset daemons
daemons(0)
# Check connection / statistics
info()
daemons(
n = 4,
dispatcher = TRUE, # Use dispatcher for optimal FIFO scheduling
cleanup = TRUE, # Clean env between tasks
output = FALSE, # Capture stdout/stderr
maxtasks = Inf, # Task limit per daemon
idletime = Inf, # Max idle time (ms) before exit
walltime = Inf # Time limit (ms) before exit
)
daemons(sync = TRUE) # Run in current process
m <- mirai(Sys.getpid())
daemons(0)
# Listen at host URL with TLS
daemons(
url = host_url(tls = TRUE),
remote = ssh_config(c("ssh://10.75.32.90", "ssh://node2:22"))
)
# Or without automatic launching
daemons(url = host_url(tls = TRUE))
launch_remote(2, remote = ssh_config("ssh://10.75.32.90"))
host_url() # Auto-detect IP, tcp://x.x.x.x:0
host_url(tls = TRUE) # TLS connection
host_url(tls = TRUE, port = 5555) # Specific port
local_url() # IPC (Unix sockets/named pipes)
local_url(tcp = TRUE) # tcp://127.0.0.1:0
local_url(tcp = TRUE, port = 5555) # tcp://127.0.0.1:5555
ssh_config(
remotes = c("ssh://node1:22", "ssh://node2:22"),
tunnel = FALSE, # Direct connection
timeout = 10, # Connection timeout (seconds)
command = "ssh", # SSH executable
rscript = "Rscript" # R executable on remote
)
Requirements for SSH Direct:
# Host uses localhost URL
daemons(
n = 4,
url = local_url(tcp = TRUE), # tcp://127.0.0.1:0
remote = ssh_config("ssh://10.75.32.90", tunnel = TRUE)
)
# Or with specific port
daemons(
n = 2,
url = local_url(tcp = TRUE, port = 5555), # tcp://127.0.0.1:5555
remote = ssh_config("ssh://remote-server", tunnel = TRUE)
)
How Tunnelling Works:
daemons(
n = 4,
url = host_url(),
remote = cluster_config(
command = "sbatch", # Scheduler command: "sbatch", "qsub", "bsub", etc.
options = "#SBATCH --job-name=mirai
#SBATCH --mem=16G
#SBATCH --cpus-per-task=1
#SBATCH --output=mirai_%j.out
#SBATCH --error=mirai_%j.err
module load R/4.5.0",
rscript = file.path(R.home("bin"), "Rscript")
)
)
| Scheduler | Command | Job Name | Memory | CPUs |
|---|---|---|---|---|
| Slurm | sbatch |
#SBATCH --job-name=NAME |
--mem=16G |
--cpus-per-task=1 |
| SGE | qsub |
#$ -N NAME |
-l mem_free=16G |
-pe smp 1 |
| Torque/PBS | qsub |
#PBS -N NAME |
-l mem=16gb |
-l nodes=1:ppn=1 |
| LSF | bsub |
#BSUB -J NAME |
-M 16000 |
-n 1 |
# Set daemons to listen
daemons(url = host_url(tls = TRUE))
# Get launch commands (doesn't execute)
cmds <- launch_remote(
n = 2,
remote = remote_config() # Empty config returns commands
)
# Copy/paste commands to run on remote machines
# E.g. Rscript -e "mirai::daemon('tcp://10.75.32.70:5555')"
print(cmds)
# Create CPU profile
daemons(4, .compute = "cpu")
# Create GPU profile
daemons(2, .compute = "gpu")
# Direct tasks to specific profile
m_cpu <- mirai(heavy_compute(), .compute = "cpu")
m_gpu <- mirai(gpu_task(), .compute = "gpu")
# Reset specific profile
daemons(0, .compute = "cpu")
# Temporarily use profile
with_daemons("gpu", {
model <- mirai(train_model())
})
# Set profile for scope
local_daemons("cpu")
m <- mirai(task()) # Uses "cpu" profile
with(daemons(4), {
m1 <- mirai(task1())
m2 <- mirai(task2())
c(m1[], m2[])
})
# Daemons auto-reset on exit
daemons(url = host_url())
launch_local(2) # 2 local daemons
launch_remote(4, ssh_config("ssh://remote")) # 4 remote
daemons(url = host_url()) # Start listening
launch_local(2) # Add 2 daemons
# Later...
# Add 2 more (automatically exit after idle for 60 secs)
launch_local(2, idletime = 60000)
daemons(4)
# Simple map
results <- mirai_map(1:10, sqrt)[]
# With additional arguments
results <- mirai_map(
1:10,
rnorm,
.args = list(mean = 5, sd = 2)
)[]
# With helper functions
results <- mirai_map(
1:100,
function(x) transform(x, helper),
helper = my_helper_func
)[]
# Flatten to vector
results <- mirai_map(1:10, rnorm, .args = list(n = 1))[.flat]
# Progress bar
results <- mirai_map(1:100, slow_func)[.progress]
# Early stopping on error
results <- mirai_map(data_list, process)[.stop]
# Combine options
results <- mirai_map(1:100, task)[.stop, .progress]
# Map over dataframe rows
df <- data.frame(x = 1:10, y = 11:20)
mirai_map(df, function(x, y) x + y)[.flat]
# Map over matrix rows
mat <- matrix(1:6, nrow = 3, dimnames = list(c("a","b","c"), c("x","y")))
mirai_map(mat, function(x, y) x * y)[]
m <- mirai(stop("error"))
m[]
# Test error types
is_mirai_error(m$data) # Execution error
is_mirai_interrupt(m$data) # User interrupt
is_error_value(m$data) # Any error (catch-all)
# Access error details
m$data$stack.trace # Full stack trace
m$data$condition.class # Original error classes
m$data$message # Error message
status() # Detailed status
info() # Concise statistics
daemons_set() # Check if daemons exist
require_daemons() # Error if not set
# Per-mirai timeout (requires dispatcher for auto-cancellation)
m <- mirai(Sys.sleep(10), .timeout = 1000) # 1 second
m[] # Returns errorValue 5 (timed out)
# Cancel mirai (requires dispatcher)
m <- mirai(Sys.sleep(100))
stop_mirai(m) # Attempts cancellation
m$data # errorValue 20 (canceled)
# Load package on all daemons
everywhere(library(data.table))
# Export variables to all daemons
everywhere(config <<- list(threads = 4))
# Export variables to all daemons
everywhere({}, db_conn = my_conn, api_key = key)
# Statistically-sound but non-reproducible (default)
daemons(4, seed = NULL)
# Reproducible RNG (seed per mirai call)
daemons(4, seed = 123)
# For torch tensors, Arrow tables, Polars objects
daemons(
4,
serial = serial_config(
"torch_tensor",
sfunc = torch::torch_serialize,
ufunc = torch::torch_load
)
)
# Global registration
register_serial("torch_tensor", torch::torch_serialize, torch::torch_load)
daemons(4) # Auto-applies registered configs
# Auto TLS (zero-config certificates)
daemons(url = host_url(tls = TRUE))
# Custom certificate
daemons(
url = host_url(tls = TRUE),
tls = "/path/to/cert.pem",
pass = function() askpass::askpass()
)
| Feature | With Dispatcher (default) | Direct (dispatcher=FALSE) |
|---|---|---|
| Scheduling | Optimal FIFO | Round-robin |
| Timeouts | ✓ | No auto-cancellation |
| Cancellation | ✓ | ✗ |
| Serialization | ✓ | ✗ |
| Overhead | Slightly higher | Minimal |
| Use case | Variable task times | Similar task times |
┌─ Need async in R?
│
├─ Single task → mirai()
│ └─ No daemons set? → ephemeral (auto-creates process)
│
├─ Map operation → mirai_map()
│ └─ Requires daemons() to be set first
│
└─ Multiple tasks → Set up daemons
│
├─ Local only
│ └─ daemons(n)
│
├─ Remote with open ports
│ └─ daemons(url = host_url(), remote = ssh_config(..., tunnel = FALSE))
│
├─ Remote with firewall/blocked ports
│ └─ daemons(url = local_url(tcp = TRUE), remote = ssh_config(..., tunnel = TRUE))
│
└─ HPC cluster (Slurm/SGE/PBS/LSF)
└─ daemons(url = host_url(), remote = cluster_config(...))
# Expression Evaluation
mirai(pkg::func(x), x = data)
# Namespace functions OR library() inside expression
mirai(func(x), func = my_func, x = data)
# Pass dependencies explicitly via ... or .args
# Dispatcher Required For
stop_mirai(m) # Cancellation
mirai(task(), .timeout = 1000) # Timeout cancellation
daemons(4, serial = serial_config(...)) # Custom serialization
# SSH Tunnelling
daemons(url = local_url(tcp = TRUE), remote = ssh_config(..., tunnel = TRUE))
# Must use 127.0.0.1 (not external IP) + tunnel = TRUE
# TLS
host_url(tls = TRUE) # Auto TLS (zero-config, just works)
# Custom certs: provide cert path + optional passphrase function
# Remote Prerequisites
# - SSH key-based auth configured beforehand
# - SSH direct: host port open to inbound connections
# - HPC: correct module load commands and scheduler directives