From 0aec78463b2b3872090ec1f80c57c21fc06111db Mon Sep 17 00:00:00 2001 From: Jean-Louis Lamezec Date: Tue, 22 Oct 2024 16:26:19 +0200 Subject: [PATCH 1/5] feat(aggregation): create python aggregation and DAG --- dags/aggregation.py | 32 +++++++ .../metabase_aggregation/aggregation.py | 87 +++++++++++++++++++ .../postgres/get_postgres_connection.py | 26 ++++++ 3 files changed, 145 insertions(+) create mode 100644 dags/aggregation.py create mode 100644 dags/data_utils/metabase_aggregation/aggregation.py create mode 100644 dags/data_utils/postgres/get_postgres_connection.py diff --git a/dags/aggregation.py b/dags/aggregation.py new file mode 100644 index 0000000..085b883 --- /dev/null +++ b/dags/aggregation.py @@ -0,0 +1,32 @@ +from airflow import DAG +from airflow.operators.python import PythonOperator +from datetime import datetime, timedelta + +from data_utils.metabase_aggregation.aggregation import perform_and_insert_aggregated_data + +# Default arguments for the DAG tasks +default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'email_on_failure': False, + 'retries': 0, +} + +# Define the DAG +with DAG( + 'daily_aggregated_client_data_dag', + default_args=default_args, + description='A DAG to aggregate data from multiple clients and insert it into a database', + schedule='@daily', + start_date=datetime(2024, 10, 21), # DAG start date + catchup=False, +) as dag: + + # Task to run the aggregation and insertion function + aggregate_and_insert_task = PythonOperator( + task_id='perform_and_insert_aggregated_data_task', + python_callable=perform_and_insert_aggregated_data, + op_kwargs={}, + ) + + aggregate_and_insert_task diff --git a/dags/data_utils/metabase_aggregation/aggregation.py b/dags/data_utils/metabase_aggregation/aggregation.py new file mode 100644 index 0000000..adb81cd --- /dev/null +++ b/dags/data_utils/metabase_aggregation/aggregation.py @@ -0,0 +1,87 @@ +from sqlalchemy import text + +from ..postgres.get_postgres_connection import get_postgres_connection +import pandas as pd + +db_cluster = 'db_cluster_name_data' + + +def execute_queries(connection, queries): + """ + Executes a list of queries on the specified connection. + """ + results = {} + for query_name, query in queries.items(): + try: + result = connection.execute(text(query)) + count = result.fetchone()[0] + results[query_name] = count + except Exception as e: + print(f"Failed to execute query {query_name}: {e}") + results[query_name] = None # Return None if the query fails + return results + + +def aggregate_data_for_clients(client_databases, queries): + """ + Performs data aggregation for each client and returns a DataFrame. + """ + data = [] + + for client_db in client_databases: + connection = get_postgres_connection(db_cluster, client_db) + client_results = execute_queries(connection, queries) + client_results['client'] = client_db # Add client name to results + data.append(client_results) + connection.close() # Close the connection after query execution + + # Convert results to DataFrame + df = pd.DataFrame(data) + return df + +def clean_data_in_postgres(connection): + """Deletes rows in the table where the 'date' is between the start_date and end_date.""" + try: + delete_query = text( + f"DELETE FROM aggregated_data;" + ) + + connection.execute(delete_query) + print(f"Cleaned data in aggregated_data") + except Exception as e: + print(f"Failed to clean data in aggregated_data: {e}") + +def insert_data_to_aggregated_db(dataframe, target_database, target_table): + """ + Inserts the aggregated data into a target PostgreSQL table. + """ + try: + connection = get_postgres_connection(db_cluster, target_database) + clean_data_in_postgres(connection) + dataframe.to_sql(target_table, con=connection, if_exists='append', index=False) + print(f"Data successfully inserted into {target_table} in {target_database}.") + connection.close() + except Exception as e: + print(f"Failed to insert data into {target_table}: {e}") + raise + + +def perform_and_insert_aggregated_data(): + client_databases = ["lyon", "marseille", "toulouse"] + + # List of aggregation queries + queries = { + 'user_count': "SELECT COUNT(*) AS user_count FROM public.decidim_users;", + } + + # Perform data aggregation for all clients + aggregated_data = aggregate_data_for_clients(client_databases, queries) + + # Display the aggregated data (optional) + print(aggregated_data.head(5)) + + # Insert the aggregated data into a new database and table + target_database = "aggregated_client_data" + target_table = "aggregated_data" + + insert_data_to_aggregated_db(aggregated_data, target_database, target_table) \ No newline at end of file diff --git a/dags/data_utils/postgres/get_postgres_connection.py b/dags/data_utils/postgres/get_postgres_connection.py new file mode 100644 index 0000000..2f0e451 --- /dev/null +++ b/dags/data_utils/postgres/get_postgres_connection.py @@ -0,0 +1,26 @@ +from airflow.hooks.base import BaseHook +from sqlalchemy import create_engine + +def get_postgres_connection(db_cluster, database): + """ + Extracts PostgreSQL connection details from Airflow and establishes a connection using SQLAlchemy. + """ + try: + # Retrieve the connection object from Airflow + connection = BaseHook.get_connection(db_cluster) + + # Extract connection details + user = connection.login + password = connection.password + host = connection.host + port = connection.port + + # Create the SQLAlchemy engine + engine = create_engine(f"postgresql://{user}:{password}@{host}:{port}/{database}") + conn = engine.connect() + print("Successfully connected to the PostgreSQL database via Airflow.") + return conn + + except Exception as e: + print(f"Failed to connect to PostgreSQL via Airflow: {e}") + raise # Raise exception to ensure the DAG fails if the connection cannot be established From 32d661b68bebe7f265163fe98c7ae9eb71e5bd77 Mon Sep 17 00:00:00 2001 From: ailepet Date: Wed, 23 Oct 2024 10:14:25 +0200 Subject: [PATCH 2/5] feat: add participatory process count --- dags/data_utils/metabase_aggregation/aggregation.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/dags/data_utils/metabase_aggregation/aggregation.py b/dags/data_utils/metabase_aggregation/aggregation.py index adb81cd..ee13c7d 100644 --- a/dags/data_utils/metabase_aggregation/aggregation.py +++ b/dags/data_utils/metabase_aggregation/aggregation.py @@ -58,7 +58,7 @@ def insert_data_to_aggregated_db(dataframe, target_database, target_table): try: connection = get_postgres_connection(db_cluster, target_database) clean_data_in_postgres(connection) - dataframe.to_sql(target_table, con=connection, if_exists='append', index=False) + dataframe.to_sql(target_table, con=connection, if_exists='replace', index=False) print(f"Data successfully inserted into {target_table} in {target_database}.") connection.close() except Exception as e: @@ -67,11 +67,12 @@ def insert_data_to_aggregated_db(dataframe, target_database, target_table): def perform_and_insert_aggregated_data(): - client_databases = ["lyon", "marseille", "toulouse"] + client_databases = ["lyon", "marseille", "toulouse", "grand_nancy"] # List of aggregation queries queries = { - 'user_count': "SELECT COUNT(*) AS user_count FROM public.decidim_users;", + 'user_count': "SELECT COUNT(*) AS user_count FROM prod.users;", + 'participatory_process_count': "SELECT COUNT(*) AS participatory_process_count FROM public.decidim_participatory_processes;", } # Perform data aggregation for all clients From daadae7db811ff3159a80a9b5550c211f43fdc15 Mon Sep 17 00:00:00 2001 From: ailepet Date: Wed, 23 Oct 2024 11:11:52 +0200 Subject: [PATCH 3/5] feat: improve aggregated_data table --- dags/data_utils/metabase_aggregation/aggregation.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/dags/data_utils/metabase_aggregation/aggregation.py b/dags/data_utils/metabase_aggregation/aggregation.py index ee13c7d..61ff4d5 100644 --- a/dags/data_utils/metabase_aggregation/aggregation.py +++ b/dags/data_utils/metabase_aggregation/aggregation.py @@ -67,12 +67,14 @@ def insert_data_to_aggregated_db(dataframe, target_database, target_table): def perform_and_insert_aggregated_data(): - client_databases = ["lyon", "marseille", "toulouse", "grand_nancy"] + client_databases = ["lyon", "marseille", "toulouse", "grand_nancy", "tours"] # List of aggregation queries queries = { 'user_count': "SELECT COUNT(*) AS user_count FROM prod.users;", - 'participatory_process_count': "SELECT COUNT(*) AS participatory_process_count FROM public.decidim_participatory_processes;", + 'participating_user_count': "SELECT COUNT(*) AS participating_user_count FROM prod.users WHERE has_answered_survey OR is_endorsing OR is_following OR has_authored_comment OR has_authored_proposal OR has_voted_on_project OR has_voted_on_proposal;", + 'participatory_process_count': "SELECT COUNT(*) AS participatory_process_count FROM prod.stg_decidim_participatory_processes;", + 'participations_count': "SELECT COUNT(*) AS participations_count FROM prod.participations WHERE participation_type IS NOT NULL;", } # Perform data aggregation for all clients From 68ee2d95e1dcfafe354e675bdebe81d7bbc739ec Mon Sep 17 00:00:00 2001 From: ailepet Date: Wed, 23 Oct 2024 16:42:53 +0200 Subject: [PATCH 4/5] feat: add two more tables --- dags/aggregation.py | 16 ++++- .../metabase_aggregation/aggregation.py | 66 +++++++++++++++---- 2 files changed, 68 insertions(+), 14 deletions(-) diff --git a/dags/aggregation.py b/dags/aggregation.py index 085b883..faa260d 100644 --- a/dags/aggregation.py +++ b/dags/aggregation.py @@ -2,7 +2,7 @@ from airflow.operators.python import PythonOperator from datetime import datetime, timedelta -from data_utils.metabase_aggregation.aggregation import perform_and_insert_aggregated_data +from data_utils.metabase_aggregation.aggregation import perform_and_insert_aggregated_data, aggregate_by_date_task, aggregate_by_participation_type_task # Default arguments for the DAG tasks default_args = { @@ -28,5 +28,19 @@ python_callable=perform_and_insert_aggregated_data, op_kwargs={}, ) + aggregate_by_date_task = PythonOperator( + task_id='aggregate_by_date_task', + python_callable=aggregate_by_date_task, + op_kwargs={}, + ) + aggregate_by_participation_type_task = PythonOperator( + task_id='aggregate_by_participation_type_task', + python_callable=aggregate_by_participation_type_task, + op_kwargs={}, + ) + aggregate_and_insert_task + aggregate_by_date_task + aggregate_by_participation_type_task + \ No newline at end of file diff --git a/dags/data_utils/metabase_aggregation/aggregation.py b/dags/data_utils/metabase_aggregation/aggregation.py index 61ff4d5..90e390b 100644 --- a/dags/data_utils/metabase_aggregation/aggregation.py +++ b/dags/data_utils/metabase_aggregation/aggregation.py @@ -3,8 +3,8 @@ from ..postgres.get_postgres_connection import get_postgres_connection import pandas as pd -db_cluster = 'db_cluster_name_data' - +db_cluster_preprod = 'db_cluster_name_data' +db_cluster_prod = 'db_cluster_name_data_prod' def execute_queries(connection, queries): """ @@ -22,7 +22,7 @@ def execute_queries(connection, queries): return results -def aggregate_data_for_clients(client_databases, queries): +def aggregate_data_for_clients(db_cluster, client_databases, queries): """ Performs data aggregation for each client and returns a DataFrame. """ @@ -39,25 +39,25 @@ def aggregate_data_for_clients(client_databases, queries): df = pd.DataFrame(data) return df -def clean_data_in_postgres(connection): - """Deletes rows in the table where the 'date' is between the start_date and end_date.""" +def clean_data_in_postgres(connection, table): + """Deletes rows in the table.""" try: delete_query = text( - f"DELETE FROM aggregated_data;" + f"DELETE FROM {table};" ) connection.execute(delete_query) - print(f"Cleaned data in aggregated_data") + print(f"Cleaned data in table {table}") except Exception as e: - print(f"Failed to clean data in aggregated_data: {e}") + print(f"Failed to clean data in table {table}: {e}") -def insert_data_to_aggregated_db(dataframe, target_database, target_table): +def insert_data_to_aggregated_db(db_cluster, dataframe, target_database, target_table): """ Inserts the aggregated data into a target PostgreSQL table. """ try: connection = get_postgres_connection(db_cluster, target_database) - clean_data_in_postgres(connection) + clean_data_in_postgres(connection, target_table) dataframe.to_sql(target_table, con=connection, if_exists='replace', index=False) print(f"Data successfully inserted into {target_table} in {target_database}.") connection.close() @@ -67,7 +67,7 @@ def insert_data_to_aggregated_db(dataframe, target_database, target_table): def perform_and_insert_aggregated_data(): - client_databases = ["lyon", "marseille", "toulouse", "grand_nancy", "tours"] + client_databases = ["angers", "cdc", "cea", "cese", "grand_nancy", "lyon", "marseille", "meyzieu", "sytral", "thionville", "toulouse", "tours", "valbonne"] # List of aggregation queries queries = { @@ -78,7 +78,7 @@ def perform_and_insert_aggregated_data(): } # Perform data aggregation for all clients - aggregated_data = aggregate_data_for_clients(client_databases, queries) + aggregated_data = aggregate_data_for_clients(db_cluster_prod, client_databases, queries) # Display the aggregated data (optional) print(aggregated_data.head(5)) @@ -87,4 +87,44 @@ def perform_and_insert_aggregated_data(): target_database = "aggregated_client_data" target_table = "aggregated_data" - insert_data_to_aggregated_db(aggregated_data, target_database, target_table) \ No newline at end of file + insert_data_to_aggregated_db(db_cluster_preprod, aggregated_data, target_database, target_table) + +def aggregate_by_date_task(): + client_databases = ["angers", "cdc", "cea", "cese", "grand_nancy", "lyon", "marseille", "meyzieu", "sytral", "thionville", "toulouse", "tours", "valbonne"] + + query = """ + SELECT + COUNT(*) as users_count, + DATE(created_at) AS date_of_creation + FROM prod.users + GROUP BY date_of_creation + """ + + target_database = "aggregated_client_data" + + for client in client_databases: + target_table = f"aggregate_by_date_{client}" + connection = get_postgres_connection(db_cluster_prod, client) + result = connection.execute(text(query)) + df = pd.DataFrame(result) + insert_data_to_aggregated_db(db_cluster_preprod, df, target_database, target_table) + +def aggregate_by_participation_type_task(): + client_databases = ["angers", "cdc", "cea", "cese", "grand_nancy", "lyon", "marseille", "meyzieu", "sytral", "thionville", "toulouse", "tours", "valbonne"] + + query = """ + SELECT + COUNT (*), + participation_type + FROM prod.participations + GROUP BY participation_type + """ + + target_database = "aggregated_client_data" + + for client in client_databases: + target_table = f"aggregate_by_participation_type_{client}" + connection = get_postgres_connection(db_cluster_prod, client) + result = connection.execute(text(query)) + df = pd.DataFrame(result) + insert_data_to_aggregated_db(db_cluster_preprod, df, target_database, target_table) \ No newline at end of file From b29f0d9f683f227a938bbb18e6cc1044e582cd45 Mon Sep 17 00:00:00 2001 From: ailepet Date: Thu, 24 Oct 2024 14:18:05 +0200 Subject: [PATCH 5/5] feat: last edits --- .../metabase_aggregation/aggregation.py | 37 ++++++++++++------- 1 file changed, 24 insertions(+), 13 deletions(-) diff --git a/dags/data_utils/metabase_aggregation/aggregation.py b/dags/data_utils/metabase_aggregation/aggregation.py index 90e390b..5e8b5c6 100644 --- a/dags/data_utils/metabase_aggregation/aggregation.py +++ b/dags/data_utils/metabase_aggregation/aggregation.py @@ -39,6 +39,24 @@ def aggregate_data_for_clients(db_cluster, client_databases, queries): df = pd.DataFrame(data) return df +def aggregate_data_for_clients_for_unique_query(db_cluster, client_databases, query): + """ + Performs data aggregation for each client and returns a DataFrame. + """ + data = [] + + for client_db in client_databases: + connection = get_postgres_connection(db_cluster, client_db) + result = connection.execute(text(query)) + df = pd.DataFrame(result) + df['client'] = client_db # Add client name to results + data.append(df) + connection.close() # Close the connection after query execution + + # Convert results to DataFrame + df = pd.concat(data) + return df + def clean_data_in_postgres(connection, table): """Deletes rows in the table.""" try: @@ -102,12 +120,9 @@ def aggregate_by_date_task(): target_database = "aggregated_client_data" - for client in client_databases: - target_table = f"aggregate_by_date_{client}" - connection = get_postgres_connection(db_cluster_prod, client) - result = connection.execute(text(query)) - df = pd.DataFrame(result) - insert_data_to_aggregated_db(db_cluster_preprod, df, target_database, target_table) + target_table = "aggregate_by_date" + df = aggregate_data_for_clients_for_unique_query(db_cluster_prod, client_databases, query) + insert_data_to_aggregated_db(db_cluster_preprod, df, target_database, target_table) def aggregate_by_participation_type_task(): client_databases = ["angers", "cdc", "cea", "cese", "grand_nancy", "lyon", "marseille", "meyzieu", "sytral", "thionville", "toulouse", "tours", "valbonne"] @@ -121,10 +136,6 @@ def aggregate_by_participation_type_task(): """ target_database = "aggregated_client_data" - - for client in client_databases: - target_table = f"aggregate_by_participation_type_{client}" - connection = get_postgres_connection(db_cluster_prod, client) - result = connection.execute(text(query)) - df = pd.DataFrame(result) - insert_data_to_aggregated_db(db_cluster_preprod, df, target_database, target_table) \ No newline at end of file + target_table = "aggregate_by_participation_type" + df = aggregate_data_for_clients_for_unique_query(db_cluster_prod, client_databases, query) + insert_data_to_aggregated_db(db_cluster_preprod, df, target_database, target_table) \ No newline at end of file