The goal of interprocess is to synchronize concurrent R
processes.
Allows R to access low-level operating system mechanisms for performing atomic operations on shared data structures. Mutexes provide shared and exclusive locks. Semaphores act as counters. Message queues move text strings from one process to another. All these interprocess communication (IPC) tools can optionally block with or without a timeout.
Works cross-platform, including Windows, MacOS, and Linux, and can be used to synchronize a mixture of R sessions and other types of processes.
Implemented using the Boost C++ library.
# Install the latest stable version from CRAN:
install.packages("interprocess")
# Or the development version from GitHub:
install.packages("pak")
pak::pak("cmmr/interprocess")Mutexes, semaphores, and message queues are managed by the operating system. Any process that knows the resource’s name can access it. Therefore, sharing the resource name (a short text string) amongst cooperating processes enables communication and synchronization.
These names can be any alphanumeric string that starts with a letter
and is no longer than 250 characters. The mutex(),
semaphore(), and msg_queue() functions will
default to generating a unique identifier, or you can provide a custom
or pre-existing one with the name parameter. If the
resource is associated with a specific file or directory, the
file parameter can be used to derive a name from
normalizing and hashing that path.
Setting cleanup = TRUE will automatically remove the
resource when the R session exits. Otherwise, the resource will persist
until $remove() is called or the operating system is
restarted.
An exclusive advisory lock is acquired by default. For a shared lock,
use shared = TRUE. Set file = <path> to
use a filepath hash for the name.
tmp <- tempfile()
mut <- interprocess::mutex(file = tmp)
mut$name
#> [1] "oThd9KRpHb0"
with(mut, writeLines('Important Data', tmp))
with(mut, shared = TRUE, readLines(tmp))
#> [1] "Important Data"
mut$remove()
unlink(tmp)The semaphore’s value can be incremented with $post()
and decremented with $wait(). The initial value can also be
set in the constructor.
sem <- interprocess::semaphore('mySemaphore', value = 1)
sem$name
#> [1] "mySemaphore"
sem$post()
sem$wait(timeout_ms = 0)
#> [1] TRUE
sem$wait(timeout_ms = 0)
#> [1] TRUE
sem$wait(timeout_ms = 0)
#> [1] FALSE
sem$remove()The constructor’s max_count and max_nchar
parameters determine how much memory is allocated for the message
queue.
mq <- interprocess::msg_queue(max_count = 2, max_nchar = 5)
mq$name
#> [1] "Ae2udeRLWcb"
mq$send('Hello')
mq$send('Hi', priority = 1)
mq$receive(timeout_ms = 0)
#> [1] "Hi"
mq$receive(timeout_ms = 0)
#> [1] "Hello"
mq$receive(timeout_ms = 0)
#> [1] NULL
mq$remove()