Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

integrate the liquid clustering macros #1

Open
wants to merge 3 commits into
base: support-python-model-session-submission-method
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 1.9.0a1
current_version = 1.9.0a4
parse = (?P<major>[\d]+) # major version number
\.(?P<minor>[\d]+) # minor version number
\.(?P<patch>[\d]+) # patch version number
Expand Down
2 changes: 1 addition & 1 deletion dbt/adapters/spark/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = "1.9.0a1"
version = "1.9.0a4"
1 change: 1 addition & 0 deletions dbt/adapters/spark/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class SparkConfig(AdapterConfig):
location_root: Optional[str] = None
partition_by: Optional[Union[List[str], str]] = None
clustered_by: Optional[Union[List[str], str]] = None
liquid_clustered_by: Optional[Union[List[str], str]] = None
buckets: Optional[int] = None
options: Optional[Dict[str, str]] = None
merge_update_columns: Optional[str] = None
Expand Down
23 changes: 23 additions & 0 deletions dbt/include/spark/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@
{{ options_clause() }}
{{ tblproperties_clause() }}
{{ partition_cols(label="partitioned by") }}
{{ liquid_clustered_cols() }}
{{ clustered_cols(label="clustered by") }}
{{ location_clause() }}
{{ comment_clause() }}
Expand Down Expand Up @@ -430,3 +431,25 @@
{% do run_query(sql) %}

{% endmacro %}

{% macro liquid_clustered_cols() -%}
dkruh1 marked this conversation as resolved.
Show resolved Hide resolved
{%- set cols = config.get('liquid_clustered_by', validator=validation.any[list, basestring]) -%}
{%- if cols is not none %}
{%- if cols is string -%}
{%- set cols = [cols] -%}
{%- endif -%}
CLUSTER BY ({{ cols | join(', ') }})
{%- endif %}
{%- endmacro -%}

{% macro apply_liquid_clustered_cols(target_relation) -%}
{%- set cols = config.get('liquid_clustered_by', validator=validation.any[list, basestring]) -%}
{%- if cols is not none %}
{%- if cols is string -%}
{%- set cols = [cols] -%}
{%- endif -%}
{%- call statement('set_cluster_by_columns') -%}
ALTER {{ target_relation.type }} {{ target_relation }} CLUSTER BY ({{ cols | join(', ') }})
{%- endcall -%}
{%- endif %}
{%- endmacro -%}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
{%- set language = model['language'] -%}
{%- set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') -%}
{%- set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) -%}
{%- set target_relation = this -%}
{%- set target_relation = this.incorporate(type='table') -%}
{%- set existing_relation = load_relation(this) -%}
{% set tmp_relation = this.incorporate(path = {"identifier": this.identifier ~ '__dbt_tmp'}) -%}

Expand Down Expand Up @@ -72,12 +72,14 @@
{%- endcall %}
{%- endif -%}
{%- endif -%}

{% do apply_liquid_clustered_cols(target_relation) %}
{% set should_revoke = should_revoke(existing_relation, full_refresh_mode) %}
{% do apply_grants(target_relation, grant_config, should_revoke) %}

{% do persist_docs(target_relation, model) %}

{% do optimize(target_relation) %}

{{ run_hooks(post_hooks) }}

{{ return({'relations': [target_relation]}) }}
Expand Down
45 changes: 44 additions & 1 deletion dbt/include/spark/macros/materializations/table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@

{% do persist_constraints(target_relation, model) %}

{% do optimize(target_relation) %}

{{ run_hooks(post_hooks) }}

{{ return({'relations': [target_relation]})}}
Expand Down Expand Up @@ -98,9 +100,50 @@ else:
msg = f"{type(df)} is not a supported type for dbt Python materialization"
raise Exception(msg)

