Skip to content

Commit

Permalink
Overview dashboard datamarts + Table and incremental models DAG (et/s…
Browse files Browse the repository at this point in the history
…omenergia-jardiner!16)

* dag to re-run all tables and incremental models and subsequent models

* fix spine incremental + change dm name + dm produccio cartera

* add potencia_instantanea_planta to dm_overview_instant

* added production target monthly, finish dm_dashboard_overview_monthly,added documentation

* production target snapshot, wip overview dashboard

* wip dashboard overview

* add documentation of drive airbyte connection to energy production prediccion on AG

* feature: wip dashboard overview

* fix: correct a typo

* fix: to limit the length of signal_uuid to 36 characters

* dev: add command to run dbt re-data models to makefile

* wip overview dashboard

* fix renameing dbt project, split seeds on jardiner and legacy, move legacy models to jardiner

* wip overview dashboard

* wip dm_overview

* fix typo in signals seed

* wip dm for overview dashboard

* deletion of meter losses coefficient

* add dm_plants per dades fixes

* fix warning on test_to_debug_device_uid_errors

* rename to nom_plants, add dades fixes dm_plants
  • Loading branch information
Lugadur committed Oct 17, 2023
1 parent 586bb5e commit cb1dc2b
Show file tree
Hide file tree
Showing 41 changed files with 549 additions and 78 deletions.
11 changes: 10 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ app.build: ## build image using docker build
@docker compose -f $(app_compose_file) --env-file $(app_compose_env_file) build app --progress=plain

app.push: ## push image using docker push
@docker compose -f $(app_compose_file) --env-file $(app_compose_env_file) push app
@docker compose -f $(app_compose_file) --env-file $(app_compose_env_file) push app

app_dev.build: ## build image for development using docker build
@docker compose -f $(app_compose_file) --env-file $(app_compose_env_file) build app-dev --progress=plain
Expand Down Expand Up @@ -78,3 +78,12 @@ dbt_docs.build_docs: ## build the dbt-docs documentation

dbt_docs.logs: ## show the logs of the dbt-docs container
@docker compose -f $(app_compose_file) --env-file $(app_compose_env_file) logs -ft dbt-docs

# ---------------------------------------------------------------------------- #
# local commands #
# ---------------------------------------------------------------------------- #

local.re_data_models.dev: ## Run re_data models
@(cd dbt_jardiner && dbt run --target dev --models package:re_data)


101 changes: 101 additions & 0 deletions dags/plant_production_datasets_jardiner_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import random
import urllib.parse
from datetime import datetime, timedelta

from airflow import DAG
from airflow.models import Variable
from airflow.providers.docker.operators.docker import DockerOperator
from docker.types import DriverConfig, Mount

my_email = Variable.get("fail_email")
addr = Variable.get("repo_server_url")


args = {
"email": my_email,
"email_on_failure": True,
"email_on_retry": True,
"retries": 3,
"retry_delay": timedelta(minutes=30),
}

nfs_config = {
"type": "nfs",
"o": f"addr={addr},nfsvers=4",
"device": ":/opt/airflow/repos",
}


def get_random_moll():
available_molls = Variable.get("available_molls").split()
return random.choice(available_molls)


driver_config = DriverConfig(name="local", options=nfs_config)
mount_nfs = Mount(
source="local", target="/repos", type="volume", driver_config=driver_config
)


def dbapi_to_dict(dbapi: str):
parsed_string = urllib.parse.urlparse(dbapi)

return {
"provider": parsed_string.scheme,
"user": parsed_string.username,
"password": urllib.parse.unquote(parsed_string.password)
if parsed_string.password
else None,
"host": parsed_string.hostname,
"port": parsed_string.port,
"database": parsed_string.path[1:],
}


with DAG(
dag_id="plant_production_datasets__jardiner__v1",
start_date=datetime(2023, 1, 10),
schedule_interval="@daily",
catchup=False,
tags=["Plantmonitor", "Jardiner", "Transform", "DBT"],
max_active_runs=1,
default_args=args,
) as dag:
repo_name = "somenergia-jardiner"

sampled_moll = get_random_moll()

dbapi = Variable.get("plantmonitor_db")

dbapi_dict = dbapi_to_dict(dbapi)

environment = {
"DBUSER": dbapi_dict["user"],
"DBPASSWORD": dbapi_dict["password"],
"DBHOST": dbapi_dict["host"],
"DBPORT": dbapi_dict["port"],
"DBNAME": dbapi_dict["database"],
}

