Skip to content

Commit

Permalink
reimplement #69 fix with NA argument
Browse files Browse the repository at this point in the history
  • Loading branch information
shikokuchuo committed Aug 22, 2023
1 parent f9bc477 commit 23c4cf8
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 17 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: mirai
Type: Package
Title: Minimalist Async Evaluation Framework for R
Version: 0.9.1.9011
Version: 0.9.1.9012
Description: Lightweight parallel code execution and distributed computing.
Designed for simplicity, a 'mirai' evaluates an R expression asynchronously,
on local or network resources, resolving automatically upon completion.
Expand Down
4 changes: 2 additions & 2 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# mirai 0.9.1.9011 (development)
# mirai 0.9.1.9012 (development)

* Previously deprecated function `server()` is removed in favour of `daemon()`.
* `daemons()` now allows a numeric value for 'dispatcher' to set an initial sync time other than the default 5s, allowing for cases where the R startup configuration takes longer (thanks @alexpiper).
* `daemons()` argument 'dispatcher' has a new option `NA`, which allows an initial sync time of up to 20s (vs. the default 5s), accommodating configurations where R startup takes longer (thanks @alexpiper).
* Dispatcher enhancements and fixes:
+ Straight pass through without serialization/unserialization allows higher performance and lower memory utilisation.
+ Fixes edge cases of `status()` occasionally failing to communicate with dispatcher.
Expand Down
4 changes: 3 additions & 1 deletion R/mirai-package.R
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,12 @@ NULL
n_zero = "the number of daemons must be zero or greater",
numeric_n = "'n' must be numeric, did you mean to provide 'url'?",
requires_list = "'.args' must be specified as a list",
sync_dispatch = "sync with dispatcher timed out after %s ms, perhaps specify a larger value?",
sync_dispatch = "initial sync with dispatcher timed out after 5s, specify 'dispatcher = NA' to allow longer",
sync_na = "initial sync with dispatcher timed out after 20s",
sync_timeout = "sync with local process timed out after 5s",
url_spec = "numeric value for 'url' is out of bounds"
),
hash = TRUE
)
.timelimit <- 5000L
.nalimit <- 20000L
17 changes: 8 additions & 9 deletions R/mirai.R
Original file line number Diff line number Diff line change
Expand Up @@ -523,10 +523,9 @@ mirai <- function(.expr, ..., .args = list(), .timeout = NULL, .compute = "defau
#' @param dispatcher [default TRUE] logical value whether to use dispatcher.
#' Dispatcher is a local background process that connects to daemons on
#' behalf of the host and ensures FIFO scheduling, queueing tasks if
#' necessary (see Dispatcher section below). Alternatively, supply a numeric
#' value to enable dispatcher and set the maximum time in milliseconds for
#' initial synchronisation (default 5000L) - may be set wider to allow for
#' configurations where the R startup process takes longer.
#' necessary (see Dispatcher section below). Specify NA to set a longer
#' 20s timeout for initial synchronisation (default 5s), designed to
#' accommodate configurations with a longer R startup process.
#' @param tls [default NULL] (optional for secure TLS connections) if not
#' supplied, zero-configuration single-use keys and certificates are
#' automatically generated. If supplied, \strong{either} the character path
Expand Down Expand Up @@ -748,13 +747,13 @@ daemons <- function(n, url = NULL, dispatcher = TRUE, tls = NULL, ..., .compute
envir[["tls"]] <- weakref(envir, tls[["client"]])
tls <- tls[["server"]]
}
if (dispatcher) {
if (is.na(dispatcher) || dispatcher) {
n <- if (missing(n)) length(url) else if (is.numeric(n) && n >= 1L) as.integer(n) else stop(.messages[["n_one"]])
urld <- auto_tokenized_url()
urlc <- strcat(urld, "c")
sock <- req_socket(urld)
sockc <- req_socket(urlc, resend = 0L)
launch_and_sync_daemon(sock = sock, synctime = if (is.numeric(dispatcher)) dispatcher else .timelimit, urld, parse_dots(...), url, n, urlc, tls = tls)
launch_and_sync_daemon(sock = sock, synctime = if (is.na(dispatcher)) .nalimit else .timelimit, urld, parse_dots(...), url, n, urlc, tls = tls)
init_monitor(sockc = sockc, envir = envir)
} else {
sock <- req_socket(url, tls = if (length(tls)) tls_config(server = tls))
Expand Down Expand Up @@ -786,10 +785,10 @@ daemons <- function(n, url = NULL, dispatcher = TRUE, tls = NULL, ..., .compute
urld <- auto_tokenized_url()
sock <- req_socket(urld)
dots <- parse_dots(...)
if (dispatcher) {
if (is.na(dispatcher) || dispatcher) {
urlc <- strcat(urld, "c")
sockc <- req_socket(urlc, resend = 0L)
launch_and_sync_daemon(sock = sock, synctime = if (is.numeric(dispatcher)) dispatcher else .timelimit, urld, dots, n, urlc)
launch_and_sync_daemon(sock = sock, synctime = if (is.na(dispatcher)) .nalimit else .timelimit, urld, dots, n, urlc)
init_monitor(sockc = sockc, envir = envir)
} else {
for (i in seq_len(n))
Expand Down Expand Up @@ -1286,7 +1285,7 @@ launch_and_sync_daemon <- function(sock, synctime, ..., tls = NULL) {
cv <- cv()
pipe_notify(sock, cv = cv, add = TRUE, remove = FALSE, flag = TRUE)
launch_daemon(..., tls = tls)
until(cv, synctime) && stop(if (...length() < 3L) .messages[["sync_timeout"]] else sprintf(.messages[["sync_dispatch"]], as.character(synctime)))
until(cv, synctime) && stop(if (...length() < 3L) .messages[["sync_timeout"]] else if (synctime == .timelimit) .messages[["sync_dispatch"]] else .messages[["sync_na"]])
}

dial_and_sync_socket <- function(sock, url, asyncdial, tls = NULL) {
Expand Down
7 changes: 3 additions & 4 deletions man/daemons.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 23c4cf8

Please sign in to comment.