Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable feature factory with data preparation for LLMs #18

Merged
merged 11 commits into from
Nov 11, 2023
2 changes: 1 addition & 1 deletion channelDemoCatalog/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from framework.channel import Channel
from channelDemoStore.sales import Sales
from .sales import Sales
from featurefamily_common.trends import TrendsCommon
from framework.feature_factory.dtm import DateTimeManager
from framework.feature_factory import Helpers
Expand Down
2 changes: 2 additions & 0 deletions channelDemoCatalog/sales.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
from framework.feature_factory.feature_family import FeatureFamily
from framework.feature_factory import Helpers
from framework.configobj import ConfigObj
from framework.spark_singleton import SparkSingleton
import inspect

# joiner_func = Helpers()._register_joiner_func()
multipliable = Helpers()._register_feature_func()
base_feat = Helpers()._register_feature_func()

spark = SparkSingleton.get_instance()

# Extend FeatureFamily
class Sales(FeatureFamily):
Expand Down
2 changes: 1 addition & 1 deletion channelDemoStore/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from framework.channel import Channel
from channelDemoStore.sales import Sales
from .sales import Sales
from featurefamily_common.trends import TrendsCommon
from framework.feature_factory.dtm import DateTimeManager
from framework.feature_factory import Helpers
Expand Down
2 changes: 2 additions & 0 deletions channelDemoStore/sales.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
from framework.feature_factory.feature_family import FeatureFamily
from framework.feature_factory import Helpers
from framework.configobj import ConfigObj
from framework.spark_singleton import SparkSingleton
import inspect

# joiner_func = Helpers()._register_joiner_func()
multipliable = Helpers()._register_feature_func()
base_feat = Helpers()._register_feature_func()

spark = SparkSingleton.get_instance()

# Extend FeatureFamily
class Sales(FeatureFamily):
Expand Down
2 changes: 1 addition & 1 deletion channelDemoWeb/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from framework.channel import Channel
from channelDemoStore.sales import Sales
from .sales import Sales
from featurefamily_common.trends import TrendsCommon
from framework.feature_factory.dtm import DateTimeManager
from framework.feature_factory import Helpers
Expand Down
2 changes: 2 additions & 0 deletions channelDemoWeb/sales.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
from framework.feature_factory.feature_family import FeatureFamily
from framework.feature_factory import Helpers
from framework.configobj import ConfigObj
from framework.spark_singleton import SparkSingleton
import inspect

# joiner_func = Helpers()._register_joiner_func()
multipliable = Helpers()._register_feature_func()
base_feat = Helpers()._register_feature_func()

spark = SparkSingleton.get_instance()