dbt_transformation_task = DockerOperator(
api_version="auto",
task_id="dbt_transformation_task__jardiner",
environment=environment,
docker_conn_id="somenergia_harbor_dades_registry",
image="{}/{}-app:latest".format(
"{{ conn.somenergia_harbor_dades_registry.host }}",
repo_name,
),
working_dir=f"/repos/{repo_name}/dbt_jardiner",
command=(
"dbt run --profiles-dir config --target prod "
"-s config.materialized:table+ config.materialized:incremental+"
),
docker_url=sampled_moll,
mounts=[mount_nfs],
mount_tmp_dir=False,
auto_remove=True,
retrieve_output=True,
trigger_rule="none_failed",
force_pull=True,
)
2 changes: 0 additions & 2 deletions dbt_jardiner/models/jardiner/_jardiner__sources.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,3 @@ sources:
schema: public
tables:
- name: plantmoduleparameters
- name: meter
- name: meterregistry
11 changes: 11 additions & 0 deletions dbt_jardiner/models/jardiner/intermediate/_int__models.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
version: 2

tests:

- name: int_produccio_objectiu__long
description: >
Passem les dades de producció mensual objectiu a dades llargues, i transformem la col·lumna de 'mes' a data amb any, en funció de la data d'actualització
config:
tags: int


Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{{ config(materialized='view') }}


with ts_last_registry as (
SELECT signal_uuid, plant, max(ts) as ultim_registre
from {{ ref('int_dset_responses__last_month')}}
group by plant, signal_uuid
)
SELECT m.*
from ts_last_registry tslr
left join {{ ref('int_dset_responses__last_month') }} m
on m.signal_uuid = tslr.signal_uuid and m.ts = tslr.ultim_registre
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
{{ config(materialized='table') }}


