From baa099d66656cb79f6192b98e3deb4dfc6741234 Mon Sep 17 00:00:00 2001 From: xiaodaigh Date: Sun, 9 Aug 2020 00:34:16 +1000 Subject: [PATCH 1/9] refactoring for better nse --- DESCRIPTION | 9 +- NAMESPACE | 25 - R/add_chunk.r | 5 +- R/chunk_mapper.r | 91 +- R/clapply.r | 0 R/cmap.r | 94 +- R/collect.r | 66 +- R/data.table.r | 50 +- R/dplyr_verbs.r | 84 +- R/get_chunk.r | 43 +- R/map-deprecated.r | 85 - R/map2.r | 16 - R/play.r | 12 + R/srckeep.disk.frame.r | 16 +- man/chunk_group_by.Rd | 20 +- man/cmap.Rd | 50 +- man/cmap2.Rd | 3 - man/collect.Rd | 2 +- man/create_chunk_mapper.Rd | 2 +- man/create_dplyr_mapper.Rd | 11 - man/dplyr_verbs.Rd | 15 - man/play.Rd | 16 + man/srckeep.Rd | 5 - misc/disk.frame-report.html | 26696 +++++++++------- .../datatables-crosstalk.css | 16 + 25 files changed, 15474 insertions(+), 11958 deletions(-) create mode 100644 R/clapply.r delete mode 100644 R/map-deprecated.r create mode 100644 R/play.r delete mode 100644 man/create_dplyr_mapper.Rd create mode 100644 man/play.Rd diff --git a/DESCRIPTION b/DESCRIPTION index d235b10b..93184e16 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,7 +1,7 @@ Type: Package Package: disk.frame Title: Larger-than-RAM Disk-Based Data Manipulation Framework -Version: 0.3.7 +Version: 0.4.0 Date: 2020-07-07 Authors@R: c( person("Dai", "ZJ", email = "zhuojia.dai@gmail.com", role = c("aut", "cre")), @@ -25,17 +25,16 @@ Imports: pryr (>= 0.1.4), stringr (>= 1.4.0), fst (>= 0.8.0), - globals (>= 0.12.4), future (>= 1.14.0), data.table (>= 1.12.2), crayon (>= 1.3.4), bigreadr (>= 0.2.0), bit64, - benchmarkme + benchmarkme, + purrr (>= 0.3.2) Depends: R (>= 3.4), - dplyr (>= 1.0.0), - purrr (>= 0.3.2) + dplyr (>= 1.0.0) Suggests: testthat (>= 2.1.0), nycflights13, diff --git a/NAMESPACE b/NAMESPACE index d93b80bf..77e4b782 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -17,7 +17,6 @@ S3method(colnames,disk.frame) S3method(compute,disk.frame) S3method(delayed,disk.frame) S3method(distinct,disk.frame) -S3method(do,disk.frame) S3method(filter,disk.frame) S3method(full_join,disk.frame) S3method(get_chunk,disk.frame) @@ -30,18 +29,9 @@ S3method(hard_arrange,disk.frame) S3method(hard_group_by,data.frame) S3method(hard_group_by,disk.frame) S3method(head,disk.frame) -S3method(imap,default) -S3method(imap_dfr,default) -S3method(imap_dfr,disk.frame) S3method(inner_join,disk.frame) S3method(lazy,disk.frame) S3method(left_join,disk.frame) -S3method(map,default) -S3method(map,disk.frame) -S3method(map2,default) -S3method(map2,disk.frame) -S3method(map_dfr,default) -S3method(map_dfr,disk.frame) S3method(merge,disk.frame) S3method(mutate,disk.frame) S3method(names,disk.frame) @@ -67,8 +57,6 @@ S3method(transmute,disk.frame) export(IQR_df.chunk_agg.disk.frame) export(IQR_df.collected_agg.disk.frame) export(add_chunk) -export(add_count.disk.frame) -export(add_tally.disk.frame) export(all_df.chunk_agg.disk.frame) export(all_df.collected_agg.disk.frame) export(any_df.chunk_agg.disk.frame) @@ -90,9 +78,7 @@ export(cmap_dfr) export(collect_list) export(colnames) export(copy_df_to) -export(count.disk.frame) export(create_chunk_mapper) -export(create_dplyr_mapper) export(csv_to_disk.frame) export(delayed) export(delete) @@ -107,16 +93,12 @@ export(get_chunk) export(get_chunk_ids) export(hard_arrange) export(hard_group_by) -export(imap) -export(imap_dfr) export(insert_ceremony) export(is_disk.frame) export(lazy) export(length_df.chunk_agg.disk.frame) export(length_df.collected_agg.disk.frame) export(make_glm_streaming_fn) -export(map) -export(map2) export(map_by_chunk_id) export(max_df.chunk_agg.disk.frame) export(max_df.collected_agg.disk.frame) @@ -151,10 +133,8 @@ export(shardkey_equal) export(show_boilerplate) export(show_ceremony) export(srckeep) -export(srckeepchunks) export(sum_df.chunk_agg.disk.frame) export(sum_df.collected_agg.disk.frame) -export(tally.disk.frame) export(var_df.chunk_agg.disk.frame) export(var_df.collected_agg.disk.frame) export(write_disk.frame) @@ -179,14 +159,11 @@ importFrom(data.table,setDT) importFrom(data.table,setkey) importFrom(data.table,setkeyv) importFrom(data.table,timetaken) -importFrom(dplyr,add_count) -importFrom(dplyr,add_tally) importFrom(dplyr,anti_join) importFrom(dplyr,arrange) importFrom(dplyr,bind_rows) importFrom(dplyr,collect) importFrom(dplyr,compute) -importFrom(dplyr,count) importFrom(dplyr,distinct) importFrom(dplyr,do) importFrom(dplyr,filter) @@ -208,7 +185,6 @@ importFrom(dplyr,select) importFrom(dplyr,semi_join) importFrom(dplyr,summarise) importFrom(dplyr,summarize) -importFrom(dplyr,tally) importFrom(dplyr,tbl_vars) importFrom(dplyr,transmute) importFrom(fs,dir_create) @@ -227,7 +203,6 @@ importFrom(future,nbrOfWorkers) importFrom(future,plan) importFrom(future,sequential) importFrom(future.apply,future_lapply) -importFrom(globals,findGlobals) importFrom(glue,glue) importFrom(jsonlite,fromJSON) importFrom(jsonlite,toJSON) diff --git a/R/add_chunk.r b/R/add_chunk.r index 215feb85..08fee3d2 100644 --- a/R/add_chunk.r +++ b/R/add_chunk.r @@ -116,9 +116,10 @@ add_chunk <- function(df, chunk, chunk_id = NULL, full.names = FALSE, ...) { data.table::setDT(check_vars) if(nrow(check_vars[is.na(new_chunk)]) > 0) { + vars_strings = paste0(check_vars[is.na(new_chunk), colnames], collapse=',\n ') warning( - glue::glue( - "these variables are in the disk.frame but not in the new chunk: \n {paste0(check_vars[is.na(new_chunk), colnames], collapse=',\n ')}")) + sprintf( + "these variables are in the disk.frame but not in the new chunk: \n %s", vars_strings)) } if(nrow(check_vars[is.na(existing_df)]) > 0){ warning(glue::glue("these variables are in the new chunk but not in the existing disk.frame: {paste0(check_vars[is.na(existing_df), colnames], collapse=', ')}")) diff --git a/R/chunk_mapper.r b/R/chunk_mapper.r index 2063252a..9f4ea4f8 100644 --- a/R/chunk_mapper.r +++ b/R/chunk_mapper.r @@ -33,57 +33,54 @@ #' @param as.data.frame force the input chunk of a data.frame; needed for dtplyr #' @importFrom rlang enquos quo #' @export -create_chunk_mapper <- function(chunk_fn, warning_msg = NULL, as.data.frame = TRUE) { - return_func <- function(.data, ...) { - if (!is.null(warning_msg)) { - warning(warning_msg) +create_chunk_mapper <- function(chunk_fn, warning_msg = NULL, as.data.frame = FALSE) { + if(as.data.frame) { + warning("`as.data.frame` is deprecated in create_chunk_mapper") + } + + return(function(.data, ...) { + if(!is.null(warning_msg)) { + print(warning_msg) } - quo_dotdotdot = rlang::enquos(...) + # you need to use list otherwise the names will be gone + sub_dotdotdot = substitute(list(...)) - # this is designed to capture any global stuff - vars_and_pkgs = future::getGlobalsAndPackages(quo_dotdotdot) - data_for_eval_tidy = force(vars_and_pkgs$globals) + code = paste0(deparse(sub_dotdotdot), collapse = "") + # code will be in the form of "list(...)" + code = substr(code, 6, nchar(code)-1) - res = cmap(.data, ~{ - - this_env = environment() - - if(length(data_for_eval_tidy) > 0) { - for(i in 1:length(data_for_eval_tidy)) { - assign(names(data_for_eval_tidy)[i], data_for_eval_tidy[[i]], pos = this_env) - } - } - - lapply(quo_dotdotdot, function(x) { - attr(x, ".Environment") = this_env - }) - - if(as.data.frame) { - if("grouped_df" %in% class(.x)) { - code = rlang::quo(chunk_fn(.x, !!!quo_dotdotdot)) - } else { - code = rlang::quo(chunk_fn(as.data.frame(.x), !!!quo_dotdotdot)) - } - } else { - code = rlang::quo(chunk_fn(.x, !!!quo_dotdotdot)) + if (code == "NULL") { + globals_and_pkgs = future::getGlobalsAndPackages(str2lang("chunk_fn()")) + } else { + globals_and_pkgs = future::getGlobalsAndPackages(str2lang(sprintf("chunk_fn(%s)", code))) + } + + + global_vars = globals_and_pkgs$globals + + env = parent.frame() + + done = identical(env, globalenv()) + + # keep adding global variables + + while(!done) { + tmp_globals_and_pkgs = future::getGlobalsAndPackages(sub_dotdotdot, envir = env) + new_global_vars = tmp_globals_and_pkgs$globals + for (name in setdiff(names(new_global_vars), names(global_vars))) { + global_vars[name] = new_global_vars[[name]] } - # ZJ: we need both approaches. TRUST ME - # TODO better NSE at some point need dist - tryCatch({ - return(rlang::eval_tidy(code)) - }, error = function(e) { - as_label_code = rlang::as_label(code) - if(as_label_code == "chunk_fn(...)") { - stop(glue::glue("disk.frame has detected a syntax error in \n\n`{code}`\n\n. If you believe your syntax is correct, raise an issue at https://github.com/xiaodaigh/disk.frame with a MWE")) - } else { - # likely to be dealing with data.tables - return(eval(parse(text=as_label_code), envir = this_env)) - } - }) - }, lazy = TRUE) - } - return_func -} \ No newline at end of file + done = identical(env, globalenv()) + env = parent.env(env) + } + + globals_and_pkgs$globals = global_vars + + attr(.data, "recordings") = c(attr(.data, "recordings"), list(globals_and_pkgs)) + + .data + }) +} diff --git a/R/clapply.r b/R/clapply.r new file mode 100644 index 00000000..e69de29b diff --git a/R/cmap.r b/R/cmap.r index e1700066..fe04516f 100644 --- a/R/cmap.r +++ b/R/cmap.r @@ -53,69 +53,61 @@ cmap <- function(.x, .f, ...) { #' @rdname cmap #' @importFrom future getGlobalsAndPackages #' @export -cmap.disk.frame <- function(.x, .f, ..., outdir = NULL, keep = NULL, chunks = nchunks(.x), compress = 50, lazy = TRUE, overwrite = FALSE, vars_and_pkgs = future::getGlobalsAndPackages(.f, envir = parent.frame()), .progress = TRUE) { - .f = purrr::as_mapper(.f) - if(lazy) { - attr(.x, "lazyfn") = - c( - attr(.x, "lazyfn"), - list( - list( - func = .f, - vars_and_pkgs = vars_and_pkgs, - dotdotdot = list(...) - ) +cmap.disk.frame <- function(.x, .f, ..., outdir = NULL, + keep = NULL, + chunks = nchunks(.x), + compress = 50, + lazy = TRUE, + overwrite = FALSE, + .progress = TRUE) { + if(typeof(.f) == "language") { + if(requireNamespace("purrr")) { + .f = purrr::as_mapper(.f) + } else { + code = paste0(deparse(substitute(.f)), collapse = "") + stop( + sprintf( + "in cmap(.x, %s), it appears you are using {purrr} syntax but do not have {purrr} installed. Try `install.packages('purrr')`", + code ) ) - return(.x) + } } - if(!is.null(outdir)) { + if (lazy) { + ..f = create_chunk_mapper(.f) + return(..f(.x)) + } else { + # not lazy + if (is.null(outdir)) { + stop("cmap(...) error -- `lazy` = FALSE but `outdir` is not specified") + } + overwrite_check(outdir, overwrite) - } - - stopifnot(is_ready(.x)) - - keep1 = attr(.x,"keep", exact=TRUE) - - if(is.null(keep)) { - keep = keep1 - } - - path <- attr(.x, "path") - files <- list.files(path, full.names = TRUE) - files_shortname <- list.files(path) - - keep_future = keep - - cid = get_chunk_ids(.x, full.names = TRUE) - - dotdotdot = list(...) - - res = future.apply::future_lapply(1:length(files), function(ii, ...) { - #res = lapply(1:length(files), function(ii) { - ds = disk.frame::get_chunk(.x, cid[ii], keep=keep_future, full.names = TRUE) - res = .f(ds, ...) + path <- attr(.x, "path") + files <- list.files(path, full.names = TRUE) + files_shortname <- list.files(path) - #res = do.call(.f, c(ds, dotdotdot)) + cids = get_chunk_ids(.x, full.names = T, strip_extension = F) - if(!is.null(outdir)) { + # compute + # TODO refactor that into a write_disk.frame() + future.apply::future_lapply(1:length(files), function(ii, ...) { + ds = get_chunk(.x, cids[ii], full.names = TRUE) + + res = .f(ds, ...) + if(nrow(res) == 0) { - warning(glue::glue("The output chunk has 0 row, therefore chunk {ii} NOT written")) + warning(sprintf("The output chunk has 0 row, therefore chunk %d NOT written", ii)) } else { - fst::write_fst(res, file.path(outdir, files_shortname[ii]), compress) + out_chunk_name = file.path(outdir, files_shortname[ii]) + fst::write_fst(res, out_chunk_name, compress) } - return(ii) - } else { - return(res) - } - }, ...) - - if(!is.null(outdir)) { + NULL + }, ...) + return(disk.frame(outdir)) - } else { - return(res) } } diff --git a/R/collect.r b/R/collect.r index 66aac392..fbcc286a 100644 --- a/R/collect.r +++ b/R/collect.r @@ -11,8 +11,6 @@ #' hence parallel = FALSE is a better choice #' @param ... not used #' @importFrom data.table data.table as.data.table -#' @importFrom furrr future_map_dfr future_options -#' @importFrom purrr map_dfr #' @importFrom dplyr collect select mutate #' @return collect return a data.frame/data.table #' @examples @@ -24,18 +22,20 @@ #' delete(cars.df) #' @export #' @rdname collect -collect.disk.frame <- function(x, ..., parallel = !is.null(attr(x,"lazyfn"))) { - cids = get_chunk_ids(x, full.names = TRUE, strip_extension = FALSE) - #cids = as.integer(get_chunk_ids(x)) - if(nchunks(x) > 0) { - if(parallel) { - furrr::future_map_dfr(cids, ~get_chunk(x, .x, full.names = TRUE)) - } else { - purrr::map_dfr(cids, ~get_chunk(x, .x, full.names = TRUE)) - } +collect.disk.frame <- function(x, ..., parallel = !is.null(attr(x,"recordings"))) { + cids = get_chunk_ids(x, full.names = T, strip_extension = F) + + if (parallel) { + list_of_data.table = future.apply::future_lapply(cids, function(cid) { + get_chunk(x, cid, full.names = TRUE) + }) } else { - data.table() + list_of_data.table = lapply(cids, function(cid) { + get_chunk(x, cid, full.names = TRUE) + }) } + + data.table::rbindlist(list_of_data.table) } #' @param simplify Should the result be simplified to array @@ -51,25 +51,25 @@ collect.disk.frame <- function(x, ..., parallel = !is.null(attr(x,"lazyfn"))) { #' # clean up #' delete(cars.df) collect_list <- function(x, simplify = FALSE, parallel = !is.null(attr(x,"lazyfn"))) { - cids = get_chunk_ids(x, full.names = TRUE, strip_extension = FALSE) - - - if(nchunks(x) > 0) { - res <- NULL - if (parallel) { - #res = furrr::future_map(1:nchunks(x), ~get_chunk(x, .x)) - res = future.apply::future_lapply(cids, function(.x) { - get_chunk(x, .x, full.names = TRUE) - }) - } else { - res = purrr::map(cids, ~get_chunk(x, .x, full.names = TRUE)) - } - if (simplify) { - return(simplify2array(res)) - } else { - return(res) - } - } else { - list() - } + error("do it") + # cids = get_chunk_ids(x, full.names = TRUE, strip_extension = FALSE) + # + # + # if(nchunks(x) > 0) { + # res <- NULL + # if (parallel) { + # res = future.apply::future_lapply(cids, function(.x) { + # get_chunk(x, .x, full.names = TRUE) + # }) + # } else { + # res = purrr::map(cids, ~get_chunk(x, .x, full.names = TRUE)) + # } + # if (simplify) { + # return(simplify2array(res)) + # } else { + # return(res) + # } + # } else { + # list() + # } } diff --git a/R/data.table.r b/R/data.table.r index 5459e4cb..0e5ce503 100644 --- a/R/data.table.r +++ b/R/data.table.r @@ -9,7 +9,6 @@ #' @import fst #' @importFrom future.apply future_lapply #' @importFrom data.table rbindlist -#' @importFrom globals findGlobals #' @export #' @examples #' cars.df = as.disk.frame(cars) @@ -19,31 +18,30 @@ #' # clean up #' delete(cars.df) `[.disk.frame` <- function(df, ..., keep = NULL, rbind = TRUE, use.names = TRUE, fill = FALSE, idcol = NULL) { - keep_for_future = keep - - dotdotdot = substitute(...()) #this is an alist - - ag = globals::findGlobals(dotdotdot) - ag = setdiff(ag, "") # "" can cause issues with future - - res = future.apply::future_lapply(get_chunk_ids(df, strip_extension = FALSE), function(chunk_id) { - #lapply(get_chunk_ids(df, strip_extension = FALSE), function(chunk_id) { - chunk = get_chunk(df, chunk_id, keep = keep_for_future) - data.table::setDT(chunk) - expr <- quote(chunk) - expr <- c(expr, dotdotdot) - res <- do.call(`[`, expr) - res - }, future.globals = c("df", "keep_for_future", "dotdotdot", ag), future.packages = c("data.table","disk.frame") - ) - - if(rbind & all(sapply(res, function(x) "data.frame" %in% class(x)))) { - rbindlist(res, use.names = use.names, fill = fill, idcol = idcol) - } else if(rbind) { - unlist(res) - } else { - res - } + # keep_for_future = keep + # + # dotdotdot = substitute(...()) #this is an alist + # + # ag = globals::findGlobals(dotdotdot) + # ag = setdiff(ag, "") # "" can cause issues with future + # + # res = future.apply::future_lapply(get_chunk_ids(df, strip_extension = FALSE), function(chunk_id) { + # chunk = get_chunk(df, chunk_id, keep = keep_for_future) + # data.table::setDT(chunk) + # expr <- quote(chunk) + # expr <- c(expr, dotdotdot) + # res <- do.call(`[`, expr) + # res + # }, future.globals = c("df", "keep_for_future", "dotdotdot", ag), future.packages = c("data.table","disk.frame") + # ) + # + # if(rbind & all(sapply(res, function(x) "data.frame" %in% class(x)))) { + # rbindlist(res, use.names = use.names, fill = fill, idcol = idcol) + # } else if(rbind) { + # unlist(res) + # } else { + # res + # } } # Solutions from https://stackoverflow.com/questions/57122960/how-to-use-non-standard-evaluation-nse-to-evaluate-arguments-on-data-table?answertab=active#tab-top diff --git a/R/dplyr_verbs.r b/R/dplyr_verbs.r index 8ea3c4ea..2126f081 100644 --- a/R/dplyr_verbs.r +++ b/R/dplyr_verbs.r @@ -23,19 +23,17 @@ #' #' # clean up cars.df #' delete(cars.df) -select.disk.frame <- function(.data, ...) { - quo_dotdotdot = rlang::enquos(...) - cmap(.data, ~{ - code = rlang::quo(dplyr::select(.x, !!!quo_dotdotdot)) - rlang::eval_tidy(code) - }, lazy = TRUE) -} +select.disk.frame <- create_chunk_mapper(dplyr::select) + +# comment out code; to be removed when it's no longer needed +# select.disk.frame <- function(.data, ...) { +# quo_dotdotdot = rlang::enquos(...) +# cmap(.data, ~{ +# code = rlang::quo(dplyr::select(.x, !!!quo_dotdotdot)) +# rlang::eval_tidy(code) +# }, lazy = TRUE) +# } -#' Kept for backwards-compatibility to be removed in 0.3 -#' @export -create_dplyr_mapper = function() { - stop("create_dplyr_mapper has been deprecated. Please use create_chunk_mapper instead") -} #' @export #' @rdname dplyr_verbs @@ -72,16 +70,16 @@ arrange.disk.frame =create_chunk_mapper(dplyr::arrange, warning_msg="`arrange.di chunk_arrange <- create_chunk_mapper(dplyr::arrange) -#' @export -#' @importFrom dplyr tally -#' @rdname dplyr_verbs -tally.disk.frame <- create_chunk_mapper(dplyr::tally) - - -#' @export -#' @importFrom dplyr count -#' @rdname dplyr_verbs -count.disk.frame <- create_chunk_mapper(dplyr::count) +#' #' @export +#' #' @importFrom dplyr tally +#' #' @rdname dplyr_verbs +#' tally.disk.frame <- create_chunk_mapper(dplyr::tally) +#' +#' +#' #' @export +#' #' @importFrom dplyr count +#' #' @rdname dplyr_verbs +#' count.disk.frame <- create_chunk_mapper(dplyr::count) #' #' @export #' #' @importFrom dplyr add_count @@ -107,10 +105,10 @@ chunk_summarize <- create_chunk_mapper(dplyr::summarize) chunk_summarise <- create_chunk_mapper(dplyr::summarise) -#' @export -#' @rdname dplyr_verbs -#' @importFrom dplyr do -do.disk.frame <- create_chunk_mapper(dplyr::do) +#' #' @export +#' #' @rdname dplyr_verbs +#' #' @importFrom dplyr do +#' do.disk.frame <- create_chunk_mapper(dplyr::do) #' @export @@ -161,37 +159,3 @@ chunk_ungroup = create_chunk_mapper(dplyr::ungroup) glimpse.disk.frame <- function(.data, ...) { glimpse(head(.data, ...), ...) } - -# Internal methods -# @param .data the data -# @param cmd the function to record -record <- function(.data, cmd){ - attr(.data,"lazyfn") <- c(attr(.data,"lazyfn"), list(cmd)) - .data -} - -# Internal methods -# @param .data the disk.frame -# @param cmds the list of function to play back -play <- function(.data, cmds=NULL) { - for (cmd in cmds){ - if (typeof(cmd) == "closure") { - .data <- cmd(.data) - } else { - # create a temporary environment - an_env = new.env(parent = environment()) - - ng = names(cmd$vars_and_pkgs$globals) - - if(length(ng) > 0) { - for(i in 1:length(cmd$vars_and_pkgs$globals)) { - g = cmd$vars_and_pkgs$globals[[i]] - assign(ng[i], g, pos = an_env) - } - } - - .data <- do.call(cmd$func, c(list(.data),cmd$dotdotdot), envir = an_env) - } - } - .data -} diff --git a/R/get_chunk.r b/R/get_chunk.r index 1e3cfcbd..c4aa2f2e 100644 --- a/R/get_chunk.r +++ b/R/get_chunk.r @@ -26,38 +26,18 @@ get_chunk <- function(...) { #' @export get_chunk.disk.frame <- function(df, n, keep = NULL, full.names = FALSE, ...) { stopifnot("disk.frame" %in% class(df)) - keep_chunks = attr(df, "keep_chunks", exact=TRUE) - - # print(names(attr(df, "lazyfn")[[1]]$vars_and_pkgs$globals)) - # stop("ok") - - # TODO relax this - # if(!is.null(keep_chunks)) { - # # browser() - # # n_int = as.integer(n) - # # - # # if(is.na(n_int)) { - # # if(as.character(n) %in% get_chunk_ids(df)[keep_chunks]) { - # # return(NULL) - # # } else if(normalizePath(as.character(n)) %in% sapply(get_chunk_ids(df, full.names = TRUE)[keep_chunks],normalizePath)) { - # # return(NULL) - # # } - # # } else { - # # if(!n %in% keep_chunk) { - # # return(NULL) - # # } - # # } - # } + # keep_chunks = attr(df, "keep_chunks", exact=TRUE) path = attr(df,"path", exact=TRUE) # all the variables to keep in the attr from a previous srckeep - keep1 = attr(df,"keep", exact=TRUE) + keep1 = attr(df, "keep", exact=TRUE) - cmds = attr(df,"lazyfn", exact=TRUE) + recordings = attr(df, "recordings", exact=TRUE) filename = "" if (typeof(keep) == "closure") { + # sometimes purrr::keep is picked up keep = keep1 } else if(!is.null(keep1) & !is.null(keep)) { if (length(setdiff(keep, keep1)) > 0) { @@ -89,7 +69,7 @@ get_chunk.disk.frame <- function(df, n, keep = NULL, full.names = FALSE, ...) { } } - # if the file you are looking for don't exist + # if the file you are looking for doesn't exist if (!fs::file_exists(filename)) { warning(glue("The chunk {filename} does not exist; returning an empty data.table")) notbl <- data.table() @@ -97,17 +77,18 @@ get_chunk.disk.frame <- function(df, n, keep = NULL, full.names = FALSE, ...) { return(notbl) } - if (is.null(cmds)) { - if(typeof(keep)!="closure") { - fst::read_fst(filename, columns = keep, as.data.table = TRUE,...) - } else { + if (is.null(recordings)) { + if(typeof(keep)=="closure") { fst::read_fst(filename, as.data.table = TRUE,...) + } else { + fst::read_fst(filename, columns = keep, as.data.table = TRUE,...) } } else { if(typeof(keep)!="closure") { - play(fst::read_fst(filename, columns = keep, as.data.table = TRUE,...), cmds) + play(fst::read_fst(filename, as.data.table = TRUE,...), recordings) } else { - play(fst::read_fst(filename, as.data.table = TRUE,...), cmds) + play(fst::read_fst(filename, columns = keep, as.data.table = TRUE,...), recordings) + } } } diff --git a/R/map-deprecated.r b/R/map-deprecated.r deleted file mode 100644 index bac52e3d..00000000 --- a/R/map-deprecated.r +++ /dev/null @@ -1,85 +0,0 @@ -#' @export -#' @rdname cmap -map <- function(.x, .f, ...) { - UseMethod("map") -} - -#' @export -#' @rdname cmap -map.disk.frame <- function(...) { - warning("map(df, ...) where df is a disk.frame has been deprecated. Please use cmap(df,...) instead") - cmap.disk.frame(...) -} - -#' @export -#' @rdname cmap -map.default <- function(.x, .f, ...) { - purrr::map(.x, .f, ...) -} - - -#' @export -#' @rdname cmap -imap_dfr <- function(.x, .f, ..., .id = NULL) { - UseMethod("imap_dfr") -} - -#' @export -#' @rdname cmap -imap_dfr.disk.frame <- function(...) { - warning("imap_dfr(df, ...) where df is disk.frame is deprecated. Please use cimap_dfr(df, ...) instead") - cimap_dfr.disk.frame(...) -} - -#' @export -#' @rdname cmap -imap_dfr.default <- function(.x, .f, ..., .id = NULL) { - purrr::imap_dfr(.x, .f, ..., .id = .id) -} - -#' @export -#' @rdname cmap -#' @examples -#' cars.df = as.disk.frame(cars) -#' -#' # .x is the chunk and .y is the ID as an integer -#' -#' # lazy = TRUE support is not available at the moment -#' cimap(cars.df, ~.x[, id := .y], lazy = FALSE) -#' -#' cimap_dfr(cars.df, ~.x[, id := .y]) -#' -#' # clean up cars.df -#' delete(cars.df) -imap <- function(.x, .f, ...) { - UseMethod("imap") -} - -imap.disk.frame <- function(...) { - warning("imap(df,..) where df is disk.frame is deprecated. Use cimap(df, ...) instead") - cimap.disk.frame(...) -} - -#' @export -#' @rdname cmap -imap.default <- function(.x, .f, ...) { - purrr::imap(.x, .f, ...) -} - -#' @rdname cmap -#' @param .id not used -#' @export -map_dfr.disk.frame <- function(...) { - warning("map_dfr(df, ...) where df is disk.frame is deprecated. Please use cmap_dfr instead") - cmap_dfr.disk.frame(...) -} - -map_dfr <- function(.x, .f, ..., .id = NULL) { - UseMethod("map_dfr") -} - -#' @export -#' @rdname cmap -map_dfr.default <- function(.x, .f, ..., .id = NULL) { - purrr::map_dfr(.x, .f, ..., .id = .id) -} \ No newline at end of file diff --git a/R/map2.r b/R/map2.r index 99f0c698..643d0ac4 100644 --- a/R/map2.r +++ b/R/map2.r @@ -26,22 +26,6 @@ cmap2 <- function(.x, .y, .f, ...){ UseMethod("cmap2") } -#' @export -#' @rdname cmap2 -map2 <- function(.x, .y, .f, ...){ - UseMethod("map2") -} - -#' @export -map2.default <- function(.x, .y, .f, ...) { - purrr::map2(.x,.y,.f,...) -} - -#' @export -map2.disk.frame <- function(...) { - warning("map2.disk.frame(df, df1, ..) where df is disk.frame is deprecated. Use cmap(df, df1, ...) instead") - cmap2.disk.frame(...) -} #' @export #' @importFrom pryr do_call diff --git a/R/play.r b/R/play.r new file mode 100644 index 00000000..9f3f4011 --- /dev/null +++ b/R/play.r @@ -0,0 +1,12 @@ +#' Play the recorded lazy operations +#' @param dataframe A data.frame +#' @param recordings A recording the expression, globals and packages using create_chunk_mapper +play <- function(dataframe, recordings) { + for(recording in recordings) { + tmp_env = list2env(recording$globals) + one_recording_as_string = paste0(deparse(recording$expr), collapse = "") + code = str2lang(sprintf("dataframe %%>%% %s", one_recording_as_string)) + dataframe = eval(code, envir = tmp_env) + } + dataframe +} \ No newline at end of file diff --git a/R/srckeep.disk.frame.r b/R/srckeep.disk.frame.r index 54aef73a..1cfc1b2a 100644 --- a/R/srckeep.disk.frame.r +++ b/R/srckeep.disk.frame.r @@ -21,11 +21,11 @@ srckeep <- function(diskf, selections, ...) { #' @param chunks The chunks to load #' @rdname srckeep #' @export -srckeepchunks <- function(diskf, chunks, ...) { - stopifnot("disk.frame" %in% class(diskf)) - # TODO relax this - stopifnot(is.integer(chunks)) - - attr(df,"keep_chunks") = chunks - diskf -} +# srckeepchunks <- function(diskf, chunks, ...) { +# stopifnot("disk.frame" %in% class(diskf)) +# # TODO relax this +# stopifnot(is.integer(chunks)) +# +# attr(df,"keep_chunks") = chunks +# diskf +# } diff --git a/man/chunk_group_by.Rd b/man/chunk_group_by.Rd index f06836c2..cbe17485 100644 --- a/man/chunk_group_by.Rd +++ b/man/chunk_group_by.Rd @@ -5,7 +5,10 @@ \alias{chunk_summarise} \alias{chunk_group_by} \alias{chunk_ungroup} -\title{Group by within each disk.frame} +\title{#' @export +#' @importFrom dplyr tally +#' @rdname dplyr_verbs +tally.disk.frame <- create_chunk_mapper(dplyr::tally)} \usage{ chunk_summarize(.data, ...) @@ -21,11 +24,26 @@ chunk_ungroup(.data, ...) \item{...}{passed to dplyr::group_by} } \description{ +#' @export +#' @importFrom dplyr count +#' @rdname dplyr_verbs +count.disk.frame <- create_chunk_mapper(dplyr::count) +#' @export +#' @importFrom dplyr add_count +#' @rdname dplyr_verbs +add_count.disk.frame <- create_chunk_mapper(dplyr::add_count) + The disk.frame group by operation perform group WITHIN each chunk. This is often used for performance reasons. If the user wishes to perform group-by, they may choose to use the `hard_group_by` function which is expensive as it reorganizes the chunks by the shard key. } +\details{ +#' @export +#' @importFrom dplyr add_tally +#' @rdname dplyr_verbs +add_tally.disk.frame <- create_chunk_mapper(dplyr::add_tally) +} \seealso{ hard_group_by group_by } diff --git a/man/cmap.Rd b/man/cmap.Rd index fc06d9cd..8479a2d4 100644 --- a/man/cmap.Rd +++ b/man/cmap.Rd @@ -1,5 +1,5 @@ % Generated by roxygen2: do not edit by hand -% Please edit documentation in R/cmap.r, R/map-deprecated.r +% Please edit documentation in R/cmap.r \name{cmap} \alias{cmap} \alias{cmap.disk.frame} @@ -13,16 +13,6 @@ \alias{lazy.disk.frame} \alias{delayed} \alias{chunk_lapply} -\alias{map} -\alias{map.disk.frame} -\alias{map.default} -\alias{imap_dfr} -\alias{imap_dfr.disk.frame} -\alias{imap_dfr.default} -\alias{imap} -\alias{imap.default} -\alias{map_dfr.disk.frame} -\alias{map_dfr.default} \title{Apply the same function to all chunks} \usage{ cmap(.x, .f, ...) @@ -37,7 +27,6 @@ cmap(.x, .f, ...) compress = 50, lazy = TRUE, overwrite = FALSE, - vars_and_pkgs = future::getGlobalsAndPackages(.f, envir = parent.frame()), .progress = TRUE ) @@ -78,26 +67,6 @@ lazy(.x, .f, ...) delayed(.x, .f, ...) chunk_lapply(...) - -map(.x, .f, ...) - -\method{map}{disk.frame}(...) - -\method{map}{default}(.x, .f, ...) - -imap_dfr(.x, .f, ..., .id = NULL) - -\method{imap_dfr}{disk.frame}(...) - -\method{imap_dfr}{default}(.x, .f, ..., .id = NULL) - -imap(.x, .f, ...) - -\method{imap}{default}(.x, .f, ...) - -\method{map_dfr}{disk.frame}(...) - -\method{map_dfr}{default}(.x, .f, ..., .id = NULL) } \arguments{ \item{.x}{a disk.frame} @@ -118,17 +87,15 @@ imap(.x, .f, ...) \item{overwrite}{if TRUE removes any existing chunks in the data} -\item{vars_and_pkgs}{variables and packages to send to a background session. This is typically automatically detected} - \item{.progress}{A logical, for whether or not to print a progress bar for multiprocess, multisession, and multicore plans. From {furrr}} -\item{.id}{not used} - \item{use.names}{for cmap_dfr's call to data.table::rbindlist. See data.table::rbindlist} \item{fill}{for cmap_dfr's call to data.table::rbindlist. See data.table::rbindlist} \item{idcol}{for cmap_dfr's call to data.table::rbindlist. See data.table::rbindlist} + +\item{vars_and_pkgs}{variables and packages to send to a background session. This is typically automatically detected} } \description{ Apply the same function to all chunks @@ -166,17 +133,6 @@ cmap_dfr(cars.df, ~.x[1,]) collect(lazy(cars.df, ~.x[1,])) collect(delayed(cars.df, ~.x[1,])) -# clean up cars.df -delete(cars.df) -cars.df = as.disk.frame(cars) - -# .x is the chunk and .y is the ID as an integer - -# lazy = TRUE support is not available at the moment -cimap(cars.df, ~.x[, id := .y], lazy = FALSE) - -cimap_dfr(cars.df, ~.x[, id := .y]) - # clean up cars.df delete(cars.df) } diff --git a/man/cmap2.Rd b/man/cmap2.Rd index 721dd80e..2a03f241 100644 --- a/man/cmap2.Rd +++ b/man/cmap2.Rd @@ -2,14 +2,11 @@ % Please edit documentation in R/map2.r, R/map_by_chunk_id.r \name{cmap2} \alias{cmap2} -\alias{map2} \alias{map_by_chunk_id} \title{`cmap2` a function to two disk.frames} \usage{ cmap2(.x, .y, .f, ...) -map2(.x, .y, .f, ...) - map_by_chunk_id(.x, .y, .f, ..., outdir) } \arguments{ diff --git a/man/collect.Rd b/man/collect.Rd index 520157ea..bf6651b9 100644 --- a/man/collect.Rd +++ b/man/collect.Rd @@ -6,7 +6,7 @@ \alias{collect.summarized_disk.frame} \title{Bring the disk.frame into R} \usage{ -\method{collect}{disk.frame}(x, ..., parallel = !is.null(attr(x, "lazyfn"))) +\method{collect}{disk.frame}(x, ..., parallel = !is.null(attr(x, "recordings"))) collect_list(x, simplify = FALSE, parallel = !is.null(attr(x, "lazyfn"))) diff --git a/man/create_chunk_mapper.Rd b/man/create_chunk_mapper.Rd index 0702093d..7be02cea 100644 --- a/man/create_chunk_mapper.Rd +++ b/man/create_chunk_mapper.Rd @@ -4,7 +4,7 @@ \alias{create_chunk_mapper} \title{Create function that applies to each chunk if disk.frame} \usage{ -create_chunk_mapper(chunk_fn, warning_msg = NULL, as.data.frame = TRUE) +create_chunk_mapper(chunk_fn, warning_msg = NULL, as.data.frame = FALSE) } \arguments{ \item{chunk_fn}{The dplyr function to create a mapper for} diff --git a/man/create_dplyr_mapper.Rd b/man/create_dplyr_mapper.Rd deleted file mode 100644 index a486be28..00000000 --- a/man/create_dplyr_mapper.Rd +++ /dev/null @@ -1,11 +0,0 @@ -% Generated by roxygen2: do not edit by hand -% Please edit documentation in R/dplyr_verbs.r -\name{create_dplyr_mapper} -\alias{create_dplyr_mapper} -\title{Kept for backwards-compatibility to be removed in 0.3} -\usage{ -create_dplyr_mapper() -} -\description{ -Kept for backwards-compatibility to be removed in 0.3 -} diff --git a/man/dplyr_verbs.Rd b/man/dplyr_verbs.Rd index 9a259740..cfebebb4 100644 --- a/man/dplyr_verbs.Rd +++ b/man/dplyr_verbs.Rd @@ -8,11 +8,6 @@ \alias{transmute.disk.frame} \alias{arrange.disk.frame} \alias{chunk_arrange} -\alias{tally.disk.frame} -\alias{count.disk.frame} -\alias{add_count.disk.frame} -\alias{add_tally.disk.frame} -\alias{do.disk.frame} \alias{distinct.disk.frame} \alias{chunk_distinct} \alias{glimpse.disk.frame} @@ -32,16 +27,6 @@ chunk_arrange(.data, ...) -tally.disk.frame(.data, ...) - -count.disk.frame(.data, ...) - -add_count.disk.frame(.data, ...) - -add_tally.disk.frame(.data, ...) - -\method{do}{disk.frame}(.data, ...) - \method{distinct}{disk.frame}(...) chunk_distinct(.data, ...) diff --git a/man/play.Rd b/man/play.Rd new file mode 100644 index 00000000..d252ead2 --- /dev/null +++ b/man/play.Rd @@ -0,0 +1,16 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/play.r +\name{play} +\alias{play} +\title{Play the recorded lazy operations} +\usage{ +play(dataframe, recordings) +} +\arguments{ +\item{dataframe}{A data.frame} + +\item{recordings}{A recording the expression, globals and packages using create_chunk_mapper} +} +\description{ +Play the recorded lazy operations +} diff --git a/man/srckeep.Rd b/man/srckeep.Rd index c377c1c1..e4c4accc 100644 --- a/man/srckeep.Rd +++ b/man/srckeep.Rd @@ -2,12 +2,9 @@ % Please edit documentation in R/srckeep.disk.frame.r \name{srckeep} \alias{srckeep} -\alias{srckeepchunks} \title{Keep only the variables from the input listed in selections} \usage{ srckeep(diskf, selections, ...) - -srckeepchunks(diskf, chunks, ...) } \arguments{ \item{diskf}{a disk.frame} @@ -15,8 +12,6 @@ srckeepchunks(diskf, chunks, ...) \item{selections}{The list of variables to keep from the input source} \item{...}{not yet used} - -\item{chunks}{The chunks to load} } \description{ Keep only the variables from the input listed in selections diff --git a/misc/disk.frame-report.html b/misc/disk.frame-report.html index 12c97232..5f8a1d71 100644 --- a/misc/disk.frame-report.html +++ b/misc/disk.frame-report.html @@ -1,6 +1,6 @@ - - + + @@ -22,7 +22,7 @@ - +
-

disk.frame coverage - 51.96%

+

disk.frame coverage - 52.99%

-
- +R/zip_to_disk.frame.r1154504500.00%R/sas2disk_frame.r774004000.00%R/foverlaps.disk.frame.r893503500.00%R/csv2disk.frame_readr.r512902900.00%R/move_to.r662202200.00%R/show_ceremony.R3170700.00%R/collect.summarized_disk.frame.r3130300.00%R/map_by_chunk_id.r620200.00%R/csv2disk.frame.r46826829239110.82%R/map-deprecated.r85162141012.50%R/setup.r8522319113.64%R/sample_n.R9312033.33%R/disk.frame.r24096336315134.38%R/hard_group_by.r2208131501938.27%R/recommend_nchunks.r1366326371441.27%R/srckeep.disk.frame.r31734042.86%R/one-stage-verbs.R3901105357048.18%R/glm.r6821129157.14%R/write_disk.frame.r9132191337859.38%R/remove_chunk.r441174163.64%R/get_chunk.r10234231156067.65%R/rechunk.r155755322570.67%R/dplyr_verbs.r3222216629172.73%R/overwrite_check.r431612423375.00%R/shardkey.r2686216175.00%R/collect.r75171341976.47%R/sample_frac.R27972177.78%R/rbindlist.disk.frame.r80332677178.79%R/cmap.r2417661155180.26%R/add_chunk.r1476250121880.65%R/ncol-nrow.r66161337581.25%R/data.table.r6817143882.35%R/full_join.r53292451082.76%R/map2.r106352963282.86%R/semi_join.r5530255783.33%R/anti_join.r7030255783.33%R/inner_join.r6637316883.78%R/left_join.r5832275884.38%R/chunk_mapper.r892723414285.19%R/nchunks.r4376130885.71%R/is_disk.frame.r401513230486.67%R/make_glm_streaming_fn.r54161421287.50%R/names.r398712087.50%R/get_chunk_ids.r381110121090.91%R/hard_arrange.r6714131692.86%R/shard.r692927212893.10%R/merge.disk.frame.r98303002100.00%R/sortablestr2i.R672222038100.00%R/zzz.r661212016100.00%R/util.r271212014100.00%R/as.disk.frame.r4299063100.00%R/print.disk.frame.r198801100.00%R/as.data.frame.r312201100.00%R/compute.r232203100.00%R/delete.r122208100.00%R/tbl_vars.r101101100.00%
+
@@ -227,7 +227,7 @@

disk.frame coverage - 51.96%

16 - 517x + 499x
  stopifnot(backend == "fst")
@@ -241,14 +241,14 @@

disk.frame coverage - 51.96%

18 - 517x + 499x
  if(dir.exists(path)) {
19 - 517x + 499x
    disk.frame_folder(path)
@@ -472,7 +472,7 @@

disk.frame coverage - 51.96%

51 - 377x + 361x
  stopifnot("disk.frame" %in% class(df))
@@ -486,7 +486,7 @@

disk.frame coverage - 51.96%

53 - 377x + 361x
  if(is.null(shardkey)) {
@@ -521,14 +521,14 @@

disk.frame coverage - 51.96%

58 - 377x + 361x
  fs::dir_create(file.path(attr(df,"path"), ".metadata"))
59 - 377x + 361x
  json_path = fs::file_create(file.path(attr(df,"path"), ".metadata", "meta.json"))
@@ -542,21 +542,21 @@

disk.frame coverage - 51.96%

61 - 377x + 361x
  filesize = file.size("meta.json")
62 - 377x + 361x
  meta_out = NULL
63 - 377x + 361x
  if(is.na(filesize)) {
@@ -570,49 +570,49 @@

disk.frame coverage - 51.96%

65 - 377x + 361x
    meta_out = jsonlite::toJSON(
66 - 377x + 361x
        c(
67 - 377x + 361x
          list(
68 - 377x + 361x
            nchunks = nchunks, 
69 - 377x + 361x
            shardkey = shardkey, 
70 - 377x + 361x
            shardchunks = shardchunks), 
71 - 377x + 361x
          list(...)
@@ -682,14 +682,14 @@

disk.frame coverage - 51.96%

81 - 377x + 361x
  cat(meta_out, file = json_path)
82 - 377x + 361x
  df
@@ -738,42 +738,42 @@

disk.frame coverage - 51.96%

89 - 517x + 499x
  df <- list()
90 - 517x + 499x
  df$files <- list.files(path, full.names = TRUE)
91 - 517x + 499x
  df$files_short <- list.files(path)
92 - 517x + 499x
  attr(df,"path") <- path
93 - 517x + 499x
  attr(df,"backend") <- "fst"
94 - 517x + 499x
  class(df) <- c("disk.frame", "disk.frame.folder")
@@ -787,14 +787,14 @@

disk.frame coverage - 51.96%

96 - 517x + 499x
  attr(df, "performing") <- "none"
97 - 517x + 499x
  df
@@ -1039,7 +1039,7 @@

disk.frame coverage - 51.96%

132 - 104x + 103x
  return(TRUE)
@@ -1368,21 +1368,21 @@

disk.frame coverage - 51.96%

179 - 654x + 628x
  if(check.consistency) {
180 - 654x + 628x
    fpath <- attr(df,"path")
181 - 654x + 628x
    if(!dir.exists(fpath) & file.exists(fpath)) {
@@ -1403,7 +1403,7 @@

disk.frame coverage - 51.96%

184 - 654x + 628x
      return(FALSE)
@@ -1466,7 +1466,7 @@

disk.frame coverage - 51.96%

193 - 654x + 628x
  !is.file.disk.frame(df, check.consistency = check.consistency)
@@ -1803,467 +1803,600 @@

disk.frame coverage - 51.96%

-