Skip to content

Commit

Permalink
enable retry for threaded dispatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
shikokuchuo committed Nov 18, 2024
1 parent a40319e commit f0f8ba6
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 4 deletions.
3 changes: 2 additions & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ Encoding: UTF-8
Depends:
R (>= 3.6)
Imports:
nanonext (>= 1.3.0)
nanonext (>= 1.3.2.9000)
Enhances:
parallel,
promises
Expand All @@ -40,3 +40,4 @@ Suggests:
litedown
VignetteBuilder: litedown
RoxygenNote: 7.3.2
Remotes: shikokuchuo/nanonext
4 changes: 4 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# mirai 1.3.1.9000 (development)

#### New Features

* `daemons(dispatcher = "thread")` now also respects argument 'retry' to govern whether to auto-retry failed tasks or else return an error.

# mirai 1.3.1

#### Updates
Expand Down
11 changes: 9 additions & 2 deletions R/daemons.R
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,13 @@ daemons <- function(n, url = NULL, remote = NULL, dispatcher = c("process", "thr
{
n <- if (missing(n)) length(url) else if (is.numeric(n) && n >= 1L) as.integer(n) else stop(._[["n_one"]])
urls <- resolve_dispatcher_urls(n = n, url = url)
sock <- .dispatcher(host = inproc_url(), url = urls, tls = if (length(tls)) tls_config(server = tls, pass = pass))
retry <- list(...)[["retry"]]
sock <- .dispatcher(
host = inproc_url(),
url = urls,
tls = if (length(tls)) tls_config(server = tls, pass = pass),
retry = if (is.null(retry)) FALSE else retry
)
`[[<-`(`[[<-`(envir, "urls", urls), "dispatcher", TRUE)
},
{
Expand Down Expand Up @@ -362,7 +368,8 @@ daemons <- function(n, url = NULL, remote = NULL, dispatcher = c("process", "thr
},
{
urls <- auto_dispatcher_urls(n = n, url = urld)
sock <- .dispatcher(host = inproc_url(), url = urls)
retry <- list(...)[["retry"]]
sock <- .dispatcher(host = inproc_url(), url = urls, retry = if (is.null(retry)) FALSE else retry)
for (i in seq_len(n))
launch_daemon(wa3(urls[i], dots, next_stream(envir)), output)
`[[<-`(`[[<-`(envir, "urls", urls), "dispatcher", TRUE)
Expand Down
2 changes: 1 addition & 1 deletion tests/tests.R
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ connection && Sys.getenv("NOT_CRAN") == "true" && {
test_equal(length(urls <- nextget("urls")), 2L)
test_identical(as.integer(nanonext::parse_url(urls[[2L]])[["port"]]), as.integer(nanonext::parse_url(urls[[1L]])[["port"]]) + 1L)
test_zero(daemons(0))
test_equal(daemons(2, url = host_url(ws = TRUE), dispatcher = "thread"), 2L)
test_equal(daemons(2, url = host_url(ws = TRUE), dispatcher = "thread", retry = TRUE), 2L)
test_equal(length(urls <- nextget("urls")), 2L)
test_true(endsWith(urls[[1L]], "/1"))
test_true(endsWith(urls[[2L]], "/2"))
Expand Down

0 comments on commit f0f8ba6

Please sign in to comment.