Skip to content

Commit

Permalink
tidy ups
Browse files Browse the repository at this point in the history
  • Loading branch information
shikokuchuo committed Sep 18, 2024
1 parent afb8cf5 commit bf7c0a8
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 20 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: mirai
Type: Package
Title: Minimalist Async Evaluation Framework for R
Version: 1.2.0.9019
Version: 1.2.0.9020
Description: Designed for simplicity, a 'mirai' evaluates an R expression
asynchronously in a parallel process, locally or distributed over the
network, with the result automatically available upon completion. Modern
Expand Down
2 changes: 1 addition & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# mirai 1.2.0.9019 (development)
# mirai 1.2.0.9020 (development)

* `daemons(dispatcher = "thread")` implements threaded dispatcher (experimental), a faster and more efficient way of running dispatcher logic than in a background process.
* `daemons()` behavioural changes:
Expand Down
29 changes: 14 additions & 15 deletions R/daemons.R
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,15 @@
#'
#' @section Dispatcher:
#'
#' By default \code{dispatcher = 'process'} launches a background process
#' By default \code{dispatcher = "process"} launches a background process
#' running \code{\link{dispatcher}}. Dispatcher connects to daemons on
#' behalf of the host and ensures optimal FIFO scheduling of tasks.
#'
#' Specifying \code{dispatcher = 'thread'} runs dispatcher logic on a new
#' Specifying \code{dispatcher = "thread"} runs dispatcher logic on a new
#' thread, a faster and more efficient alternative to using a separate
#' process. This is a new feature and should be considered experimental.
#'
#' Specifying \code{dispatcher = 'none'}, uses the default behaviour without
#' Specifying \code{dispatcher = "none"}, uses the default behaviour without
#' additional dispatcher logic. In this case daemons connect directly to the
#' host and tasks are distributed in a round-robin fashion. Optimal
#' scheduling is not guaranteed as the duration of tasks cannot be known
Expand Down Expand Up @@ -293,9 +293,8 @@ daemons <- function(n, url = NULL, remote = NULL, dispatcher = c("process", "thr
if (is.character(url)) {

if (is.null(envir)) {
envir <- new.env(hash = FALSE, parent = ..)
envir <- init_envir_stream(seed = seed)
tls <- check_create_tls(url = url, tls = tls, envir = envir)
create_stream(n = n, seed = seed, envir = envir)
switch(
parse_dispatcher(dispatcher[1L]),
{
Expand All @@ -320,7 +319,7 @@ daemons <- function(n, url = NULL, remote = NULL, dispatcher = c("process", "thr
},
{
sock <- req_socket(url, tls = if (length(tls)) tls_config(server = tls, pass = pass))
store_urls(sock = sock, envir = envir)
check_store_url(sock = sock, envir = envir)
n <- 0L
},
stop(._[["dispatcher_args"]])
Expand Down Expand Up @@ -352,9 +351,8 @@ daemons <- function(n, url = NULL, remote = NULL, dispatcher = c("process", "thr
} else if (is.null(envir)) {

n > 0L || stop(._[["n_zero"]])
envir <- new.env(hash = FALSE, parent = ..)
envir <- init_envir_stream(seed = seed)
urld <- local_url()
create_stream(n = n, seed = seed, envir = envir)
dots <- parse_dots(...)
output <- attr(dots, "output")
switch(
Expand Down Expand Up @@ -593,13 +591,14 @@ check_create_tls <- function(url, tls, envir) {
tls
}

create_stream <- function(n, seed, envir) {
init_envir_stream <- function(seed) {
.advance()
oseed <- .GlobalEnv[[".Random.seed"]]
RNGkind("L'Ecuyer-CMRG")
if (length(seed)) set.seed(seed)
`[[<-`(envir, "stream", .GlobalEnv[[".Random.seed"]])
envir <- `[[<-`(new.env(hash = FALSE, parent = ..), "stream", .GlobalEnv[[".Random.seed"]])
`[[<-`(.GlobalEnv, ".Random.seed", oseed)
envir
}

# "%s-%d" format required by IPC under MacOS
Expand Down Expand Up @@ -695,12 +694,12 @@ launch_sync_daemons <- function(seq, sock, urld, dots, envir, output) {
store_dispatcher <- function(sockc, res, cv, envir)
`[[<-`(`[[<-`(`[[<-`(`[[<-`(envir, "sockc", sockc), "urls", res[-1L]), "pid", as.integer(res[1L])), "cv", cv)

store_urls <- function(sock, envir) {
check_store_url <- function(sock, envir) {
listener <- attr(sock, "listener")[[1L]]
urls <- opt(listener, "url")
if (parse_url(urls)[["port"]] == "0")
urls <- sub_real_port(port = opt(listener, "tcp-bound-port"), url = urls)
`[[<-`(envir, "urls", urls)
url <- opt(listener, "url")
if (parse_url(url)[["port"]] == "0")
url <- sub_real_port(port = opt(listener, "tcp-bound-port"), url = url)
`[[<-`(envir, "urls", url)
}

send_signal <- function(envir) {
Expand Down
6 changes: 3 additions & 3 deletions man/daemons.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit bf7c0a8

Please sign in to comment.