-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
edf9c74
commit ca601f2
Showing
10 changed files
with
481 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,4 +13,6 @@ weather_etl/logs/* | |
*/**/venv/* | ||
*.terraform | ||
*.tfstate | ||
*/**/.DS_Store | ||
*/**/.DS_Store | ||
*/**/*.db | ||
*/**/*.parquet |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
**/feature_store/ | ||
**/features/ | ||
definitions.py |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,165 @@ | ||
# Feature Store Demo | ||
|
||
This project demonstrates a feature store implementation using Feast, showcasing how to manage and serve features for machine learning applications. The demo includes both batch and online feature serving capabilities. | ||
|
||
## Project Structure | ||
|
||
``` | ||
feature_store_demo/ | ||
├── data/ # Data storage directory | ||
│ ├── customer_interactions.parquet | ||
│ ├── customer_profiles.parquet | ||
│ ├── registry.db # Feast registry | ||
│ └── online_store.db # SQLite online store | ||
├── feature_repo/ # Feature definitions | ||
│ ├── __init__.py | ||
│ └── feature_store.py # Feature views and services | ||
├── demo.py # Demo script | ||
├── feature_store.yaml # Feast configuration | ||
├── environment.yml # Conda environment specification | ||
└── README.md # This file | ||
``` | ||
|
||
## Feature Sets | ||
|
||
### 1. Customer Interactions Features | ||
Real-time customer behavior and interaction data: | ||
- `session_duration` (Int64): Duration of user sessions in seconds | ||
- `pages_viewed` (Int64): Number of pages viewed in a session | ||
- `purchase_amount` (Float32): Amount spent in the session | ||
- `is_weekend` (Int64): Binary flag indicating weekend activity | ||
- `device_type` (String): Device used (mobile, desktop, tablet) | ||
- `browser` (String): Browser used (chrome, firefox, safari) | ||
- `interaction_type` (String): Type of interaction (view, cart, purchase) | ||
|
||
### 2. Customer Profile Features | ||
Static customer attributes and aggregated metrics: | ||
- `age` (Int64): Customer's age | ||
- `subscription_type` (String): Subscription level (free, basic, premium) | ||
- `country` (String): Customer's country | ||
- `customer_segment` (String): Customer segment (new, regular, vip) | ||
- `email_subscribed` (Int64): Email subscription status | ||
- `lifetime_value` (Float32): Customer's lifetime value | ||
|
||
## Feature Services | ||
|
||
### 1. Recommendation Features | ||
Optimized for product recommendations: | ||
```python | ||
recommendation_fs = FeatureService( | ||
name="recommendation_features", | ||
features=[ | ||
customer_interactions_fv[["session_duration", "pages_viewed", "purchase_amount", "interaction_type"]], | ||
customer_profile_fv[["age", "subscription_type", "customer_segment", "lifetime_value"]], | ||
], | ||
) | ||
``` | ||
|
||
### 2. Personalization Features | ||
Designed for UI/UX personalization: | ||
```python | ||
personalization_fs = FeatureService( | ||
name="personalization_features", | ||
features=[ | ||
customer_interactions_fv[["device_type", "browser", "is_weekend"]], | ||
customer_profile_fv[["country", "email_subscribed", "customer_segment"]], | ||
], | ||
) | ||
``` | ||
|
||
## Setup Instructions | ||
|
||
1. Create and activate the conda environment: | ||
```bash | ||
conda env create -f environment.yml | ||
conda activate feature-store-demo | ||
``` | ||
|
||
2. Generate sample data: | ||
```bash | ||
python data/generate_data.py | ||
``` | ||
|
||
3. Initialize the feature store: | ||
```bash | ||
feast apply | ||
``` | ||
|
||
4. Run the demo: | ||
```bash | ||
python demo.py | ||
``` | ||
|
||
## Demo Walkthrough | ||
|
||
The demo script (`demo.py`) demonstrates: | ||
|
||
1. Data Generation: | ||
- Creates synthetic customer interaction data | ||
- Generates customer profile information | ||
- Saves data in parquet format | ||
|
||
2. Feature Store Operations: | ||
- Materializes features to the online store | ||
- Retrieves historical features for batch processing | ||
- Demonstrates real-time feature serving | ||
|
||
3. Feature Retrieval Examples: | ||
```python | ||
# Batch feature retrieval | ||
features = store.get_historical_features( | ||
entity_df=entity_df, | ||
features=[ | ||
"customer_interactions:session_duration", | ||
"customer_profile:customer_segment", | ||
], | ||
) | ||
|
||
# Online feature retrieval | ||
online_features = store.get_online_features( | ||
features=[ | ||
"customer_interactions:session_duration", | ||
"customer_profile:customer_segment", | ||
], | ||
entity_rows=[{"customer_id": 1}], | ||
) | ||
``` | ||
|
||
## Data Freshness | ||
|
||
- Customer Interactions: Features have a TTL of 90 days | ||
- Customer Profiles: Features have a TTL of 90 days | ||
- Online Store: Updated through feature materialization | ||
- Offline Store: Uses parquet files with event timestamps | ||
|
||
## Dependencies | ||
|
||
- feast==0.42.0 | ||
- pandas>=2.0.1 | ||
- numpy>=1.24.4 | ||
- scikit-learn>=1.3.2 | ||
- python-dotenv>=1.0.0 | ||
|
||
## Best Practices | ||
|
||
1. Feature Definition: | ||
- Use meaningful feature names | ||
- Include proper data types | ||
- Set appropriate TTL values | ||
|
||
2. Data Management: | ||
- Maintain timestamp fields (event_timestamp, created) | ||
- Use appropriate file formats (parquet) | ||
- Regular feature materialization | ||
|
||
3. Feature Services: | ||
- Group related features | ||
- Create purpose-specific services | ||
- Document feature usage | ||
|
||
## Contributing | ||
|
||
Feel free to contribute by: | ||
1. Opening issues for bugs or feature requests | ||
2. Submitting pull requests with improvements | ||
3. Adding new feature services or views |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
import pandas as pd | ||
import numpy as np | ||
from datetime import datetime, timedelta | ||
|
||
# Set random seed for reproducibility | ||
np.random.seed(42) | ||
|
||
def generate_customer_data(n_customers=1000): | ||
# Generate customer IDs | ||
customer_ids = range(1, n_customers + 1) | ||
|
||
# Generate timestamps for the last 30 days | ||
end_date = datetime.now() | ||
start_date = end_date - timedelta(days=30) | ||
|
||
data = [] | ||
for customer_id in customer_ids: | ||
# Generate multiple interactions per customer | ||
n_interactions = np.random.randint(1, 10) | ||
for _ in range(n_interactions): | ||
timestamp = start_date + timedelta( | ||
days=np.random.randint(0, 30), | ||
hours=np.random.randint(0, 24), | ||
minutes=np.random.randint(0, 60) | ||
) | ||
|
||
interaction = { | ||
'customer_id': customer_id, | ||
'event_timestamp': timestamp, | ||
'session_duration': np.random.randint(1, 120), # minutes | ||
'pages_viewed': np.random.randint(1, 20), | ||
'purchase_amount': np.random.choice([0] * 7 + [round(np.random.uniform(10, 200), 2)] * 3), | ||
'is_weekend': int(timestamp.weekday() >= 5), | ||
'device_type': np.random.choice(['mobile', 'desktop', 'tablet']), | ||
'browser': np.random.choice(['chrome', 'firefox', 'safari', 'edge']), | ||
'interaction_type': np.random.choice(['view', 'click', 'purchase', 'cart_add']) | ||
} | ||
data.append(interaction) | ||
|
||
# Convert to DataFrame and sort by timestamp | ||
df = pd.DataFrame(data) | ||
df = df.sort_values('event_timestamp') | ||
return df | ||
|
||
def generate_customer_profile(n_customers=1000): | ||
customer_ids = range(1, n_customers + 1) | ||
|
||
profiles = [] | ||
for customer_id in customer_ids: | ||
profile = { | ||
'customer_id': customer_id, | ||
'age': np.random.randint(18, 70), | ||
'subscription_type': np.random.choice(['basic', 'premium', 'pro']), | ||
'country': np.random.choice(['US', 'UK', 'CA', 'AU', 'DE']), | ||
'signup_date': datetime.now() - timedelta(days=np.random.randint(1, 365)), | ||
'customer_segment': np.random.choice(['new', 'regular', 'vip']), | ||
'email_subscribed': np.random.choice([True, False]), | ||
'lifetime_value': round(np.random.uniform(0, 1000), 2) | ||
} | ||
profiles.append(profile) | ||
|
||
return pd.DataFrame(profiles) | ||
|
||
if __name__ == "__main__": | ||
# Generate datasets | ||
interactions_df = generate_customer_data() | ||
profiles_df = generate_customer_profile() | ||
|
||
# Save datasets in parquet format | ||
interactions_df.to_parquet("data/customer_interactions.parquet", index=False) | ||
profiles_df.to_parquet("data/customer_profiles.parquet", index=False) | ||
|
||
print(f"Generated {len(interactions_df)} interaction records") | ||
print(f"Generated {len(profiles_df)} customer profiles") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
from datetime import datetime, timedelta | ||
import pandas as pd | ||
import numpy as np | ||
from feast import FeatureStore | ||
|
||
def generate_sample_data(): | ||
"""Generate sample data for demonstration""" | ||
# Set random seed for reproducibility | ||
np.random.seed(42) | ||
|
||
# Generate customer interactions data | ||
n_interactions = 1000 | ||
customer_ids = np.random.randint(1, 6, n_interactions) | ||
|
||
# Generate timestamps for the last 30 days | ||
end_date = datetime.now() | ||
start_date = end_date - timedelta(days=30) | ||
timestamps = pd.date_range(start=start_date, end=end_date, periods=n_interactions) | ||
|
||
interactions_df = pd.DataFrame({ | ||
"event_timestamp": timestamps, | ||
"created": timestamps, # Additional timestamp field | ||
"customer_id": customer_ids, | ||
"session_duration": np.random.randint(1, 3600, n_interactions), | ||
"pages_viewed": np.random.randint(1, 50, n_interactions), | ||
"purchase_amount": np.random.uniform(0, 1000, n_interactions), | ||
"is_weekend": np.random.randint(0, 2, n_interactions), | ||
"device_type": np.random.choice(["mobile", "desktop", "tablet"], n_interactions), | ||
"browser": np.random.choice(["chrome", "firefox", "safari"], n_interactions), | ||
"interaction_type": np.random.choice(["view", "cart", "purchase"], n_interactions), | ||
}) | ||
|
||
# Generate customer profile data | ||
n_customers = 5 | ||
profile_timestamps = [datetime.now()] * n_customers | ||
|
||
profile_df = pd.DataFrame({ | ||
"event_timestamp": profile_timestamps, | ||
"created": profile_timestamps, # Additional timestamp field | ||
"customer_id": range(1, n_customers + 1), | ||
"age": np.random.randint(18, 80, n_customers), | ||
"subscription_type": np.random.choice(["free", "basic", "premium"], n_customers), | ||
"country": np.random.choice(["US", "UK", "CA"], n_customers), | ||
"customer_segment": np.random.choice(["new", "regular", "vip"], n_customers), | ||
"email_subscribed": np.random.randint(0, 2, n_customers), | ||
"lifetime_value": np.random.uniform(0, 5000, n_customers), | ||
}) | ||
|
||
# Convert timestamps to pandas datetime | ||
interactions_df['event_timestamp'] = pd.to_datetime(interactions_df['event_timestamp']) | ||
interactions_df['created'] = pd.to_datetime(interactions_df['created']) | ||
profile_df['event_timestamp'] = pd.to_datetime(profile_df['event_timestamp']) | ||
profile_df['created'] = pd.to_datetime(profile_df['created']) | ||
|
||
# Save the dataframes | ||
interactions_df.to_parquet("data/customer_interactions.parquet") | ||
profile_df.to_parquet("data/customer_profiles.parquet") | ||
|
||
return interactions_df, profile_df | ||
|
||
def main(): | ||
# Initialize the feature store | ||
store = FeatureStore(repo_path=".") | ||
|
||
print("\n=== Demonstration of Feature Store Usage ===\n") | ||
|
||
# 1. Generate and save sample data | ||
print("1. Generating sample data...") | ||
interactions_df, profile_df = generate_sample_data() | ||
print("Sample data generated and saved to parquet files.") | ||
|
||
# 2. Materialize features to online store | ||
print("\n2. Materializing features to online store...") | ||
store.materialize_incremental(end_date=datetime.now()) | ||
print("Features materialized successfully.") | ||
|
||
# 3. Generate sample entity dataframe for feature retrieval | ||
entity_df = pd.DataFrame( | ||
{ | ||
"customer_id": [1, 2, 3, 4, 5], | ||
"event_timestamp": [datetime.now() for _ in range(5)] | ||
} | ||
) | ||
|
||
print("\n3. Fetching historical features for recommendation system:") | ||
recommendation_features = store.get_historical_features( | ||
entity_df=entity_df, | ||
features=[ | ||
"customer_interactions:session_duration", | ||
"customer_interactions:purchase_amount", | ||
"customer_profile:customer_segment", | ||
"customer_profile:lifetime_value", | ||
], | ||
).to_df() | ||
print("\nRecommendation Features Sample:") | ||
print(recommendation_features.head()) | ||
|
||
# 4. Online feature retrieval | ||
print("\n4. Online Feature Retrieval Example:") | ||
python demo.py online_features = store.get_online_features( | ||
features=[ | ||
"customer_interactions:session_duration", | ||
"customer_interactions:purchase_amount", | ||
"customer_profile:customer_segment", | ||
], | ||
entity_rows=[{"customer_id": 1}], | ||
).to_dict() | ||
print("\nOnline Features for Customer 1:") | ||
print(online_features) | ||
|
||
if __name__ == "__main__": | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
name: feature-store-demo | ||
channels: | ||
- conda-forge | ||
- defaults | ||
dependencies: | ||
- python=3.10 | ||
- pandas>=2.0.1 # Compatible with feast and great-expectations | ||
- numpy>=1.24.4 | ||
- scikit-learn>=1.3.2 | ||
- python-dotenv>=1.0.0 | ||
- sqlite>=3.42.0 | ||
- jupyter>=1.0.0 | ||
- ipykernel>=6.0.0 | ||
- pip>=23.0.0 | ||
- pip: | ||
- "dask[dataframe]>=2024.12.1" | ||
- great-expectations>=0.17.23 | ||
- feast==0.42.0 | ||
- dvc==3.30.3 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
|
Oops, something went wrong.