# Extend FeatureFamily
class Sales(FeatureFamily):
Expand Down
27 changes: 13 additions & 14 deletions framework/feature_factory/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from pyspark.sql.functions import col, lit, when, struct
from pyspark.sql.column import Column
from pyspark.sql import functions as F
from pyspark.sql import functions as F, SparkSession
from pyspark.sql.dataframe import DataFrame
from framework.feature_factory.feature import Feature, FeatureSet, Multiplier
from framework.configobj import ConfigObj
from framework.feature_factory.helpers import Helpers
from framework.feature_factory.agg_granularity import AggregationGranularity
from framework.feature_factory.llm_tools import LLMFeature, LLMUtils
import re
import logging
import datetime
Expand Down Expand Up @@ -33,25 +34,15 @@
"""
# If groupBy Column is past in as something other than list, convert to list
# Validation - If features, passed in is dict, convert to list of vals, etc.
# groupBy_cols = self.helpers._to_list(groupBy_cols)
# groupBy_cols, groupBy_joiners = self.helpers._extract_groupby_joiner(groupBy_cols)
groupBy_cols = [gc.assembled_column if isinstance(gc, Feature) else gc for gc in groupBy_cols]
features, dups = self.helpers._dedup_fast(df, [feature for feature_set in feature_sets for feature in feature_set.features.values()])
# df = self.helpers._resolve_feature_joiners(df, features, groupBy_joiners).repartition(*groupBy_cols)
df = df.repartition(*groupBy_cols)

# feature_cols = []
agg_cols = []
non_agg_cols = {}
features_to_drop = []
# base_cols = [f.base_col for f in features]

# column validation
# valid_result, undef_cols = self.helpers.validate_col(df, *base_cols)
# assert valid_result, "base cols {} are not defined in df columns {}".format(undef_cols, df.columns)

# valid_result, undef_cols = self.helpers._validate_col(df, *groupBy_cols)
# assert valid_result, "groupby cols {} are not defined in df columns {}".format(undef_cols, df.columns)

granularity_validator = AggregationGranularity(granularityEnum) if granularityEnum else None
for feature in features:
assert True if ((len(feature.aggs) > 0) and (len(
Expand All @@ -77,8 +68,7 @@
df = df.withColumn(fn, col)

final_df = df.drop(*features_to_drop)
# else:
# new_df = df.select(*df.columns + feature_cols)

return final_df

def append_catalog(self, df: DataFrame, groupBy_cols, catalog_cls, feature_names = [], withTrendsForFeatures: List[FeatureSet] = None, granularityEnum: Enum = None):
Expand All @@ -96,3 +86,12 @@
fs = FeatureSet(dct)
return self.append_features(df, groupBy_cols, [fs], withTrendsForFeatures, granularityEnum)

def assemble_llm_feature(self, spark: SparkSession, srcDirectory: str, llmFeature: LLMFeature, partitionNum: int):

all_files = self.helpers.list_files_recursively(srcDirectory)
src_rdd = spark.sparkContext.parallelize(all_files, partitionNum)

Check warning on line 92 in framework/feature_factory/__init__.py

View check run for this annotation

Codecov / codecov/patch

framework/feature_factory/__init__.py#L91-L92

Added lines #L91 - L92 were not covered by tests

rdd = src_rdd.mapPartitions(lambda partition_data: LLMUtils.process_docs(partitionData=partition_data, llmFeat=llmFeature)).repartition(partitionNum)
rdd.cache()
df = rdd.toDF([llmFeature.name])
return df.select(F.explode(llmFeature.name).alias(llmFeature.name))

Check warning on line 97 in framework/feature_factory/__init__.py

View check run for this annotation

Codecov / codecov/patch

framework/feature_factory/__init__.py#L95-L97

Added lines #L95 - L97 were not covered by tests
13 changes: 10 additions & 3 deletions framework/feature_factory/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,16 @@
from pyspark.sql import functions as F
from pyspark.sql.dataframe import DataFrame
from framework.feature_factory.feature import Feature
from framework.spark_singleton import SparkSingleton
from framework import feature_factory

from datetime import datetime
from datetime import timedelta
import inspect
import os
from collections import OrderedDict
import logging

logger = logging.getLogger(__name__)
spark = SparkSingleton.get_instance()
# spark = SparkSingleton.get_instance()


class Helpers:
Expand Down Expand Up @@ -325,6 +324,14 @@
dt2 = self.subtract_months(dt, i)
months_range.append(dt2)
return months_range

def list_files_recursively(self, directory: str):
file_list = []

Check warning on line 329 in framework/feature_factory/helpers.py

View check run for this annotation

Codecov / codecov/patch

framework/feature_factory/helpers.py#L329

Added line #L329 was not covered by tests
for root, _, files in os.walk(directory):
for file in files:
file_path = os.path.join(root, file)
file_list.append(file_path)
return file_list

Check warning on line 334 in framework/feature_factory/helpers.py

View check run for this annotation

Codecov / codecov/patch

framework/feature_factory/helpers.py#L332-L334

Added lines #L332 - L334 were not covered by tests

class Converter:
def __init__(self, items, target_type):
Expand Down
Loading
Loading