df.write.mode("overwrite").format("{{ config.get('file_format', 'delta') }}").option("overwriteSchema", "true").saveAsTable("{{ target_relation }}")
writer = (
df.write
.mode("overwrite")
.option("overwriteSchema", "true")
{{ py_get_writer_options()|indent(8, True) }}
)
writer.saveAsTable("{{ target_relation }}")
{%- endmacro -%}

{%- macro py_get_writer_options() -%}
{%- set location_root = config.get('location_root', validator=validation.any[basestring]) -%}
{%- set file_format = config.get('file_format', validator=validation.any[basestring])|default('delta', true) -%}
{%- set partition_by = config.get('partition_by', validator=validation.any[list, basestring]) -%}
{%- set liquid_clustered_by = config.get('liquid_clustered_by', validator=validation.any[list, basestring]) -%}
{%- set clustered_by = config.get('clustered_by', validator=validation.any[list, basestring]) -%}
{%- set buckets = config.get('buckets', validator=validation.any[int]) -%}
.format("{{ file_format }}")
{%- if location_root is not none %}
{%- set identifier = model['alias'] %}
{%- if is_incremental() %}
{%- set identifier = identifier + '__dbt_tmp' %}
{%- endif %}
.option("path", "{{ location_root }}/{{ identifier }}")
{%- endif -%}
{%- if partition_by is not none -%}
{%- if partition_by is string -%}
{%- set partition_by = [partition_by] -%}
{%- endif %}
.partitionBy({{ partition_by }})
{%- endif -%}
{%- if liquid_clustered_by and not is_incremental() -%}
{%- if liquid_clustered_by is string -%}
{%- set liquid_clustered_by = [liquid_clustered_by] -%}
{%- endif %}
.clusterBy({{ liquid_clustered_by }})
{%- endif -%}
{%- if (clustered_by is not none) and (buckets is not none) -%}
{%- if clustered_by is string -%}
{%- set clustered_by = [clustered_by] -%}
{%- endif %}
.bucketBy({{ buckets }}, {{ clustered_by }})
{%- endif -%}
{% endmacro -%}

{%macro py_script_comment()%}
# how to execute python model in notebook
# dbt = dbtObj(spark.table)
Expand Down
36 changes: 36 additions & 0 deletions dbt/include/spark/macros/optimize.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
{% macro optimize(relation) %}
{{ return(adapter.dispatch('optimize', 'dbt')(relation)) }}
{% endmacro %}

{%- macro spark__optimize(relation) -%}
{%- if var('DATABRICKS_SKIP_OPTIMIZE', 'false')|lower != 'true' and
var('databricks_skip_optimize', 'false')|lower != 'true' and
config.get('file_format', 'delta') == 'delta' -%}
{%- if (config.get('zorder', False) or config.get('liquid_clustered_by', False)) -%}
{%- call statement('run_optimize_stmt') -%}
{{ get_optimize_sql(relation) }}
{%- endcall -%}
{%- endif -%}
{%- endif -%}
{%- endmacro -%}

{%- macro get_optimize_sql(relation) %}
optimize {{ relation }}
{%- if config.get('zorder', False) and config.get('file_format', 'delta') == 'delta' %}
{%- if config.get('liquid_clustered_by', False) %}
{{ exceptions.warn("Both zorder and liquid_clustered_by are set but they are incompatible. zorder will be ignored.") }}
{%- else %}
{%- set zorder = config.get('zorder', none) %}
{# TODO: predicates here? WHERE ... #}
{%- if zorder is sequence and zorder is not string %}
zorder by (
{%- for col in zorder %}
{{ col }}{% if not loop.last %}, {% endif %}
{%- endfor %}
)
{%- else %}
zorder by ({{zorder}})
{%- endif %}
{%- endif %}
{%- endif %}
{%- endmacro -%}
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def _get_plugin_version_dict():


package_name = "dbt-spark"
package_version = "1.9.0a1"
package_version = "1.9.0a4"
description = """The Apache Spark adapter plugin for dbt"""

odbc_extras = ["pyodbc~=4.0.39"]
Expand Down