Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow more granular control of daemons #153

Merged
merged 1 commit into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: mirai
Type: Package
Title: Minimalist Async Evaluation Framework for R
Version: 1.2.0.9014
Version: 1.2.0.9015
Description: Designed for simplicity, a 'mirai' evaluates an R expression
asynchronously in a parallel process, locally or distributed over the
network, with the result automatically available upon completion. Modern
Expand Down
4 changes: 3 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
# mirai 1.2.0.9014 (development)
# mirai 1.2.0.9015 (development)

* `daemons(dispatcher = NA)` now provides access to threaded dispatcher (experimental). This implements dispatcher using a thread rather than an external process and is faster and more efficient.
* `mirai_map()` behavioural changes:
- Combining multiple collection options becomes easier, allowing for instance `x[.stop, .progress]`.
- Adds `mirai_map()[.progress_cli]` as an alternative progress indicator, using the 'cli' package to show % complete and ETA.
- Now only performs multiple map over the rows of matrices and dataframes (thanks @andrewGhazi, #147).
* Fixes flatmap with `mirai_map()[.flat]` assigning a variable 'typ' to the calling environment.
* `daemon()` gains argument 'asyncdial' to allow control of connection behaviour independently of what happens when the daemon exits.
* `dispatcher()` drops argument 'asyncdial' as it is not practically very useful to set this here.
* `everywhere()` now errors if the specified compute profile is not yet set up, rather than fail silently.
* Internal performance enhancements.
* Requires `nanonext` >= [1.2.1.9018].
Expand Down
28 changes: 15 additions & 13 deletions R/daemon.R
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@
#' @param url the character host or dispatcher URL to dial into, including the
#' port to connect to (and optionally for websockets, a path), e.g.
#' 'tcp://hostname:5555' or 'ws://10.75.32.70:5555/path'.
#' @param asyncdial [default FALSE] whether to perform dials asynchronously. The
#' default FALSE will error if a connection is not immediately possible (for
#' instance if \code{\link{daemons}} has yet to be called on the host, or
#' the specified port is not open etc.). Specifying TRUE continues retrying
#' (indefinitely) if not immediately successful, which is more resilient but
#' can mask potential connection issues.
#' @param autoexit [default TRUE] logical value, whether the daemon should
#' exit automatically when its socket connection ends. If a signal from the
#' \pkg{tools} package, e.g. \code{tools::SIGINT}, or an equivalent integer
Expand Down Expand Up @@ -76,21 +82,17 @@
#' socket connection has ended.
#'
#' Instead of TRUE, supplying a signal from the \pkg{tools} package, e.g.
#' \code{tools::SIGINT}, or an equivalent integer value, sets the signal to
#' be raised when the socket connection ends. As an example, supplying
#' SIGINT allows a potentially more immediate exit by interrupting any
#' ongoing evaluation rather than letting it complete.
#' \code{tools::SIGINT}, or an equivalent integer value, sets this to be
#' raised when the socket connection ends. As an example, supplying SIGINT
#' allows a potentially more immediate exit by interrupting any ongoing
#' evaluation rather than letting it complete.
#'
#' Setting to FALSE allows the daemon to persist indefinitely even when
#' there is no longer a socket connection. This allows a host session to end
#' and a new session to connect at the URL where the daemon is dialled in.
#' Daemons must be terminated with \code{daemons(NULL)} in this case, which
#' sends explicit exit instructions to all connected daemons.
#'
#' Persistence also implies that dials are performed asynchronously, which
#' means retries are attempted (indefinitely) if not immediately successful.
#' This is resilient behaviour but can mask potential connection issues.
#'
#' @section Cleanup Options:
#'
#' The \sQuote{cleanup} argument also accepts an integer value, which
Expand All @@ -108,17 +110,17 @@
#'
#' @export
#'
daemon <- function(url, autoexit = TRUE, cleanup = TRUE, output = FALSE,
maxtasks = Inf, idletime = Inf, walltime = Inf, timerstart = 0L,
..., tls = NULL, rs = NULL) {
daemon <- function(url, asyncdial = FALSE, autoexit = TRUE, cleanup = TRUE,
output = FALSE, maxtasks = Inf, idletime = Inf, walltime = Inf,
timerstart = 0L, ..., tls = NULL, rs = NULL) {

cv <- cv()
sock <- socket(protocol = "rep")
on.exit(reap(sock))
`[[<-`(., "sock", sock)
autoexit && pipe_notify(sock, cv = cv, remove = TRUE, flag = as.integer(autoexit))
if (length(tls)) tls <- tls_config(client = tls)
dial_and_sync_socket(sock = sock, url = url, asyncdial = !autoexit, tls = tls)
dial_and_sync_socket(sock = sock, url = url, asyncdial = asyncdial, tls = tls)

if (is.numeric(rs)) `[[<-`(.GlobalEnv, ".Random.seed", as.integer(rs))
if (idletime > walltime) idletime <- walltime else if (idletime == Inf) idletime <- NULL
Expand Down Expand Up @@ -211,7 +213,7 @@ eval_mirai <- function(._mirai_.) {
)
}

dial_and_sync_socket <- function(sock, url, asyncdial, tls = NULL) {
dial_and_sync_socket <- function(sock, url, asyncdial = FALSE, tls = NULL) {
cv <- cv()
pipe_notify(sock, cv = cv, add = TRUE)
dial(sock, url = url, autostart = asyncdial || NA, tls = tls, error = TRUE)
Expand Down
15 changes: 4 additions & 11 deletions R/dispatcher.R
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,6 @@
#' @param ... (optional) additional arguments passed through to \code{\link{daemon}}.
#' These include \sQuote{autoexit}, \sQuote{cleanup}, \sQuote{maxtasks},
#' \sQuote{idletime}, \sQuote{walltime} and \sQuote{timerstart}.
#' @param asyncdial [default FALSE] whether to perform dials asynchronously. The
#' default FALSE will error if a connection is not immediately possible
#' (e.g. \code{\link{daemons}} has yet to be called on the host, or the
#' specified port is not open etc.). Specifying TRUE continues retrying
#' (indefinitely) if not immediately successful, which is more resilient but
#' can mask potential connection issues.
#' @param retry [default FALSE] logical value, whether to automatically retry
#' tasks where the daemon crashes or terminates unexpectedly on the next
#' daemon instance to connect. If TRUE, the mirai will remain unresolved but
Expand Down Expand Up @@ -78,9 +72,8 @@
#'
#' @export
#'
dispatcher <- function(host, url = NULL, n = NULL, ..., asyncdial = FALSE,
retry = FALSE, token = FALSE, tls = NULL, pass = NULL,
rs = NULL, monitor = NULL) {
dispatcher <- function(host, url = NULL, n = NULL, ..., retry = FALSE, token = FALSE,
tls = NULL, pass = NULL, rs = NULL, monitor = NULL) {

n <- if (is.numeric(n)) as.integer(n) else length(url)
n > 0L || stop(._[["missing_url"]])
Expand All @@ -89,14 +82,14 @@ dispatcher <- function(host, url = NULL, n = NULL, ..., asyncdial = FALSE,
sock <- socket(protocol = "rep")
on.exit(reap(sock))
pipe_notify(sock, cv = cv, remove = TRUE, flag = TRUE)
dial_and_sync_socket(sock = sock, url = host, asyncdial = asyncdial)
dial_and_sync_socket(sock = sock, url = host)

ctrchannel <- is.character(monitor)
if (ctrchannel) {
sockc <- socket(protocol = "rep")
on.exit(reap(sockc), add = TRUE, after = FALSE)
pipe_notify(sockc, cv = cv, remove = TRUE, flag = TRUE)
dial_and_sync_socket(sock = sockc, url = monitor, asyncdial = asyncdial)
dial_and_sync_socket(sock = sockc, url = monitor)
cmessage <- recv(sockc, mode = 2L, block = .limit_long)
is.object(cmessage) && stop(._[["sync_timeout"]])
if (nzchar(cmessage[2L]))
Expand Down
20 changes: 12 additions & 8 deletions man/daemon.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 0 additions & 8 deletions man/dispatcher.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion tests/tests.R
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ connection && {
nanotest(is_error_value(call_mirai_(m)$data) || m$data == 8L)
nanotestn(stop_mirai(m))
Sys.sleep(1L)
nanotesto(d <- daemons(1L, dispatcher = FALSE, seed = 1546L))
nanotesto(d <- daemons(1L, dispatcher = FALSE, asyncdial = FALSE, seed = 1546L))
nanotestp(d)
me <- mirai(mirai::mirai(), .timeout = 2000L)
nanotest(is_mirai_error(call_mirai(me)$data) || is_error_value(me$data))
Expand Down
Loading