diff --git a/.gitignore b/.gitignore index 36b127c9..6a7cf974 100644 --- a/.gitignore +++ b/.gitignore @@ -143,3 +143,6 @@ cython_debug/ # Project specific test.py + +# OS +.DS_Store diff --git a/dbt/include/athena/macros/materializations/models/helpers/get_partition_batches.sql b/dbt/include/athena/macros/materializations/models/helpers/get_partition_batches.sql index 5780d283..08e1845a 100644 --- a/dbt/include/athena/macros/materializations/models/helpers/get_partition_batches.sql +++ b/dbt/include/athena/macros/materializations/models/helpers/get_partition_batches.sql @@ -1,11 +1,15 @@ -{% macro get_partition_batches(sql) -%} +{% macro get_partition_batches(sql, as_subquery=True) -%} {%- set partitioned_by = config.get('partitioned_by') -%} {%- set athena_partitions_limit = config.get('partitions_limit', 100) | int -%} {%- set partitioned_keys = adapter.format_partition_keys(partitioned_by) -%} {% do log('PARTITIONED KEYS: ' ~ partitioned_keys) %} {% call statement('get_partitions', fetch_result=True) %} - select distinct {{ partitioned_keys }} from ({{ sql }}) order by {{ partitioned_keys }}; + {%- if as_subquery -%} + select distinct {{ partitioned_keys }} from ({{ sql }}) order by {{ partitioned_keys }}; + {%- else -%} + select distinct {{ partitioned_keys }} from {{ sql }} order by {{ partitioned_keys }}; + {%- endif -%} {% endcall %} {%- set table = load_result('get_partitions').table -%} diff --git a/dbt/include/athena/macros/materializations/models/incremental/incremental.sql b/dbt/include/athena/macros/materializations/models/incremental/incremental.sql index b42c7cb9..a7fc580b 100644 --- a/dbt/include/athena/macros/materializations/models/incremental/incremental.sql +++ b/dbt/include/athena/macros/materializations/models/incremental/incremental.sql @@ -25,11 +25,11 @@ {% set to_drop = [] %} {% if existing_relation is none %} {% set query_result = safe_create_table_as(False, target_relation, sql) -%} - {% set build_sql = "select '{{ query_result }}'" -%} + {% set build_sql = "select '" ~ query_result ~ "'" -%} {% elif existing_relation.is_view or should_full_refresh() %} {% do drop_relation(existing_relation) %} {% set query_result = safe_create_table_as(False, target_relation, sql) -%} - {% set build_sql = "select '{{ query_result }}'" -%} + {% set build_sql = "select '" ~ query_result ~ "'" -%} {% elif partitioned_by is not none and strategy == 'insert_overwrite' %} {% set tmp_relation = make_temp_relation(target_relation) %} {% if tmp_relation is not none %} diff --git a/dbt/include/athena/macros/materializations/models/table/create_table_as.sql b/dbt/include/athena/macros/materializations/models/table/create_table_as.sql index 7e89ff0d..aa6775eb 100644 --- a/dbt/include/athena/macros/materializations/models/table/create_table_as.sql +++ b/dbt/include/athena/macros/materializations/models/table/create_table_as.sql @@ -1,7 +1,7 @@ -{% macro athena__create_table_as(temporary, relation, sql) -%} +{% macro athena__create_table_as(temporary, relation, sql, skip_partitioning=False) -%} {%- set materialized = config.get('materialized', default='table') -%} {%- set external_location = config.get('external_location', default=none) -%} - {%- set partitioned_by = config.get('partitioned_by', default=none) -%} + {%- set partitioned_by = config.get('partitioned_by', default=none) if not skip_partitioning else none -%} {%- set bucketed_by = config.get('bucketed_by', default=none) -%} {%- set bucket_count = config.get('bucket_count', default=none) -%} {%- set field_delimiter = config.get('field_delimiter', default=none) -%} @@ -90,41 +90,70 @@ {% macro create_table_as_with_partitions(temporary, relation, sql) -%} - {% set partitions_batches = get_partition_batches(sql) %} + {%- set tmp_relation = api.Relation.create( + identifier=relation.identifier ~ '__tmp_not_partitioned', + schema=relation.schema, + database=relation.database, + s3_path_table_part=relation.identifier ~ '__tmp_not_partitioned' , + type='table' + ) + -%} + + {%- if tmp_relation is not none -%} + {%- do drop_relation(tmp_relation) -%} + {%- endif -%} + + {%- do log('CREATE NON-PARTIONED STAGING TABLE: ' ~ tmp_relation) -%} + {%- do run_query(create_table_as(temporary, tmp_relation, sql, True)) -%} + + {% set partitions_batches = get_partition_batches(sql=tmp_relation, as_subquery=False) %} {% do log('BATCHES TO PROCESS: ' ~ partitions_batches | length) %} - {%- do log('CREATE EMPTY TABLE: ' ~ relation) -%} - {%- set create_empty_table_query -%} - {{ create_table_as(temporary, relation, sql) }} - limit 0 - {%- endset -%} - {%- do run_query(create_empty_table_query) -%} - {%- set dest_columns = adapter.get_columns_in_relation(relation) -%} + {%- set dest_columns = adapter.get_columns_in_relation(tmp_relation) -%} {%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%} {%- for batch in partitions_batches -%} {%- do log('BATCH PROCESSING: ' ~ loop.index ~ ' OF ' ~ partitions_batches | length) -%} - {%- set insert_batch_partitions -%} - insert into {{ relation }} ({{ dest_cols_csv }}) - select {{ dest_cols_csv }} - from ({{ sql }}) - where {{ batch }} - {%- endset -%} + {%- if loop.index == 1 -%} + {%- set create_target_relation_sql -%} + select {{ dest_cols_csv }} + from {{ tmp_relation }} + where {{ batch }} + {%- endset -%} + {%- do run_query(create_table_as(temporary, relation, create_target_relation_sql)) -%} + {%- else -%} + {%- set insert_batch_partitions_sql -%} + insert into {{ relation }} ({{ dest_cols_csv }}) + select {{ dest_cols_csv }} + from {{ tmp_relation }} + where {{ batch }} + {%- endset -%} + + {%- do run_query(insert_batch_partitions_sql) -%} + {%- endif -%} + - {%- do run_query(insert_batch_partitions) -%} {%- endfor -%} + {%- do drop_relation(tmp_relation) -%} + select 'SUCCESSFULLY CREATED TABLE {{ relation }}' {%- endmacro %} {% macro safe_create_table_as(temporary, relation, sql) -%} - {%- set query_result = adapter.run_query_with_partitions_limit_catching(create_table_as(temporary, relation, sql)) -%} - {%- do log('QUERY RESULT: ' ~ query_result) -%} - {%- if query_result == 'TOO_MANY_OPEN_PARTITIONS' -%} - {%- do create_table_as_with_partitions(temporary, relation, sql) -%} - {%- set query_result = relation ~ ' with many partitions created' -%} + {%- if temporary -%} + {%- do run_query(create_table_as(temporary, relation, sql, True)) -%} + {%- set query_result = relation ~ ' as temporary relation without partitioning created' -%} + {%- else -%} + {%- set query_result = adapter.run_query_with_partitions_limit_catching(create_table_as(temporary, relation, sql)) -%} + {%- do log('QUERY RESULT: ' ~ query_result) -%} + {%- if query_result == 'TOO_MANY_OPEN_PARTITIONS' -%} + {%- do create_table_as_with_partitions(temporary, relation, sql) -%} + {%- set query_result = relation ~ ' with many partitions created' -%} + {%- endif -%} {%- endif -%} + {{ return(query_result) }} {%- endmacro %}