{#
Cada planta té un o diversos tipus de senyals d'irradancia. Així, per cada planta, triem una senyal
d'irradancia. l'Order by permet fer una jerarquia quan a una planta hi ha més d'un tipus de irradància.
#}
select
distinct on (plant, ts)
plant,
signal,
metric,
device,
device_type,
device_uuid,
device_parent,
signal_uuid,
group_name,
signal_id,
signal_tz,
signal_code,
signal_type,
signal_unit,
signal_last_ts,
signal_frequency,
signal_is_virtual,
signal_last_value,
ts,
signal_value
from {{ref('int_dset__last_registries')}}
where signal = 'irradiacio'
or signal = 'irradiacio_sonda_bruta'
or signal = 'irradiacio_sonda_neta'
or signal = 'irradiacio_sonda'
order by plant, ts,
case
when signal = 'irradiacio' then 1
when signal = 'irradiacio_sonda_bruta' then 2
when signal = 'irradiacio_sonda_neta' then 3
when signal = 'irradiacio_sonda' then 4
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{{ config(materialized='view') }}


with meter_registry_hourly_raw as (
select
date_trunc('hour', time_start_hour) AS time_start_hour,
meter_id,
meter_name,
plant_id,
plant_name,
plant_code,
round(avg(export_energy_wh),2) as export_energy_wh,
round(avg(import_energy_wh),2) as import_energy_wh
from {{ ref('raw_meterregistry') }} as meter_registry
--WHERE time >= '' and time < max(time)
group by date_trunc('hour', time_start_hour), plant_id, plant_name, plant_code, meter_id, meter_name
)

select
*,
CASE
WHEN export_energy_wh > 0 THEN 1
WHEN export_energy_wh = 0 THEN 0
ELSE NULL
END as has_energy
FROM meter_registry_hourly_raw
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
{{ config(materialized='view') }}


with pot_instantanea_planta as (
select
plant,
ts as ultim_registre_pot_instantanea,
round(sum(signal_value),1) as pot_instantantanea_planta_kw
from {{ref('int_dset__last_registries')}}
group by
plant,
ts,
device_type,
signal
having signal = 'potencia_activa'
), plant_production_daily_previous_day as(
SELECT
nom_planta,
dia,
energia_exportada_comptador_kwh,
energia_esperada_solargis_kwh
FROM {{ ref('dm_plant_production_daily') }} as ppd
where dia = current_date - interval '1 day'
)
select
p.plant_name as nom_planta,
--municipality as municipi,
province as provincia,
technology as tecnologia,
peak_power_kw as potencia_pic_kw,
nominal_power_kw as potencia_nominal_kw,
i.ultim_registre_pot_instantanea,
i.pot_instantantanea_planta_kw,
ir.ts as ultim_registre_irradiacio,
ir.signal_value as irradiacio,
ppd.dia,
ppd.energia_exportada_comptador_kwh,
ppd.energia_esperada_solargis_kwh
from {{ ref('seed_plants__parameters') }} p
left join pot_instantanea_planta i
on i.plant = p.plant_name
left join plant_production_daily_previous_day ppd on ppd.nom_planta = p.plant_name
left join {{ref('int_dset_last_registries_irradiation')}} ir
on ir.plant =p.plant_name
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{{ config(materialized='view') }}


with objectius_unpivoted as (
{# Pivot values from columns to rows. Similar to pandas DataFrame melt() function.#}
{{ dbt_utils.unpivot(relation=ref('raw_gestio_actius_production_target'), cast_to='numeric', exclude=['plant','gestio_actius_updated_at','dbt_updated_at','dbt_valid_from','dbt_valid_to'], field_name='month', value_name='energy_production_target_mwh') }}
)
select
plant,
month,
energy_production_target_mwh,
dbt_valid_from,
dbt_valid_to
from objectius_unpivoted
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{{ config(materialized='view') }}


select
s.month,
pt.plant,
pt.energy_production_target_mwh
from {{ref('spine_monthly__full_year')}} s
left join {{ref('int_production_target__long')}} pt
on pt.dbt_valid_from <= s.month and s.month < pt.dbt_valid_to
and to_char(s.month,'FMmonth') ilike pt.month



Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,6 @@ left join {{ ref('raw_plantlake_omie_historical_price__with_row_number_per_date'
left join {{ ref('raw_plantmonitordb_solarevent__generous') }} as solar_events
on solar_events.plant_id = plant_metadata.plant_id and solar_events.day = spine.start_hour::date
{# temporarely use plantmonitors' meterregistry until dset provides it #}
left join {{ref('meter_registry_hourly')}} as meter_registry
left join {{ref('int_erp_meter_registry__hourly')}} as meter_registry
on meter_registry.plant_id = plant_metadata.plant_id and meter_registry.time_start_hour = spine.start_hour
order by start_hour desc, plant
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{{ config(materialized='view') }}

with spine as (
select generate_series(current_date::timestamptz, current_date + interval '1 day - 1 hour', '1 hour') as start_hour
), today_hourly as (
select
hora_inici,
tecnologia,
sum(energia_instantania_inversor_kwh) as energia_instantania_inversor_kwh,
sum(energia_exportada_instantania_comptador_kwh) as energia_exportada_instantania_comptador_kwh,
max(preu_omie_e_kwh) as preu_omie_e_kwh
from {{ ref("dm_plant_production_hourly") }}
where current_date <= hora_inici and hora_inici < current_date + interval '1 day'
group by hora_inici, tecnologia
)
select
*
from spine
left join today_hourly
on today_hourly.hora_inici = spine.start_hour
order by start_hour desc
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{{ config(materialized='view') }}

select
nom_planta,
--municipality as municipi,
provincia,
tecnologia,
potencia_pic_kw,
ultim_registre_pot_instantanea,
round(pot_instantantanea_planta_kw,2) as pot_instantantanea_planta_kw,
round(pot_instantantanea_planta_kw/potencia_pic_kw,2) as instant_vs_pic,
ultim_registre_irradiacio,
irradiacio,
dia,
round((energia_exportada_comptador_kwh - energia_esperada_solargis_kwh)/1000,2) as energia_perduda_mw
from {{ ref('int_plants_overview_instant') }}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
{{ config(materialized='view') }}


with production_target as (
select
month,
sum(energy_production_target_mwh) as total_energy_production_target_mwh
from {{ ref("int_production_target__monthly") }}
group by month
), production_target_w_cumsum as (
select
month,
total_energy_production_target_mwh,
sum(total_energy_production_target_mwh) over (order by month) as total_cumsum_energy_production_target_mwh
from production_target
), production_monthly as (
select
month,
sum(energia_instantania_inversor_kwh)/1000 as total_energia_instantania_inversores_mwh,
sum(energia_exportada_instantania_comptador_kwh)/1000 as total_energia_exportada_instantania_comptadores_mwh,
sum(energia_exportada_comptador_kwh)/1000 as total_energia_exportada_comptadores_mwh,
sum(energia_esperada_solargis_kwh)/1000 as total_energia_esperada_solargis_mwh,
sum(abs(energia_perduda_kwh))/1000 as total_energia_perduda_mwh,
max(preu_omie_e_kwh) as preu_omie_e_kwh
from {{ ref("dm_plant_production_monthly") }}
group by month
)
select
production_target_w_cumsum.month,
production_monthly.total_energia_instantania_inversores_mwh,
production_monthly.total_energia_exportada_instantania_comptadores_mwh,
production_monthly.total_energia_exportada_comptadores_mwh,
sum(production_monthly.total_energia_exportada_comptadores_mwh) over (partition by extract (year from production_monthly.month) order by production_monthly.month) as total_cumsum_energia_exportada_comptadores_mwh,
production_monthly.total_energia_esperada_solargis_mwh,
production_monthly.total_energia_perduda_mwh,
production_monthly.preu_omie_e_kwh,
production_target_w_cumsum.total_energy_production_target_mwh as total_energia_objetivo_mwh,
sum(production_target_w_cumsum.total_energy_production_target_mwh) over (partition by extract (year from production_target_w_cumsum.month) order by production_target_w_cumsum.month) as total_cumsum_energy_production_target_mwh
from production_target_w_cumsum
left join production_monthly on production_target_w_cumsum.month = production_monthly.month
Loading

0 comments on commit cb1dc2b

Please sign in to comment.