Skip to content

Commit

Permalink
new: improve loading time by combination of materialized and view dbt…
Browse files Browse the repository at this point in the history
… models (et/somenergia-jardiner!32)

* change: update dag with doc and use dbt-docs image instead of app

* new: add dbt datamart for counting gaps per signal from dset data

* new: add basic tests to raw_dset model

* change: remove duplicates only in dbt view using CTE

* chore: minor bump to format

* chore: update tags in dag to match existing

Co-authored-by: Pol Monsó Purtí <[email protected]>

* new: add first version of combination of materialized and view models
  • Loading branch information
diegoquintanav committed Nov 22, 2023
1 parent 72d49d7 commit 06924fd
Show file tree
Hide file tree
Showing 9 changed files with 306 additions and 10 deletions.
4 changes: 4 additions & 0 deletions .prettierrc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,7 @@ overrides:
- "*.html"
options:
tabWidth: 2
- files:
- "*.yaml"
options:
tabWidth: 2
111 changes: 111 additions & 0 deletions dags/dset_materialize_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import random
import urllib.parse
from datetime import datetime, timedelta

from airflow import DAG
from airflow.models import Variable
from airflow.providers.docker.operators.docker import DockerOperator
from docker.types import DriverConfig, Mount

my_email = Variable.get("fail_email")
addr = Variable.get("repo_server_url")


args = {
"email": my_email,
"email_on_failure": True,
"email_on_retry": True,
"retries": 3,
"retry_delay": timedelta(minutes=5),
}

nfs_config = {
"type": "nfs",
"o": f"addr={addr},nfsvers=4",
"device": ":/opt/airflow/repos",
}

__doc__ = """
# Executes dbt run on an incremental model, hourly
This dag will update an incremental model in the dbt jardiner project.
Warning: Note that views and other dbt dependencies need to exist beforehand
so that model can be succesfully executed.
"""


def get_random_moll():
available_molls = Variable.get("available_molls").split()
# trunk-ignore(bandit/B311)
return random.choice(available_molls)


driver_config = DriverConfig(name="local", options=nfs_config)
mount_nfs = Mount(
source="local", target="/repos", type="volume", driver_config=driver_config
)


def dbapi_to_dict(dbapi: str):
parsed_string = urllib.parse.urlparse(dbapi)

return {
"provider": parsed_string.scheme,
"user": parsed_string.username,
"password": urllib.parse.unquote(parsed_string.password)
if parsed_string.password
else None,
"host": parsed_string.hostname,
"port": parsed_string.port,
"database": parsed_string.path[1:],
}


with DAG(
dag_id="dset_materialize_dag",
start_date=datetime(2023, 1, 10),
schedule_interval="@hourly",
catchup=False,
tags=["scope:Plantmonitor", "scope:Jardiner", "DBT", "source:DSET"],
max_active_runs=1,
default_args=args,
doc_md=__doc__,
) as dag:
repo_name = "somenergia-jardiner"

sampled_moll = get_random_moll()

dbapi = Variable.get("plantmonitor_db")

dbapi_dict = dbapi_to_dict(dbapi)

environment = {
"DBUSER": dbapi_dict["user"],
"DBPASSWORD": dbapi_dict["password"],
"DBHOST": dbapi_dict["host"],
"DBPORT": dbapi_dict["port"],
"DBNAME": dbapi_dict["database"],
}

dbt_transformation_task = DockerOperator(
api_version="auto",
task_id="dbt_transformation_task__dset_materialization",
environment=environment,
docker_conn_id="somenergia_harbor_dades_registry",
image=f"harbor.somenergia.coop/dades/{repo_name}-dbt-docs:latest",
working_dir=f"/repos/{repo_name}/dbt_jardiner",
command=(
"dbt run"
" --profiles-dir config"
" --target prod"
" --models int_dset_responses__materialized_one_hour_late"
),
docker_url=sampled_moll,
mounts=[mount_nfs],
mount_tmp_dir=False,
auto_remove=True,
retrieve_output=True,
trigger_rule="none_failed",
force_pull=True,
)
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,4 @@ tests:
pot_instantantanea_planta_kw.
- name: pot_instantantanea_planta_kw
description:
Suma de la potència activ a de tots els inversors de cada planta.
Suma de la potència activ a de tots els inversors de cada planta.
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

with
ordered as (
select row_number() over (partition by rdrar.ts, rdrar.signal_id order by rdrar.queried_at desc) as row_order, *
from {{ ref("raw_dset_responses__api_response") }} rdrar
select *, row_number() over (partition by ts, signal_id order by queried_at desc) as row_order
from {{ ref("raw_dset_responses__api_response") }}
)
select {{ dbt_utils.star(from=ref("raw_dset_responses__api_response")) }}
from ordered o
where o.row_order = 1
from ordered
where row_order = 1
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
{{ config(materialized="incremental") }}

