Skip to content

Commit

Permalink
reorg internal functions
Browse files Browse the repository at this point in the history
  • Loading branch information
shikokuchuo committed Oct 1, 2023
1 parent b859a7b commit 19717c8
Show file tree
Hide file tree
Showing 8 changed files with 178 additions and 186 deletions.
18 changes: 18 additions & 0 deletions R/daemon.R
Original file line number Diff line number Diff line change
Expand Up @@ -163,3 +163,21 @@ daemon <- function(url, asyncdial = FALSE, maxtasks = Inf, idletime = Inf,
wait(cv)

}

# internals --------------------------------------------------------------------

dial_and_sync_socket <- function(sock, url, asyncdial, tls = NULL) {
cv <- cv()
if (length(tls) && !asyncdial) {
pipe_notify(sock, cv = cv, add = TRUE, remove = FALSE, flag = TRUE)
dial(sock, url = url, autostart = TRUE, tls = tls, error = TRUE)
until(cv, .timelimit) && stop(.messages[["sync_timeout"]])
} else {
pipe_notify(sock, cv = cv, add = TRUE, remove = FALSE, flag = FALSE)
dial(sock, url = url, autostart = length(tls) || asyncdial || NA, tls = tls, error = TRUE)
wait(cv)
}
}

parse_cleanup <- function(cleanup)
c(cleanup %% 2L, (clr <- as.raw(cleanup)) & as.raw(2L), clr & as.raw(4L), clr & as.raw(8L))
73 changes: 73 additions & 0 deletions R/daemons.R
Original file line number Diff line number Diff line change
Expand Up @@ -397,3 +397,76 @@ status <- function(.compute = "default") {
daemons = if (length(envir[["sockc"]])) query_status(envir) else if (length(envir[["urls"]])) envir[["urls"]] else 0L)

}

# internals --------------------------------------------------------------------

req_socket <- function(url, tls = NULL, resend = .intmax)
`opt<-`(socket(protocol = "req", listen = url, tls = tls), "req:resend-time", resend)

parse_dots <- function(...)
if (missing(...)) "" else {
dots <- list(...)
for (dot in dots)
is.numeric(dot) || is.logical(dot) || stop(.messages[["wrong_dots"]])
dnames <- names(dots)
dots <- strcat(",", paste(dnames, dots, sep = "=", collapse = ","))
"output" %in% dnames && return(`class<-`(dots, "output"))
dots
}

parse_tls <- function(tls)
switch(length(tls) + 1L,
"",
sprintf(",tls='%s'", tls),
sprintf(",tls=c('%s','%s')", tls[1L], tls[2L]))

write_args <- function(dots, rs = NULL, tls = NULL, libpath = NULL)
shQuote(switch(length(dots),
sprintf("mirai::.daemon('%s')", dots[[1L]]),
sprintf("mirai::daemon('%s'%s%s)", dots[[1L]], dots[[2L]], parse_tls(tls)),
sprintf("mirai::daemon('%s'%s%s,rs=c(%s))", dots[[1L]], dots[[2L]], parse_tls(tls), paste0(dots[[3L]], collapse = ",")),
sprintf(".libPaths(c('%s',.libPaths()));mirai::dispatcher('%s',n=%d,rs=c(%s),monitor='%s'%s)", libpath, dots[[1L]], dots[[3L]], paste0(rs, collapse= ","), dots[[4L]], dots[[2L]]),
sprintf(".libPaths(c('%s',.libPaths()));mirai::dispatcher('%s',c('%s'),n=%d,monitor='%s'%s)", libpath, dots[[1L]], paste0(dots[[3L]], collapse = "','"), dots[[4L]], dots[[5L]], dots[[2L]])))

launch_daemon <- function(..., rs = NULL, tls = NULL) {
dots <- list(...)
dlen <- length(dots)
output <- dlen > 1L && is.object(dots[[2L]])
libpath <- if (dlen > 3L) (lp <- .libPaths())[file.exists(file.path(lp, "mirai"))][1L]
system2(command = .command, args = c(if (length(libpath)) "--vanilla", "-e", write_args(dots, rs = rs, tls = tls, libpath = libpath)), stdout = if (output) "", stderr = if (output) "", wait = FALSE)
}

launch_and_sync_daemon <- function(sock, ..., rs = NULL, tls = NULL, pass = NULL) {
cv <- cv()
pipe_notify(sock, cv = cv, add = TRUE, remove = FALSE, flag = TRUE)
if (is.character(tls)) {
switch(
length(tls),
{
on.exit(Sys.unsetenv("MIRAI_TEMP_FIELD1"))
Sys.setenv(MIRAI_TEMP_FIELD1 = tls)
Sys.unsetenv("MIRAI_TEMP_FIELD2")
},
{
on.exit(Sys.unsetenv(c("MIRAI_TEMP_FIELD1", "MIRAI_TEMP_FIELD2")))
Sys.setenv(MIRAI_TEMP_FIELD1 = tls[1L])
Sys.setenv(MIRAI_TEMP_FIELD2 = tls[2L])
}
)
if (is.character(pass)) {
on.exit(Sys.unsetenv("MIRAI_TEMP_VAR"), add = TRUE)
Sys.setenv(MIRAI_TEMP_VAR = pass)
}
}
launch_daemon(..., rs = rs)
until(cv, .timelimit) && stop(if (...length() < 3L) .messages[["sync_timeout"]] else .messages[["sync_dispatch"]])
}

