Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable Feature Factory to support LLMs RAG #20

Open
wants to merge 27 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
67339ee
add support for catalog
lyliyu Apr 25, 2023
0a21113
add catalog base
lyliyu May 16, 2023
4821dc5
agg granularity to restrict group by
lyliyu Jun 15, 2023
a21e3aa
add llm tools
lyliyu Nov 6, 2023
22370b3
add mapparitions for generating chunks col
lyliyu Nov 9, 2023
7df4bc4
check the resource and create if not created before running apply logic
lyliyu Nov 10, 2023
9e961fb
add tokenizer to the text splitter
lyliyu Nov 10, 2023
38abaca
add llm notebooks
lyliyu Nov 10, 2023
e78ac55
add dependencies for langchain and llamaindex
lyliyu Nov 10, 2023
67f33eb
add sample file for testing
lyliyu Nov 10, 2023
06f3367
changed to relative path
lyliyu Nov 10, 2023
e7eae70
fix build issue
lyliyu Nov 10, 2023
09d88b8
utils funcs to convert between string, document and lcdocument
lyliyu Nov 11, 2023
3332711
add tokenizer splitter
lyliyu Nov 11, 2023
294d4f9
Merge pull request #18 from lyliyu/llms
lyliyu Nov 11, 2023
cff8eda
updated readme; requirements; add catalog for llm features.
lyliyu Nov 13, 2023
0ed788b
add comments to llms api and classes
lyliyu Nov 13, 2023
aa41e99
remove unnecessary imports
lyliyu Nov 13, 2023
7b7ecb2
fixed langchain version
lyliyu Nov 13, 2023
f80c25c
fix openai at 0.27.8
lyliyu Nov 13, 2023
05817bf
add specific version of openai to setup.py
lyliyu Nov 14, 2023
f5f2e80
give a range for openai
lyliyu Nov 14, 2023
92ca58e
install openai
lyliyu Nov 15, 2023
872b900
implemented extract_directory_metadata to extract metadata from file …
lyliyu Nov 17, 2023
b079aa2
Merge remote-tracking branch 'ff_llms/llms' into llms
lyliyu Nov 17, 2023
6a1272c
Impl extractor subclassing llmtool; provide extractors as input to th…
lyliyu Dec 2, 2023
c3216a2
customize prompt template
lyliyu Dec 10, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ jobs:
with:
python-version: 3.8
- name: Install dependencies
run: pip install -r requirements.txt
run: |
pip install openai==0.28.1
pip install -r requirements.txt
- name: Run tests and collect coverage
run: |
coverage run -m unittest discover
Expand Down
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,24 @@ class StoreSales(CommonFeatures, Filters):
return self._dct["total_trans"]
```

## 6. LLM Support

Feature Factory supports Retrieval Augmented Generaetion by creating chunks of texts from documents. Vector store indices can be populated from the chunks of texts and be utilized for augmenting the prompts before feeding into LLMs.

A LLM feature is a column of a dataframe which contains the chunks generated from input documents. This is an example of how can Feature Facotry APIs can be invoked to create a LLM feature:

```python
df = ff.assemble_llm_feature(spark, srcDirectory= "a directory containing documents", llmFeature=llm_feature, partitionNum=partition_num)
```
In this example, `srcDirectory` is the directory containing all intput documents. The `partitionNum` is the number of spark partitions during computation: i.e. if you have two work nodes as GPU instances, you can set the partitionNum to be 2 to distribute the documents onto the two worker nodes.

`llm_feature` is an instance of class `LLMFeature`, which consists of a doc reader and splitter. The current implementation of doc readers includes SimpleDirectoryReader of LlamaIndex and UnstructuredDocReader using Unstructured API. Cusotimized readers can be implemented by overriding class DocReader and re-implement the `create` and `apply` method. `create` method is called to create the resources needed for the computation, and the `apply` make inference for each file/row.

The current implementation of doc splitters supports `SimpleNodeParser` of LlamaIndex, `RecursiveCharacterTextSplitter` of LangChain, and a custom tokeninzer based splitter (`TokenizerTextSpliter`). Like doc readers, the splitter classes can be extended by subclass DocSplitter. Please note that meta data extractor is supported for the `SimpleNodeParser`. A LLM instance needs to be created for the metadata extracion. The LLM definition needs to subclass `LLMDef` and override the `create` method. An example of LLM definition can be found at: [LLM notebook](./notebooks/feature_factory_llms.py).

Metadata of documents can be extracted using the Metadata extractor of LlamaIndex. Feature factory also provides a method to extract metadadta from the file pathes. For example, if your documents are stored in directories of years, you can extract the year as metadata if the directories are named as `year=[actual year]`. For example, if your document has the path of /tmp/year_of_publication=2023/doc1, after splitting, each chunk from that document will have `year of publication: 2023` as the part of the header of the chunk.


## Project Support
Please note that all projects in the /databrickslabs github account are provided for your exploration only, and are not formally supported by Databricks with Service Level Agreements (SLAs). They are provided AS-IS and we do not make any guarantees of any kind. Please do not submit a support ticket relating to any issues arising from the use of these projects.

Expand Down
2 changes: 1 addition & 1 deletion channelDemoCatalog/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from framework.channel import Channel
from channelDemoStore.sales import Sales
from .sales import Sales
from featurefamily_common.trends import TrendsCommon
from framework.feature_factory.dtm import DateTimeManager
from framework.feature_factory import Helpers
Expand Down
2 changes: 2 additions & 0 deletions channelDemoCatalog/sales.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
from framework.feature_factory.feature_family import FeatureFamily
from framework.feature_factory import Helpers
from framework.configobj import ConfigObj
from framework.spark_singleton import SparkSingleton
import inspect

# joiner_func = Helpers()._register_joiner_func()
multipliable = Helpers()._register_feature_func()
base_feat = Helpers()._register_feature_func()

spark = SparkSingleton.get_instance()

# Extend FeatureFamily
class Sales(FeatureFamily):
Expand Down
2 changes: 1 addition & 1 deletion channelDemoStore/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from framework.channel import Channel
from channelDemoStore.sales import Sales
from .sales import Sales
from featurefamily_common.trends import TrendsCommon
from framework.feature_factory.dtm import DateTimeManager
from framework.feature_factory import Helpers
Expand Down
2 changes: 2 additions & 0 deletions channelDemoStore/sales.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
from framework.feature_factory.feature_family import FeatureFamily
from framework.feature_factory import Helpers
from framework.configobj import ConfigObj
from framework.spark_singleton import SparkSingleton
import inspect

# joiner_func = Helpers()._register_joiner_func()
multipliable = Helpers()._register_feature_func()
base_feat = Helpers()._register_feature_func()

spark = SparkSingleton.get_instance()

# Extend FeatureFamily
class Sales(FeatureFamily):
Expand Down
2 changes: 1 addition & 1 deletion channelDemoWeb/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from framework.channel import Channel
from channelDemoStore.sales import Sales
from .sales import Sales
from featurefamily_common.trends import TrendsCommon
from framework.feature_factory.dtm import DateTimeManager
from framework.feature_factory import Helpers
Expand Down
2 changes: 2 additions & 0 deletions channelDemoWeb/sales.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
from framework.feature_factory.feature_family import FeatureFamily
from framework.feature_factory import Helpers
from framework.configobj import ConfigObj
from framework.spark_singleton import SparkSingleton
import inspect

# joiner_func = Helpers()._register_joiner_func()
multipliable = Helpers()._register_feature_func()
base_feat = Helpers()._register_feature_func()

spark = SparkSingleton.get_instance()

# Extend FeatureFamily
class Sales(FeatureFamily):
Expand Down
57 changes: 42 additions & 15 deletions framework/feature_factory/__init__.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
from pyspark.sql.functions import col, lit, when, struct
from pyspark.sql.column import Column
from pyspark.sql import functions as F
from pyspark.sql import functions as F, SparkSession
from pyspark.sql.dataframe import DataFrame
from framework.feature_factory.feature import Feature, FeatureSet, Multiplier
from framework.configobj import ConfigObj
from framework.feature_factory.helpers import Helpers
from framework.feature_factory.agg_granularity import AggregationGranularity
from framework.feature_factory.llm_tools import LLMFeature, LLMUtils
import re
import logging
import datetime
import inspect
from collections import OrderedDict
from typing import List
from enum import Enum

logger = logging.getLogger(__name__)

Expand All @@ -18,7 +22,7 @@ class Feature_Factory():
def __init__(self):
self.helpers = Helpers()

def append_features(self, df: DataFrame, groupBy_cols, feature_sets: [FeatureSet], withTrendsForFeatures: [FeatureSet] = None):
def append_features(self, df: DataFrame, groupBy_cols, feature_sets: List[FeatureSet], withTrendsForFeatures: List[FeatureSet] = None, granularityEnum: Enum = None):
"""
Appends features to incoming df. The features columns and groupby cols will be deduped and validated.
If there's a group by, the groupby cols will be applied before appending features.
Expand All @@ -30,29 +34,22 @@ def append_features(self, df: DataFrame, groupBy_cols, feature_sets: [FeatureSet
"""
# If groupBy Column is past in as something other than list, convert to list
# Validation - If features, passed in is dict, convert to list of vals, etc.
# groupBy_cols = self.helpers._to_list(groupBy_cols)
# groupBy_cols, groupBy_joiners = self.helpers._extract_groupby_joiner(groupBy_cols)
groupBy_cols = [gc.assembled_column if isinstance(gc, Feature) else gc for gc in groupBy_cols]
features, dups = self.helpers._dedup_fast(df, [feature for feature_set in feature_sets for feature in feature_set.features.values()])
# df = self.helpers._resolve_feature_joiners(df, features, groupBy_joiners).repartition(*groupBy_cols)
df = df.repartition(*groupBy_cols)

# feature_cols = []
agg_cols = []
non_agg_cols = {}
features_to_drop = []
# base_cols = [f.base_col for f in features]

# column validation
# valid_result, undef_cols = self.helpers.validate_col(df, *base_cols)
# assert valid_result, "base cols {} are not defined in df columns {}".format(undef_cols, df.columns)

# valid_result, undef_cols = self.helpers._validate_col(df, *groupBy_cols)
# assert valid_result, "groupby cols {} are not defined in df columns {}".format(undef_cols, df.columns)

granularity_validator = AggregationGranularity(granularityEnum) if granularityEnum else None
for feature in features:
assert True if ((len(feature.aggs) > 0) and (len(
groupBy_cols) > 0) or feature.agg_func is None) else False, "{} has either aggs or groupBys " \
"but not both, ensure both are present".format(feature.name)
if granularity_validator:
granularity_validator.validate(feature, groupBy_cols)
# feature_cols.append(feature.assembled_column)
# feature_cols.append(F.col(feature.output_alias))
agg_cols += [agg_col for agg_col in feature.aggs]
Expand All @@ -71,8 +68,38 @@ def append_features(self, df: DataFrame, groupBy_cols, feature_sets: [FeatureSet
df = df.withColumn(fn, col)

final_df = df.drop(*features_to_drop)
# else:
# new_df = df.select(*df.columns + feature_cols)

return final_df

def append_catalog(self, df: DataFrame, groupBy_cols, catalog_cls, feature_names = [], withTrendsForFeatures: List[FeatureSet] = None, granularityEnum: Enum = None):
"""
Appends features to incoming df. The features columns and groupby cols will be deduped and validated.
If there's a group by, the groupby cols will be applied before appending features.
If there's not a group by and no agg features then the features will be appended to df.
:param df:
:param groupBy_cols:
:param feature_sets: input of FeatureSet
:return:
"""
# dct = self._get_all_features(catalog_cls)
dct = catalog_cls.get_all_features()
fs = FeatureSet(dct)
return self.append_features(df, groupBy_cols, [fs], withTrendsForFeatures, granularityEnum)

def assemble_llm_feature(self, spark: SparkSession, srcDirectory: str, llmFeature: LLMFeature, partitionNum: int):
"""
Creates a dataframe which contains only one column named as llmFeature.name.
The method will distribute the files under srcDirectory to the partitions determined by the partitionNum.
Each file will be parsed and chunked using the reader and splitter in the llmFeature object.
:param spark: a spark session instance
:param srcDirectory: the directory containing documents to parse
:llmFeature: the LLM feature instance
:partitionNum: the number of partitions the src documents will be distributed onto.
"""
all_files = self.helpers.list_files_recursively(srcDirectory)
src_rdd = spark.sparkContext.parallelize(all_files, partitionNum)

rdd = src_rdd.mapPartitions(lambda partition_data: LLMUtils.process_docs(partitionData=partition_data, llmFeat=llmFeature)).repartition(partitionNum)
rdd.cache()
df = rdd.toDF([llmFeature.name])
return df.select(F.explode(llmFeature.name).alias(llmFeature.name))
26 changes: 26 additions & 0 deletions framework/feature_factory/agg_granularity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from enum import IntEnum, EnumMeta
from pyspark.sql.column import Column

class AggregationGranularity:
def __init__(self, granularity: EnumMeta) -> None:
assert isinstance(granularity, EnumMeta), "Granularity should be of type Enum."
self.granularity = granularity


def validate(self, feat, groupby_list):
if not feat.agg_granularity:
return None
min_granularity_level = float("inf")
for level in groupby_list:
if isinstance(level, str):
try:
level = self.granularity[level]
except:
print(f"{level} is not part of {self.granularity}")
continue
if isinstance(level, Column):
continue
min_granularity_level = min(min_granularity_level, level.value)
assert min_granularity_level <= feat.agg_granularity.value, f"Required granularity for {feat.name} is {feat.agg_granularity}"


47 changes: 47 additions & 0 deletions framework/feature_factory/catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from .feature import Feature
from .llm_tools import LLMFeature

class CatalogBase:
@classmethod
def get_all_features(cls):
"""
Returns a dict of features defined in a catalog class.
Note that the variables include those inherited from the parent classes, and are
updated based on the reversed method resolution order.

In the example below, while `class C` inherits props defined in `class A` and `class B`,
props defined in `class C` will overwrite props defined in `class A`, which will in turn
overwrite those defined in `class B`:
>>> class A:
>>> pass
>>> class B:
>>> pass
>>> class C(A, B):
>>> pass
"""

members = dict()
for aclass in reversed(cls.__mro__):
vars_dct = vars(aclass)
for nm, variable in vars_dct.items():
if not isinstance(variable, Feature): continue
if not callable(getattr(aclass, nm)) and not nm.startswith("__"):
members[nm] = variable
variable.set_feature_name(nm)
return members

class LLMCatalogBase:
@classmethod
def get_all_features(cls) -> LLMFeature:
"""
Returns a LLMFeature which contains a DocReader and DocSplitter instance.
"""
llm_feat = None
for aclass in reversed(cls.__mro__):
vars_dct = vars(aclass)
for nm, variable in vars_dct.items():
if not callable(getattr(aclass, nm)) and not nm.startswith("__"):
if isinstance(variable, LLMFeature):
llm_feat = variable
llm_feat.name = nm
return llm_feat
28 changes: 27 additions & 1 deletion framework/feature_factory/feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from framework.feature_factory.dtm import DateTimeManager
import datetime
import copy
from typing import List


class Feature:
Expand All @@ -20,7 +21,8 @@ def __init__(self,
_agg_func=None,
_agg_alias:str=None,
_kind="multipliable",
_is_temporary=False):
_is_temporary=False,
_agg_granularity=None):
"""

:param _name: name of the feature
Expand All @@ -46,6 +48,13 @@ def __init__(self,
self.kind = _kind
self.is_temporary = _is_temporary
self.names = None
self.agg_granularity = _agg_granularity

def set_feature_name(self, name: str):
self._name = name
self.output_alias = name
self.aggs = []
self._assemble_column()

def _clone(self, _alias: str=None):
alias = _alias if _alias is not None else self.output_alias
Expand Down Expand Up @@ -122,6 +131,23 @@ def __add__(self, other):

def __sub__(self, other):
return CompositeFeature.from_feature(self, "-", other)

@classmethod
def create(cls, base_col: Column,
filter: List[Column] = [],
negative_value=None,
agg_func=None,
agg_alias: str = None,
agg_granularity: str = None):

return Feature(
_name = "",
_base_col = base_col,
_filter = filter,
_negative_value = negative_value,
_agg_func = agg_func,
_agg_alias = agg_alias,
_agg_granularity = agg_granularity)


class FeatureSet:
Expand Down
13 changes: 10 additions & 3 deletions framework/feature_factory/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,16 @@
from pyspark.sql import functions as F
from pyspark.sql.dataframe import DataFrame
from framework.feature_factory.feature import Feature
from framework.spark_singleton import SparkSingleton
from framework import feature_factory

from datetime import datetime
from datetime import timedelta
import inspect
import os
from collections import OrderedDict
import logging

logger = logging.getLogger(__name__)
spark = SparkSingleton.get_instance()
# spark = SparkSingleton.get_instance()


class Helpers:
Expand Down Expand Up @@ -325,6 +324,14 @@ def get_months_range(self, dt: datetime.date, months: int):
dt2 = self.subtract_months(dt, i)
months_range.append(dt2)
return months_range

def list_files_recursively(self, directory: str):
file_list = []
for root, _, files in os.walk(directory):
for file in files:
file_path = os.path.join(root, file)
file_list.append(file_path)
return file_list

class Converter:
def __init__(self, items, target_type):
Expand Down
Loading
Loading