Skip to content

Commit

Permalink
Merge pull request #371 from xiaodaigh/development
Browse files Browse the repository at this point in the history
Development for v0.6
  • Loading branch information
xiaodaigh authored Jan 30, 2022
2 parents b522999 + 4f0f581 commit d24d88d
Show file tree
Hide file tree
Showing 198 changed files with 19,227 additions and 13,544 deletions.
6 changes: 5 additions & 1 deletion .Rbuildignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
^renv$
^renv\.lock$
^.*\.Rproj$
^\.github$
^manuscript$
Expand Down Expand Up @@ -65,4 +67,6 @@ vignettes.Rnw.template
^codecov\.yml$
new-nse-dev.r
test-poorman.R
*.parquet
.parquet$
maditr-devs.r
^CRAN-SUBMISSION$
2 changes: 0 additions & 2 deletions CRAN-RELEASE

This file was deleted.

20 changes: 9 additions & 11 deletions DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
Type: Package
Package: disk.frame
Title: Larger-than-RAM Disk-Based Data Manipulation Framework
Version: 0.5.0
Date: 2021-05-09
Version: 0.6.0
Date: 2022-01-31
Authors@R: c(
person("Dai", "ZJ", email = "[email protected]", role = c("aut", "cre")),
person("Jacky", "Poon", role = c("ctb"))
Expand All @@ -17,27 +17,24 @@ License: MIT + file LICENSE
Imports:
Rcpp (>= 0.12.13),
glue (>= 1.3.1),
rlang (>= 0.4.0),
future.apply (>= 1.3.0),
fs (>= 1.3.1),
jsonlite (>= 1.6),
pryr (>= 0.1.4),
stringr (>= 1.4.0),
fst (>= 0.8.0),
globals (>= 0.12.4),
future (>= 1.14.0),
data.table (>= 1.12.2),
crayon (>= 1.3.4),
bigreadr (>= 0.2.0),
furrr (>= 0.2.2),
bit64,
benchmarkme
benchmarkme,
purrr (>= 0.3.2),
rlang
Depends:
R (>= 3.4),
dplyr (>= 1.0.0),
purrr (>= 0.3.2)
dplyr (>= 1.0.0)
Suggests:
testthat (>= 2.1.0),
nycflights13,
magrittr,
shiny,
Expand All @@ -49,10 +46,11 @@ Suggests:
speedglm,
broom,
ggplot2,
covr
rmarkdown
LinkingTo:
Rcpp
RoxygenNote: 7.1.1
RoxygenNote: 7.1.2
VignetteBuilder: rmarkdown
Encoding: UTF-8
URL: https://diskframe.com
BugReports: https://github.com/xiaodaigh/disk.frame/issues
35 changes: 3 additions & 32 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,17 @@ S3method(colnames,disk.frame)
S3method(compute,disk.frame)
S3method(delayed,disk.frame)
S3method(distinct,disk.frame)
S3method(do,disk.frame)
S3method(filter,disk.frame)
S3method(full_join,disk.frame)
S3method(get_chunk,disk.frame)
S3method(glimpse,disk.frame)
S3method(group_by,disk.frame)
S3method(group_vars,disk.frame)
S3method(groups,disk.frame)
S3method(hard_arrange,data.frame)
S3method(hard_arrange,disk.frame)
S3method(hard_group_by,data.frame)
S3method(hard_group_by,disk.frame)
S3method(head,disk.frame)
S3method(imap,default)
S3method(imap_dfr,default)
S3method(imap_dfr,disk.frame)
S3method(inner_join,disk.frame)
S3method(lazy,disk.frame)
S3method(left_join,disk.frame)
S3method(map,default)
S3method(map,disk.frame)
S3method(map2,default)
S3method(map2,disk.frame)
S3method(map_dfr,default)
S3method(map_dfr,disk.frame)
S3method(merge,disk.frame)
S3method(mutate,disk.frame)
S3method(names,disk.frame)
Expand All @@ -67,22 +53,22 @@ S3method(transmute,disk.frame)
export(IQR_df.chunk_agg.disk.frame)
export(IQR_df.collected_agg.disk.frame)
export(add_chunk)
export(add_tally.disk.frame)
export(all_df.chunk_agg.disk.frame)
export(all_df.collected_agg.disk.frame)
export(any_df.chunk_agg.disk.frame)
export(any_df.collected_agg.disk.frame)
export(as.disk.frame)
export(bind_rows.disk.frame)
export(ceremony_text)
export(chunk_arrange)
export(chunk_distinct)
export(chunk_group_by)
export(chunk_lapply)
export(chunk_summarise)
export(chunk_summarize)
export(chunk_ungroup)
export(cimap)
export(cimap_dfr)
export(clapply)
export(cmap)
export(cmap2)
export(cmap_dfr)
Expand All @@ -102,18 +88,12 @@ export(foverlaps.disk.frame)
export(gen_datatable_synthetic)
export(get_chunk)
export(get_chunk_ids)
export(hard_arrange)
export(hard_group_by)
export(imap)
export(imap_dfr)
export(insert_ceremony)
export(is_disk.frame)
export(lazy)
export(length_df.chunk_agg.disk.frame)
export(length_df.collected_agg.disk.frame)
export(make_glm_streaming_fn)
export(map)
export(map2)
export(map_by_chunk_id)
export(max_df.chunk_agg.disk.frame)
export(max_df.collected_agg.disk.frame)
Expand Down Expand Up @@ -148,7 +128,6 @@ export(shardkey_equal)
export(show_boilerplate)
export(show_ceremony)
export(srckeep)
export(srckeepchunks)
export(sum_df.chunk_agg.disk.frame)
export(sum_df.collected_agg.disk.frame)
export(var_df.chunk_agg.disk.frame)
Expand All @@ -172,10 +151,8 @@ importFrom(data.table,foverlaps)
importFrom(data.table,fread)
importFrom(data.table,rbindlist)
importFrom(data.table,setDT)
importFrom(data.table,setkey)
importFrom(data.table,setkeyv)
importFrom(data.table,timetaken)
importFrom(dplyr,add_tally)
importFrom(dplyr,anti_join)
importFrom(dplyr,arrange)
importFrom(dplyr,bind_rows)
Expand Down Expand Up @@ -218,7 +195,6 @@ importFrom(future,nbrOfWorkers)
importFrom(future,plan)
importFrom(future,sequential)
importFrom(future.apply,future_lapply)
importFrom(globals,findGlobals)
importFrom(glue,glue)
importFrom(jsonlite,fromJSON)
importFrom(jsonlite,toJSON)
Expand All @@ -230,19 +206,14 @@ importFrom(purrr,map2)
importFrom(purrr,map_chr)
importFrom(purrr,map_dfr)
importFrom(purrr,map_lgl)
importFrom(rlang,enquos)
importFrom(rlang,eval_tidy)
importFrom(rlang,quo)
importFrom(rlang,enexpr)
importFrom(stats,median)
importFrom(stats,quantile)
importFrom(stats,runif)
importFrom(stringr,fixed)
importFrom(utils,capture.output)
importFrom(utils,head)
importFrom(utils,memory.limit)
importFrom(utils,methods)
importFrom(utils,setTxtProgressBar)
importFrom(utils,tail)
importFrom(utils,txtProgressBar)
importFrom(utils,unzip)
useDynLib(disk.frame)
5 changes: 5 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
# disk.frame 0.6
* Much better NSE support in disk.frame!
* removed `hard_arrange` and `hard_group_by`
* various API updates

# disk.frame 0.5
* removed `add_count` method

Expand Down
5 changes: 3 additions & 2 deletions R/add_chunk.r
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,10 @@ add_chunk <- function(df, chunk, chunk_id = NULL, full.names = FALSE, ...) {

data.table::setDT(check_vars)
if(nrow(check_vars[is.na(new_chunk)]) > 0) {
vars_strings = paste0(check_vars[is.na(new_chunk), colnames], collapse=',\n ')
warning(
glue::glue(
"these variables are in the disk.frame but not in the new chunk: \n {paste0(check_vars[is.na(new_chunk), colnames], collapse=',\n ')}"))
sprintf(
"these variables are in the disk.frame but not in the new chunk: \n %s", vars_strings))
}
if(nrow(check_vars[is.na(existing_df)]) > 0){
warning(glue::glue("these variables are in the new chunk but not in the existing disk.frame: {paste0(check_vars[is.na(existing_df), colnames], collapse=', ')}"))
Expand Down
17 changes: 9 additions & 8 deletions R/anti_join.r
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
#' @param merge_by_chunk_id the merge is performed by chunk id
#' @param overwrite overwrite output directory
#' @param .progress Show progress or not. Defaults to FALSE
#' @param suffix see dplyr::XXX_join
#' @param keep see dplyr::XXX_join
#' @param ... same as dplyr's joins
#' @rdname join
#' @importFrom rlang quo enquos
#' @importFrom dplyr anti_join left_join full_join semi_join inner_join
#' @return disk.frame or data.frame/data.table
#' @export
Expand All @@ -29,11 +30,11 @@ anti_join.disk.frame <- function(x, y, by=NULL, copy=FALSE, ..., outdir = tempfi
overwrite_check(outdir, overwrite)

if("data.frame" %in% class(y)) {
quo_dotdotdot = enquos(...)
cmap_dfr.disk.frame(x, ~{
code = quo(anti_join(.x, y, by = by, copy = copy, !!!quo_dotdotdot))
rlang::eval_tidy(code)
tmp = cmap.disk.frame(x, ~{
anti_join(.x, y, by = by, copy = copy, ...)
}, .progress = .progress)

return(tmp)
} else if("disk.frame" %in% class(y)) {
if(is.null(merge_by_chunk_id)) {
stop("both x and y are disk.frames. You need to specify merge_by_chunk_id = TRUE or FALSE explicitly")
Expand All @@ -47,12 +48,12 @@ anti_join.disk.frame <- function(x, y, by=NULL, copy=FALSE, ..., outdir = tempfi
if (merge_by_chunk_id == FALSE) {
warning("merge_by_chunk_id = FALSE. This will take significantly longer and the preparations needed are performed eagerly which may lead to poor performance. Consider making y a data.frame or set merge_by_chunk_id = TRUE for better performance.")

x = hard_group_by(x, by, nchunks = max(ncy,ncx), overwrite = TRUE)
y = hard_group_by(y, by, nchunks = max(ncy,ncx), overwrite = TRUE)
ncxy = max(ncy,ncx)
x = rechunk(x, shardby=by, nchunks = ncxy, outdir=tempfile(fileext = ".jdf"), overwrite = FALSE)
y = rechunk(y, shardby=by, nchunks =ncxy, outdir=tempfile(fileext = ".jdf"), overwrite = FALSE)
return(anti_join.disk.frame(x, y, by, copy = copy, outdir = outdir, merge_by_chunk_id = TRUE, overwrite = overwrite))
} else if ((identical(shardkey(x)$shardkey, "") & identical(shardkey(y)$shardkey, "")) | identical(shardkey(x), shardkey(y))) {
res = cmap2.disk.frame(x, y, ~{
#res = cmap2(x, y, ~{
if(is.null(.y)) {
return(.x)
} else if (is.null(.x)) {
Expand Down
1 change: 0 additions & 1 deletion R/as.disk.frame.r
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
#' delete(cars_new_location.df)
#' delete(cars_chunks.df)
as.disk.frame <- function(df, outdir = tempfile(fileext = ".df"), nchunks = recommend_nchunks(df), overwrite = FALSE, shardby = NULL, compress = 50,...) {

stopifnot("data.frame" %in% class(df))
overwrite_check(outdir, overwrite)
data.table::setDT(df)
Expand Down
6 changes: 6 additions & 0 deletions R/bind_rows.r
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#' Bind rows
#' @param ... disk.frame to be row bound
#' @export
bind_rows.disk.frame <- function(...) {
rbindlist.disk.frame(list(...))
}
84 changes: 37 additions & 47 deletions R/chunk_mapper.r
Original file line number Diff line number Diff line change
Expand Up @@ -31,59 +31,49 @@
#' @param chunk_fn The dplyr function to create a mapper for
#' @param warning_msg The warning message to display when invoking the mapper
#' @param as.data.frame force the input chunk of a data.frame; needed for dtplyr
#' @importFrom rlang enquos quo
#' @export
create_chunk_mapper <- function(chunk_fn, warning_msg = NULL, as.data.frame = TRUE) {
return_func <- function(.data, ...) {
if (!is.null(warning_msg)) {
create_chunk_mapper <- function(chunk_fn, warning_msg = NULL, as.data.frame = FALSE) {
if(as.data.frame) {
warning("`as.data.frame` is deprecated in create_chunk_mapper")
}

return(function(.data, ...) {
if(!is.null(warning_msg)) {
warning(warning_msg)
}

# you need to use list otherwise the names will be gone
code = substitute(chunk_fn(.disk.frame.chunk, ...))

quo_dotdotdot = rlang::enquos(...)
if (paste0(deparse(code), collapse="") == "chunk_fn(NULL)") {
globals_and_pkgs = future::getGlobalsAndPackages(expression(chunk_fn()))
} else {
globals_and_pkgs = future::getGlobalsAndPackages(code)
}

# this is designed to capture any global stuff
vars_and_pkgs = future::getGlobalsAndPackages(quo_dotdotdot)
data_for_eval_tidy = force(vars_and_pkgs$globals)

res = cmap(.data, ~{

this_env = environment()

if(length(data_for_eval_tidy) > 0) {
for(i in 1:length(data_for_eval_tidy)) {
assign(names(data_for_eval_tidy)[i], data_for_eval_tidy[[i]], pos = this_env)
}
}

lapply(quo_dotdotdot, function(x) {
attr(x, ".Environment") = this_env
})

if(as.data.frame) {
if("grouped_df" %in% class(.x)) {
code = rlang::quo(chunk_fn(.x, !!!quo_dotdotdot))
} else {
code = rlang::quo(chunk_fn(as.data.frame(.x), !!!quo_dotdotdot))
}
} else {
code = rlang::quo(chunk_fn(.x, !!!quo_dotdotdot))
global_vars = globals_and_pkgs$globals

env = parent.frame()

done = identical(env, emptyenv()) || identical(env, globalenv())

# keep adding global variables by moving up the environment chain
while(!done) {
tmp_globals_and_pkgs = future::getGlobalsAndPackages(code, envir = env)
new_global_vars = tmp_globals_and_pkgs$globals
for (name in setdiff(names(new_global_vars), names(global_vars))) {
global_vars[[name]] <- new_global_vars[[name]]
}

# ZJ: we need both approaches. TRUST ME
# TODO better NSE at some point need dist
tryCatch({
return(rlang::eval_tidy(code))
}, error = function(e) {
as_label_code = rlang::as_label(code)
if(as_label_code == "chunk_fn(...)") {
stop(glue::glue("disk.frame has detected a syntax error in \n\n`{code}`\n\n. If you believe your syntax is correct, raise an issue at https://github.com/xiaodaigh/disk.frame with a MWE"))
} else {
# likely to be dealing with data.tables
return(eval(parse(text=as_label_code), envir = this_env))
}
})
}, lazy = TRUE)
}
return_func
}
done = identical(env, emptyenv()) || identical(env, globalenv())
env = parent.env(env)
}

globals_and_pkgs$globals = global_vars

attr(.data, "recordings") = c(attr(.data, "recordings"), list(globals_and_pkgs))

.data
})
}
Empty file added R/clapply.r
Empty file.
Loading

0 comments on commit d24d88d

Please sign in to comment.