Skip to content

Commit

Permalink
First stab at databricks support (#1379)
Browse files Browse the repository at this point in the history
Fixes #1377
  • Loading branch information
hadley authored Oct 25, 2023
1 parent 90de7c3 commit 77f3c28
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 0 deletions.
1 change: 1 addition & 0 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ Collate:
'backend-postgres-old.R'
'backend-redshift.R'
'backend-snowflake.R'
'backend-spark-sql.R'
'backend-sqlite.R'
'backend-teradata.R'
'build-sql.R'
Expand Down
7 changes: 7 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ S3method(db_col_types,PqConnection)
S3method(db_col_types,TestConnection)
S3method(db_col_types,default)
S3method(db_collect,DBIConnection)
S3method(db_compute,"Spark SQL")
S3method(db_compute,DBIConnection)
S3method(db_connection_describe,DBIConnection)
S3method(db_connection_describe,MariaDBConnection)
Expand All @@ -49,6 +50,7 @@ S3method(db_connection_describe,PostgreSQL)
S3method(db_connection_describe,PostgreSQLConnection)
S3method(db_connection_describe,PqConnection)
S3method(db_connection_describe,SQLiteConnection)
S3method(db_copy_to,"Spark SQL")
S3method(db_copy_to,DBIConnection)
S3method(db_create_index,DBIConnection)
S3method(db_desc,DBIConnection)
Expand All @@ -74,6 +76,7 @@ S3method(dbplyr_as_join_by,default)
S3method(dbplyr_as_join_by,dplyr_join_by)
S3method(dbplyr_as_join_by,list)
S3method(dbplyr_edition,"Microsoft SQL Server")
S3method(dbplyr_edition,"Spark SQL")
S3method(dbplyr_edition,ACCESS)
S3method(dbplyr_edition,HDB)
S3method(dbplyr_edition,Hive)
Expand Down Expand Up @@ -346,6 +349,7 @@ S3method(sql_semi_join,DBIConnection)
S3method(sql_set_op,DBIConnection)
S3method(sql_subquery,DBIConnection)
S3method(sql_table_analyze,"Microsoft SQL Server")
S3method(sql_table_analyze,"Spark SQL")
S3method(sql_table_analyze,ACCESS)
S3method(sql_table_analyze,DBIConnection)
S3method(sql_table_analyze,HDB)
Expand All @@ -361,6 +365,7 @@ S3method(sql_table_analyze,Teradata)
S3method(sql_table_index,DBIConnection)
S3method(sql_translate_env,DBIConnection)
S3method(sql_translation,"Microsoft SQL Server")
S3method(sql_translation,"Spark SQL")
S3method(sql_translation,ACCESS)
S3method(sql_translation,DBIConnection)
S3method(sql_translation,HDB)
Expand Down Expand Up @@ -394,6 +399,7 @@ S3method(sql_values_subquery,Redshift)
S3method(sql_values_subquery,RedshiftConnection)
S3method(src_tbls,src_sql)
S3method(summarise,tbl_lazy)
S3method(supports_window_clause,"Spark SQL")
S3method(supports_window_clause,ACCESS)
S3method(supports_window_clause,DBIConnection)
S3method(supports_window_clause,Hive)
Expand Down Expand Up @@ -500,6 +506,7 @@ export(simulate_oracle)
export(simulate_postgres)
export(simulate_redshift)
export(simulate_snowflake)
export(simulate_spark_sql)
export(simulate_sqlite)
export(simulate_teradata)
export(sql)
Expand Down
2 changes: 2 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
(@mgirlich, #1211).

* Joins now work again for Pool and Oracle connections (@mgirlich, #1177, #1181).

* Preliminary databricks Sqark SQL backend (#1377).

* `dbplyr_pivot_wider_spec()` is now exported. Unlike `pivot_wider()` this can
be lazy. Note that this will be removed soon after `pivot_wider_spec()`
Expand Down
130 changes: 130 additions & 0 deletions R/backend-spark-sql.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
#' Backend: Databricks Spark SQL
#'
#' @description
#' See `vignette("translation-function")` and `vignette("translation-verb")` for
#' details of overall translation technology. Key differences for this backend
#' are better translation of statistical aggregate functions
#' (e.g. `var()`, `median()`) and use of temporary views instead of temporary
#' tables when copying data.
#'
#' Use `simulate_spark_sql()` with `lazy_frame()` to see simulated SQL without
#' converting to live access database.
#'
#' @name backend-spark-sql
#' @aliases NULL
#' @examples
#' library(dplyr, warn.conflicts = FALSE)
#'
#' lf <- lazy_frame(a = TRUE, b = 1, d = 2, c = "z", con = simulate_spark_sql())
#'
#' lf %>% summarise(x = median(d, na.rm = TRUE))
#' lf %>% summarise(x = var(c, na.rm = TRUE), .by = d)
#'
#' lf %>% mutate(x = first(c))
#' lf %>% mutate(x = first(c), .by = d)
NULL

#' @export
#' @rdname backend-spark-sql
simulate_spark_sql <- function() simulate_dbi("Spark SQL")

#' @export
`dbplyr_edition.Spark SQL` <- function(con) {
2L
}

#' @export
`sql_translation.Spark SQL` <- function(con) {
sql_variant(
base_odbc_scalar,
sql_translator(.parent = base_odbc_agg,
var = sql_aggregate("VARIANCE", "var"),
quantile = sql_quantile("PERCENTILE"),
median = sql_aggregate("MEDIAN"),
first = function(x, na_rm = FALSE) {
check_na_rm(na_rm)
glue_sql2(sql_current_con(), "FIRST({.val x})")
},
last = function(x, na_rm = FALSE) {
check_na_rm(na_rm)
glue_sql2(sql_current_con(), "LAST({.val x})")
},
),
sql_translator(.parent = base_odbc_win,
var = win_aggregate("VARIANCE"),
quantile = sql_quantile("PERCENTILE", window = TRUE),
median = win_aggregate("MEDIAN"),
first = function(x, order_by = NULL, na_rm = FALSE) {
sql_nth(x, 1L, order_by = order_by, na_rm = na_rm, ignore_nulls = "bool")
},
last = function(x, order_by = NULL, na_rm = FALSE) {
sql_nth(x, Inf, order_by = order_by, na_rm = na_rm, ignore_nulls = "bool")
},
nth = function(x, n, order_by = NULL, na_rm = FALSE) {
sql_nth(x, n, order_by = order_by, na_rm = na_rm, ignore_nulls = "bool")
},
)
)
}

#' @export
`sql_table_analyze.Spark SQL` <- function(con, table, ...) {
# https://docs.databricks.com/en/sql/language-manual/sql-ref-syntax-aux-analyze-table.html
glue_sql2(con, "ANALYZE TABLE {.tbl table} COMPUTE STATISTICS")
}

#' @export
`supports_window_clause.Spark SQL` <- function(con) {
TRUE
}

#' @export
`db_copy_to.Spark SQL` <- function(con,
table,
values,
...,
overwrite = FALSE,
types = NULL,
temporary = TRUE,
unique_indexes = NULL,
indexes = NULL,
analyze = TRUE,
in_transaction = FALSE) {

if (!temporary) {
cli::cli_abort("Spark SQL only support temporary tables")
}

sql <- sql_values_subquery(con, values, types = types, lvl = 1)
db_compute(con, table, sql, overwrite = overwrite)
}

#' @export
`db_compute.Spark SQL` <- function(con,
table,
sql,
...,
overwrite = FALSE,
temporary = TRUE,
unique_indexes = list(),
indexes = list(),
analyze = TRUE,
in_transaction = FALSE) {

if (!temporary) {
cli::cli_abort("Spark SQL only support temporary tables")
}

table <- as_table_ident(table)
sql <- glue_sql2(
con,
"CREATE ", if (overwrite) "OR REPLACE ",
"TEMPORARY VIEW {.tbl {table}} AS \n",
"{.from {sql}}"
)
DBI::dbExecute(con, sql)

table
}

utils::globalVariables("regexp_replace")
29 changes: 29 additions & 0 deletions man/backend-spark-sql.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 77f3c28

Please sign in to comment.