Skip to content

Commit

Permalink
improve error messages
Browse files Browse the repository at this point in the history
  • Loading branch information
shikokuchuo committed Sep 17, 2024
1 parent 9a3c180 commit cedd0ca
Show file tree
Hide file tree
Showing 7 changed files with 19 additions and 16 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: mirai
Type: Package
Title: Minimalist Async Evaluation Framework for R
Version: 1.2.0.9017
Version: 1.2.0.9018
Description: Designed for simplicity, a 'mirai' evaluates an R expression
asynchronously in a parallel process, locally or distributed over the
network, with the result automatically available upon completion. Modern
Expand Down
2 changes: 1 addition & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# mirai 1.2.0.9017 (development)
# mirai 1.2.0.9018 (development)

* `daemons(dispatcher = NA)` now provides access to threaded dispatcher (experimental). This implements dispatcher using a thread rather than an external process and is faster and more efficient.
* `daemons()` behavioural changes:
Expand Down
8 changes: 4 additions & 4 deletions R/daemons.R
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ daemons <- function(n, url = NULL, remote = NULL, dispatcher = TRUE, ..., force
sock <- req_socket(urld)
sockc <- req_socket(urlc)
res <- launch_sync_dispatcher(sock, sockc, wa5(urld, dots, n, urlc, url), output, tls, pass)
is.object(res) && stop(._[["sync_timeout"]])
is.object(res) && stop(._[["sync_dispatcher"]])
store_dispatcher(sockc = sockc, res = res, cv = cv, envir = envir)
} else {
sock <- req_socket(url, tls = if (length(tls)) tls_config(server = tls, pass = pass))
Expand Down Expand Up @@ -364,12 +364,12 @@ daemons <- function(n, url = NULL, remote = NULL, dispatcher = TRUE, ..., force
urlc <- sprintf("%s%s", urld, "c")
sockc <- req_socket(urlc)
res <- launch_sync_dispatcher(sock, sockc, wa4(urld, dots, envir[["stream"]], n, urlc), output)
is.object(res) && stop(._[["sync_timeout"]])
is.object(res) && stop(._[["sync_dispatcher"]])
store_dispatcher(sockc = sockc, res = res, cv = cv, envir = envir)
for (i in seq_len(n)) next_stream(envir)
} else {
sock <- req_socket(urld)
launch_sync_daemons(seq_len(n), sock, urld, dots, envir, output) || stop(._[["sync_timeout"]])
launch_sync_daemons(seq_len(n), sock, urld, dots, envir, output) || stop(._[["sync_daemons"]])
`[[<-`(envir, "urls", urld)
}
`[[<-`(.., .compute, `[[<-`(`[[<-`(envir, "sock", sock), "n", n))
Expand Down Expand Up @@ -629,7 +629,7 @@ req_socket <- function(url, tls = NULL, resend = 0L)
parse_dots <- function(...) {
...length() || return("")
dots <- list(...)
dots <- dots[as.logical(lapply(dots, function(x) is.numeric(x) | is.logical(x)))]
dots <- dots[as.logical(lapply(dots, function(x) is.logical(x) || is.numeric(x)))]
dnames <- names(dots)
out <- sprintf(",%s", paste(dnames, dots, sep = "=", collapse = ","))
pos <- dnames == "output"
Expand Down
4 changes: 2 additions & 2 deletions R/dispatcher.R
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ dispatcher <- function(host, url = NULL, n = NULL, ..., retry = FALSE, token = F
pipe_notify(sockc, cv = cv, remove = TRUE, flag = TRUE)
dial_and_sync_socket(sock = sockc, url = monitor)
cmessage <- recv(sockc, mode = 2L, block = .limit_long)
is.object(cmessage) && stop(._[["sync_timeout"]])
is.object(cmessage) && stop(._[["sync_dispatcher"]])
if (nzchar(cmessage[2L]))
Sys.setenv(R_DEFAULT_PACKAGES = cmessage[2L]) else
Sys.unsetenv("R_DEFAULT_PACKAGES")
Expand Down Expand Up @@ -159,7 +159,7 @@ dispatcher <- function(host, url = NULL, n = NULL, ..., retry = FALSE, token = F

if (auto)
for (i in seq_n)
until(cv, .limit_long) || stop(._[["sync_timeout"]])
until(cv, .limit_long) || stop(._[["sync_daemons"]])

if (ctrchannel) {
send(sockc, c(Sys.getpid(), servernames), mode = 2L)
Expand Down
10 changes: 6 additions & 4 deletions R/launchers.R
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ launch_remote <- function(url, remote = remote_config(), ..., tls = NULL, .compu
envir <- ..[[.compute]]
dots <- parse_dots(...)
if (is.null(tls)) tls <- envir[["tls"]]
url <- process_url(url, .compute = .compute)
url <- process_url(url, .compute = .compute, local = FALSE)

ulen <- length(url)
command <- remote[["command"]]
Expand Down Expand Up @@ -433,11 +433,13 @@ find_dot <- function(args) {
sel
}

process_url <- function(url, .compute) {
process_url <- function(url, .compute, local = TRUE) {
if (is.numeric(url)) {
vec <- ..[[.compute]][["urls"]]
is.null(vec) && stop(._[["daemons_unset"]], call. = FALSE)
all(url >= 1L, url <= length(vec)) || stop(._[["url_spec"]], call. = FALSE)
is.null(vec) &&
stop(sprintf(._[["daemons_unset"]], if (local) "local" else "remote"), call. = FALSE)
all(url >= 1L, url <= length(vec)) ||
stop(sprintf(._[["url_spec"]], if (local) "local" else "remote"), call. = FALSE)
url <- vec[url]
} else {
lapply(url, parse_url)
Expand Down
7 changes: 4 additions & 3 deletions R/mirai-package.R
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
arglen = "'args' and/or 'url' must be of length 1 or the same length",
cluster_inactive = "cluster is no longer active",
correct_context = "'host' must be specified if not using directly in a function argument",
daemons_unset = "launch_*(): a numeric value for 'url' requires daemons to be set",
daemons_unset = "launch_%s(): a numeric value for 'url' requires daemons to be set",
dot_required = "remote_config(): '.' must be an element of the character vector(s) supplied to 'args'",
missing_expression = "missing expression, perhaps wrap in {}?",
missing_url = "at least one URL must be supplied for 'url' or 'n' must be at least 1",
Expand All @@ -102,8 +102,9 @@
requires_daemons = "mirai_map(): launching one local daemon as none previously set",
requires_local = "ssh_config(): SSH tunnelling requires 'url' hostname to be '127.0.0.1' or 'localhost'",
single_url = "only one 'url' should be specified",
sync_timeout = "initial sync with dispatcher/daemon timed out after 10s",
url_spec = "launch_*(): numeric value for 'url' is out of bounds"
sync_daemons = "initial sync with daemon(s) timed out after 10s",
sync_dispatcher = "initial sync with dispatcher timed out after 10s",
url_spec = "launch_%s(): numeric value for 'url' is out of bounds"
),
hash = TRUE
)
Expand Down
2 changes: 1 addition & 1 deletion tests/tests.R
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ connection && .Platform[["OS.type"]] != "windows" && Sys.getenv("NOT_CRAN") == "
nanotestz(daemons(NULL))
nanotesto(daemons(url = "ws://:0", correctype = 0L, token = TRUE))
nanotestz(daemons(0L))
nanotestz(with(daemons(url = "tcp://:0", correcttype = 1, token = TRUE), {8L - 9L + 1L}))
nanotestz(with(daemons(url = "tcp://:0", correcttype = c(1, 0), token = TRUE), {8L - 9L + 1L}))
nanotest(daemons(n = 2, "ws://:0") == 2L)
nanotest(is.integer(nextget("pid")))
nanotest(length(nextget("urls")) == 2L)
Expand Down

0 comments on commit cedd0ca

Please sign in to comment.