Skip to content

Commit

Permalink
Merge pull request #1123 from cynkra/f-learn-mysql-2
Browse files Browse the repository at this point in the history
  • Loading branch information
krlmlr authored Jun 21, 2022
2 parents 1cf682c + 1edd020 commit 4496328
Show file tree
Hide file tree
Showing 10 changed files with 922 additions and 45 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ Suggests:
pixarfilms,
pool,
progress,
RMariaDB (>= 1.0.10),
RMariaDB (>= 1.2.2),
rmarkdown,
RPostgres,
RSQLite (>= 2.2.8),
Expand Down
5 changes: 2 additions & 3 deletions R/db-helpers.R
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ is_postgres <- function(dest) {
}

is_mariadb <- function(dest) {
inherits(dest, "src_MariaDBConnection") ||
inherits(dest, "MariaDBConnection")
inherits_any(dest, c("MariaDBConnection", "src_MariaDBConnection", "src_DoltConnection", "src_DoltLocalConnection"))
}

src_from_src_or_con <- function(dest) {
Expand All @@ -91,7 +90,7 @@ repair_table_names_for_db <- function(table_names, temporary, con, schema = NULL
names <- unique_db_table_name(names)
} else {
# permanent tables
if (!is.null(schema) && !is_mssql(con) && !is_postgres(con)) {
if (!is.null(schema) && !is_mssql(con) && !is_postgres(con) && !is_mariadb(con)) {
abort_no_schemas_supported(con = con)
}
names <- table_names
Expand Down
6 changes: 6 additions & 0 deletions R/global.R
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,12 @@ utils::globalVariables(c(
"remote_table_unquoted",
"unique_def",
#
# meta
"DATABASE",
"referenced_column_name",
"referenced_table_name",
"referenced_table_schema",
#
# keep this to avoid dealing with trailing commas
NULL
))
116 changes: 92 additions & 24 deletions R/meta.R
Original file line number Diff line number Diff line change
Expand Up @@ -45,31 +45,47 @@ dm_meta_raw <- function(con, catalog) {

local_options(digits.secs = 6)

schemata <- tbl_lc(src, "information_schema.schemata", vars = c(
"catalog_name", "schema_name", "schema_owner", "default_character_set_catalog",
"default_character_set_schema", "default_character_set_name"
schemata <- tbl_lc(src, "information_schema.schemata", vars = vec_c(
"catalog_name", "schema_name", "default_character_set_name",
# Optional, not MySQL:
# "schema_owner", "default_character_set_catalog", "default_character_set_schema",
))
tables <- tbl_lc(src, "information_schema.tables", vars = c(
"table_catalog", "table_schema", "table_name", "table_type"
tables <- tbl_lc(src, "information_schema.tables", vars = vec_c(
"table_catalog", "table_schema", "table_name", "table_type",
))
columns <- tbl_lc(src, "information_schema.columns", vars = c(
columns <- tbl_lc(src, "information_schema.columns", vars = vec_c(
"table_catalog", "table_schema", "table_name", "column_name",
"ordinal_position", "column_default", "is_nullable", "data_type",
"character_maximum_length", "character_octet_length", "numeric_precision",
"numeric_precision_radix", "numeric_scale", "datetime_precision",
"character_set_catalog", "character_set_schema", "character_set_name",
"collation_catalog", "collation_schema", "collation_name", "domain_catalog",
"domain_schema", "domain_name"
))
table_constraints <- tbl_lc(src, "information_schema.table_constraints", vars = c(
"constraint_catalog", "constraint_schema", "constraint_name",
"table_catalog", "table_schema", "table_name", "constraint_type",
"is_deferrable", "initially_deferred"
"numeric_scale", "datetime_precision",
"character_set_name", "collation_name",

# Optional, not RMySQL:
# "numeric_precision_radix",
# "character_set_catalog", "character_set_schema",
# "collation_catalog", "collation_schema", "domain_catalog",
# "domain_schema", "domain_name"
))
key_column_usage <- tbl_lc(src, "information_schema.key_column_usage", vars = c(

if (is_mariadb(src)) {
table_constraints <- tbl_lc(src, "information_schema.table_constraints", vars = vec_c(
"constraint_catalog", "constraint_schema", "constraint_name",
"table_name", "constraint_type"
)) %>%
mutate(table_catalog = constraint_catalog, table_schema = constraint_schema, .before = table_name) %>%
mutate(constraint_name = if_else(constraint_type == "PRIMARY KEY", paste0("pk_", table_name), constraint_name))
} else {
table_constraints <- tbl_lc(src, "information_schema.table_constraints", vars = vec_c(
"constraint_catalog", "constraint_schema", "constraint_name",
"table_catalog", "table_schema", "table_name", "constraint_type",
"is_deferrable", "initially_deferred",
))
}

key_column_usage <- tbl_lc(src, "information_schema.key_column_usage", vars = vec_c(
"constraint_catalog", "constraint_schema", "constraint_name",
"table_catalog", "table_schema", "table_name", "column_name",
"ordinal_position"
"ordinal_position",
))

if (is_postgres(src)) {
Expand All @@ -82,6 +98,36 @@ dm_meta_raw <- function(con, catalog) {
))
} else if (is_mssql(src)) {
constraint_column_usage <- mssql_constraint_column_usage(src, table_constraints, catalog)
} else {
# Alternate constraint names for uniqueness
key_column_usage <-
key_column_usage %>%
left_join(
tbl_lc(src, "information_schema.table_constraints", vars = vec_c(
"constraint_catalog", "constraint_schema", "constraint_name",
"table_name", "constraint_type"
)),
by = vec_c(
"constraint_catalog", "constraint_schema", "constraint_name",
"table_name",
)
) %>%
mutate(constraint_name = if_else(constraint_type == "PRIMARY KEY", paste0("pk_", table_name), constraint_name)) %>%
select(-constraint_type)

constraint_column_usage <-
tbl_lc(src, "information_schema.key_column_usage", vars = c(
"table_catalog",
"referenced_table_schema", "referenced_table_name", "referenced_column_name",
"constraint_catalog", "constraint_schema", "constraint_name",
"ordinal_position"
)) %>%
filter(!is.na(referenced_table_name)) %>%
rename(
table_schema = referenced_table_schema,
table_name = referenced_table_name,
column_name = referenced_column_name,
)
}

dm(schemata, tables, columns, table_constraints, key_column_usage, constraint_column_usage) %>%
Expand Down Expand Up @@ -145,13 +191,24 @@ dm_meta_simple_add_keys <- function(dm_meta) {
}

tbl_lc <- function(con, name, vars) {
from <- paste0(
"SELECT ",
paste0(DBI::dbQuoteIdentifier(con_from_src_or_con(con), vars), collapse = ", "),
"\nFROM ", name
)
# For discovery only!
if (is.null(vars)) {
from <- name
} else {
from <- sql(paste0(
"SELECT ",
paste0(DBI::dbQuoteIdentifier(con_from_src_or_con(con), vars), collapse = ", "),
"\nFROM ", name
))
}

tbl(con, sql(from), vars = vars)
out <- tbl(con, from, vars = vars)
if (is.null(vars)) {
out <-
out %>%
rename(!!!set_names(colnames(out), tolower(colnames(out))))
}
out
}

select_dm_meta <- function(dm_meta) {
Expand Down Expand Up @@ -184,13 +241,20 @@ filter_dm_meta <- function(dm_meta, catalog = NULL, schema = NULL) {
constraint_column_usage <- constraint_column_usage %>% filter(table_catalog %in% !!catalog)
}

if (!is.null(schema)) {
if (!is.null(schema) && !is.na(schema)) {
schemata <- schemata %>% filter(schema_name %in% !!schema)
tables <- tables %>% filter(table_schema %in% !!schema)
columns <- columns %>% filter(table_schema %in% !!schema)
table_constraints <- table_constraints %>% filter(table_schema %in% !!schema)
key_column_usage <- key_column_usage %>% filter(table_schema %in% !!schema)
constraint_column_usage <- constraint_column_usage %>% filter(table_schema %in% !!schema)
} else if (!is.na(schema) && is_mariadb(dm_get_con(dm_meta))) {
schemata <- schemata %>% filter(schema_name == DATABASE() | is.na(DATABASE()))
tables <- tables %>% filter(table_schema == DATABASE() | is.na(DATABASE()))
columns <- columns %>% filter(table_schema == DATABASE() | is.na(DATABASE()))
table_constraints <- table_constraints %>% filter(table_schema == DATABASE() | is.na(DATABASE()))
key_column_usage <- key_column_usage %>% filter(table_schema == DATABASE() | is.na(DATABASE()))
constraint_column_usage <- constraint_column_usage %>% filter(table_schema == DATABASE() | is.na(DATABASE()))
}

dm(
Expand Down Expand Up @@ -222,6 +286,10 @@ filter_dm_meta_simple <- function(dm_meta, catalog = NULL, schema = NULL) {
schemata <- schemata %>% filter(schema_name %in% !!schema)
tables <- tables %>% filter(table_schema %in% !!schema)
columns <- columns %>% filter(table_schema %in% !!schema)
} else if (!is.na(schema) && is_mariadb(dm_get_con(dm_meta))) {
schemata <- schemata %>% filter(schema_name == DATABASE() | is.na(DATABASE()))
tables <- tables %>% filter(table_schema == DATABASE() | is.na(DATABASE()))
columns <- columns %>% filter(table_schema == DATABASE() | is.na(DATABASE()))
}

dm(schemata, tables, columns) %>%
Expand Down
Loading

0 comments on commit 4496328

Please sign in to comment.