diff --git a/.prettierrc.yaml b/.prettierrc.yaml index e59acf52..4f431fb4 100644 --- a/.prettierrc.yaml +++ b/.prettierrc.yaml @@ -8,3 +8,7 @@ overrides: - "*.html" options: tabWidth: 2 + - files: + - "*.yaml" + options: + tabWidth: 2 diff --git a/dags/dset_materialize_dag.py b/dags/dset_materialize_dag.py new file mode 100644 index 00000000..ddeefae1 --- /dev/null +++ b/dags/dset_materialize_dag.py @@ -0,0 +1,111 @@ +import random +import urllib.parse +from datetime import datetime, timedelta + +from airflow import DAG +from airflow.models import Variable +from airflow.providers.docker.operators.docker import DockerOperator +from docker.types import DriverConfig, Mount + +my_email = Variable.get("fail_email") +addr = Variable.get("repo_server_url") + + +args = { + "email": my_email, + "email_on_failure": True, + "email_on_retry": True, + "retries": 3, + "retry_delay": timedelta(minutes=5), +} + +nfs_config = { + "type": "nfs", + "o": f"addr={addr},nfsvers=4", + "device": ":/opt/airflow/repos", +} + +__doc__ = """ +# Executes dbt run on an incremental model, hourly + +This dag will update an incremental model in the dbt jardiner project. + +Warning: Note that views and other dbt dependencies need to exist beforehand +so that model can be succesfully executed. +""" + + +def get_random_moll(): + available_molls = Variable.get("available_molls").split() + # trunk-ignore(bandit/B311) + return random.choice(available_molls) + + +driver_config = DriverConfig(name="local", options=nfs_config) +mount_nfs = Mount( + source="local", target="/repos", type="volume", driver_config=driver_config +) + + +def dbapi_to_dict(dbapi: str): + parsed_string = urllib.parse.urlparse(dbapi) + + return { + "provider": parsed_string.scheme, + "user": parsed_string.username, + "password": urllib.parse.unquote(parsed_string.password) + if parsed_string.password + else None, + "host": parsed_string.hostname, + "port": parsed_string.port, + "database": parsed_string.path[1:], + } + + +with DAG( + dag_id="dset_materialize_dag", + start_date=datetime(2023, 1, 10), + schedule_interval="@hourly", + catchup=False, + tags=["scope:Plantmonitor", "scope:Jardiner", "DBT", "source:DSET"], + max_active_runs=1, + default_args=args, + doc_md=__doc__, +) as dag: + repo_name = "somenergia-jardiner" + + sampled_moll = get_random_moll() + + dbapi = Variable.get("plantmonitor_db") + + dbapi_dict = dbapi_to_dict(dbapi) + + environment = { + "DBUSER": dbapi_dict["user"], + "DBPASSWORD": dbapi_dict["password"], + "DBHOST": dbapi_dict["host"], + "DBPORT": dbapi_dict["port"], + "DBNAME": dbapi_dict["database"], + } + + dbt_transformation_task = DockerOperator( + api_version="auto", + task_id="dbt_transformation_task__dset_materialization", + environment=environment, + docker_conn_id="somenergia_harbor_dades_registry", + image=f"harbor.somenergia.coop/dades/{repo_name}-dbt-docs:latest", + working_dir=f"/repos/{repo_name}/dbt_jardiner", + command=( + "dbt run" + " --profiles-dir config" + " --target prod" + " --models int_dset_responses__materialized_one_hour_late" + ), + docker_url=sampled_moll, + mounts=[mount_nfs], + mount_tmp_dir=False, + auto_remove=True, + retrieve_output=True, + trigger_rule="none_failed", + force_pull=True, + ) diff --git a/dbt_jardiner/models/jardiner/intermediate/dset/_int_dset__models.yaml b/dbt_jardiner/models/jardiner/intermediate/dset/_int_dset__models.yaml index dd7db666..781ad489 100644 --- a/dbt_jardiner/models/jardiner/intermediate/dset/_int_dset__models.yaml +++ b/dbt_jardiner/models/jardiner/intermediate/dset/_int_dset__models.yaml @@ -37,4 +37,4 @@ tests: pot_instantantanea_planta_kw. - name: pot_instantantanea_planta_kw description: - Suma de la potència activ a de tots els inversors de cada planta. \ No newline at end of file + Suma de la potència activ a de tots els inversors de cada planta. diff --git a/dbt_jardiner/models/jardiner/intermediate/dset/int_dset_responses__deduplicated.sql b/dbt_jardiner/models/jardiner/intermediate/dset/int_dset_responses__deduplicated.sql index abd13bb4..84bc2d56 100644 --- a/dbt_jardiner/models/jardiner/intermediate/dset/int_dset_responses__deduplicated.sql +++ b/dbt_jardiner/models/jardiner/intermediate/dset/int_dset_responses__deduplicated.sql @@ -2,9 +2,9 @@ with ordered as ( - select row_number() over (partition by rdrar.ts, rdrar.signal_id order by rdrar.queried_at desc) as row_order, * - from {{ ref("raw_dset_responses__api_response") }} rdrar + select *, row_number() over (partition by ts, signal_id order by queried_at desc) as row_order + from {{ ref("raw_dset_responses__api_response") }} ) select {{ dbt_utils.star(from=ref("raw_dset_responses__api_response")) }} -from ordered o -where o.row_order = 1 +from ordered +where row_order = 1 diff --git a/dbt_jardiner/models/jardiner/intermediate/dset/int_dset_responses__materialized_one_hour_late.sql b/dbt_jardiner/models/jardiner/intermediate/dset/int_dset_responses__materialized_one_hour_late.sql new file mode 100644 index 00000000..f5632101 --- /dev/null +++ b/dbt_jardiner/models/jardiner/intermediate/dset/int_dset_responses__materialized_one_hour_late.sql @@ -0,0 +1,34 @@ +{{ config(materialized="incremental") }} + +with + normalized_jsonb as ( + + select + group_name, + queried_at, + ts, + signal_code, + signal_device_type, + signal_device_uuid, + signal_frequency, + signal_id, + signal_is_virtual, + signal_last_ts, + signal_last_value, + signal_type, + signal_tz, + signal_unit, + signal_uuid, + signal_uuid_raw, + signal_value + from {{ ref("int_dset_responses__deduplicated") }} + ) + +select * +from normalized_jsonb +where + ts < now() - interval '1 hour' {#- select only freshly ingested rows #} + + {% if is_incremental() -%} + and ts > coalesce((select max(ts) from {{ this }}), '1900-01-01') and queried_at > now() - interval '2 hour' + {%- endif %} diff --git a/dbt_jardiner/models/jardiner/intermediate/dset/int_dset_responses__union_view_and_materialized.sql b/dbt_jardiner/models/jardiner/intermediate/dset/int_dset_responses__union_view_and_materialized.sql new file mode 100644 index 00000000..7657b455 --- /dev/null +++ b/dbt_jardiner/models/jardiner/intermediate/dset/int_dset_responses__union_view_and_materialized.sql @@ -0,0 +1,53 @@ +{{ config(materialized="view") }} + +with + dset_materialized as ( + + select + group_name, + queried_at, + ts, + signal_code, + signal_device_type, + signal_device_uuid, + signal_frequency, + signal_id, + signal_is_virtual, + signal_last_ts, + signal_last_value, + signal_type, + signal_tz, + signal_unit, + signal_uuid, + signal_uuid_raw, + signal_value + from {{ ref("int_dset_responses__materialized_one_hour_late") }} + ), + +dset_current_day_view as ( + select + group_name, + queried_at, + ts, + signal_code, + signal_device_type, + signal_device_uuid, + signal_frequency, + signal_id, + signal_is_virtual, + signal_last_ts, + signal_last_value, + signal_type, + signal_tz, + signal_unit, + signal_uuid, + signal_uuid_raw, + signal_value + from {{ ref("int_dset_responses__view_current_hour") }} + ) + +select * +from dset_materialized +union all +select * +from dset_current_day_view diff --git a/dbt_jardiner/models/jardiner/intermediate/dset/int_dset_responses__view_current_hour.sql b/dbt_jardiner/models/jardiner/intermediate/dset/int_dset_responses__view_current_hour.sql new file mode 100644 index 00000000..e9ef367b --- /dev/null +++ b/dbt_jardiner/models/jardiner/intermediate/dset/int_dset_responses__view_current_hour.sql @@ -0,0 +1,37 @@ +{{ config(materialized="view") }} + + +with + latest as (select max(ts) as max_ts from {{ ref("int_dset_responses__materialized_one_hour_late") }}), + + normalized_jsonb as ( + select + group_name, + queried_at, + ts, + signal_code, + signal_device_type, + signal_device_uuid, + signal_frequency, + signal_id, + signal_is_virtual, + signal_last_ts, + signal_last_value, + signal_type, + signal_tz, + signal_uuid_raw, + signal_unit, + signal_value, + signal_uuid + from {{ ref("raw_dset_responses__api_response") }} + where ts >= (select max_ts from latest) and queried_at >= (select max_ts from latest) + ), + +ordered as ( + select *, row_number() over (partition by ts, signal_id order by queried_at desc) as row_order + from normalized_jsonb + ) + +select * +from ordered +where row_order = 1 diff --git a/dbt_jardiner/models/jardiner/marts/dm_dset_responses_observability__frequencies.sql b/dbt_jardiner/models/jardiner/marts/dm_dset_responses_observability__frequencies.sql new file mode 100644 index 00000000..d76de775 --- /dev/null +++ b/dbt_jardiner/models/jardiner/marts/dm_dset_responses_observability__frequencies.sql @@ -0,0 +1,34 @@ +{{ config(materialized="view") }} + +with + window_ as ( + select + group_name, + signal_uuid, + ts, + signal_frequency::interval as signal_frequency, + lag(ts) over (partition by signal_uuid order by ts asc) as lag_ + from {{ ref("int_dset_responses__union_view_and_materialized") }} + ), + + delta_ as ( + select *, ts - lag_ as delta + from window_ + where ts - lag_ > signal_frequency and current_date - interval '1 day' < ts + ), + + summarized as ( + select + group_name, + signal_uuid, + delta, + extract('year' from ts) as "year", + extract('month' from ts) as "month", + count(signal_uuid) as count_signal_uuid + from delta_ + group by signal_uuid, delta, extract('year' from ts), extract('month' from ts), group_name + order by group_name, extract('year' from ts) desc, extract('month' from ts) asc + ) + +select * +from summarized diff --git a/dbt_jardiner/models/jardiner/raw/dset/_raw_dset__models.yaml b/dbt_jardiner/models/jardiner/raw/dset/_raw_dset__models.yaml index 4ceb8179..d471a146 100644 --- a/dbt_jardiner/models/jardiner/raw/dset/_raw_dset__models.yaml +++ b/dbt_jardiner/models/jardiner/raw/dset/_raw_dset__models.yaml @@ -1,22 +1,45 @@ version: 2 models: - - name: raw_dset_responses__api_response - description: Conté les lectures crues que venen de la ingesta. Consulta la doc del model dset_responses (el source) per més info. + description: > + Conté les lectures crues que venen de la ingesta. + Consulta la doc del model dset_responses (el source) per més info. columns: - name: group_name - - name: response + description: Equivalent a planta. Camp definit per partner DSET + tests: + - not_null: + where: "CURRENT_DATE - interval '3 days' < queried_at" - name: signal_id + description: id intern de DSET per identificar un senyal - name: signal_tz + description: Timezone. Camp definit per partner DSET. - name: signal_code + description: codi intern de senyal. Camp definit per partner DSET - name: signal_type + description: Tipus de senyal. - name: signal_unit - - name: zone - - name: signal_last_ts + description: Unitats del senyal. Camp definit per partner DSET. - name: signal_frequency + description: Freqüencia del senyal. Camp definit per partner DSET - name: signal_is_virtual + description: > + Indica si es un senyal derivat o calculat. + Camp definit per partner DSET - name: signal_last_value + description: > + Últim valor del senyal observat. Camp definit per partner DSET - name: signal_uuid + description: > + Identificador únic (uuid) del senyal a la base de dades de SomEnergia + tests: + - not_null: + where: "CURRENT_DATE - interval '3 days' < queried_at" - name: ts + description: Camp definit per partner DSET + tests: + - not_null: + where: "CURRENT_DATE - interval '3 days' < queried_at" - name: signal_value + description: Valor del senyal observat. Camp definit per partner DSET