Skip to content

Commit

Permalink
Merge pull request #1006 from cynkra/f-342-learn-postgres
Browse files Browse the repository at this point in the history
  • Loading branch information
krlmlr authored May 30, 2022
2 parents 096739e + 00f8bf0 commit 49e6f7b
Show file tree
Hide file tree
Showing 13 changed files with 220 additions and 355 deletions.
31 changes: 0 additions & 31 deletions R/data-model-helpers.R
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,3 @@ new_data_model <- function(tables, columns, references) {
class = "data_model"
)
}

get_datamodel_from_overview <- function(overview) {
new_data_model(
tables = datamodel_tables_from_overview(overview),
columns = datamodel_columns_from_overview(overview),
references = datamodel_references_from_overview(overview)
)
}

datamodel_tables_from_overview <- function(overview) {
overview %>%
distinct(table) %>%
add_column(segment = NA_character_, display = NA_character_) %>%
as.data.frame(stringsAsFactors = FALSE)
}

datamodel_columns_from_overview <- function(overview) {
overview %>%
select(column, type, table, key, ref, ref_col) %>%
mutate(key = as.numeric(key)) %>%
as.data.frame(stringsAsFactors = FALSE)
}

datamodel_references_from_overview <- function(overview) {
overview %>%
filter(!is.na(ref)) %>%
select(table, column, ref, ref_col) %>%
mutate(ref_id = as.numeric(row_number())) %>%
add_column(ref_col_num = 1) %>%
as.data.frame(stringsAsFactors = FALSE)
}
4 changes: 0 additions & 4 deletions R/datamodelr-code.R
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@

# data_model code directly from {datamodelr} --------------------------------------

is.data_model <- function(x) {
inherits(x, "data_model")
}

