Skip to content

Commit

Permalink
Add postgres support (#368)
Browse files Browse the repository at this point in the history
* Add macros for postgres

* Add steps for postgres

* Add postgres entry

* Add a postgres service to action

* Just test postgres for now

* Fix duplicate section

* Install correct package

* Update host to localhost

* Update profiles.yml to env vars

* Add postgres env vars to step

* Try again

* Remove parse_json func

* Another fix

* Another fix

* Alias the subquery

* Add back other CDW [skip-ci]

* Add postgres as supported DW [skip ci]

* Test removing env vars

* Another test

* Add back DWs

* Add missing CI tests

* Formatting

* Update to include new features

---------

Co-authored-by: Gemma Down <[email protected]>
  • Loading branch information
dpguthrie and Gemma Down authored Sep 29, 2023
1 parent af2ed31 commit 069d71c
Show file tree
Hide file tree
Showing 18 changed files with 573 additions and 6 deletions.
30 changes: 28 additions & 2 deletions .github/workflows/ci_test_package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,27 @@ jobs:
strategy:
fail-fast: false # Don't fail one DWH if the others fail
matrix:
warehouse: ["snowflake", "bigquery"]
warehouse: ["snowflake", "bigquery", "postgres"]
runs-on: ubuntu-latest
environment:
name: Approve Integration Tests
permissions:
contents: "read"
id-token: "write"

services:
postgres:
image: postgres
env:
POSTGRES_PASSWORD: postgres
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 5432:5432

steps:
- name: Get latest release
uses: rez0n/actions-github-release@main
Expand Down Expand Up @@ -100,7 +113,7 @@ jobs:
strategy:
fail-fast: false # Don't fail one DWH if the others fail
matrix:
warehouse: ["snowflake", "bigquery"]
warehouse: ["snowflake", "bigquery", "postgres"]
# When supporting a new version, update the list here
version: ["1_3_0", "1_4_0", "1_5_0", "1_6_0"]
runs-on: ubuntu-latest
Expand All @@ -110,6 +123,19 @@ jobs:
contents: "read"
id-token: "write"

services:
postgres:
image: postgres
env:
POSTGRES_PASSWORD: postgres
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 5432:5432

steps:
- uses: actions/setup-python@v4
with:
Expand Down
15 changes: 14 additions & 1 deletion .github/workflows/main_test_package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,26 @@ jobs:
integration:
strategy:
matrix:
warehouse: ["snowflake", "bigquery"]
warehouse: ["snowflake", "bigquery", "postgres"]
version: ["1_3_0", "1_4_0", "1_5_0", "1_6_0"]
runs-on: ubuntu-latest
permissions:
contents: "read"
id-token: "write"

services:
postgres:
image: postgres
env:
POSTGRES_PASSWORD: postgres
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 5432:5432

steps:
- name: Checkout
uses: actions/checkout@v3
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ The package currently supports
- Spark :white_check_mark:
- Snowflake :white_check_mark:
- Google BigQuery :white_check_mark:
- Postgres :white_check_mark:

Models included:

Expand Down
13 changes: 11 additions & 2 deletions integration_test_project/profiles.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
# You should __NEVER__ check credentials into version control. Thanks for reading :)

config:
send_anonymous_usage_stats: False
use_colors: True
send_anonymous_usage_stats: False
use_colors: True

dbt_artifacts:
target: snowflake
Expand Down Expand Up @@ -43,3 +43,12 @@ dbt_artifacts:
timeout_seconds: 300
priority: interactive
retries: 1
postgres:
type: postgres
host: localhost
user: postgres
password: postgres
port: 5432
dbname: postgres
schema: public
threads: 8
2 changes: 1 addition & 1 deletion integration_test_project/tests/singular_test.sql
Original file line number Diff line number Diff line change
@@ -1 +1 @@
select 1 as failures from (select 2) where 1 = 2
select 1 as failures from (select 2) as foo where 1 = 2
34 changes: 34 additions & 0 deletions macros/upload_individual_datasets/upload_exposures.sql
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,37 @@
{{ return("") }}
{% endif %}
{%- endmacro %}

{% macro postgres__get_exposures_dml_sql(exposures) -%}
{% if exposures != [] %}

{% set exposure_values %}
{% for exposure in exposures -%}
(
'{{ invocation_id }}', {# command_invocation_id #}
$${{ exposure.unique_id }}$$, {# node_id #}
'{{ run_started_at }}', {# run_started_at #}
$${{ exposure.name }}$$, {# name #}
'{{ exposure.type }}', {# type #}
$${{ tojson(exposure.owner) }}$$, {# owner #}
'{{ exposure.maturity }}', {# maturity #}
$${{ exposure.original_file_path }}$$, {# path #}
$${{ exposure.description }}$$, {# description #}
'{{ exposure.url }}', {# url #}
'{{ exposure.package_name }}', {# package_name #}
$${{ tojson(exposure.depends_on.nodes) }}$$, {# depends_on_nodes #}
$${{ tojson(exposure.tags) }}$$, {# tags #}
{% if var('dbt_artifacts_exclude_all_results', false) %}
null
{% else %}
$${{ tojson(exposure) }}$$ {# all_results #}
{% endif %}
)
{%- if not loop.last %},{%- endif %}
{%- endfor %}
{% endset %}
{{ exposure_values }}
{% else %}
{{ return("") }}
{% endif %}
{%- endmacro %}
62 changes: 62 additions & 0 deletions macros/upload_individual_datasets/upload_invocations.sql
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,65 @@
{{ invocation_values }}

{% endmacro -%}

{% macro postgres__get_invocations_dml_sql() -%}
{% set invocation_values %}
(
'{{ invocation_id }}', {# command_invocation_id #}
'{{ dbt_version }}', {# dbt_version #}
'{{ project_name }}', {# project_name #}
'{{ run_started_at }}', {# run_started_at #}
'{{ flags.WHICH }}', {# dbt_command #}
{{ flags.FULL_REFRESH }}, {# full_refresh_flag #}
'{{ target.profile_name }}', {# target_profile_name #}
'{{ target.name }}', {# target_name #}
'{{ target.schema }}', {# target_schema #}
{{ target.threads }}, {# target_threads #}

'{{ env_var("DBT_CLOUD_PROJECT_ID", "") }}', {# dbt_cloud_project_id #}
'{{ env_var("DBT_CLOUD_JOB_ID", "") }}', {# dbt_cloud_job_id #}
'{{ env_var("DBT_CLOUD_RUN_ID", "") }}', {# dbt_cloud_run_id #}
'{{ env_var("DBT_CLOUD_RUN_REASON_CATEGORY", "") }}', {# dbt_cloud_run_reason_category #}
$${{ env_var('DBT_CLOUD_RUN_REASON', '') }}$$, {# dbt_cloud_run_reason #}

{% if var('env_vars', none) %}
{% set env_vars_dict = {} %}
{% for env_variable in var('env_vars') %}
{% do env_vars_dict.update({env_variable: (env_var(env_variable, ''))}) %}
{% endfor %}
$${{ tojson(env_vars_dict) }}$$, {# env_vars #}
{% else %}
null, {# env_vars #}
{% endif %}

{% if var('dbt_vars', none) %}
{% set dbt_vars_dict = {} %}
{% for dbt_var in var('dbt_vars') %}
{% do dbt_vars_dict.update({dbt_var: (var(dbt_var, ''))}) %}
{% endfor %}
$${{ tojson(dbt_vars_dict) }}$$, {# dbt_vars #}
{% else %}
null, {# dbt_vars #}
{% endif %}

{% if invocation_args_dict.vars %}
{# vars - different format for pre v1.5 (yaml vs list) #}
{% if invocation_args_dict.vars is string %}
{# BigQuery does not handle the yaml-string from "--vars" well, when passed to "parse_json". Workaround is to parse the string, and then "tojson" will properly format the dict as a json-object. #}
{% set parsed_inv_args_vars = fromyaml(invocation_args_dict.vars) %}
{% do invocation_args_dict.update({'vars': parsed_inv_args_vars}) %}
{% endif %}
{% endif %}

$${{ tojson(invocation_args_dict) }}$$, {# invocation_args #}

{% set metadata_env = {} %}
{% for key, value in dbt_metadata_envs.items() %}
{% do metadata_env.update({key: value}) %}
{% endfor %}
$${{ tojson(metadata_env) }}$$ {# dbt_custom_envs #}
)
{% endset %}
{{ invocation_values }}

{% endmacro -%}
41 changes: 41 additions & 0 deletions macros/upload_individual_datasets/upload_model_executions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -162,3 +162,44 @@
{{ return("") }}
{% endif %}
{% endmacro -%}

{% macro postgres__get_model_executions_dml_sql(models) -%}
{% if models != [] %}
{% set model_execution_values %}
{% for model in models -%}
(
'{{ invocation_id }}', {# command_invocation_id #}
'{{ model.node.unique_id }}', {# node_id #}
'{{ run_started_at }}', {# run_started_at #}

{% set config_full_refresh = model.node.config.full_refresh %}
{% if config_full_refresh is none %}
{% set config_full_refresh = flags.FULL_REFRESH %}
{% endif %}
{{ config_full_refresh }}, {# was_full_refresh #}

'{{ model.thread_id }}', {# thread_id #}
'{{ model.status }}', {# status #}

{% set compile_started_at = (model.timing | selectattr("name", "eq", "compile") | first | default({}))["started_at"] %}
{% if compile_started_at %}'{{ compile_started_at }}'{% else %}null{% endif %}, {# compile_started_at #}
{% set query_completed_at = (model.timing | selectattr("name", "eq", "execute") | first | default({}))["completed_at"] %}
{% if query_completed_at %}'{{ query_completed_at }}'{% else %}null{% endif %}, {# query_completed_at #}

{{ model.execution_time }}, {# total_node_runtime #}
null, {# rows_affected #}
'{{ model.node.config.materialized }}', {# materialization #}
'{{ model.node.schema }}', {# schema #}
'{{ model.node.name }}', {# name #}
'{{ model.node.alias }}', {# alias #}
$${{ model.message }}$$, {# message #}
$${{ tojson(model.adapter_response) }}$$ {# adapter_response #}
)
{%- if not loop.last %},{%- endif %}
{%- endfor %}
{% endset %}
{{ model_execution_values }}
{% else %}
{{ return("") }}
{% endif %}
{%- endmacro %}
35 changes: 35 additions & 0 deletions macros/upload_individual_datasets/upload_models.sql
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,38 @@
{{ return("") }}
{% endif %}
{%- endmacro %}

{% macro postgres__get_models_dml_sql(models) -%}
{% if models != [] %}
{% set model_values %}
{% for model in models -%}
{% do model.pop('raw_code', None) %}
(
'{{ invocation_id }}', {# command_invocation_id #}
'{{ model.unique_id }}', {# node_id #}
'{{ run_started_at }}', {# run_started_at #}
'{{ model.database }}', {# database #}
'{{ model.schema }}', {# schema #}
'{{ model.name }}', {# name #}
'{{ tojson(model.depends_on.nodes) }}', {# depends_on_nodes #}
'{{ model.package_name }}', {# package_name #}
$${{ model.original_file_path | replace('\\', '\\\\') }}$$, {# path #}
'{{ model.checksum.checksum }}', {# checksum #}
'{{ model.config.materialized }}', {# materialization #}
'{{ tojson(model.tags) }}', {# tags #}
$${{ model.config.meta }}$$, {# meta #}
'{{ model.alias }}', {# alias #}
{% if var('dbt_artifacts_exclude_all_results', false) %}
null
{% else %}
$${{ tojson(model) }}$$ {# all_results #}
{% endif %}
)
{%- if not loop.last %},{%- endif %}
{%- endfor %}
{% endset %}
{{ model_values }}
{% else %}
{{ return("") }}
{% endif %}
{%- endmacro %}
57 changes: 57 additions & 0 deletions macros/upload_individual_datasets/upload_seed_executions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -208,3 +208,60 @@
{{ return("") }}
{% endif %}
{% endmacro -%}

{% macro postgres__get_seed_executions_dml_sql(seeds) -%}
{% if seeds != [] %}
{% set seed_execution_values %}
{% for model in seeds -%}
(
'{{ invocation_id }}', {# command_invocation_id #}
'{{ model.node.unique_id }}', {# node_id #}
'{{ run_started_at }}', {# run_started_at #}

{% set config_full_refresh = model.node.config.full_refresh %}
{% if config_full_refresh is none %}
{% set config_full_refresh = flags.FULL_REFRESH %}
{% endif %}
{{ config_full_refresh }}, {# was_full_refresh #}

'{{ model.thread_id }}', {# thread_id #}
'{{ model.status }}', {# status #}

{% if model.timing != [] %}
{% for stage in model.timing if stage.name == "compile" %}
{% if loop.length == 0 %}
null, {# compile_started_at #}
{% else %}
'{{ stage.started_at }}', {# compile_started_at #}
{% endif %}
{% endfor %}

{% for stage in model.timing if stage.name == "execute" %}
{% if loop.length == 0 %}
null, {# query_completed_at #}
{% else %}
'{{ stage.completed_at }}', {# query_completed_at #}
{% endif %}
{% endfor %}
{% else %}
null, {# compile_started_at #}
null, {# query_completed_at #}
{% endif %}

{{ model.execution_time }}, {# total_node_runtime #}
null, -- rows_affected not available {# Databricks #}
'{{ model.node.config.materialized }}', {# materialization #}
'{{ model.node.schema }}', {# schema #}
'{{ model.node.name }}', {# name #}
'{{ model.node.alias }}', {# alias #}
$${{ model.message }}$$, {# message #}
$${{ tojson(model.adapter_response) }}$$ {# adapter_response #}
)
{%- if not loop.last %},{%- endif %}
{%- endfor %}
{% endset %}
{{ seed_execution_values }}
{% else %}
{{ return("") }}
{% endif %}
{% endmacro -%}
Loading

0 comments on commit 069d71c

Please sign in to comment.