diff --git a/DESCRIPTION b/DESCRIPTION index 50a9dec98..92dfe06e6 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -87,6 +87,7 @@ Collate: 'backend-postgres-old.R' 'backend-redshift.R' 'backend-snowflake.R' + 'backend-spark-sql.R' 'backend-sqlite.R' 'backend-teradata.R' 'build-sql.R' diff --git a/NAMESPACE b/NAMESPACE index a9bff8dd8..17e592bc1 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -39,6 +39,7 @@ S3method(db_col_types,PqConnection) S3method(db_col_types,TestConnection) S3method(db_col_types,default) S3method(db_collect,DBIConnection) +S3method(db_compute,"Spark SQL") S3method(db_compute,DBIConnection) S3method(db_connection_describe,DBIConnection) S3method(db_connection_describe,MariaDBConnection) @@ -49,6 +50,7 @@ S3method(db_connection_describe,PostgreSQL) S3method(db_connection_describe,PostgreSQLConnection) S3method(db_connection_describe,PqConnection) S3method(db_connection_describe,SQLiteConnection) +S3method(db_copy_to,"Spark SQL") S3method(db_copy_to,DBIConnection) S3method(db_create_index,DBIConnection) S3method(db_desc,DBIConnection) @@ -74,6 +76,7 @@ S3method(dbplyr_as_join_by,default) S3method(dbplyr_as_join_by,dplyr_join_by) S3method(dbplyr_as_join_by,list) S3method(dbplyr_edition,"Microsoft SQL Server") +S3method(dbplyr_edition,"Spark SQL") S3method(dbplyr_edition,ACCESS) S3method(dbplyr_edition,HDB) S3method(dbplyr_edition,Hive) @@ -346,6 +349,7 @@ S3method(sql_semi_join,DBIConnection) S3method(sql_set_op,DBIConnection) S3method(sql_subquery,DBIConnection) S3method(sql_table_analyze,"Microsoft SQL Server") +S3method(sql_table_analyze,"Spark SQL") S3method(sql_table_analyze,ACCESS) S3method(sql_table_analyze,DBIConnection) S3method(sql_table_analyze,HDB) @@ -361,6 +365,7 @@ S3method(sql_table_analyze,Teradata) S3method(sql_table_index,DBIConnection) S3method(sql_translate_env,DBIConnection) S3method(sql_translation,"Microsoft SQL Server") +S3method(sql_translation,"Spark SQL") S3method(sql_translation,ACCESS) S3method(sql_translation,DBIConnection) S3method(sql_translation,HDB) @@ -394,6 +399,7 @@ S3method(sql_values_subquery,Redshift) S3method(sql_values_subquery,RedshiftConnection) S3method(src_tbls,src_sql) S3method(summarise,tbl_lazy) +S3method(supports_window_clause,"Spark SQL") S3method(supports_window_clause,ACCESS) S3method(supports_window_clause,DBIConnection) S3method(supports_window_clause,Hive) @@ -500,6 +506,7 @@ export(simulate_oracle) export(simulate_postgres) export(simulate_redshift) export(simulate_snowflake) +export(simulate_spark_sql) export(simulate_sqlite) export(simulate_teradata) export(sql) diff --git a/NEWS.md b/NEWS.md index de9636442..4c2711a84 100644 --- a/NEWS.md +++ b/NEWS.md @@ -15,6 +15,8 @@ (@mgirlich, #1211). * Joins now work again for Pool and Oracle connections (@mgirlich, #1177, #1181). + +* Preliminary databricks Sqark SQL backend (#1377). * `dbplyr_pivot_wider_spec()` is now exported. Unlike `pivot_wider()` this can be lazy. Note that this will be removed soon after `pivot_wider_spec()` diff --git a/R/backend-spark-sql.R b/R/backend-spark-sql.R new file mode 100644 index 000000000..79a556107 --- /dev/null +++ b/R/backend-spark-sql.R @@ -0,0 +1,130 @@ +#' Backend: Databricks Spark SQL +#' +#' @description +#' See `vignette("translation-function")` and `vignette("translation-verb")` for +#' details of overall translation technology. Key differences for this backend +#' are better translation of statistical aggregate functions +#' (e.g. `var()`, `median()`) and use of temporary views instead of temporary +#' tables when copying data. +#' +#' Use `simulate_spark_sql()` with `lazy_frame()` to see simulated SQL without +#' converting to live access database. +#' +#' @name backend-spark-sql +#' @aliases NULL +#' @examples +#' library(dplyr, warn.conflicts = FALSE) +#' +#' lf <- lazy_frame(a = TRUE, b = 1, d = 2, c = "z", con = simulate_spark_sql()) +#' +#' lf %>% summarise(x = median(d, na.rm = TRUE)) +#' lf %>% summarise(x = var(c, na.rm = TRUE), .by = d) +#' +#' lf %>% mutate(x = first(c)) +#' lf %>% mutate(x = first(c), .by = d) +NULL + +#' @export +#' @rdname backend-spark-sql +simulate_spark_sql <- function() simulate_dbi("Spark SQL") + +#' @export +`dbplyr_edition.Spark SQL` <- function(con) { + 2L +} + +#' @export +`sql_translation.Spark SQL` <- function(con) { + sql_variant( + base_odbc_scalar, + sql_translator(.parent = base_odbc_agg, + var = sql_aggregate("VARIANCE", "var"), + quantile = sql_quantile("PERCENTILE"), + median = sql_aggregate("MEDIAN"), + first = function(x, na_rm = FALSE) { + check_na_rm(na_rm) + glue_sql2(sql_current_con(), "FIRST({.val x})") + }, + last = function(x, na_rm = FALSE) { + check_na_rm(na_rm) + glue_sql2(sql_current_con(), "LAST({.val x})") + }, + ), + sql_translator(.parent = base_odbc_win, + var = win_aggregate("VARIANCE"), + quantile = sql_quantile("PERCENTILE", window = TRUE), + median = win_aggregate("MEDIAN"), + first = function(x, order_by = NULL, na_rm = FALSE) { + sql_nth(x, 1L, order_by = order_by, na_rm = na_rm, ignore_nulls = "bool") + }, + last = function(x, order_by = NULL, na_rm = FALSE) { + sql_nth(x, Inf, order_by = order_by, na_rm = na_rm, ignore_nulls = "bool") + }, + nth = function(x, n, order_by = NULL, na_rm = FALSE) { + sql_nth(x, n, order_by = order_by, na_rm = na_rm, ignore_nulls = "bool") + }, + ) + ) +} + +#' @export +`sql_table_analyze.Spark SQL` <- function(con, table, ...) { + # https://docs.databricks.com/en/sql/language-manual/sql-ref-syntax-aux-analyze-table.html + glue_sql2(con, "ANALYZE TABLE {.tbl table} COMPUTE STATISTICS") +} + +#' @export +`supports_window_clause.Spark SQL` <- function(con) { + TRUE +} + +#' @export +`db_copy_to.Spark SQL` <- function(con, + table, + values, + ..., + overwrite = FALSE, + types = NULL, + temporary = TRUE, + unique_indexes = NULL, + indexes = NULL, + analyze = TRUE, + in_transaction = FALSE) { + + if (!temporary) { + cli::cli_abort("Spark SQL only support temporary tables") + } + + sql <- sql_values_subquery(con, values, types = types, lvl = 1) + db_compute(con, table, sql, overwrite = overwrite) +} + +#' @export +`db_compute.Spark SQL` <- function(con, + table, + sql, + ..., + overwrite = FALSE, + temporary = TRUE, + unique_indexes = list(), + indexes = list(), + analyze = TRUE, + in_transaction = FALSE) { + + if (!temporary) { + cli::cli_abort("Spark SQL only support temporary tables") + } + + table <- as_table_ident(table) + sql <- glue_sql2( + con, + "CREATE ", if (overwrite) "OR REPLACE ", + "TEMPORARY VIEW {.tbl {table}} AS \n", + "{.from {sql}}" + ) + DBI::dbExecute(con, sql) + + table +} + +utils::globalVariables("regexp_replace") diff --git a/man/backend-spark-sql.Rd b/man/backend-spark-sql.Rd new file mode 100644 index 000000000..e6b6d93d9 --- /dev/null +++ b/man/backend-spark-sql.Rd @@ -0,0 +1,29 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/backend-spark-sql.R +\name{backend-spark-sql} +\alias{simulate_spark_sql} +\title{Backend: Databricks Spark SQL} +\usage{ +simulate_spark_sql() +} +\description{ +See \code{vignette("translation-function")} and \code{vignette("translation-verb")} for +details of overall translation technology. Key differences for this backend +are better translation of statistical aggregate functions +(e.g. \code{var()}, \code{median()}) and use of temporary views instead of temporary +tables when copying data. + +Use \code{simulate_spark_sql()} with \code{lazy_frame()} to see simulated SQL without +converting to live access database. +} +\examples{ +library(dplyr, warn.conflicts = FALSE) + +lf <- lazy_frame(a = TRUE, b = 1, d = 2, c = "z", con = simulate_spark_sql()) + +lf \%>\% summarise(x = median(d, na.rm = TRUE)) +lf \%>\% summarise(x = var(c, na.rm = TRUE), .by = d) + +lf \%>\% mutate(x = first(c)) +lf \%>\% mutate(x = first(c), .by = d) +}