Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(aggregation): create python aggregation and DAG #12

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions dags/aggregation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

from data_utils.metabase_aggregation.aggregation import perform_and_insert_aggregated_data, aggregate_by_date_task, aggregate_by_participation_type_task

# Default arguments for the DAG tasks
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'retries': 0,
}

# Define the DAG
with DAG(
'daily_aggregated_client_data_dag',
default_args=default_args,
description='A DAG to aggregate data from multiple clients and insert it into a database',
schedule='@daily',
start_date=datetime(2024, 10, 21), # DAG start date
catchup=False,
) as dag:

# Task to run the aggregation and insertion function
aggregate_and_insert_task = PythonOperator(
task_id='perform_and_insert_aggregated_data_task',
python_callable=perform_and_insert_aggregated_data,
op_kwargs={},
)
aggregate_by_date_task = PythonOperator(
task_id='aggregate_by_date_task',
python_callable=aggregate_by_date_task,
op_kwargs={},
)
aggregate_by_participation_type_task = PythonOperator(
task_id='aggregate_by_participation_type_task',
python_callable=aggregate_by_participation_type_task,
op_kwargs={},
)


aggregate_and_insert_task
aggregate_by_date_task
aggregate_by_participation_type_task

141 changes: 141 additions & 0 deletions dags/data_utils/metabase_aggregation/aggregation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
from sqlalchemy import text

from ..postgres.get_postgres_connection import get_postgres_connection
import pandas as pd

db_cluster_preprod = 'db_cluster_name_data'
db_cluster_prod = 'db_cluster_name_data_prod'

def execute_queries(connection, queries):
"""
Executes a list of queries on the specified connection.
"""
results = {}
for query_name, query in queries.items():
try:
result = connection.execute(text(query))
count = result.fetchone()[0]
results[query_name] = count
except Exception as e:
print(f"Failed to execute query {query_name}: {e}")
results[query_name] = None # Return None if the query fails
return results


def aggregate_data_for_clients(db_cluster, client_databases, queries):
"""
Performs data aggregation for each client and returns a DataFrame.
"""
data = []

for client_db in client_databases:
connection = get_postgres_connection(db_cluster, client_db)
client_results = execute_queries(connection, queries)
client_results['client'] = client_db # Add client name to results
data.append(client_results)
connection.close() # Close the connection after query execution

# Convert results to DataFrame
df = pd.DataFrame(data)
return df

def aggregate_data_for_clients_for_unique_query(db_cluster, client_databases, query):
"""
Performs data aggregation for each client and returns a DataFrame.
"""
data = []

for client_db in client_databases:
connection = get_postgres_connection(db_cluster, client_db)
result = connection.execute(text(query))
df = pd.DataFrame(result)
df['client'] = client_db # Add client name to results
data.append(df)
connection.close() # Close the connection after query execution

# Convert results to DataFrame
df = pd.concat(data)
return df

def clean_data_in_postgres(connection, table):
"""Deletes rows in the table."""
try:
delete_query = text(
f"DELETE FROM {table};"
)

connection.execute(delete_query)
print(f"Cleaned data in table {table}")
except Exception as e:
print(f"Failed to clean data in table {table}: {e}")

def insert_data_to_aggregated_db(db_cluster, dataframe, target_database, target_table):
"""
Inserts the aggregated data into a target PostgreSQL table.
"""
try:
connection = get_postgres_connection(db_cluster, target_database)
clean_data_in_postgres(connection, target_table)
dataframe.to_sql(target_table, con=connection, if_exists='replace', index=False)
print(f"Data successfully inserted into {target_table} in {target_database}.")
connection.close()
except Exception as e:
print(f"Failed to insert data into {target_table}: {e}")
raise


def perform_and_insert_aggregated_data():
client_databases = ["angers", "cdc", "cea", "cese", "grand_nancy", "lyon", "marseille", "meyzieu", "sytral", "thionville", "toulouse", "tours", "valbonne"]

# List of aggregation queries
queries = {
'user_count': "SELECT COUNT(*) AS user_count FROM prod.users;",
'participating_user_count': "SELECT COUNT(*) AS participating_user_count FROM prod.users WHERE has_answered_survey OR is_endorsing OR is_following OR has_authored_comment OR has_authored_proposal OR has_voted_on_project OR has_voted_on_proposal;",
'participatory_process_count': "SELECT COUNT(*) AS participatory_process_count FROM prod.stg_decidim_participatory_processes;",
'participations_count': "SELECT COUNT(*) AS participations_count FROM prod.participations WHERE participation_type IS NOT NULL;",
}

# Perform data aggregation for all clients
aggregated_data = aggregate_data_for_clients(db_cluster_prod, client_databases, queries)

# Display the aggregated data (optional)
print(aggregated_data.head(5))

# Insert the aggregated data into a new database and table
target_database = "aggregated_client_data"
target_table = "aggregated_data"

insert_data_to_aggregated_db(db_cluster_preprod, aggregated_data, target_database, target_table)

def aggregate_by_date_task():
client_databases = ["angers", "cdc", "cea", "cese", "grand_nancy", "lyon", "marseille", "meyzieu", "sytral", "thionville", "toulouse", "tours", "valbonne"]

query = """
SELECT
COUNT(*) as users_count,
DATE(created_at) AS date_of_creation
FROM prod.users
GROUP BY date_of_creation
"""

target_database = "aggregated_client_data"

target_table = "aggregate_by_date"
df = aggregate_data_for_clients_for_unique_query(db_cluster_prod, client_databases, query)
insert_data_to_aggregated_db(db_cluster_preprod, df, target_database, target_table)

def aggregate_by_participation_type_task():
client_databases = ["angers", "cdc", "cea", "cese", "grand_nancy", "lyon", "marseille", "meyzieu", "sytral", "thionville", "toulouse", "tours", "valbonne"]

query = """
SELECT
COUNT (*),
participation_type
FROM prod.participations
GROUP BY participation_type
"""

target_database = "aggregated_client_data"
target_table = "aggregate_by_participation_type"
df = aggregate_data_for_clients_for_unique_query(db_cluster_prod, client_databases, query)
insert_data_to_aggregated_db(db_cluster_preprod, df, target_database, target_table)
26 changes: 26 additions & 0 deletions dags/data_utils/postgres/get_postgres_connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from airflow.hooks.base import BaseHook
from sqlalchemy import create_engine

def get_postgres_connection(db_cluster, database):
"""
Extracts PostgreSQL connection details from Airflow and establishes a connection using SQLAlchemy.
"""
try:
# Retrieve the connection object from Airflow
connection = BaseHook.get_connection(db_cluster)

# Extract connection details
user = connection.login
password = connection.password
host = connection.host
port = connection.port

# Create the SQLAlchemy engine
engine = create_engine(f"postgresql://{user}:{password}@{host}:{port}/{database}")
conn = engine.connect()
print("Successfully connected to the PostgreSQL database via Airflow.")
return conn

except Exception as e:
print(f"Failed to connect to PostgreSQL via Airflow: {e}")
raise # Raise exception to ensure the DAG fails if the connection cannot be established