create_stream <- function(n, seed, envir) {
rexp(n = 1L)
oseed <- .GlobalEnv[[".Random.seed"]]
RNGkind("L'Ecuyer-CMRG")
if (length(seed)) set.seed(seed)
`[[<-`(envir, "stream", .GlobalEnv[[".Random.seed"]])
`[[<-`(.GlobalEnv, ".Random.seed", oseed)
}
34 changes: 34 additions & 0 deletions R/dispatcher.R
Original file line number Diff line number Diff line change
Expand Up @@ -293,3 +293,37 @@ saisei <- function(i, force = FALSE, .compute = "default") {
r

}

# internals --------------------------------------------------------------------

auto_tokenized_url <- function() strcat(.urlscheme, random(12L))

new_tokenized_url <- function(url) sprintf("%s/%s", url, random(12L))

sub_real_port <- function(port, url) sub("(?<=:)0(?![^/])", port, url, perl = TRUE)

query_dispatcher <- function(sock, command, mode) {
send(sock, data = command, mode = 2L, block = .timelimit)
recv(sock, mode = mode, block = .timelimit)
}

query_status <- function(envir) {
res <- query_dispatcher(sock = envir[["sockc"]], command = 0L, mode = 5L)
is.object(res) && return(res)
`attributes<-`(res, list(dim = c(envir[["n"]], 5L),
dimnames = list(envir[["urls"]], c("i", "online", "instance", "assigned", "complete"))))
}

init_monitor <- function(sockc, envir) {
res <- query_dispatcher(sockc, command = 0L, mode = 2L)
is.object(res) && stop(.messages[["sync_timeout"]])
`[[<-`(`[[<-`(`[[<-`(envir, "sockc", sockc), "urls", res[-1L]), "pid", as.integer(res[1L]))
}

get_and_reset_env <- function(x) {
candidate <- Sys.getenv(x)
if (nzchar(candidate)) {
Sys.unsetenv(x)
candidate
}
}
186 changes: 0 additions & 186 deletions R/internals.R

This file was deleted.

26 changes: 26 additions & 0 deletions R/launchers.R
Original file line number Diff line number Diff line change
Expand Up @@ -290,3 +290,29 @@ ssh_args <- function(names, port = 22, timeout = 5, tunnel = FALSE) {
args

}

# internals --------------------------------------------------------------------

find_dot <- function(args) {
sel <- args == "."
any(sel) || stop(.messages[["dot_required"]])
sel
}

process_url <- function(url, .compute) {
if (is.numeric(url)) {
vec <- ..[[.compute]][["urls"]]
is.null(vec) && stop(.messages[["daemons_unset"]])
all(url >= 1L, url <= length(vec)) || stop(.messages[["url_spec"]])
url <- vec[url]
} else {
lapply(url, parse_url)
}
url
}

parse_check_local_url <- function(url) {
purl <- parse_url(url)
purl[["hostname"]] %in% c("localhost", "127.0.0.1") || stop(.messages[["requires_local"]])
purl
}
14 changes: 14 additions & 0 deletions R/mirai.R
Original file line number Diff line number Diff line change
Expand Up @@ -387,3 +387,17 @@ print.miraiInterrupt <- function(x, ...) {
invisible(x)

}

# internals --------------------------------------------------------------------

mk_interrupt_error <- function(e) `class<-`("", c("miraiInterrupt", "errorValue"))

mk_mirai_error <- function(e) {
x <- .subset2(e, "call")
call <- if (length(x)) deparse(x, width.cutoff = 500L, backtick = TRUE, control = NULL, nlines = 1L)
msg <- if (is.null(call) || call == "eval(expr = ._mirai_.[[\".expr\"]], envir = ._mirai_., enclos = NULL)")
sprintf("Error: %s", .subset2(e, "message")) else
sprintf("Error in %s: %s", call, .subset2(e, "message"))
cat(strcat(msg, "\n"), file = stderr());
`class<-`(msg, c("miraiError", "errorValue", "try-error"))
}
9 changes: 9 additions & 0 deletions R/next.R
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,12 @@ nextstream <- function(.compute = "default") next_stream(..[[.compute]])
#' @export
#'
nextget <- function(x, .compute = "default") ..[[.compute]][[x]]

# internals --------------------------------------------------------------------

next_stream <- function(envir) {
stream <- envir[["stream"]]
length(stream) || return()
`[[<-`(envir, "stream", nextRNGStream(stream))
stream
}
4 changes: 4 additions & 0 deletions R/parallel.R
Original file line number Diff line number Diff line change
Expand Up @@ -208,4 +208,8 @@ print.miraiNode <- function(x, ...) {

}

# internals --------------------------------------------------------------------

node_unresolved <- function(node) unresolved(.subset2(node, "mirai"))

# nocov end

0 comments on commit 19717c8

Please sign in to comment.