Skip to content

Commit

Permalink
update serialization()
Browse files Browse the repository at this point in the history
  • Loading branch information
shikokuchuo committed Jul 1, 2024
1 parent 80514d3 commit f3ff0ec
Show file tree
Hide file tree
Showing 13 changed files with 167 additions and 173 deletions.
4 changes: 2 additions & 2 deletions DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: mirai
Type: Package
Title: Minimalist Async Evaluation Framework for R
Version: 1.1.0.9009
Version: 1.1.0.9010
Description: High-performance parallel code execution and distributed computing.
Designed for simplicity, a 'mirai' evaluates an R expression asynchronously,
on local or network resources, resolving automatically upon completion.
Expand Down Expand Up @@ -31,4 +31,4 @@ Suggests:
knitr,
markdown
VignetteBuilder: knitr
RoxygenNote: 7.3.1
RoxygenNote: 7.3.2
4 changes: 2 additions & 2 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# mirai 1.1.0.9009 (development)
# mirai 1.1.0.9010 (development)

* `serialization()` function signature simplified for clarity and ease of use.
* `serialization()` function signature and return value slightly modified for clarify. Successful registration / cancellation mesasges are no longer printed to the console.
* `dispatcher()` argument 'retry' now defaults to FALSE for consistency with non-dispatcher behaviour.
* `remote_config()` gains argument 'quote' to control whether or not to quote the daemon launch commmand, and now works with Slurm (thanks @michaelmayer2 #119).
* Ephemeral daemons now exit as soon as permissible, eiliminating the 2s linger period.
Expand Down
56 changes: 24 additions & 32 deletions R/daemons.R
Original file line number Diff line number Diff line change
Expand Up @@ -489,41 +489,34 @@ status <- function(.compute = "default") {
#' Registers custom serialization and unserialization functions for sending and
#' receiving reference objects.
#'
#' @param fns \strong{either} a list comprising 2 functions: \cr serialization
#' function: must accept a reference object (or list of objects) inheriting
#' from \sQuote{class} and return a raw vector.\cr unserialization function:
#' must accept a raw vector and return a reference object (or list of
#' reference objects).\cr \strong{or else} NULL to reset.
#' @param class the class of reference object (as a character string) that these
#' functions are applied to, e.g. 'ArrowTabular' or 'torch_tensor',
#' \strong{or else} NULL to cancel registered functions.
#' @param sfunc serialization function: must accept a reference object (or list
#' of objects) inheriting from \sQuote{class} and return a raw vector.
#' @param ufunc unserialization function: must accept a raw vector and return
#' a reference object (or list of reference objects).
#' functions are applied to, e.g. 'ArrowTabular' or 'torch_tensor'.
#' @param vec [default FALSE] if FALSE the functions must accept and return
#' reference objects individually e.g. \code{arrow::write_to_raw} and
#' \code{arrow::read_ipc_stream}. If TRUE, the functions are vectorized and
#' must accept and return a list of reference objects, e.g.
#' \code{torch::torch_serialize} and \code{torch::torch_load}.
#'
#' @return Invisibly, a list comprising the currently-registered values for
#' 'class', 'sfunc', 'ufunc' and 'vec', or else NULL if unregistered. A
#' message is printed to the console when functions are successfully
#' registered or reset.
#' @return Invisibly, a list comprising 'fns', class', and 'vec', or else NULL
#' if supplied to 'fns'.
#'
#' @details Registering new functions replaces any existing registered
#' functions.
#'
#' To cancel registered functions, specify 'class' as NULL, without the
#' need to supply 'sfunc' or 'ufunc'.
#'
#' Calling without any arguments returns the pairlist of
#' currently-registered serialization functions.
#' @details Registering new functions replaces any existing registered functions.
#'
#' This function may be called prior to or after setting daemons, with the
#' registered functions applying across all compute profiles.
#'
#' Calling without any arguments returns a list comprising the registered
#' values for 'fns', class', and 'vec', or else NULL if not registered.
#'
#' @examples
#' reg <- serialization(
#' class = "",
#' sfunc = function(x) serialize(x, NULL),
#' ufunc = base::unserialize
#' list(function(x) serialize(x, NULL), base::unserialize),
#' class = "example_class"
#' )
#' reg
#'
Expand All @@ -532,24 +525,23 @@ status <- function(.compute = "default") {
#'
#' @export
#'
serialization <- function(class, sfunc, ufunc, vec = FALSE) {
serialization <- function(fns, class, vec = FALSE) {

missing(class) && return(.[["serial"]])
missing(fns) && return(.[["serial"]])

if (is.null(class)) {
if (is.null(fns)) {
serial <- NULL
next_config(NULL)
cat("mirai serialization functions cancelled\n", file = stderr())
} else if (is.character(class) && is.function(sfunc) && is.function(ufunc)) {
serial <- list(class, sfunc, ufunc, vec)
next_config(refhook = list(sfunc, ufunc), class = class, vec = vec)
cat("mirai serialization functions registered\n", file = stderr())
} else if (length(fns) == 2L && is.function(fns[[1L]]) && is.function(fns[[2L]])) {
is.character(class) || stop(._[["character_class"]])
serial <- list(fns, class, vec)
next_config(fns, class = class, vec = vec)
} else {
stop(._[["serial_invalid"]])
}

`[[<-`(., "serial", serial)
register_everywhere(serial = serial)
register_everywhere(serial)
invisible(serial)

}
Expand Down Expand Up @@ -673,12 +665,12 @@ query_status <- function(envir) {
register_everywhere <- function(serial)
for (.compute in names(..))
everywhere(
mirai::serialization(class = serial[[1L]], sfunc = serial[[2L]], ufunc = serial[[3L]], vec = serial[[4L]]),
mirai::serialization(serial[[1L]], class = serial[[2L]], vec = serial[[3L]]),
.args = list(serial = serial),
.compute = .compute
)

check_register_everywhere <- function(serial = .[["serial"]])
if (length(serial[[1L]])) register_everywhere(serial = serial)
if (length(serial[[1L]])) register_everywhere(serial)

._scm_. <- as.raw(c(0x07, 0x00, 0x00, 0x00, 0x42, 0x0a, 0x03, 0x00, 0x00, 0x00, 0x02, 0x03, 0x04, 0x00, 0x00, 0x05, 0x03, 0x00, 0x05, 0x00, 0x00, 0x00, 0x55, 0x54, 0x46, 0x2d, 0x38, 0xfc, 0x00, 0x00, 0x00))
3 changes: 2 additions & 1 deletion R/mirai-package.R
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
._ <- list2env(
list(
arglen = "'args' and/or 'url' must be of length 1 or the same length",
character_class = "'class' must be a character string",
cluster_inactive = "cluster is no longer active",
correct_context = "'host' must be specified if not using directly in a function argument",
daemons_unset = "a numeric value for 'url' requires daemons to be set",
Expand All @@ -98,7 +99,7 @@
register_cluster = "this function requires a more recent version of R",
requires_daemons = "launching one local daemon as none previously set",
requires_local = "SSH tunnelling requires 'url' hostname to be '127.0.0.1' or 'localhost'",
serial_invalid = "'class' must be a character value or NULL, 'sfunc' and 'ufunc' must be functions",
serial_invalid = "'fns' must be a list of 2 functions or NULL",
single_url = "only one 'url' should be specified",
sync_timeout = "initial sync with dispatcher/daemon timed out after 10s",
url_spec = "numeric value for 'url' is out of bounds",
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ method:

``` r
m[]
#> [1] 50.10154
#> [1] 47.76782
```

It is not necessary to wait, as the mirai resolves automatically
Expand All @@ -94,7 +94,7 @@ available at `$data`.
m
#> < mirai [$data] >
m$data
#> [1] 50.10154
#> [1] 47.76782
```

### Daemons
Expand Down
40 changes: 16 additions & 24 deletions man/serialization.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions tests/tests.R
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ nanotesterr(launch_remote(c("tcp://localhost:5555", "tcp://localhost:6666", "tcp
nanotesterr(launch_local(1L), "requires daemons to be set")
nanotestn(everywhere(mirai::serialization()))
nanotestn(serialization())
nanotesterr(serialization(list(NULL)), "must be a character value or NULL")
nanotesterr(serialization(list(NULL)), "must be a list of 2 functions or NULL")
nanotesterr(serialization(list(identity, identity), class = NA), "must be a character string")
nanotest(is.character(host_url()))
nanotest(substr(host_url(ws = TRUE, tls = TRUE), 1L, 3L) == "wss")
nanotest(substr(host_url(tls = TRUE), 1L, 3L) == "tls")
Expand Down Expand Up @@ -240,8 +241,8 @@ connection && .Platform[["OS.type"]] != "windows" && Sys.getenv("NOT_CRAN") == "
nanotestz(sum(tstatus[, "assigned"]))
nanotestz(sum(tstatus[, "complete"]))
nanotestz(daemons(0))
nanotest(is.list(serialization(class = "", sfunc = function(x) serialize(x, NULL), ufunc = unserialize)))
nanotest(is.function(serialization()[[2L]]))
nanotest(is.list(serialization(fns = list(function(x) serialize(x, NULL), unserialize), class = "tst_cls")))
nanotest(is.function(serialization()[[1L]][[2L]]))
nanotesto(daemons(url = "wss://127.0.0.1:0", token = TRUE, pass = "test"))
nanotestn(launch_local(1L))
Sys.sleep(1L)
Expand Down
13 changes: 7 additions & 6 deletions vignettes/databases.Rmd
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,11 @@ everywhere({
con <<- dbConnect(adbi::adbi("adbcsqlite"), uri = ":memory:")
})
serialization(
class = "nanoarrow_array_stream",
sfunc = arrow::write_to_raw,
ufunc = function(x) arrow::read_ipc_stream(x, as_data_frame = FALSE)
list(
arrow::write_to_raw,
function(x) arrow::read_ipc_stream(x, as_data_frame = FALSE)
),
class = "nanoarrow_array_stream"
)
```
`mirai()` calls may then be used to write to or query the database all in the Arrow format.
Expand Down Expand Up @@ -249,9 +251,8 @@ server <- function(input, output, session) {

# serialization() specifies the native Arrow serialization functions
serialization(
class = "nanoarrow_array_stream",
sfunc = arrow::write_to_raw,
ufunc = nanoarrow::read_nanoarrow
list(arrow::write_to_raw, nanoarrow::read_nanoarrow),
class = "nanoarrow_array_stream"
)

# run Shiny app
Expand Down
13 changes: 7 additions & 6 deletions vignettes/databases.Rmd.orig
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,11 @@ everywhere({
con <<- dbConnect(adbi::adbi("adbcsqlite"), uri = ":memory:")
})
serialization(
class = "nanoarrow_array_stream",
sfunc = arrow::write_to_raw,
ufunc = function(x) arrow::read_ipc_stream(x, as_data_frame = FALSE)
list(
arrow::write_to_raw,
function(x) arrow::read_ipc_stream(x, as_data_frame = FALSE)
),
class = "nanoarrow_array_stream"
)
```
`mirai()` calls may then be used to write to or query the database all in the Arrow format.
Expand Down Expand Up @@ -182,9 +184,8 @@ server <- function(input, output, session) {

# serialization() specifies the native Arrow serialization functions
serialization(
class = "nanoarrow_array_stream",
sfunc = arrow::write_to_raw,
ufunc = nanoarrow::read_nanoarrow
list(arrow::write_to_raw, nanoarrow::read_nanoarrow),
class = "nanoarrow_array_stream"
)

# run Shiny app
Expand Down
Loading

0 comments on commit f3ff0ec

Please sign in to comment.