Skip to content

Commit

Permalink
Merge pull request #517 from cynkra/f-learn-compound-mssql
Browse files Browse the repository at this point in the history
ENH: Draft dm_meta()
  • Loading branch information
krlmlr authored May 28, 2022
2 parents 2ac9ab8 + 424701b commit 096739e
Show file tree
Hide file tree
Showing 10 changed files with 491 additions and 87 deletions.
10 changes: 7 additions & 3 deletions R/dm-from-src.R
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ dm_from_src <- function(src = NULL, table_names = NULL, learn_keys = NULL,
src <- src_from_src_or_con(src)
con <- con_from_src_or_con(src)

# FIXME: Get rid of legacy method once it works for all

if (is.null(learn_keys) || isTRUE(learn_keys)) {
dm_learned <- dm_learn_from_db(src, ...)

Expand All @@ -76,12 +78,12 @@ dm_from_src <- function(src = NULL, table_names = NULL, learn_keys = NULL,
inform("Keys queried successfully, use `learn_keys = TRUE` to mute this message.")
}

tbls_in_dm <- src_tbls_impl(dm_learned)

if (is_null(table_names)) {
return(dm_learned)
}

tbls_in_dm <- src_tbls_impl(dm_learned)

if (!all(table_names %in% tbls_in_dm)) {
abort_tbl_access(setdiff(table_names, tbls_in_dm))
}
Expand Down Expand Up @@ -123,7 +125,9 @@ dm_from_src <- function(src = NULL, table_names = NULL, learn_keys = NULL,
}

quote_ids <- function(x, con, schema = NULL) {
if (is.null(con)) return(x)
if (is.null(con)) {
return(x)
}

if (is_null(schema)) {
map(
Expand Down
12 changes: 4 additions & 8 deletions R/dm.R
Original file line number Diff line number Diff line change
Expand Up @@ -94,23 +94,19 @@ new_dm <- function(tables = list()) {
}

new_dm2 <- function(tables = list(),
pks = structure(list(), names = character()),
fks = structure(list(), names = character()),
pks_df = tibble(table = character(), pks = list()),
fks_df = tibble(table = character(), fks = list()),
validate = TRUE) {
# Legacy
data <- unname(tables)
table <- names2(tables)

stopifnot(!is.null(names(pks)), all(names(pks) %in% table))
stopifnot(!is.null(names(fks)), all(names(fks) %in% table))
stopifnot(all(pks_df$table %in% table))
stopifnot(all(fks_df$table %in% table))

zoom <- new_zoom()
col_tracker_zoom <- new_col_tracker_zoom()

pks_df <- enframe(pks, "table", "pks")

fks_df <- enframe(fks, "table", "fks")

filters <-
tibble(
table = table,
Expand Down
31 changes: 31 additions & 0 deletions R/global.R
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,37 @@ utils::globalVariables(c(
"orders",
"trans",
#
# information_schema
"catalog",
"catalog_name",
"column_default",
"column_id",
"column_name",
"con",
"constraint_catalog",
"constraint_column_id",
"constraint_column_usage",
"constraint_name",
"constraint_schema",
"constraint_type",
"dbname",
"FIXME",
"is_nullable",
"key_column_usage",
"object_id",
"ordinal_position",
"schema_id",
"schemata",
"table_catalog",
"table_constraints",
"table_schema",
"table_type",
"tables",
"constraint_column_usage.column_name",
"constraint_column_usage.dm_name",
"key_column_usage.column_name",
"key_column_usage.dm_name",
#
# pixarfilms
"pixar_films",
"pixar_people",
Expand Down
180 changes: 126 additions & 54 deletions R/learn.R
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
#' # the `dm` from the SQLite DB
#' iris_dm_learned <- dm_learn_from_db(src_sqlite)
#' }
dm_learn_from_db <- function(dest, dbname = NULL, ...) {
dm_learn_from_db <- function(dest, dbname = NA, ...) {
# assuming that we will not try to learn from (globally) temporary tables, which do not appear in sys.table
con <- con_from_src_or_con(dest)
src <- src_from_src_or_con(dest)
Expand All @@ -41,6 +41,130 @@ dm_learn_from_db <- function(dest, dbname = NULL, ...) {
return()
}

if (!is_mssql(con)) {
return(dm_learn_from_db_legacy(con, dbname, ...))
}

dm_learn_from_db_meta(con, catalog = dbname, ...)
}

dm_learn_from_db_meta <- function(con, catalog = NULL, schema = NULL, name_format = "{table}") {
info <- dm_meta(con, catalog = catalog, schema = schema)

df_info <-
info %>%
dm_select_tbl(-schemata) %>%
collect()

dm_name <-
df_info$tables %>%
select(catalog = table_catalog, schema = table_schema, table = table_name) %>%
mutate(name = glue(!!name_format)) %>%
pull() %>%
unclass() %>%
vec_as_names(repair = "unique")

from <-
df_info$tables %>%
select(catalog = table_catalog, schema = table_schema, table = table_name) %>%
pmap_chr(~ DBI::dbQuoteIdentifier(con, DBI::Id(...)))

df_key_info <-
df_info %>%
dm_zoom_to(tables) %>%
mutate(dm_name = !!dm_name, from = !!from) %>%
dm_update_zoomed() %>%
dm_zoom_to(columns) %>%
arrange(ordinal_position) %>%
select(-ordinal_position) %>%
left_join(tables) %>%
dm_update_zoomed() %>%
dm_select_tbl(constraint_column_usage, key_column_usage, columns)

table_info <-
df_key_info %>%
dm_zoom_to(columns) %>%
group_by(dm_name, from) %>%
summarize(vars = list(column_name)) %>%
ungroup() %>%
pull_tbl()

tables <- map2(table_info$from, table_info$vars, ~ tbl(con, dbplyr::ident_q(.x), vars = .y))
names(tables) <- table_info$dm_name

pks_df <-
df_key_info %>%
dm_zoom_to(key_column_usage) %>%
anti_join(constraint_column_usage) %>%
arrange(ordinal_position) %>%
dm_update_zoomed() %>%
dm_squash_to_tbl(key_column_usage) %>%
select(constraint_catalog, constraint_schema, constraint_name, dm_name, column_name) %>%
group_by(constraint_catalog, constraint_schema, constraint_name, dm_name) %>%
summarize(pks = list(tibble(column = list(column_name)))) %>%
ungroup() %>%
select(table = dm_name, pks)

fks_df <-
df_key_info %>%
dm_zoom_to(key_column_usage) %>%
left_join(columns, select = c(column_name, dm_name, table_catalog, table_schema, table_name)) %>%
dm_update_zoomed() %>%
dm_zoom_to(constraint_column_usage) %>%
left_join(columns, select = c(column_name, dm_name, table_catalog, table_schema, table_name)) %>%
dm_update_zoomed() %>%
dm_select_tbl(-columns) %>%
dm_rename(constraint_column_usage, constraint_column_usage.table_catalog = table_catalog) %>%
dm_rename(constraint_column_usage, constraint_column_usage.table_schema = table_schema) %>%
dm_rename(constraint_column_usage, constraint_column_usage.table_name = table_name) %>%
dm_rename(constraint_column_usage, constraint_column_usage.column_name = column_name) %>%
dm_rename(constraint_column_usage, constraint_column_usage.dm_name = dm_name) %>%
dm_rename(key_column_usage, key_column_usage.table_catalog = table_catalog) %>%
dm_rename(key_column_usage, key_column_usage.table_schema = table_schema) %>%
dm_rename(key_column_usage, key_column_usage.table_name = table_name) %>%
dm_rename(key_column_usage, key_column_usage.column_name = column_name) %>%
dm_rename(key_column_usage, key_column_usage.dm_name = dm_name) %>%
dm_flatten_to_tbl(constraint_column_usage) %>%
select(
constraint_catalog,
constraint_schema,
constraint_name,
ordinal_position,
ref_table = constraint_column_usage.dm_name,
ref_column = constraint_column_usage.column_name,
table = key_column_usage.dm_name,
column = key_column_usage.column_name,
) %>%
arrange(
constraint_catalog,
constraint_schema,
constraint_name,
ordinal_position,
) %>%
select(-ordinal_position) %>%
# FIXME: Where to learn this in INFORMATION_SCHEMA?
group_by(
constraint_catalog,
constraint_schema,
constraint_name,
ref_table,
) %>%
summarize(fks = list(tibble(
ref_column = list(ref_column),
table = if (length(table) > 0) table[[1]] else NA_character_,
column = list(column),
on_delete = "no_action"
))) %>%
ungroup() %>%
select(-(1:3)) %>%
group_by(table = ref_table) %>%
summarize(fks = list(bind_rows(fks))) %>%
ungroup()

new_dm2(tables, pks_df, fks_df)
}

dm_learn_from_db_legacy <- function(con, dbname, ...) {
sql <- db_learn_query(con, dbname = dbname, ...)
if (is.null(sql)) {
return()
Expand Down Expand Up @@ -74,7 +198,7 @@ dm_learn_from_db <- function(dest, dbname = NULL, ...) {

schema_if <- function(schema, table, con, dbname = NULL) {
table_sql <- DBI::dbQuoteIdentifier(con, table)
if (is_null(dbname) || dbname == "") {
if (is_null(dbname) || is.na(dbname) || dbname == "") {
if_else(
are_na(schema),
table_sql,
Expand All @@ -91,63 +215,11 @@ schema_if <- function(schema, table, con, dbname = NULL) {
}

db_learn_query <- function(dest, dbname, ...) {
if (is_mssql(dest)) {
return(mssql_learn_query(dest, dbname = dbname, ...))
}
if (is_postgres(dest)) {
return(postgres_learn_query(dest, ...))
}
}

mssql_learn_query <- function(con, schema = "dbo", dbname = NULL) { # taken directly from {datamodelr} and subsequently tweaked a little
dbname_sql <- if (is_null(dbname)) {
""
} else {
paste0(DBI::dbQuoteIdentifier(con, dbname), ".")
}
glue::glue(
"select
schemas.name as [schema],
tabs.name as [table],
cols.name as [column],
isnull(ind_col.column_id, 0) as [key],
ref_tabs.name AS ref,
ref_cols.name AS ref_col,
1 - cols.is_nullable as mandatory,
types.name as [type],
cols.max_length,
cols.precision,
cols.scale
from
{dbname_sql}sys.all_columns cols
inner join {dbname_sql}sys.tables tabs on
cols.object_id = tabs.object_id
inner join {dbname_sql}sys.schemas schemas on
tabs.schema_id = schemas.schema_id
left outer join {dbname_sql}sys.foreign_key_columns ref on
ref.parent_object_id = tabs.object_id
and ref.parent_column_id = cols.column_id
left outer join {dbname_sql}sys.indexes ind on
ind.object_id = tabs.object_id
and ind.is_primary_key = 1
left outer join {dbname_sql}sys.index_columns ind_col on
ind_col.object_id = ind.object_id
and ind_col.index_id = ind.index_id
and ind_col.column_id = cols.column_id
left outer join {dbname_sql}sys.systypes [types] on
types.xusertype = cols.system_type_id
left outer join {dbname_sql}sys.tables ref_tabs on
ref_tabs.object_id = ref.referenced_object_id
left outer join {dbname_sql}sys.all_columns ref_cols on
ref_cols.object_id = ref.referenced_object_id
and ref_cols.column_id = ref.referenced_column_id
where schemas.name = {DBI::dbQuoteString(con, schema)}
order by
tabs.create_date,
cols.column_id"
)
}

postgres_learn_query <- function(con, schema = "public", table_type = "BASE TABLE") {
sprintf(
"SELECT
Expand Down
Loading

0 comments on commit 096739e

Please sign in to comment.