Skip to content

Commit

Permalink
exports local_url()
Browse files Browse the repository at this point in the history
  • Loading branch information
shikokuchuo committed Dec 28, 2023
1 parent 40b45fe commit f9176bb
Show file tree
Hide file tree
Showing 9 changed files with 52 additions and 23 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: mirai
Type: Package
Title: Minimalist Async Evaluation Framework for R
Version: 0.11.3.9001
Version: 0.11.3.9002
Description: Lightweight parallel code execution and distributed computing.
Designed for simplicity, a 'mirai' evaluates an R expression asynchronously,
on local or network resources, resolving automatically upon completion.
Expand Down
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ export(is_mirai_error)
export(is_mirai_interrupt)
export(launch_local)
export(launch_remote)
export(local_url)
export(make_cluster)
export(mirai)
export(nextget)
Expand Down
3 changes: 2 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# mirai 0.11.3.9001 (development)
# mirai 0.11.3.9002 (development)

* Adds `local_url()` helper function to construct a random URL for inter-process communications on the local machine.
* `daemon()` argument 'autoexit' now accepts a signal value such as `tools::SIGINT` in order to raise it upon exit.
* Eliminates potential memory leaks along certain error paths.
* Requires nanonext >= [0.11.0.9001].
Expand Down
8 changes: 3 additions & 5 deletions R/daemons.R
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ daemons <- function(n, url = NULL, remote = NULL, dispatcher = TRUE, ...,
if (dispatcher) {
n <- if (missing(n)) length(url) else if (is.numeric(n) && n >= 1L) as.integer(n) else stop(._[["n_one"]])
if (length(tls)) tls_config(server = tls, pass = pass)
urld <- auto_tokenized_url()
urld <- local_url()
urlc <- strcat(urld, "c")
sock <- req_socket(urld, resend = 0L)
sockc <- req_socket(urlc, resend = 0L)
Expand Down Expand Up @@ -319,7 +319,7 @@ daemons <- function(n, url = NULL, remote = NULL, dispatcher = TRUE, ...,

n > 0L || stop(._[["n_zero"]])
envir <- new.env(hash = FALSE, parent = ..)
urld <- auto_tokenized_url()
urld <- local_url()
cv <- cv()
create_stream(n = n, seed = seed, envir = envir)
if (dispatcher) {
Expand Down Expand Up @@ -486,9 +486,7 @@ create_stream <- function(n, seed, envir) {
`[[<-`(.GlobalEnv, ".Random.seed", oseed)
}

auto_tokenized_url <- function() strcat(.urlscheme, random(12L))

new_tokenized_url <- function(url) sprintf("%s/%s", url, random(12L))
tokenized_url <- function(url) sprintf("%s/%s", url, random(12L))

req_socket <- function(url, tls = NULL, resend = .intmax)
`opt<-`(socket(protocol = "req", listen = url, tls = tls), "req:resend-time", resend)
Expand Down
6 changes: 3 additions & 3 deletions R/dispatcher.R
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ dispatcher <- function(host, url = NULL, n = NULL, ..., asyncdial = FALSE,
if (vectorised) url[i] else
if (is.null(ports)) sprintf("%s/%d", url, i) else
sub(ports[1L], ports[i], url, fixed = TRUE)
nurl <- if (auto) auto_tokenized_url() else if (token) new_tokenized_url(burl) else burl
nurl <- if (auto) local_url() else if (token) tokenized_url(burl) else burl
ncv <- cv()
nsock <- req_socket(NULL)
pipe_notify(nsock, cv = ncv, cv2 = cv, add = TRUE, remove = TRUE)
Expand Down Expand Up @@ -167,7 +167,7 @@ dispatcher <- function(host, url = NULL, n = NULL, ..., asyncdial = FALSE,
if (i > 0L && !activevec[[i]]) {
reap(attr(servers[[i]], "listener")[[1L]])
attr(servers[[i]], "listener") <- NULL
data <- servernames[i] <- if (auto) auto_tokenized_url() else new_tokenized_url(basenames[i])
data <- servernames[i] <- if (auto) local_url() else tokenized_url(basenames[i])
instance[i] <- -abs(instance[i])
listen(servers[[i]], url = data, tls = tls, error = TRUE)

Expand All @@ -177,7 +177,7 @@ dispatcher <- function(host, url = NULL, n = NULL, ..., asyncdial = FALSE,
servers[[i]] <- nsock <- req_socket(NULL)
pipe_notify(nsock, cv = active[[i]], cv2 = cv, add = TRUE, remove = TRUE)
lock(nsock, cv = active[[i]])
data <- servernames[i] <- if (auto) auto_tokenized_url() else new_tokenized_url(basenames[i])
data <- servernames[i] <- if (auto) local_url() else tokenized_url(basenames[i])
instance[i] <- -abs(instance[i])
listen(nsock, url = data, tls = tls, error = TRUE)

Expand Down
25 changes: 21 additions & 4 deletions R/launchers.R
Original file line number Diff line number Diff line change
Expand Up @@ -307,9 +307,9 @@ ssh_config <- function(remotes, timeout = 5, tunnel = FALSE, command = "ssh", rs

}

#' Host URL Constructor
#' URL Constructors
#'
#' Automatically constructs a valid host URL (at which daemons may connect)
#' \code{host_url} constructs a valid host URL (at which daemons may connect)
#' based on the computer's hostname. This may be supplied directly to the
#' 'url' argument of \code{\link{daemons}}.
#'
Expand All @@ -321,13 +321,18 @@ ssh_config <- function(remotes, timeout = 5, tunnel = FALSE, command = "ssh", rs
#' connections from the network addresses the daemons are connecting from.
#' '0' is a wildcard value that automatically assigns a free ephemeral port.
#'
#' @return A character string comprising a valid host URL.
#' @return A character string comprising a valid URL.
#'
#' @details This implementation relies on using the host name of the computer
#' @details \code{host_url} relies on using the host name of the computer
#' rather than an IP address and typically works on local networks, although
#' this is not always guaranteed. If unsuccessful, substitute an IPv4 or
#' IPv6 address in place of the hostname.
#'
#' \code{local_url} generates a random URL for the platform's default
#' inter-process communications transport: abstract Unix domain sockets on
#' Linux, Unix domain sockets on MacOS, Solaris and other POSIX platforms,
#' and named pipes on Windows.
#'
#' @examples
#' host_url()
#' host_url(ws = TRUE)
Expand All @@ -344,6 +349,18 @@ host_url <- function(ws = FALSE, tls = FALSE, port = 0)
as.character(port)
)

#' URL Constructors
#'
#' \code{local_url} constructs a random URL suitable for local daemons.
#'
#' @examples
#' local_url()
#'
#' @rdname host_url
#' @export
#'
local_url <- function() strcat(.urlscheme, random(12L))

#' @export
#'
print.miraiLaunchCmd <- function(x, ...) {
Expand Down
2 changes: 1 addition & 1 deletion R/mirai.R
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ mirai <- function(.expr, ..., .args = list(), .timeout = NULL, .compute = "defau
aio <- request_signal(.context(envir[["sock"]]), data = data, cv = envir[["cv"]], send_mode = 3L, recv_mode = 1L, timeout = .timeout)

} else {
url <- auto_tokenized_url()
url <- local_url()
sock <- req_socket(url, resend = 0L)
launch_daemon(url)
aio <- request(.context(sock), data = data, send_mode = 1L, recv_mode = 1L, timeout = .timeout)
Expand Down
20 changes: 16 additions & 4 deletions man/host_url.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions tests/tests.R
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ if (connection) {
if (connection && .Platform[["OS.type"]] != "windows") {
nanotest(is.character(launch_remote("ws://[::1]:5555", remote = remote_config(command = "echo", args = list(c("Test out:", ".", ">/dev/null")), rscript = "/usr/lib/R/bin/Rscript"))))
nanotest(is.character(launch_remote("tcp://localhost:5555", remote = ssh_config(remotes = c("ssh://remotehost", "ssh://remotenode"), tunnel = TRUE, command = "echo"))))
nanotest(daemons(url = value <- mirai:::auto_tokenized_url(), dispatcher = FALSE) == value)
nanotest(daemons(url = value <- local_url(), dispatcher = FALSE) == value)
nanotest(grepl("://", launch_remote(status()$daemons), fixed = TRUE))
nanotestn(launch_local(nextget("urls")))
if (requireNamespace("promises", quietly = TRUE)) {
Expand All @@ -106,7 +106,7 @@ if (connection && .Platform[["OS.type"]] != "windows") {
Sys.sleep(1L)
nanotestz(daemons(NULL))
Sys.sleep(1L)
nanotestn(launch_local(mirai:::auto_tokenized_url(), .compute = "test"))
nanotestn(launch_local(local_url(), .compute = "test"))
Sys.sleep(1L)
nanotest(daemons(n = 2L, url = value <- "ws://:0", dispatcher = FALSE, remote = remote_config()) != value)
nanotestz(daemons(0L))
Expand Down Expand Up @@ -167,10 +167,10 @@ if (connection) {
}
# additional parallel cluster tests
if (connection && .Platform[["OS.type"]] != "windows") {
nanotestp(cl <- make_cluster(url = mirai:::auto_tokenized_url()))
nanotestp(cl <- make_cluster(url = local_url()))
nanotestn(stopCluster(cl))
Sys.sleep(1L)
nanotestp(cl <- make_cluster(n = 1, url = mirai:::auto_tokenized_url(), remote = remote_config()))
nanotestp(cl <- make_cluster(n = 1, url = local_url(), remote = remote_config()))
nanotestn(stopCluster(cl))
Sys.sleep(1L)
}
Expand Down

0 comments on commit f9176bb

Please sign in to comment.