From 23c4cf853935232cf2b94320b7a67290beabc3de Mon Sep 17 00:00:00 2001 From: shikokuchuo <53399081+shikokuchuo@users.noreply.github.com> Date: Tue, 22 Aug 2023 21:06:28 +0100 Subject: [PATCH] reimplement #69 fix with NA argument --- DESCRIPTION | 2 +- NEWS.md | 4 ++-- R/mirai-package.R | 4 +++- R/mirai.R | 17 ++++++++--------- man/daemons.Rd | 7 +++---- 5 files changed, 17 insertions(+), 17 deletions(-) diff --git a/DESCRIPTION b/DESCRIPTION index 0375bb576..330634028 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,7 +1,7 @@ Package: mirai Type: Package Title: Minimalist Async Evaluation Framework for R -Version: 0.9.1.9011 +Version: 0.9.1.9012 Description: Lightweight parallel code execution and distributed computing. Designed for simplicity, a 'mirai' evaluates an R expression asynchronously, on local or network resources, resolving automatically upon completion. diff --git a/NEWS.md b/NEWS.md index c67c7640d..6fde01727 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,7 +1,7 @@ -# mirai 0.9.1.9011 (development) +# mirai 0.9.1.9012 (development) * Previously deprecated function `server()` is removed in favour of `daemon()`. -* `daemons()` now allows a numeric value for 'dispatcher' to set an initial sync time other than the default 5s, allowing for cases where the R startup configuration takes longer (thanks @alexpiper). +* `daemons()` argument 'dispatcher' has a new option `NA`, which allows an initial sync time of up to 20s (vs. the default 5s), accommodating configurations where R startup takes longer (thanks @alexpiper). * Dispatcher enhancements and fixes: + Straight pass through without serialization/unserialization allows higher performance and lower memory utilisation. + Fixes edge cases of `status()` occasionally failing to communicate with dispatcher. diff --git a/R/mirai-package.R b/R/mirai-package.R index 8844107cc..2be773fec 100644 --- a/R/mirai-package.R +++ b/R/mirai-package.R @@ -93,10 +93,12 @@ NULL n_zero = "the number of daemons must be zero or greater", numeric_n = "'n' must be numeric, did you mean to provide 'url'?", requires_list = "'.args' must be specified as a list", - sync_dispatch = "sync with dispatcher timed out after %s ms, perhaps specify a larger value?", + sync_dispatch = "initial sync with dispatcher timed out after 5s, specify 'dispatcher = NA' to allow longer", + sync_na = "initial sync with dispatcher timed out after 20s", sync_timeout = "sync with local process timed out after 5s", url_spec = "numeric value for 'url' is out of bounds" ), hash = TRUE ) .timelimit <- 5000L +.nalimit <- 20000L diff --git a/R/mirai.R b/R/mirai.R index d904f9c1c..ec39665f4 100644 --- a/R/mirai.R +++ b/R/mirai.R @@ -523,10 +523,9 @@ mirai <- function(.expr, ..., .args = list(), .timeout = NULL, .compute = "defau #' @param dispatcher [default TRUE] logical value whether to use dispatcher. #' Dispatcher is a local background process that connects to daemons on #' behalf of the host and ensures FIFO scheduling, queueing tasks if -#' necessary (see Dispatcher section below). Alternatively, supply a numeric -#' value to enable dispatcher and set the maximum time in milliseconds for -#' initial synchronisation (default 5000L) - may be set wider to allow for -#' configurations where the R startup process takes longer. +#' necessary (see Dispatcher section below). Specify NA to set a longer +#' 20s timeout for initial synchronisation (default 5s), designed to +#' accommodate configurations with a longer R startup process. #' @param tls [default NULL] (optional for secure TLS connections) if not #' supplied, zero-configuration single-use keys and certificates are #' automatically generated. If supplied, \strong{either} the character path @@ -748,13 +747,13 @@ daemons <- function(n, url = NULL, dispatcher = TRUE, tls = NULL, ..., .compute envir[["tls"]] <- weakref(envir, tls[["client"]]) tls <- tls[["server"]] } - if (dispatcher) { + if (is.na(dispatcher) || dispatcher) { n <- if (missing(n)) length(url) else if (is.numeric(n) && n >= 1L) as.integer(n) else stop(.messages[["n_one"]]) urld <- auto_tokenized_url() urlc <- strcat(urld, "c") sock <- req_socket(urld) sockc <- req_socket(urlc, resend = 0L) - launch_and_sync_daemon(sock = sock, synctime = if (is.numeric(dispatcher)) dispatcher else .timelimit, urld, parse_dots(...), url, n, urlc, tls = tls) + launch_and_sync_daemon(sock = sock, synctime = if (is.na(dispatcher)) .nalimit else .timelimit, urld, parse_dots(...), url, n, urlc, tls = tls) init_monitor(sockc = sockc, envir = envir) } else { sock <- req_socket(url, tls = if (length(tls)) tls_config(server = tls)) @@ -786,10 +785,10 @@ daemons <- function(n, url = NULL, dispatcher = TRUE, tls = NULL, ..., .compute urld <- auto_tokenized_url() sock <- req_socket(urld) dots <- parse_dots(...) - if (dispatcher) { + if (is.na(dispatcher) || dispatcher) { urlc <- strcat(urld, "c") sockc <- req_socket(urlc, resend = 0L) - launch_and_sync_daemon(sock = sock, synctime = if (is.numeric(dispatcher)) dispatcher else .timelimit, urld, dots, n, urlc) + launch_and_sync_daemon(sock = sock, synctime = if (is.na(dispatcher)) .nalimit else .timelimit, urld, dots, n, urlc) init_monitor(sockc = sockc, envir = envir) } else { for (i in seq_len(n)) @@ -1286,7 +1285,7 @@ launch_and_sync_daemon <- function(sock, synctime, ..., tls = NULL) { cv <- cv() pipe_notify(sock, cv = cv, add = TRUE, remove = FALSE, flag = TRUE) launch_daemon(..., tls = tls) - until(cv, synctime) && stop(if (...length() < 3L) .messages[["sync_timeout"]] else sprintf(.messages[["sync_dispatch"]], as.character(synctime))) + until(cv, synctime) && stop(if (...length() < 3L) .messages[["sync_timeout"]] else if (synctime == .timelimit) .messages[["sync_dispatch"]] else .messages[["sync_na"]]) } dial_and_sync_socket <- function(sock, url, asyncdial, tls = NULL) { diff --git a/man/daemons.Rd b/man/daemons.Rd index 0badd9840..35da69b51 100644 --- a/man/daemons.Rd +++ b/man/daemons.Rd @@ -25,10 +25,9 @@ incoming connections (and optionally for websockets, a path), e.g. \item{dispatcher}{[default TRUE] logical value whether to use dispatcher. Dispatcher is a local background process that connects to daemons on behalf of the host and ensures FIFO scheduling, queueing tasks if -necessary (see Dispatcher section below). Alternatively, supply a numeric -value to enable dispatcher and set the maximum time in milliseconds for -initial synchronisation (default 5000L) - may be set wider to allow for -configurations where the R startup process takes longer.} +necessary (see Dispatcher section below). Specify NA to set a longer +20s timeout for initial synchronisation (default 5s), designed to +accommodate configurations with a longer R startup process.} \item{tls}{[default NULL] (optional for secure TLS connections) if not supplied, zero-configuration single-use keys and certificates are