From e0f56593e7b68307bf88f4002fec10fce99de159 Mon Sep 17 00:00:00 2001 From: Murtaza Icecreamwala Date: Sat, 14 Dec 2024 16:58:29 +0000 Subject: [PATCH] fix: resolve duplicate task ID in crypto_realtime_dag - Remove TaskGroup wrapper to prevent task ID conflicts - Simplify DAG structure while maintaining functionality - Fix task dependencies and references - Create clear pipeline flow: start -> collect -> process -> end --- .gitignore | 2 + weather_etl/.env.docker | 5 + weather_etl/README.md | 68 +++++ weather_etl/dags/crypto_processing_subdag.py | 155 +++++++++++ weather_etl/dags/crypto_realtime_dag.py | 214 ++++++++++++++ weather_etl/dags/stock_etl_dag.py | 212 ++++++++++++++ weather_etl/dags/weather_etl_dag.py | 279 +++++++++++++++++++ weather_etl/docker-compose.yaml | 149 ++++++++++ weather_etl/requirements.txt | 5 + 9 files changed, 1089 insertions(+) create mode 100644 weather_etl/.env.docker create mode 100644 weather_etl/README.md create mode 100644 weather_etl/dags/crypto_processing_subdag.py create mode 100644 weather_etl/dags/crypto_realtime_dag.py create mode 100644 weather_etl/dags/stock_etl_dag.py create mode 100644 weather_etl/dags/weather_etl_dag.py create mode 100644 weather_etl/docker-compose.yaml create mode 100644 weather_etl/requirements.txt diff --git a/.gitignore b/.gitignore index f8168b0..6faa251 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,5 @@ batch_data_ingestion/lib/* batch_data_ingestion/jars/* *.pyc +weather_etl/data/* +weather_etl/logs/* diff --git a/weather_etl/.env.docker b/weather_etl/.env.docker new file mode 100644 index 0000000..47e1f44 --- /dev/null +++ b/weather_etl/.env.docker @@ -0,0 +1,5 @@ +AIRFLOW_UID=50000 +AIRFLOW_GID=0 +AIRFLOW_IMAGE_NAME=apache/airflow:2.7.1 +_AIRFLOW_WWW_USER_USERNAME=airflow +_AIRFLOW_WWW_USER_PASSWORD=airflow diff --git a/weather_etl/README.md b/weather_etl/README.md new file mode 100644 index 0000000..4b1d2be --- /dev/null +++ b/weather_etl/README.md @@ -0,0 +1,68 @@ +# Weather Data ETL Pipeline + +This project implements a data transformation and cleaning pipeline for real-time weather data using Apache Airflow. The pipeline fetches weather data from OpenWeatherMap API, transforms it, and prepares it for further analysis. + +## Features + +- Hourly weather data collection from multiple cities +- Data cleaning and transformation +- Temperature conversion (Celsius to Fahrenheit) +- Weather categorization +- Data quality checks +- Wind speed conversion +- Automated pipeline using Apache Airflow + +## Project Structure + +``` +weather_etl/ +├── dags/ +│ └── weather_etl_dag.py +├── requirements.txt +└── README.md +``` + +## Setup + +1. Install dependencies: +```bash +pip install -r requirements.txt +``` + +2. Set up environment variables: +Create a `.env` file in the project root with: +``` +WEATHER_API_KEY=your_openweathermap_api_key +``` + +3. Configure Airflow: +- Set AIRFLOW_HOME to your project directory +- Initialize the Airflow database +- Start the Airflow webserver and scheduler + +## Pipeline Steps + +1. **Extract**: Fetches weather data from OpenWeatherMap API for multiple cities +2. **Transform**: + - Converts temperature to Fahrenheit + - Categorizes temperature + - Cleans weather descriptions + - Converts wind speed to MPH + - Adds data quality flags +3. **Load**: Saves the transformed data (can be modified to load into a database) + +## Schedule + +The pipeline runs every hour to collect and process the latest weather data. + +## Data Quality Checks + +- Humidity validation (should not exceed 100%) +- Pressure validation (should be within reasonable range) +- Data completeness checks + +## Notes + +- The pipeline is configured to process data for London, New York, Tokyo, Sydney, and Mumbai +- All timestamps are in UTC +- Temporary files are stored in /tmp directory diff --git a/weather_etl/dags/crypto_processing_subdag.py b/weather_etl/dags/crypto_processing_subdag.py new file mode 100644 index 0000000..0829bf0 --- /dev/null +++ b/weather_etl/dags/crypto_processing_subdag.py @@ -0,0 +1,155 @@ +from airflow.models import DAG +from airflow.operators.python import PythonOperator +from airflow.exceptions import AirflowException +import pandas as pd +import numpy as np +import logging +import os +from datetime import datetime, timedelta + +logger = logging.getLogger(__name__) + +def process_market_data(parent_dag_id, child_dag_id, crypto_symbol, **context): + """Process market data for a specific cryptocurrency""" + try: + # Get raw data from parent DAG's XCom + task_instance = context['task_instance'] + raw_data = task_instance.xcom_pull( + dag_id=parent_dag_id, + task_ids=f'stream_market_data_{crypto_symbol}', + key=f'market_data_{crypto_symbol}' + ) + + if not raw_data: + raise AirflowException(f"No raw data found for {crypto_symbol}") + + df = pd.DataFrame(raw_data) + + # Calculate technical indicators + df['price'] = df['price'].astype(float) + df['volume'] = df['volume'].astype(float) + + # VWAP (Volume Weighted Average Price) + df['vwap'] = (df['price'] * df['volume']).cumsum() / df['volume'].cumsum() + + # Price momentum + df['price_momentum'] = df['price'].pct_change() + + # Volatility (Rolling standard deviation) + df['volatility'] = df['price_momentum'].rolling(window=10).std() + + # Volume trend + df['volume_ma'] = df['volume'].rolling(window=5).mean() + df['volume_trend'] = df['volume'] / df['volume_ma'] + + # Push processed data back to XCom + processed_data = df.to_dict('records') + task_instance.xcom_push( + key=f'processed_data_{crypto_symbol}', + value=processed_data + ) + + logger.info(f"Processed {len(processed_data)} records for {crypto_symbol}") + return processed_data + + except Exception as e: + logger.error(f"Error processing data for {crypto_symbol}: {str(e)}") + raise AirflowException(f"Failed to process market data for {crypto_symbol}: {str(e)}") + +def analyze_market_signals(parent_dag_id, child_dag_id, crypto_symbol, **context): + """Analyze market signals from processed data""" + try: + # Get processed data from XCom + task_instance = context['task_instance'] + processed_data = task_instance.xcom_pull( + task_ids=f'process_market_data_{crypto_symbol}', + key=f'processed_data_{crypto_symbol}' + ) + + if not processed_data: + raise AirflowException(f"No processed data found for {crypto_symbol}") + + df = pd.DataFrame(processed_data) + + # Generate trading signals + signals = { + 'symbol': crypto_symbol, + 'timestamp': datetime.utcnow().isoformat(), + 'signals': [] + } + + # Volume spike signal + if df['volume_trend'].iloc[-1] > 2.0: + signals['signals'].append({ + 'type': 'VOLUME_SPIKE', + 'strength': 'HIGH', + 'value': float(df['volume_trend'].iloc[-1]) + }) + + # Volatility signal + if df['volatility'].iloc[-1] > df['volatility'].mean() + df['volatility'].std(): + signals['signals'].append({ + 'type': 'HIGH_VOLATILITY', + 'strength': 'MEDIUM', + 'value': float(df['volatility'].iloc[-1]) + }) + + # Price momentum signal + if abs(df['price_momentum'].iloc[-1]) > 0.02: # 2% price movement + signal_type = 'BULLISH' if df['price_momentum'].iloc[-1] > 0 else 'BEARISH' + signals['signals'].append({ + 'type': f'MOMENTUM_{signal_type}', + 'strength': 'HIGH', + 'value': float(df['price_momentum'].iloc[-1]) + }) + + # Push signals to XCom + task_instance.xcom_push( + key=f'market_signals_{crypto_symbol}', + value=signals + ) + + logger.info(f"Generated {len(signals['signals'])} signals for {crypto_symbol}") + return signals + + except Exception as e: + logger.error(f"Error analyzing signals for {crypto_symbol}: {str(e)}") + raise AirflowException(f"Failed to analyze market signals for {crypto_symbol}: {str(e)}") + +def create_processing_subdag(parent_dag_id, child_dag_id, crypto_symbol, args): + """Create a SubDAG for processing market data""" + dag = DAG( + dag_id=f'{parent_dag_id}.{child_dag_id}', + default_args=args, + schedule_interval=None, + ) + + # Process market data + process_task = PythonOperator( + task_id=f'process_market_data_{crypto_symbol}', + python_callable=process_market_data, + op_kwargs={ + 'parent_dag_id': parent_dag_id, + 'child_dag_id': child_dag_id, + 'crypto_symbol': crypto_symbol + }, + provide_context=True, + dag=dag, + ) + + # Analyze market signals + analyze_task = PythonOperator( + task_id=f'analyze_market_signals_{crypto_symbol}', + python_callable=analyze_market_signals, + op_kwargs={ + 'parent_dag_id': parent_dag_id, + 'child_dag_id': child_dag_id, + 'crypto_symbol': crypto_symbol + }, + provide_context=True, + dag=dag, + ) + + process_task >> analyze_task + + return dag diff --git a/weather_etl/dags/crypto_realtime_dag.py b/weather_etl/dags/crypto_realtime_dag.py new file mode 100644 index 0000000..151a30b --- /dev/null +++ b/weather_etl/dags/crypto_realtime_dag.py @@ -0,0 +1,214 @@ +from datetime import datetime, timedelta +from airflow import DAG +from airflow.operators.python import PythonOperator +from airflow.operators.dummy import DummyOperator +from airflow.operators.subdag import SubDagOperator +from airflow.exceptions import AirflowException +from airflow.utils.task_group import TaskGroup +from crypto_processing_subdag import create_processing_subdag +import websocket +import json +import logging +import threading +import queue +import time +from collections import defaultdict + +# Configure logging +logger = logging.getLogger(__name__) + +# Constants +CRYPTO_SYMBOLS = ['btcusdt', 'ethusdt', 'bnbusdt'] +BINANCE_WS_URL = "wss://stream.binance.com:9443/ws" +DATA_COLLECTION_TIME = 60 # Collect data for 60 seconds +MIN_DATA_POINTS = 5 # Minimum number of data points required + +class CryptoDataCollector: + def __init__(self, symbol): + self.symbol = symbol + self.data_queue = queue.Queue() + self.ws = None + self.should_stop = False + self.connected = threading.Event() + + def on_message(self, ws, message): + """Handle incoming WebSocket messages""" + try: + data = json.loads(message) + if data.get('e') == 'trade': + symbol = data['s'].lower() + if symbol == self.symbol: + trade_data = { + 'symbol': symbol, + 'price': float(data['p']), + 'volume': float(data['q']), + 'timestamp': datetime.fromtimestamp(data['T']/1000.0).isoformat() + } + self.data_queue.put(trade_data) + logger.debug(f"Received trade data for {symbol}") + except Exception as e: + logger.error(f"Error processing message: {str(e)}") + + def on_error(self, ws, error): + """Handle WebSocket errors""" + logger.error(f"WebSocket error: {str(error)}") + self.connected.clear() + + def on_close(self, ws, close_status_code, close_msg): + """Handle WebSocket connection close""" + logger.info(f"WebSocket connection closed: {close_status_code} - {close_msg}") + self.connected.clear() + + def on_open(self, ws): + """Handle WebSocket connection open""" + logger.info(f"WebSocket connection opened for {self.symbol}") + # Subscribe to trade stream + subscribe_message = { + "method": "SUBSCRIBE", + "params": [f"{self.symbol}@trade"], + "id": 1 + } + ws.send(json.dumps(subscribe_message)) + self.connected.set() + + def connect(self): + """Establish WebSocket connection""" + websocket.enableTrace(True) + self.ws = websocket.WebSocketApp( + BINANCE_WS_URL, + on_message=self.on_message, + on_error=self.on_error, + on_close=self.on_close, + on_open=self.on_open + ) + + def collect_data(self, duration=DATA_COLLECTION_TIME): + """Collect data for specified duration""" + try: + logger.info(f"Starting data collection for {self.symbol}") + + # Start WebSocket connection + self.connect() + + # Run WebSocket in a separate thread + ws_thread = threading.Thread(target=self.ws.run_forever) + ws_thread.daemon = True + ws_thread.start() + + # Wait for connection to establish + if not self.connected.wait(timeout=10): + raise AirflowException(f"Timeout waiting for WebSocket connection for {self.symbol}") + + # Collect data + collected_data = [] + start_time = time.time() + last_data_time = start_time + + while time.time() - start_time < duration: + try: + data = self.data_queue.get(timeout=1) + collected_data.append(data) + last_data_time = time.time() + + # Check for data timeout + if time.time() - last_data_time > 10: + logger.warning(f"No data received for {self.symbol} in the last 10 seconds") + except queue.Empty: + continue + + # Clean up + self.should_stop = True + if self.ws: + self.ws.close() + + # Verify data + if len(collected_data) < MIN_DATA_POINTS: + raise AirflowException( + f"Insufficient data points for {self.symbol}: {len(collected_data)} < {MIN_DATA_POINTS}" + ) + + logger.info(f"Successfully collected {len(collected_data)} data points for {self.symbol}") + return collected_data + + except Exception as e: + logger.error(f"Error collecting data for {self.symbol}: {str(e)}") + if self.ws: + self.ws.close() + raise AirflowException(f"Failed to collect data for {self.symbol}: {str(e)}") + +def collect_crypto_data(symbol, **context): + """Task function to collect crypto data""" + try: + collector = CryptoDataCollector(symbol) + data = collector.collect_data() + + # Store data in XCom + context['task_instance'].xcom_push( + key=f'market_data_{symbol}', + value=data + ) + + return f"Collected {len(data)} data points for {symbol}" + + except Exception as e: + logger.error(f"Error in collect_crypto_data for {symbol}: {str(e)}") + raise AirflowException(f"Failed to collect data for {symbol}: {str(e)}") + +# Default arguments for the DAG +default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'start_date': datetime(2024, 12, 14), + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 1, + 'retry_delay': timedelta(minutes=1), +} + +# Create the DAG +dag = DAG( + 'crypto_realtime_pipeline', + default_args=default_args, + description='A DAG for real-time cryptocurrency data processing with SubDAGs', + schedule_interval=timedelta(minutes=5), + catchup=False +) + +# Start task +start_task = DummyOperator( + task_id='start_pipeline', + dag=dag +) + +# Create tasks for each symbol +for symbol in CRYPTO_SYMBOLS: + collect_task = PythonOperator( + task_id=f'collect_market_data_{symbol}', + python_callable=collect_crypto_data, + op_kwargs={'symbol': symbol}, + dag=dag + ) + + process_task = SubDagOperator( + task_id=f'process_market_data_{symbol}', + subdag=create_processing_subdag( + parent_dag_id='crypto_realtime_pipeline', + child_dag_id=f'process_market_data_{symbol}', + crypto_symbol=symbol, + args=default_args + ), + dag=dag + ) + + # Set task dependencies + start_task >> collect_task >> process_task + +# End task +end_task = DummyOperator( + task_id='end_pipeline', + dag=dag +) + +# Connect all symbol groups to end task +for symbol in CRYPTO_SYMBOLS: + dag.get_task(f'process_market_data_{symbol}') >> end_task diff --git a/weather_etl/dags/stock_etl_dag.py b/weather_etl/dags/stock_etl_dag.py new file mode 100644 index 0000000..e8ec19c --- /dev/null +++ b/weather_etl/dags/stock_etl_dag.py @@ -0,0 +1,212 @@ +from datetime import datetime, timedelta +from airflow import DAG +from airflow.operators.python import PythonOperator, BranchPythonOperator +from airflow.operators.dummy import DummyOperator +from airflow.exceptions import AirflowException +import pandas as pd +import yfinance as yf +import logging +import os + +# Configure logging +logger = logging.getLogger(__name__) + +# Constants +DATA_DIR = '/opt/airflow/data' +COMPANIES = { + 'AAPL': 'Apple', + 'GOOGL': 'Google', + 'MSFT': 'Microsoft', + 'AMZN': 'Amazon', + 'META': 'Meta' +} +RAW_DATA_PATTERN = 'raw_stock_data_{}.csv' +PROCESSED_DATA_PATTERN = 'processed_stock_data_{}.csv' +QUALITY_THRESHOLD = 0.9 # 90% data quality threshold + +# Default arguments for the DAG +default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'start_date': datetime(2024, 12, 14), + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 1, + 'retry_delay': timedelta(minutes=5), +} + +def get_file_path(filename): + """Helper function to get full file path""" + return os.path.join(DATA_DIR, filename) + +def extract_stock_data(company_code, **context): + """Extract stock data for a specific company""" + try: + logger.info(f"Starting data extraction for {COMPANIES[company_code]} ({company_code})") + + # Create data directory if it doesn't exist + os.makedirs(DATA_DIR, exist_ok=True) + + # Download stock data + stock = yf.Ticker(company_code) + data = stock.history(period="1mo") + + if data.empty: + raise AirflowException(f"No data retrieved for {company_code}") + + # Save raw data + output_path = get_file_path(RAW_DATA_PATTERN.format(company_code)) + data.to_csv(output_path) + + logger.info(f"Stock data saved to {output_path}") + logger.info(f"Data shape: {data.shape}") + + # Push metrics to XCom + metrics = { + 'rows': len(data), + 'missing_values': data.isnull().sum().sum(), + 'trading_days': len(data[data['Volume'] > 0]) + } + context['task_instance'].xcom_push( + key=f'metrics_{company_code}', + value=metrics + ) + + return output_path + + except Exception as e: + logger.error(f"Error extracting data for {company_code}: {str(e)}") + raise AirflowException(f"Failed to extract stock data for {company_code}: {str(e)}") + +def check_data_quality(company_code, **context): + """Check data quality and decide processing path""" + try: + logger.info(f"Checking data quality for {company_code}") + + # Get metrics from XCom + task_instance = context['task_instance'] + metrics = task_instance.xcom_pull( + key=f'metrics_{company_code}' + ) + + if not metrics: + raise AirflowException(f"No metrics found for {company_code}") + + # Calculate quality score + total_expected_values = metrics['rows'] * 6 # 6 columns in typical stock data + missing_values = metrics['missing_values'] + quality_score = 1 - (missing_values / total_expected_values) + + logger.info(f"Quality score for {company_code}: {quality_score:.2f}") + + # Decide processing path based on quality score + if quality_score >= QUALITY_THRESHOLD: + return f'process_stock_data_{company_code}' + else: + return f'flag_low_quality_{company_code}' + + except Exception as e: + logger.error(f"Error checking data quality for {company_code}: {str(e)}") + raise AirflowException(f"Failed to check data quality for {company_code}: {str(e)}") + +def process_stock_data(company_code, **context): + """Process stock data for a specific company""" + try: + logger.info(f"Processing data for {company_code}") + + # Read raw data + input_path = get_file_path(RAW_DATA_PATTERN.format(company_code)) + df = pd.read_csv(input_path) + + # Calculate technical indicators + df['SMA_20'] = df['Close'].rolling(window=20).mean() + df['EMA_20'] = df['Close'].ewm(span=20, adjust=False).mean() + df['Daily_Return'] = df['Close'].pct_change() + df['Volatility'] = df['Daily_Return'].rolling(window=20).std() + + # Save processed data + output_path = get_file_path(PROCESSED_DATA_PATTERN.format(company_code)) + df.to_csv(output_path, index=False) + + logger.info(f"Processed data saved to {output_path}") + return output_path + + except Exception as e: + logger.error(f"Error processing data for {company_code}: {str(e)}") + raise AirflowException(f"Failed to process stock data for {company_code}: {str(e)}") + +def flag_low_quality(company_code, **context): + """Handle low quality data""" + logger.warning(f"Low quality data detected for {company_code}") + # In a real scenario, you might want to: + # - Send notifications + # - Log to monitoring system + # - Trigger data cleanup workflows + return f"Low quality data flagged for {company_code}" + +# Create the DAG +dag = DAG( + 'stock_etl_pipeline', + default_args=default_args, + description='A DAG for stock data ETL with dynamic tasks and branching', + schedule_interval=timedelta(days=1), + catchup=False +) + +# Create start and end tasks +start_task = DummyOperator( + task_id='start_pipeline', + dag=dag +) + +end_task = DummyOperator( + task_id='end_pipeline', + dag=dag, + trigger_rule='none_failed' +) + +# Dynamically create tasks for each company +for company_code in COMPANIES: + # Extract task + extract_task = PythonOperator( + task_id=f'extract_stock_data_{company_code}', + python_callable=extract_stock_data, + op_kwargs={'company_code': company_code}, + dag=dag, + ) + + # Quality check (branching) task + quality_check_task = BranchPythonOperator( + task_id=f'check_data_quality_{company_code}', + python_callable=check_data_quality, + op_kwargs={'company_code': company_code}, + dag=dag, + ) + + # Process task (good quality path) + process_task = PythonOperator( + task_id=f'process_stock_data_{company_code}', + python_callable=process_stock_data, + op_kwargs={'company_code': company_code}, + dag=dag, + ) + + # Flag task (low quality path) + flag_task = PythonOperator( + task_id=f'flag_low_quality_{company_code}', + python_callable=flag_low_quality, + op_kwargs={'company_code': company_code}, + dag=dag, + ) + + # Join paths with dummy operator + join_task = DummyOperator( + task_id=f'join_paths_{company_code}', + dag=dag, + trigger_rule='none_failed' + ) + + # Set task dependencies + start_task >> extract_task >> quality_check_task + quality_check_task >> [process_task, flag_task] + [process_task, flag_task] >> join_task >> end_task diff --git a/weather_etl/dags/weather_etl_dag.py b/weather_etl/dags/weather_etl_dag.py new file mode 100644 index 0000000..46910a9 --- /dev/null +++ b/weather_etl/dags/weather_etl_dag.py @@ -0,0 +1,279 @@ +from datetime import datetime, timedelta +from airflow import DAG +from airflow.operators.python import PythonOperator +from airflow.exceptions import AirflowException +import pandas as pd +import requests +import os +from dotenv import load_dotenv +import logging + +# Configure logging +logger = logging.getLogger(__name__) + +# Constants +DATA_DIR = '/opt/airflow/data' +RAW_DATA_FILE = 'raw_weather_data.csv' +TRANSFORMED_DATA_FILE = 'transformed_weather_data.csv' +FINAL_DATA_FILE = 'final_weather_data.csv' + +# Default arguments for the DAG +default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'start_date': datetime(2024, 12, 14), + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 1, + 'retry_delay': timedelta(minutes=5), +} + +def get_file_path(filename): + """Helper function to get full file path""" + return os.path.join(DATA_DIR, filename) + +def extract_weather_data(**context): + """Extract weather data from API""" + try: + logger.info("Starting extract_weather_data task") + + # Create data directory if it doesn't exist + logger.info(f"Creating directory if not exists: {DATA_DIR}") + os.makedirs(DATA_DIR, exist_ok=True) + + # Verify directory exists and has write permissions + if not os.path.exists(DATA_DIR): + raise AirflowException(f"Failed to create directory: {DATA_DIR}") + if not os.access(DATA_DIR, os.W_OK): + raise AirflowException(f"No write permission for directory: {DATA_DIR}") + + # Load environment variables from mounted .env file + env_path = '/opt/airflow/.env' + if not os.path.exists(env_path): + raise AirflowException(f"Environment file not found at {env_path}") + load_dotenv(env_path) + + api_key = os.getenv('WEATHER_API_KEY') + if not api_key: + raise AirflowException("Weather API key not found in environment variables") + + logger.info(f"Using API key: {api_key[:4]}...") + + # Example cities + cities = ['London', 'New York', 'Tokyo', 'Sydney', 'Mumbai'] + weather_data = [] + + for city in cities: + try: + url = f'http://api.openweathermap.org/data/2.5/weather?q={city}&appid={api_key}&units=metric' + logger.info(f"Fetching weather data for {city}") + response = requests.get(url) + response.raise_for_status() + + data = response.json() + weather_data.append({ + 'city': city, + 'temperature': data['main']['temp'], + 'humidity': data['main']['humidity'], + 'pressure': data['main']['pressure'], + 'weather_desc': data['weather'][0]['description'], + 'wind_speed': data['wind']['speed'], + 'timestamp': datetime.utcnow() + }) + logger.info(f"Successfully fetched data for {city}") + + except requests.RequestException as e: + logger.error(f"Error fetching data for {city}: {str(e)}") + continue + + if not weather_data: + raise AirflowException("No weather data was collected for any city") + + # Save data to CSV + output_path = get_file_path(RAW_DATA_FILE) + logger.info(f"Saving raw data to: {output_path}") + df = pd.DataFrame(weather_data) + df.to_csv(output_path, index=False) + + logger.info(f"Weather data saved to {output_path}") + logger.info(f"Data preview:\n{df.head()}") + logger.info(f"Data shape: {df.shape}") + + # Verify file was created + if not os.path.exists(output_path): + raise AirflowException(f"Failed to create output file: {output_path}") + + # Push the file path to XCom + logger.info(f"Pushing file path to XCom: {output_path}") + context['task_instance'].xcom_push(key='raw_data_path', value=output_path) + + # Verify XCom push + pushed_value = context['task_instance'].xcom_pull(key='raw_data_path') + logger.info(f"Verified XCom value: {pushed_value}") + + return output_path + + except Exception as e: + logger.error(f"Error in extract_weather_data: {str(e)}") + raise AirflowException(f"Failed to extract weather data: {str(e)}") + +def transform_weather_data(**context): + """Transform and clean weather data""" + try: + logger.info("Starting transform_weather_data task") + + # Get input file path from XCom + task_instance = context['task_instance'] + logger.info("Attempting to pull raw_data_path from XCom") + input_path = task_instance.xcom_pull(task_ids='extract_weather_data', key='raw_data_path') + logger.info(f"XCom pull result: {input_path}") + + if not input_path: + logger.error("No input file path received from extract task") + # Try to list available XCom values + try: + all_xcoms = task_instance.xcom_pull(task_ids='extract_weather_data') + logger.info(f"Available XComs from extract task: {all_xcoms}") + except Exception as xe: + logger.error(f"Error checking XComs: {str(xe)}") + raise AirflowException("No input file path received from extract task") + + logger.info(f"Looking for input file at: {input_path}") + + if not os.path.exists(input_path): + raise AirflowException(f"Input file not found at {input_path}") + + logger.info(f"Reading data from {input_path}") + df = pd.read_csv(input_path) + + if df.empty: + raise AirflowException("Input data is empty") + + logger.info(f"Input data shape: {df.shape}") + + # Convert temperature to Fahrenheit + df['temperature_f'] = df['temperature'] * 9/5 + 32 + + # Categorize temperature + df['temp_category'] = pd.cut(df['temperature'], + bins=[-float('inf'), 0, 15, 25, float('inf')], + labels=['Cold', 'Mild', 'Warm', 'Hot']) + + # Clean weather description + df['weather_desc'] = df['weather_desc'].str.title() + + # Calculate wind speed in mph + df['wind_speed_mph'] = df['wind_speed'] * 2.237 + + # Add data quality flags + df['data_quality'] = 'Good' + df.loc[df['humidity'] > 100, 'data_quality'] = 'Check humidity' + df.loc[df['pressure'] < 870, 'data_quality'] = 'Check pressure' + + # Save transformed data + output_path = get_file_path(TRANSFORMED_DATA_FILE) + logger.info(f"Saving transformed data to: {output_path}") + df.to_csv(output_path, index=False) + + logger.info(f"Transformed data saved to {output_path}") + logger.info(f"Transformed data preview:\n{df.head()}") + logger.info(f"Transformed data shape: {df.shape}") + + # Verify file was created + if not os.path.exists(output_path): + raise AirflowException(f"Failed to create output file: {output_path}") + + # Push the transformed file path to XCom + logger.info(f"Pushing transformed file path to XCom: {output_path}") + task_instance.xcom_push(key='transformed_data_path', value=output_path) + + # Verify XCom push + pushed_value = task_instance.xcom_pull(key='transformed_data_path') + logger.info(f"Verified XCom value: {pushed_value}") + + return output_path + + except Exception as e: + logger.error(f"Error in transform_weather_data: {str(e)}") + raise AirflowException(f"Failed to transform weather data: {str(e)}") + +def load_weather_data(**context): + """Load transformed data""" + try: + logger.info("Starting load_weather_data task") + + # Get transformed file path from XCom + task_instance = context['task_instance'] + logger.info("Attempting to pull transformed_data_path from XCom") + input_path = task_instance.xcom_pull(task_ids='transform_weather_data', key='transformed_data_path') + logger.info(f"XCom pull result: {input_path}") + + if not input_path: + logger.error("No input file path received from transform task") + # Try to list available XCom values + try: + all_xcoms = task_instance.xcom_pull(task_ids='transform_weather_data') + logger.info(f"Available XComs from transform task: {all_xcoms}") + except Exception as xe: + logger.error(f"Error checking XComs: {str(xe)}") + raise AirflowException("No input file path received from transform task") + + logger.info(f"Looking for input file at: {input_path}") + + if not os.path.exists(input_path): + raise AirflowException(f"Input file not found at {input_path}") + + logger.info(f"Reading transformed data from {input_path}") + df = pd.read_csv(input_path) + + if df.empty: + raise AirflowException("Transformed data is empty") + + # Save final data + output_path = get_file_path(FINAL_DATA_FILE) + logger.info(f"Saving final data to: {output_path}") + df.to_csv(output_path, index=False) + + logger.info(f"Final data saved to {output_path}") + logger.info(f"Final data shape: {df.shape}") + + # Verify file was created + if not os.path.exists(output_path): + raise AirflowException(f"Failed to create output file: {output_path}") + + return output_path + + except Exception as e: + logger.error(f"Error in load_weather_data: {str(e)}") + raise AirflowException(f"Failed to load weather data: {str(e)}") + +# Create the DAG +dag = DAG( + 'weather_etl_pipeline', + default_args=default_args, + description='A DAG for weather data ETL pipeline', + schedule_interval=timedelta(hours=1), + catchup=False +) + +# Define the tasks +extract_task = PythonOperator( + task_id='extract_weather_data', + python_callable=extract_weather_data, + dag=dag, +) + +transform_task = PythonOperator( + task_id='transform_weather_data', + python_callable=transform_weather_data, + dag=dag, +) + +load_task = PythonOperator( + task_id='load_weather_data', + python_callable=load_weather_data, + dag=dag, +) + +# Set task dependencies +extract_task >> transform_task >> load_task diff --git a/weather_etl/docker-compose.yaml b/weather_etl/docker-compose.yaml new file mode 100644 index 0000000..eddc4da --- /dev/null +++ b/weather_etl/docker-compose.yaml @@ -0,0 +1,149 @@ +--- +version: '3' +x-airflow-common: + &airflow-common + image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.7.1} + environment: + &airflow-common-env + AIRFLOW__CORE__EXECUTOR: CeleryExecutor + AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow + AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow + AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0 + AIRFLOW__CORE__FERNET_KEY: '' + AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true' + AIRFLOW__CORE__LOAD_EXAMPLES: 'false' + AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session' + AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true' + _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:- pandas requests python-dotenv numpy yfinance websocket-client} + volumes: + - ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags + - ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs + - ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config + - ${AIRFLOW_PROJ_DIR:-.}/.env:/opt/airflow/.env + - ${AIRFLOW_PROJ_DIR:-.}/data:/opt/airflow/data + user: "${AIRFLOW_UID:-50000}:0" + depends_on: + &airflow-common-depends-on + redis: + condition: service_healthy + postgres: + condition: service_healthy + +services: + postgres: + image: postgres:13 + environment: + POSTGRES_USER: airflow + POSTGRES_PASSWORD: airflow + POSTGRES_DB: airflow + volumes: + - postgres-db-volume:/var/lib/postgresql/data + healthcheck: + test: ["CMD", "pg_isready", "-U", "airflow"] + interval: 10s + retries: 5 + start_period: 5s + restart: always + + redis: + image: redis:latest + expose: + - 6379 + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 10s + timeout: 30s + retries: 50 + start_period: 30s + restart: always + + airflow-webserver: + <<: *airflow-common + command: webserver + ports: + - "8080:8080" + healthcheck: + test: ["CMD", "curl", "--fail", "http://localhost:8080/health"] + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + + airflow-scheduler: + <<: *airflow-common + command: scheduler + healthcheck: + test: ["CMD", "curl", "--fail", "http://localhost:8974/health"] + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + + airflow-worker: + <<: *airflow-common + command: celery worker + healthcheck: + test: + - "CMD-SHELL" + - 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"' + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + environment: + <<: *airflow-common-env + DUMB_INIT_SETSID: "0" + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + + airflow-triggerer: + <<: *airflow-common + command: triggerer + healthcheck: + test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"'] + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + + airflow-init: + <<: *airflow-common + entrypoint: /bin/bash + command: + - -c + - | + mkdir -p /sources/logs /sources/dags /sources/plugins /opt/airflow/data + chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins} /opt/airflow/data + exec /entrypoint airflow version + environment: + <<: *airflow-common-env + _AIRFLOW_DB_MIGRATE: 'true' + _AIRFLOW_WWW_USER_CREATE: 'true' + _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow} + _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow} + _PIP_ADDITIONAL_REQUIREMENTS: '' + user: "0:0" + volumes: + - ${AIRFLOW_PROJ_DIR:-.}:/sources + +volumes: + postgres-db-volume: + weather-data: diff --git a/weather_etl/requirements.txt b/weather_etl/requirements.txt new file mode 100644 index 0000000..8a0817a --- /dev/null +++ b/weather_etl/requirements.txt @@ -0,0 +1,5 @@ +apache-airflow==2.7.1 +pandas==2.1.1 +requests==2.31.0 +python-dotenv==1.0.0 +numpy==1.24.3