bdm_create_references <- function(col_table) {
if (!inherits(col_table, "data.frame")) stop("Input must be a data frame.")

Expand Down
7 changes: 5 additions & 2 deletions R/db-helpers.R
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,19 @@ unique_db_table_name <- local({

function(table_name) {
i <<- i + 1
glue("{table_name}_", systime_convenient(), "_", get_pid(), "_", as.character(i))
glue("{table_name}_", as.character(i), "_", systime_convenient(), "_", get_pid())
}
})

systime_convenient <- function() {
# FIXME: Race condition here, but fast enough
local_options(digits.secs = 6)

if (Sys.getenv("IN_PKGDOWN") != "") {
"2020_08_28_07_13_03"
} else {
time <- as.character(Sys.time())
gsub("[-: ]", "_", time)
gsub("[-:. ]", "_", time)
}
}

Expand Down
66 changes: 37 additions & 29 deletions R/dm-from-src.R
Original file line number Diff line number Diff line change
Expand Up @@ -65,32 +65,36 @@ dm_from_src <- function(src = NULL, table_names = NULL, learn_keys = NULL,
# FIXME: Get rid of legacy method once it works for all

if (is.null(learn_keys) || isTRUE(learn_keys)) {
dm_learned <- dm_learn_from_db(src, ...)

if (is.null(dm_learned)) {
if (isTRUE(learn_keys)) {
abort_learn_keys()
}

inform("Keys could not be queried, use `learn_keys = FALSE` to mute this message.")
} else {
if (is_null(learn_keys)) {
inform("Keys queried successfully, use `learn_keys = TRUE` to mute this message.")
# FIXME: Try to make it work everywhere
tryCatch(
{
dm_learned <- dm_learn_from_db(src, ...)
if (is_null(learn_keys)) {
inform("Keys queried successfully, use `learn_keys = TRUE` to mute this message.")
}

if (is_null(table_names)) {
return(dm_learned)
}

tbls_in_dm <- src_tbls_impl(dm_learned)

if (!all(table_names %in% tbls_in_dm)) {
abort_tbl_access(setdiff(table_names, tbls_in_dm))
}
tbls_req <- intersect(tbls_in_dm, table_names)

return(dm_learned %>% dm_select_tbl(!!!tbls_req))
},
error = function(e) {
if (isTRUE(learn_keys)) {
abort_learn_keys(conditionMessage(e))
}
# FIXME: Use new-style error messages.
inform(paste0("Keys could not be queried: ", conditionMessage(e), ". Use `learn_keys = FALSE` to mute this message."))
NULL
}

if (is_null(table_names)) {
return(dm_learned)
}

tbls_in_dm <- src_tbls_impl(dm_learned)

if (!all(table_names %in% tbls_in_dm)) {
abort_tbl_access(setdiff(table_names, tbls_in_dm))
}
tbls_req <- intersect(tbls_in_dm, table_names)

return(dm_learned %>% dm_select_tbl(!!!tbls_req))
}
)
}

if (is_null(table_names)) {
Expand Down Expand Up @@ -144,12 +148,16 @@ quote_ids <- function(x, con, schema = NULL) {

# Errors ------------------------------------------------------------------

abort_learn_keys <- function() {
abort(error_txt_learn_keys(), class = dm_error_full("learn_keys"))
abort_learn_keys <- function(reason) {
abort(error_txt_learn_keys(reason), class = dm_error_full("learn_keys"))
}

error_txt_learn_keys <- function() {
"Failed to learn keys from database. Use `learn_keys = FALSE` to work around."
error_txt_learn_keys <- function(reason) {
# FIXME: Use new-style error messages.
paste0(
"Failed to learn keys from database: ", reason,
". Use `learn_keys = FALSE` to work around."
)
}

abort_tbl_access <- function(bad) {
Expand Down
222 changes: 4 additions & 218 deletions R/learn.R
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,6 @@ dm_learn_from_db <- function(dest, dbname = NA, ...) {
return()
}

if (!is_mssql(con)) {
return(dm_learn_from_db_legacy(con, dbname, ...))
}

dm_learn_from_db_meta(con, catalog = dbname, ...)
}

Expand Down Expand Up @@ -111,7 +107,10 @@ dm_learn_from_db_meta <- function(con, catalog = NULL, schema = NULL, name_forma
left_join(columns, select = c(column_name, dm_name, table_catalog, table_schema, table_name)) %>%
dm_update_zoomed() %>%
dm_zoom_to(constraint_column_usage) %>%
left_join(columns, select = c(column_name, dm_name, table_catalog, table_schema, table_name)) %>%
#
# inner_join(): Matching column sometimes not found on Postgres
inner_join(columns, select = c(column_name, dm_name, table_catalog, table_schema, table_name)) %>%
#
dm_update_zoomed() %>%
dm_select_tbl(-columns) %>%
dm_rename(constraint_column_usage, constraint_column_usage.table_catalog = table_catalog) %>%
Expand Down Expand Up @@ -164,38 +163,6 @@ dm_learn_from_db_meta <- function(con, catalog = NULL, schema = NULL, name_forma
new_dm2(tables, pks_df, fks_df)
}

dm_learn_from_db_legacy <- function(con, dbname, ...) {
sql <- db_learn_query(con, dbname = dbname, ...)
if (is.null(sql)) {
return()
}

overview <-
dbGetQuery(con, sql) %>%
as_tibble()

if (nrow(overview) == 0) {
return()
}

table_names <-
overview %>%
arrange(table) %>%
distinct(schema, table) %>%
transmute(
name = table,
value = schema_if(schema = schema, table = table, con = con, dbname = dbname)
) %>%
deframe()

# FIXME: Use tbl_sql(vars = ...)
tables <- map(table_names, ~ tbl(con, dbplyr::ident_q(.x)))

data_model <- get_datamodel_from_overview(overview)

legacy_new_dm(tables, data_model)
}

schema_if <- function(schema, table, con, dbname = NULL) {
table_sql <- DBI::dbQuoteIdentifier(con, table)
if (is_null(dbname) || is.na(dbname) || dbname == "") {
Expand All @@ -213,184 +180,3 @@ schema_if <- function(schema, table, con, dbname = NULL) {
SQL(paste0(DBI::dbQuoteIdentifier(con, dbname), ".", DBI::dbQuoteIdentifier(con, schema), ".", table_sql))
}
}

db_learn_query <- function(dest, dbname, ...) {
if (is_postgres(dest)) {
return(postgres_learn_query(dest, ...))
}
}

postgres_learn_query <- function(con, schema = "public", table_type = "BASE TABLE") {
sprintf(
"SELECT
t.table_schema as schema,
t.table_name as table,
c.column_name as column,
case when pk.column_name is null then 0 else 1 end as key,
fk.ref,
fk.ref_col,
case c.is_nullable when 'YES' then 0 else 1 end as mandatory,
c.data_type as type,
c.ordinal_position as column_order
from
information_schema.columns c
inner join information_schema.tables t on
t.table_name = c.table_name
and t.table_schema = c.table_schema
and t.table_catalog = c.table_catalog
left join -- primary keys
( SELECT DISTINCT
tc.constraint_name, tc.table_name, tc.table_schema, tc.table_catalog, kcu.column_name
FROM
information_schema.table_constraints AS tc
JOIN information_schema.key_column_usage AS kcu ON
tc.constraint_name = kcu.constraint_name
WHERE constraint_type = 'PRIMARY KEY'
) pk on
pk.table_name = c.table_name
and pk.column_name = c.column_name
and pk.table_schema = c.table_schema
and pk.table_catalog = c.table_catalog
left join -- foreign keys
( SELECT DISTINCT
tc.constraint_name, kcu.table_name, kcu.table_schema, kcu.table_catalog, kcu.column_name,
ccu.table_name as ref,
ccu.column_name as ref_col
FROM
information_schema.table_constraints AS tc
JOIN information_schema.key_column_usage AS kcu ON
tc.constraint_name = kcu.constraint_name
JOIN information_schema.constraint_column_usage AS ccu ON
ccu.constraint_name = tc.constraint_name
WHERE tc.constraint_type = 'FOREIGN KEY'
) fk on
fk.table_name = c.table_name
and fk.table_schema = c.table_schema
and fk.table_catalog = c.table_catalog
and fk.column_name = c.column_name
where
c.table_schema = %s
and t.table_type = %s",
dbQuoteString(con, schema),
dbQuoteString(con, table_type)
)
}

# FIXME: only needed for `dm_learn_from_db()` <- needs to be implemented in a different manner
legacy_new_dm <- function(tables = NULL, data_model = NULL) {
if (is_null(tables) && is_null(data_model)) {
return(empty_dm())
}

if (!all_same_source(tables)) abort_not_same_src()
stopifnot(is.data_model(data_model))

columns <- as_tibble(data_model$columns)

data_model_tables <- data_model$tables

stopifnot(all(names(tables) %in% data_model_tables$table))
stopifnot(all(data_model_tables$table %in% names(tables)))

pks <-
columns %>%
select(column, table, key) %>%
filter(key > 0) %>%
select(-key)

if (is.null(data_model$references) || nrow(data_model$references) == 0) {
fks <- tibble(
table = character(),
column = character(),
ref = character(),
ref_column = character(),
on_delete = character()
)
} else {
fks <-
data_model$references %>%
transmute(table, column, ref, ref_column = ref_col, on_delete = "no_action") %>%
as_tibble()
}

# Legacy
data <- unname(tables[data_model_tables$table])

table <- data_model_tables$table
segment <- data_model_tables$segment
# would be logical NA otherwise, but if set, it is class `character`
display <- as.character(data_model_tables$display)
zoom <- new_zoom()
col_tracker_zoom <- new_col_tracker_zoom()

pks <-
pks %>%
# Legacy compatibility
mutate(column = as.list(column, list())) %>%
nest_compat(pks = -table)

pks <-
tibble(
table = setdiff(table, pks$table),
pks = list_of(new_pk())
) %>%
vec_rbind(pks)

# Legacy compatibility
fks$column <- as.list(fks$column)
fks$ref_column <- as.list(fks$ref_column)

fks <-
fks %>%
nest_compat(fks = -ref) %>%
rename(table = ref)

fks <-
tibble(
table = setdiff(table, fks$table),
fks = list_of(new_fk())
) %>%
vec_rbind(fks)

# there are no filters at this stage
filters <-
tibble(
table = table,
filters = list_of(new_filter())
)

def <-
tibble(table, data, segment, display) %>%
left_join(pks, by = "table") %>%
left_join(fks, by = "table") %>%
left_join(filters, by = "table") %>%
left_join(zoom, by = "table") %>%
left_join(col_tracker_zoom, by = "table")

new_dm3(def)
}

nest_compat <- function(.data, ...) {
# `...` has to be name-variable pair (see `?nest()`) of length 1
quos <- enquos(...)
stopifnot(length(quos) == 1)
new_col <- names(quos)
if (nrow(.data) == 0) {
remove <- eval_select_indices(quo(c(...)), colnames(.data))
keep <- setdiff(seq_along(.data), remove)

nest <- new_list_of(list(), ptype = .data %>% select(!!!remove))

.data %>%
select(!!!keep) %>%
mutate(!!new_col := !!nest)
} else {
.data %>%
nest(...) %>%
mutate_at(vars(!!!new_col), as_list_of)
}
}
Loading

0 comments on commit 49e6f7b

Please sign in to comment.