Skip to content

Commit

Permalink
feat: Implement non-partitioned tmp table (#405)
Browse files Browse the repository at this point in the history
Co-authored-by: Serhii Dimchenko <[email protected]>
  • Loading branch information
svdimchenko and Serhii Dimchenko authored Sep 8, 2023
1 parent 905746f commit 6ef6c97
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 26 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,6 @@ cython_debug/

# Project specific
test.py

# OS
.DS_Store
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
{% macro get_partition_batches(sql) -%}
{% macro get_partition_batches(sql, as_subquery=True) -%}
{%- set partitioned_by = config.get('partitioned_by') -%}
{%- set athena_partitions_limit = config.get('partitions_limit', 100) | int -%}
{%- set partitioned_keys = adapter.format_partition_keys(partitioned_by) -%}
{% do log('PARTITIONED KEYS: ' ~ partitioned_keys) %}

{% call statement('get_partitions', fetch_result=True) %}
select distinct {{ partitioned_keys }} from ({{ sql }}) order by {{ partitioned_keys }};
{%- if as_subquery -%}
select distinct {{ partitioned_keys }} from ({{ sql }}) order by {{ partitioned_keys }};
{%- else -%}
select distinct {{ partitioned_keys }} from {{ sql }} order by {{ partitioned_keys }};
{%- endif -%}
{% endcall %}

{%- set table = load_result('get_partitions').table -%}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
{% set to_drop = [] %}
{% if existing_relation is none %}
{% set query_result = safe_create_table_as(False, target_relation, sql) -%}
{% set build_sql = "select '{{ query_result }}'" -%}
{% set build_sql = "select '" ~ query_result ~ "'" -%}
{% elif existing_relation.is_view or should_full_refresh() %}
{% do drop_relation(existing_relation) %}
{% set query_result = safe_create_table_as(False, target_relation, sql) -%}
{% set build_sql = "select '{{ query_result }}'" -%}
{% set build_sql = "select '" ~ query_result ~ "'" -%}
{% elif partitioned_by is not none and strategy == 'insert_overwrite' %}
{% set tmp_relation = make_temp_relation(target_relation) %}
{% if tmp_relation is not none %}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{% macro athena__create_table_as(temporary, relation, sql) -%}
{% macro athena__create_table_as(temporary, relation, sql, skip_partitioning=False) -%}
{%- set materialized = config.get('materialized', default='table') -%}
{%- set external_location = config.get('external_location', default=none) -%}
{%- set partitioned_by = config.get('partitioned_by', default=none) -%}
{%- set partitioned_by = config.get('partitioned_by', default=none) if not skip_partitioning else none -%}
{%- set bucketed_by = config.get('bucketed_by', default=none) -%}
{%- set bucket_count = config.get('bucket_count', default=none) -%}
{%- set field_delimiter = config.get('field_delimiter', default=none) -%}
Expand Down Expand Up @@ -90,41 +90,70 @@

{% macro create_table_as_with_partitions(temporary, relation, sql) -%}

{% set partitions_batches = get_partition_batches(sql) %}
{%- set tmp_relation = api.Relation.create(
identifier=relation.identifier ~ '__tmp_not_partitioned',
schema=relation.schema,
database=relation.database,
s3_path_table_part=relation.identifier ~ '__tmp_not_partitioned' ,
type='table'
)
-%}

{%- if tmp_relation is not none -%}
{%- do drop_relation(tmp_relation) -%}
{%- endif -%}

{%- do log('CREATE NON-PARTIONED STAGING TABLE: ' ~ tmp_relation) -%}
{%- do run_query(create_table_as(temporary, tmp_relation, sql, True)) -%}

{% set partitions_batches = get_partition_batches(sql=tmp_relation, as_subquery=False) %}
{% do log('BATCHES TO PROCESS: ' ~ partitions_batches | length) %}

{%- do log('CREATE EMPTY TABLE: ' ~ relation) -%}
{%- set create_empty_table_query -%}
{{ create_table_as(temporary, relation, sql) }}
limit 0
{%- endset -%}
{%- do run_query(create_empty_table_query) -%}
{%- set dest_columns = adapter.get_columns_in_relation(relation) -%}
{%- set dest_columns = adapter.get_columns_in_relation(tmp_relation) -%}
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}

{%- for batch in partitions_batches -%}
{%- do log('BATCH PROCESSING: ' ~ loop.index ~ ' OF ' ~ partitions_batches | length) -%}

{%- set insert_batch_partitions -%}
insert into {{ relation }} ({{ dest_cols_csv }})
select {{ dest_cols_csv }}
from ({{ sql }})
where {{ batch }}
{%- endset -%}
{%- if loop.index == 1 -%}
{%- set create_target_relation_sql -%}
select {{ dest_cols_csv }}
from {{ tmp_relation }}
where {{ batch }}
{%- endset -%}
{%- do run_query(create_table_as(temporary, relation, create_target_relation_sql)) -%}
{%- else -%}
{%- set insert_batch_partitions_sql -%}
insert into {{ relation }} ({{ dest_cols_csv }})
select {{ dest_cols_csv }}
from {{ tmp_relation }}
where {{ batch }}
{%- endset -%}

{%- do run_query(insert_batch_partitions_sql) -%}
{%- endif -%}


{%- do run_query(insert_batch_partitions) -%}
{%- endfor -%}

{%- do drop_relation(tmp_relation) -%}

select 'SUCCESSFULLY CREATED TABLE {{ relation }}'

{%- endmacro %}

{% macro safe_create_table_as(temporary, relation, sql) -%}
{%- set query_result = adapter.run_query_with_partitions_limit_catching(create_table_as(temporary, relation, sql)) -%}
{%- do log('QUERY RESULT: ' ~ query_result) -%}
{%- if query_result == 'TOO_MANY_OPEN_PARTITIONS' -%}
{%- do create_table_as_with_partitions(temporary, relation, sql) -%}
{%- set query_result = relation ~ ' with many partitions created' -%}
{%- if temporary -%}
{%- do run_query(create_table_as(temporary, relation, sql, True)) -%}
{%- set query_result = relation ~ ' as temporary relation without partitioning created' -%}
{%- else -%}
{%- set query_result = adapter.run_query_with_partitions_limit_catching(create_table_as(temporary, relation, sql)) -%}
{%- do log('QUERY RESULT: ' ~ query_result) -%}
{%- if query_result == 'TOO_MANY_OPEN_PARTITIONS' -%}
{%- do create_table_as_with_partitions(temporary, relation, sql) -%}
{%- set query_result = relation ~ ' with many partitions created' -%}
{%- endif -%}
{%- endif -%}

{{ return(query_result) }}
{%- endmacro %}

0 comments on commit 6ef6c97

Please sign in to comment.