with
normalized_jsonb as (

select
group_name,
queried_at,
ts,
signal_code,
signal_device_type,
signal_device_uuid,
signal_frequency,
signal_id,
signal_is_virtual,
signal_last_ts,
signal_last_value,
signal_type,
signal_tz,
signal_unit,
signal_uuid,
signal_uuid_raw,
signal_value
from {{ ref("int_dset_responses__deduplicated") }}
)

select *
from normalized_jsonb
where
ts < now() - interval '1 hour' {#- select only freshly ingested rows #}

{% if is_incremental() -%}
and ts > coalesce((select max(ts) from {{ this }}), '1900-01-01') and queried_at > now() - interval '2 hour'
{%- endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
{{ config(materialized="view") }}

with
dset_materialized as (

select
group_name,
queried_at,
ts,
signal_code,
signal_device_type,
signal_device_uuid,
signal_frequency,
signal_id,
signal_is_virtual,
signal_last_ts,
signal_last_value,
signal_type,
signal_tz,
signal_unit,
signal_uuid,
signal_uuid_raw,
signal_value
from {{ ref("int_dset_responses__materialized_one_hour_late") }}
),

dset_current_day_view as (
select
group_name,
queried_at,
ts,
signal_code,
signal_device_type,
signal_device_uuid,
signal_frequency,
signal_id,
signal_is_virtual,
signal_last_ts,
signal_last_value,
signal_type,
signal_tz,
signal_unit,
signal_uuid,
signal_uuid_raw,
signal_value
from {{ ref("int_dset_responses__view_current_hour") }}
)

select *
from dset_materialized
union all
select *
from dset_current_day_view
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
{{ config(materialized="view") }}


with
latest as (select max(ts) as max_ts from {{ ref("int_dset_responses__materialized_one_hour_late") }}),

normalized_jsonb as (
select
group_name,
queried_at,
ts,
signal_code,
signal_device_type,
signal_device_uuid,
signal_frequency,
signal_id,
signal_is_virtual,
signal_last_ts,
signal_last_value,
signal_type,
signal_tz,
signal_uuid_raw,
signal_unit,
signal_value,
signal_uuid
from {{ ref("raw_dset_responses__api_response") }}
where ts >= (select max_ts from latest) and queried_at >= (select max_ts from latest)
),

ordered as (
select *, row_number() over (partition by ts, signal_id order by queried_at desc) as row_order
from normalized_jsonb
)

select *
from ordered
where row_order = 1
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
{{ config(materialized="view") }}

with
window_ as (
select
group_name,
signal_uuid,
ts,
signal_frequency::interval as signal_frequency,
lag(ts) over (partition by signal_uuid order by ts asc) as lag_
from {{ ref("int_dset_responses__union_view_and_materialized") }}
),

delta_ as (
select *, ts - lag_ as delta
from window_
where ts - lag_ > signal_frequency and current_date - interval '1 day' < ts
),

summarized as (
select
group_name,
signal_uuid,
delta,
extract('year' from ts) as "year",
extract('month' from ts) as "month",
count(signal_uuid) as count_signal_uuid
from delta_
group by signal_uuid, delta, extract('year' from ts), extract('month' from ts), group_name
order by group_name, extract('year' from ts) desc, extract('month' from ts) asc
)

select *
from summarized
33 changes: 28 additions & 5 deletions dbt_jardiner/models/jardiner/raw/dset/_raw_dset__models.yaml
Original file line number Diff line number Diff line change
@@ -1,22 +1,45 @@
version: 2

models:

- name: raw_dset_responses__api_response
description: Conté les lectures crues que venen de la ingesta. Consulta la doc del model dset_responses (el source) per més info.
description: >
Conté les lectures crues que venen de la ingesta.
Consulta la doc del model dset_responses (el source) per més info.
columns:
- name: group_name
- name: response
description: Equivalent a planta. Camp definit per partner DSET
tests:
- not_null:
where: "CURRENT_DATE - interval '3 days' < queried_at"
- name: signal_id
description: id intern de DSET per identificar un senyal
- name: signal_tz
description: Timezone. Camp definit per partner DSET.
- name: signal_code
description: codi intern de senyal. Camp definit per partner DSET
- name: signal_type
description: Tipus de senyal.
- name: signal_unit
- name: zone
- name: signal_last_ts
description: Unitats del senyal. Camp definit per partner DSET.
- name: signal_frequency
description: Freqüencia del senyal. Camp definit per partner DSET
- name: signal_is_virtual
description: >
Indica si es un senyal derivat o calculat.
Camp definit per partner DSET
- name: signal_last_value
description: >
Últim valor del senyal observat. Camp definit per partner DSET
- name: signal_uuid
description: >
Identificador únic (uuid) del senyal a la base de dades de SomEnergia
tests:
- not_null:
where: "CURRENT_DATE - interval '3 days' < queried_at"
- name: ts
description: Camp definit per partner DSET
tests:
- not_null:
where: "CURRENT_DATE - interval '3 days' < queried_at"
- name: signal_value
description: Valor del senyal observat. Camp definit per partner DSET

0 comments on commit 06924fd

Please sign in to comment.