Skip to content

Commit

Permalink
Use dispatcher = c('process', 'thread', 'none') (#158)
Browse files Browse the repository at this point in the history
* use dispatcher = c('process', 'thread', 'none')

* update vignettes

* update NEWS
  • Loading branch information
shikokuchuo authored Sep 17, 2024
1 parent cedd0ca commit 1aa73c8
Show file tree
Hide file tree
Showing 25 changed files with 268 additions and 250 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: mirai
Type: Package
Title: Minimalist Async Evaluation Framework for R
Version: 1.2.0.9018
Version: 1.2.0.9019
Description: Designed for simplicity, a 'mirai' evaluates an R expression
asynchronously in a parallel process, locally or distributed over the
network, with the result automatically available upon completion. Modern
Expand Down
5 changes: 3 additions & 2 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# mirai 1.2.0.9018 (development)
# mirai 1.2.0.9019 (development)

* `daemons(dispatcher = NA)` now provides access to threaded dispatcher (experimental). This implements dispatcher using a thread rather than an external process and is faster and more efficient.
* `daemons(dispatcher = "thread")` implements threaded dispatcher (experimental), a faster and more efficient way of running dispatcher logic than in a background process.
* `daemons()` behavioural changes:
- Argument 'dispatcher' now takes the character options 'process', 'thread' and 'none'. Previous values of TRUE/FALSE continue to be accepted (thanks @hadley #157).
- Return value is now always an integer value - either the number of daemons set if using dispatcher, or the number of daemons launched locally (zero if using a remote launcher).
- Gains argument 'force' to control whether calls to `daemons()` resets previous settings for the same compute profile.
- Invalid type of '...' arguments are now dropped instead of throwing an error. This allows '...' containing unused arguments to be more easily passed from other functions.
Expand Down
153 changes: 84 additions & 69 deletions R/daemons.R
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,12 @@
#' @param remote [default NULL] required only for launching remote daemons, a
#' configuration generated by \code{\link{remote_config}} or
#' \code{\link{ssh_config}}.
#' @param dispatcher [default TRUE] logical value whether to use dispatcher.
#' Dispatcher is a local background process that connects to daemons on
#' behalf of the host and ensures FIFO scheduling. Specify NA for threaded
#' dispatcher (see Dispatcher section below).
#' @param dispatcher [default 'process'] character value, one of
#' \sQuote{process}, \sQuote{thread} or \sQuote{none}. Whether to deploy
#' dispatcher in another process, on a thread or not at all. Dispatcher is
#' an extension that ensures optimal scheduling, although this is not always
#' required (for details see Dispatcher section below). Note that the option
#' \sQuote{thread} is new and currently considered experimental.
#' @param ... (optional) additional arguments passed through to
#' \code{\link{dispatcher}} if using dispatcher and/or \code{\link{daemon}}
#' if launching daemons. These include \sQuote{retry} and \sQuote{token} at
Expand All @@ -55,9 +57,9 @@
#' @param seed [default NULL] (optional) supply a random seed (single value,
#' interpreted as an integer). This is used to inititalise the L'Ecuyer-CMRG
#' RNG streams sent to each daemon. Note that reproducible results can be
#' expected only for \code{dispatcher = FALSE}, as the unpredictable timing
#' expected only for \code{dispatcher = 'none'}, as the unpredictable timing
#' of task completions would otherwise influence the tasks sent to each
#' daemon. Even for \code{dispatcher = FALSE}, reproducibility is not
#' daemon. Even for \code{dispatcher = 'none'}, reproducibility is not
#' guaranteed if the order in which tasks are sent is not deterministic.
#' @param tls [default NULL] (optional for secure TLS connections) if not
#' supplied, zero-configuration single-use keys and certificates are
Expand Down Expand Up @@ -109,25 +111,22 @@
#'
#' @section Dispatcher:
#'
#' By default \code{dispatcher = TRUE}. This launches a background process
#' By default \code{dispatcher = 'process'} launches a background process
#' running \code{\link{dispatcher}}. Dispatcher connects to daemons on
#' behalf of the host and ensures FIFO scheduling of tasks. Dispatcher uses
#' synchronisation primitives from \pkg{nanonext}, waiting rather than
#' polling for tasks, which is both efficient (no resource usage) and fully
#' event-driven (having no latency).
#'
#' Specifying \code{dispatcher = NA} uses threaded dispatcher, a faster and
#' more efficient alternative to the separate background process. This is a
#' new feature that should still be considered experimental.
#'
#' By specifying \code{dispatcher = FALSE}, daemons connect to the host
#' directly rather than through dispatcher. The host sends tasks to
#' connected daemons immediately in a round-robin fashion. Optimal
#' behalf of the host and ensures optimal FIFO scheduling of tasks.
#'
#' Specifying \code{dispatcher = 'thread'} runs dispatcher logic on a new
#' thread, a faster and more efficient alternative to using a separate
#' process. This is a new feature and should be considered experimental.
#'
#' Specifying \code{dispatcher = 'none'}, uses the default behaviour without
#' additional dispatcher logic. In this case daemons connect directly to the
#' host and tasks are distributed in a round-robin fashion. Optimal
#' scheduling is not guaranteed as the duration of tasks cannot be known
#' \emph{a priori}, hence tasks can be queued at one daemon while other
#' daemons remain idle. However, this provides a resource-light approach
#' suited to working with similar-length tasks, or where concurrent tasks
#' typically do not exceed available daemons.
#' daemons remain idle. However, this provides the most resource-light
#' approach, suited to working with similar-length tasks, or where
#' concurrent tasks typically do not exceed available daemons.
#'
#' @section Distributed Computing:
#'
Expand Down Expand Up @@ -236,7 +235,7 @@
#' daemons(0)
#'
#' # Create 2 local daemons (not using dispatcher)
#' daemons(2, dispatcher = FALSE)
#' daemons(2, dispatcher = "none")
#' status()
#' # Reset to zero
#' daemons(0)
Expand All @@ -248,7 +247,7 @@
#' daemons(0)
#'
#' # Set host URL for remote daemons to dial into
#' daemons(url = host_url(), dispatcher = FALSE)
#' daemons(url = host_url(), dispatcher = "none")
#' status()
#' # Reset to zero
#' daemons(0)
Expand All @@ -271,7 +270,7 @@
#'
#' daemons(url = host_url(tls = TRUE),
#' remote = ssh_config(c('ssh://nodeone', 'ssh://nodetwo')),
#' dispatcher = FALSE)
#' dispatcher = "none")
#'
#' # Launch 4 daemons on the remote machine 10.75.32.90 using SSH tunnelling
#' # over port 5555 ('url' hostname must be 'localhost' or '127.0.0.1'):
Expand All @@ -284,7 +283,7 @@
#'
#' @export
#'
daemons <- function(n, url = NULL, remote = NULL, dispatcher = TRUE, ..., force = TRUE,
daemons <- function(n, url = NULL, remote = NULL, dispatcher = c("process", "thread", "none"), ..., force = TRUE,
seed = NULL, tls = NULL, pass = NULL, .compute = "default") {

missing(n) && missing(url) && return(status(.compute))
Expand All @@ -297,29 +296,35 @@ daemons <- function(n, url = NULL, remote = NULL, dispatcher = TRUE, ..., force
envir <- new.env(hash = FALSE, parent = ..)
tls <- check_create_tls(url = url, tls = tls, envir = envir)
create_stream(n = n, seed = seed, envir = envir)
if (is.na(dispatcher)) {
n <- if (missing(n)) length(url) else if (is.numeric(n) && n >= 1L) as.integer(n) else stop(._[["n_one"]])
urls <- resolve_dispatcher_urls(n = n, url = url)
sock <- .dispatcher(host = inproc_url(), url = urls, tls = if (length(tls)) tls_config(server = tls, pass = pass))
`[[<-`(`[[<-`(envir, "urls", urls), "dispatcher", TRUE)
} else if (dispatcher) {
n <- if (missing(n)) length(url) else if (is.numeric(n) && n >= 1L) as.integer(n) else stop(._[["n_one"]])
if (length(tls)) tls_config(server = tls, pass = pass)
cv <- cv()
dots <- parse_dots(...)
output <- attr(dots, "output")
urld <- local_url()
urlc <- sprintf("%s%s", urld, "c")
sock <- req_socket(urld)
sockc <- req_socket(urlc)
res <- launch_sync_dispatcher(sock, sockc, wa5(urld, dots, n, urlc, url), output, tls, pass)
is.object(res) && stop(._[["sync_dispatcher"]])
store_dispatcher(sockc = sockc, res = res, cv = cv, envir = envir)
} else {
sock <- req_socket(url, tls = if (length(tls)) tls_config(server = tls, pass = pass))
store_urls(sock = sock, envir = envir)
n <- 0L
}
switch(
parse_dispatcher(dispatcher),
{
n <- if (missing(n)) length(url) else if (is.numeric(n) && n >= 1L) as.integer(n) else stop(._[["n_one"]])
if (length(tls)) tls_config(server = tls, pass = pass)
cv <- cv()
dots <- parse_dots(...)
output <- attr(dots, "output")
urld <- local_url()
urlc <- sprintf("%s%s", urld, "c")
sock <- req_socket(urld)
sockc <- req_socket(urlc)
res <- launch_sync_dispatcher(sock, sockc, wa5(urld, dots, n, urlc, url), output, tls, pass)
is.object(res) && stop(._[["sync_dispatcher"]])
store_dispatcher(sockc = sockc, res = res, cv = cv, envir = envir)
},
{
n <- if (missing(n)) length(url) else if (is.numeric(n) && n >= 1L) as.integer(n) else stop(._[["n_one"]])
urls <- resolve_dispatcher_urls(n = n, url = url)
sock <- .dispatcher(host = inproc_url(), url = urls, tls = if (length(tls)) tls_config(server = tls, pass = pass))
`[[<-`(`[[<-`(envir, "urls", urls), "dispatcher", TRUE)
},
{
sock <- req_socket(url, tls = if (length(tls)) tls_config(server = tls, pass = pass))
store_urls(sock = sock, envir = envir)
n <- 0L
},
stop(._[["dispatcher_args"]])
)
`[[<-`(.., .compute, `[[<-`(`[[<-`(envir, "sock", sock), "n", n))
if (length(remote))
launch_remote(url = envir[["urls"]], remote = remote, tls = envir[["tls"]], ..., .compute = .compute)
Expand Down Expand Up @@ -352,26 +357,32 @@ daemons <- function(n, url = NULL, remote = NULL, dispatcher = TRUE, ..., force
create_stream(n = n, seed = seed, envir = envir)
dots <- parse_dots(...)
output <- attr(dots, "output")
if (is.na(dispatcher)) {
urls <- auto_dispatcher_urls(n = n, url = urld)
sock <- .dispatcher(host = inproc_url(), url = urls)
for (i in seq_len(n))
launch_daemon(wa3(urls[i], dots, next_stream(envir)), output)
`[[<-`(`[[<-`(envir, "urls", urls), "dispatcher", TRUE)
} else if (dispatcher) {
cv <- cv()
sock <- req_socket(urld)
urlc <- sprintf("%s%s", urld, "c")
sockc <- req_socket(urlc)
res <- launch_sync_dispatcher(sock, sockc, wa4(urld, dots, envir[["stream"]], n, urlc), output)
is.object(res) && stop(._[["sync_dispatcher"]])
store_dispatcher(sockc = sockc, res = res, cv = cv, envir = envir)
for (i in seq_len(n)) next_stream(envir)
} else {
sock <- req_socket(urld)
launch_sync_daemons(seq_len(n), sock, urld, dots, envir, output) || stop(._[["sync_daemons"]])
`[[<-`(envir, "urls", urld)
}
switch(
parse_dispatcher(dispatcher),
{
cv <- cv()
sock <- req_socket(urld)
urlc <- sprintf("%s%s", urld, "c")
sockc <- req_socket(urlc)
res <- launch_sync_dispatcher(sock, sockc, wa4(urld, dots, envir[["stream"]], n, urlc), output)
is.object(res) && stop(._[["sync_dispatcher"]])
store_dispatcher(sockc = sockc, res = res, cv = cv, envir = envir)
for (i in seq_len(n)) next_stream(envir)
},
{
urls <- auto_dispatcher_urls(n = n, url = urld)
sock <- .dispatcher(host = inproc_url(), url = urls)
for (i in seq_len(n))
launch_daemon(wa3(urls[i], dots, next_stream(envir)), output)
`[[<-`(`[[<-`(envir, "urls", urls), "dispatcher", TRUE)
},
{
sock <- req_socket(urld)
launch_sync_daemons(seq_len(n), sock, urld, dots, envir, output) || stop(._[["sync_daemons"]])
`[[<-`(envir, "urls", urld)
},
stop(._[["dispatcher_args"]])
)
`[[<-`(.., .compute, `[[<-`(`[[<-`(envir, "sock", sock), "n", n))
} else if (force) {
daemons(n = 0L, .compute = .compute)
Expand Down Expand Up @@ -626,6 +637,10 @@ tokenized_url <- function(url) sprintf("%s/%s", url, random(12L))
req_socket <- function(url, tls = NULL, resend = 0L)
`opt<-`(socket(protocol = "req", listen = url, tls = tls), "req:resend-time", resend)

parse_dispatcher <- function(dispatcher)
if (is.logical(dispatcher)) 1L + (!dispatcher) * 2L else
pmatch(dispatcher, c("process", "thread", "none"), nomatch = 4L)[1L]

parse_dots <- function(...) {
...length() || return("")
dots <- list(...)
Expand Down
2 changes: 1 addition & 1 deletion R/launchers.R
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
#' if (interactive()) {
#' # Only run examples in interactive R sessions
#'
#' daemons(url = host_url(ws = TRUE), dispatcher = FALSE)
#' daemons(url = host_url(ws = TRUE), dispatcher = "none")
#' status()
#' launch_local(status()$daemons, maxtasks = 10L)
#' launch_remote(1L, maxtasks = 10L)
Expand Down
6 changes: 3 additions & 3 deletions R/map.R
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
#' if (interactive()) {
#' # Only run examples in interactive R sessions
#'
#' daemons(4, dispatcher = FALSE)
#' daemons(4, dispatcher = "none")
#'
#' # map with constant args specified via '.args'
#' mirai_map(1:3, rnorm, .args = list(mean = 20, sd = 2))[]
Expand Down Expand Up @@ -138,7 +138,7 @@
#'
#' # promises example that outputs the results, including errors, to the console
#' if (requireNamespace("promises", quietly = TRUE)) {
#' daemons(1, dispatcher = FALSE)
#' daemons(1, dispatcher = "none")
#' ml <- mirai_map(
#' 1:30,
#' function(x) {Sys.sleep(0.1); if (x == 30) stop(x) else x},
Expand All @@ -160,7 +160,7 @@ mirai_map <- function(.x, .f, ..., .args = list(), .promise = NULL, .compute = "
.x
.f
warning(._[["requires_daemons"]], call. = FALSE, immediate. = TRUE)
daemons(n = 1L, dispatcher = FALSE, .compute = .compute)
daemons(n = 1L, dispatcher = "none", .compute = .compute)
return(mirai_map(.x = .x, .f = .f, ..., .args = .args, .promise = .promise, .compute = .compute))
}
xilen <- dim(.x)[1L]
Expand Down
1 change: 1 addition & 0 deletions R/mirai-package.R
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
cluster_inactive = "cluster is no longer active",
correct_context = "'host' must be specified if not using directly in a function argument",
daemons_unset = "launch_%s(): a numeric value for 'url' requires daemons to be set",
dispatcher_args = "'dispatcher' must be one of 'process', 'thread' or 'none'",
dot_required = "remote_config(): '.' must be an element of the character vector(s) supplied to 'args'",
missing_expression = "missing expression, perhaps wrap in {}?",
missing_url = "at least one URL must be supplied for 'url' or 'n' must be at least 1",
Expand Down
4 changes: 2 additions & 2 deletions R/mirai.R
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ mirai <- function(.expr, ..., .args = list(), .timeout = NULL, .compute = "defau
#' # loading a package on all daemons and also
#' # registering custom serialization functions:
#' cfg <- serial_config("cls_name", function(x) serialize(x, NULL), unserialize)
#' daemons(1, dispatcher = FALSE)
#' daemons(1, dispatcher = "none")
#' everywhere(library(parallel), .serial = cfg)
#' m <- mirai("package:parallel" %in% search())
#' call_mirai(m)$data
Expand Down Expand Up @@ -464,7 +464,7 @@ unresolved <- unresolved
#' if (interactive()) {
#' # Only run examples in interactive R sessions
#'
#' daemons(1, dispatcher = FALSE)
#' daemons(1, dispatcher = "none")
#' df <- data.frame()
#' m <- mirai(as.matrix(df), df = df)
#' is_mirai(m)
Expand Down
4 changes: 2 additions & 2 deletions R/parallel.R
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ make_cluster <- function(n, url = NULL, remote = NULL, ...) {
if (is.character(url)) {

length(url) == 1L || stop(._[["single_url"]])
daemons(url = url, remote = remote, dispatcher = FALSE, cleanup = FALSE, ..., .compute = id)
daemons(url = url, remote = remote, dispatcher = "none", cleanup = FALSE, ..., .compute = id)

if (is.null(remote)) {
if (missing(n)) n <- 1L
Expand All @@ -119,7 +119,7 @@ make_cluster <- function(n, url = NULL, remote = NULL, ...) {
} else {
is.numeric(n) || stop(._[["numeric_n"]])
n >= 1L || stop(._[["n_one"]])
daemons(n = n, dispatcher = FALSE, cleanup = FALSE, ..., .compute = id)
daemons(n = n, dispatcher = "none", cleanup = FALSE, ..., .compute = id)
}

`[[<-`(..[[id]], "cvs", cvs)
Expand Down
Loading

0 comments on commit 1aa73c8

Please sign in to comment.