diff --git a/channelDemoCatalog/__init__.py b/channelDemoCatalog/__init__.py index f6ed3c3..0bf3395 100644 --- a/channelDemoCatalog/__init__.py +++ b/channelDemoCatalog/__init__.py @@ -1,5 +1,5 @@ from framework.channel import Channel -from channelDemoStore.sales import Sales +from .sales import Sales from featurefamily_common.trends import TrendsCommon from framework.feature_factory.dtm import DateTimeManager from framework.feature_factory import Helpers diff --git a/channelDemoCatalog/sales.py b/channelDemoCatalog/sales.py index 9bab1dd..3677360 100644 --- a/channelDemoCatalog/sales.py +++ b/channelDemoCatalog/sales.py @@ -5,12 +5,14 @@ from framework.feature_factory.feature_family import FeatureFamily from framework.feature_factory import Helpers from framework.configobj import ConfigObj +from framework.spark_singleton import SparkSingleton import inspect # joiner_func = Helpers()._register_joiner_func() multipliable = Helpers()._register_feature_func() base_feat = Helpers()._register_feature_func() +spark = SparkSingleton.get_instance() # Extend FeatureFamily class Sales(FeatureFamily): diff --git a/channelDemoStore/__init__.py b/channelDemoStore/__init__.py index e7d544b..cc79fff 100644 --- a/channelDemoStore/__init__.py +++ b/channelDemoStore/__init__.py @@ -1,5 +1,5 @@ from framework.channel import Channel -from channelDemoStore.sales import Sales +from .sales import Sales from featurefamily_common.trends import TrendsCommon from framework.feature_factory.dtm import DateTimeManager from framework.feature_factory import Helpers diff --git a/channelDemoStore/sales.py b/channelDemoStore/sales.py index f9ae16a..30a6160 100644 --- a/channelDemoStore/sales.py +++ b/channelDemoStore/sales.py @@ -4,12 +4,14 @@ from framework.feature_factory.feature_family import FeatureFamily from framework.feature_factory import Helpers from framework.configobj import ConfigObj +from framework.spark_singleton import SparkSingleton import inspect # joiner_func = Helpers()._register_joiner_func() multipliable = Helpers()._register_feature_func() base_feat = Helpers()._register_feature_func() +spark = SparkSingleton.get_instance() # Extend FeatureFamily class Sales(FeatureFamily): diff --git a/channelDemoWeb/__init__.py b/channelDemoWeb/__init__.py index 4f7efde..2621fa6 100644 --- a/channelDemoWeb/__init__.py +++ b/channelDemoWeb/__init__.py @@ -1,5 +1,5 @@ from framework.channel import Channel -from channelDemoStore.sales import Sales +from .sales import Sales from featurefamily_common.trends import TrendsCommon from framework.feature_factory.dtm import DateTimeManager from framework.feature_factory import Helpers diff --git a/channelDemoWeb/sales.py b/channelDemoWeb/sales.py index 69b0663..15ff996 100644 --- a/channelDemoWeb/sales.py +++ b/channelDemoWeb/sales.py @@ -5,12 +5,14 @@ from framework.feature_factory.feature_family import FeatureFamily from framework.feature_factory import Helpers from framework.configobj import ConfigObj +from framework.spark_singleton import SparkSingleton import inspect # joiner_func = Helpers()._register_joiner_func() multipliable = Helpers()._register_feature_func() base_feat = Helpers()._register_feature_func() +spark = SparkSingleton.get_instance() # Extend FeatureFamily class Sales(FeatureFamily): diff --git a/framework/feature_factory/__init__.py b/framework/feature_factory/__init__.py index 753c373..b3359fd 100644 --- a/framework/feature_factory/__init__.py +++ b/framework/feature_factory/__init__.py @@ -1,11 +1,12 @@ from pyspark.sql.functions import col, lit, when, struct from pyspark.sql.column import Column -from pyspark.sql import functions as F +from pyspark.sql import functions as F, SparkSession from pyspark.sql.dataframe import DataFrame from framework.feature_factory.feature import Feature, FeatureSet, Multiplier from framework.configobj import ConfigObj from framework.feature_factory.helpers import Helpers from framework.feature_factory.agg_granularity import AggregationGranularity +from framework.feature_factory.llm_tools import LLMFeature, LLMUtils import re import logging import datetime @@ -33,25 +34,15 @@ def append_features(self, df: DataFrame, groupBy_cols, feature_sets: List[Featur """ # If groupBy Column is past in as something other than list, convert to list # Validation - If features, passed in is dict, convert to list of vals, etc. - # groupBy_cols = self.helpers._to_list(groupBy_cols) - # groupBy_cols, groupBy_joiners = self.helpers._extract_groupby_joiner(groupBy_cols) groupBy_cols = [gc.assembled_column if isinstance(gc, Feature) else gc for gc in groupBy_cols] features, dups = self.helpers._dedup_fast(df, [feature for feature_set in feature_sets for feature in feature_set.features.values()]) - # df = self.helpers._resolve_feature_joiners(df, features, groupBy_joiners).repartition(*groupBy_cols) df = df.repartition(*groupBy_cols) # feature_cols = [] agg_cols = [] non_agg_cols = {} features_to_drop = [] - # base_cols = [f.base_col for f in features] - - # column validation - # valid_result, undef_cols = self.helpers.validate_col(df, *base_cols) - # assert valid_result, "base cols {} are not defined in df columns {}".format(undef_cols, df.columns) - - # valid_result, undef_cols = self.helpers._validate_col(df, *groupBy_cols) - # assert valid_result, "groupby cols {} are not defined in df columns {}".format(undef_cols, df.columns) + granularity_validator = AggregationGranularity(granularityEnum) if granularityEnum else None for feature in features: assert True if ((len(feature.aggs) > 0) and (len( @@ -77,8 +68,7 @@ def append_features(self, df: DataFrame, groupBy_cols, feature_sets: List[Featur df = df.withColumn(fn, col) final_df = df.drop(*features_to_drop) - # else: - # new_df = df.select(*df.columns + feature_cols) + return final_df def append_catalog(self, df: DataFrame, groupBy_cols, catalog_cls, feature_names = [], withTrendsForFeatures: List[FeatureSet] = None, granularityEnum: Enum = None): @@ -96,3 +86,12 @@ def append_catalog(self, df: DataFrame, groupBy_cols, catalog_cls, feature_names fs = FeatureSet(dct) return self.append_features(df, groupBy_cols, [fs], withTrendsForFeatures, granularityEnum) + def assemble_llm_feature(self, spark: SparkSession, srcDirectory: str, llmFeature: LLMFeature, partitionNum: int): + + all_files = self.helpers.list_files_recursively(srcDirectory) + src_rdd = spark.sparkContext.parallelize(all_files, partitionNum) + + rdd = src_rdd.mapPartitions(lambda partition_data: LLMUtils.process_docs(partitionData=partition_data, llmFeat=llmFeature)).repartition(partitionNum) + rdd.cache() + df = rdd.toDF([llmFeature.name]) + return df.select(F.explode(llmFeature.name).alias(llmFeature.name)) \ No newline at end of file diff --git a/framework/feature_factory/helpers.py b/framework/feature_factory/helpers.py index 1f250eb..2f48844 100644 --- a/framework/feature_factory/helpers.py +++ b/framework/feature_factory/helpers.py @@ -3,17 +3,16 @@ from pyspark.sql import functions as F from pyspark.sql.dataframe import DataFrame from framework.feature_factory.feature import Feature -from framework.spark_singleton import SparkSingleton from framework import feature_factory from datetime import datetime from datetime import timedelta -import inspect +import os from collections import OrderedDict import logging logger = logging.getLogger(__name__) -spark = SparkSingleton.get_instance() +# spark = SparkSingleton.get_instance() class Helpers: @@ -325,6 +324,14 @@ def get_months_range(self, dt: datetime.date, months: int): dt2 = self.subtract_months(dt, i) months_range.append(dt2) return months_range + + def list_files_recursively(self, directory: str): + file_list = [] + for root, _, files in os.walk(directory): + for file in files: + file_path = os.path.join(root, file) + file_list.append(file_path) + return file_list class Converter: def __init__(self, items, target_type): diff --git a/framework/feature_factory/llm_tools.py b/framework/feature_factory/llm_tools.py new file mode 100644 index 0000000..6e229c5 --- /dev/null +++ b/framework/feature_factory/llm_tools.py @@ -0,0 +1,281 @@ +from abc import ABC, abstractmethod +from typing import List, Union, Tuple +from llama_index import SimpleDirectoryReader +from llama_index.node_parser import SimpleNodeParser +from llama_index.node_parser.extractors import ( + MetadataExtractor, + TitleExtractor +) +from llama_index.text_splitter import TokenTextSplitter +from llama_index.schema import MetadataMode, Document as Document +from langchain.docstore.document import Document as LCDocument +from langchain.text_splitter import RecursiveCharacterTextSplitter +from transformers import AutoTokenizer +from langchain.document_loaders import UnstructuredPDFLoader +import math + + +class LLMTool(ABC): + + def __init__(self) -> None: + self._initialized = False + + def _require_init(self) -> bool: + if self._initialized: + return False + else: + self._initialized = True + return True + + @abstractmethod + def apply(self): + ... + + @abstractmethod + def create(self): + ... + + +class DocReader(LLMTool): + + def create(self): + ... + + def apply(self, filename: str) -> Union[str, List[Document]]: + ... + + +class DocSplitter(LLMTool): + + def __init__(self) -> None: + super().__init__() + + @classmethod + def _wrap_docs(cls, docs: Union[Document, LCDocument]): + if isinstance(docs, Document): + docs = [docs] + elif isinstance(docs, LCDocument): + docs = [docs] + return docs + + @classmethod + def _to_text(cls, docs: Union[str, List[Document], List[LCDocument]]): + if not docs: + return "" + if isinstance(docs, str): + return docs + docs = cls._wrap_docs(docs) + if isinstance(docs[0], Document): + texts = [doc.get_content(metadata_mode=MetadataMode.LLM) for doc in docs] + return "\n\n".join(texts) + if isinstance(docs[0], LCDocument): + texts = [doc.page_content for doc in docs] + return "\n\n".join(texts) + raise ValueError("doc type is not defined.") + + @classmethod + def _to_documents(cls, txt: Union[str, List[Document]]): + if not txt: + return None + txt = cls._wrap_docs(txt) + if isinstance(txt, str): + doc = Document(text=txt) + return [doc] + if isinstance(txt[0], Document): + return txt + if isinstance(txt[0], LCDocument): + new_docs = [] + for doc in txt: + new_doc = Document(text=doc.page_content, metadata=doc.metadata) + new_docs.append(new_doc) + return new_docs + + @classmethod + def _to_lcdocuments(cls, docs: Union[str, List[Document], List[LCDocument]]): + if not docs: + return None + docs = cls._wrap_docs(docs) + if isinstance(docs, str): + new_docs = LCDocument(page_content=docs) + return [new_docs] + if isinstance(docs[0], LCDocument): + return docs + if isinstance(docs[0], Document): + new_docs = [] + for doc in docs: + metadata = doc.metadata or {} + new_doc = LCDocument(page_content=doc.text, metadata=metadata) + new_docs.append(new_doc) + return new_docs + + + def apply(self, docs: Union[str, List[Document]]) -> List[str]: + ... + + +class LlamaIndexDocReader(DocReader): + + def __init__(self) -> None: + super().__init__() + + def apply(self, filename: str) -> List[Document]: + documents = SimpleDirectoryReader(input_files=[filename]).load_data() + return documents + + +class UnstructuredDocReader(DocReader): + + def __init__(self, allowedCategories: Tuple[str]=('NarrativeText', 'ListItem')) -> None: + super().__init__() + self.categories = allowedCategories + + def apply(self, filename: str) -> str: + loader = UnstructuredPDFLoader(filename, mode="elements") + docs = loader.load() + filtered_docs = [] + for d in docs: + if d.metadata.get('category') in self.categories: + filtered_docs.append(d.page_content) + combined_text = "\n\n".join(filtered_docs) + + return combined_text + + + +class LLMDef(LLMTool): + + def __init__(self) -> None: + self._instance = None + + def get_instance(self): + return self._instance + + + +class LlamaIndexDocSplitter(DocSplitter): + + def __init__(self, chunk_size:int=1024, chunk_overlap:int=64, llm:LLMDef=None) -> None: + super().__init__() + self.chunk_size = chunk_size + self.chunk_overlap = chunk_overlap + self.llm = llm + + def create(self): + if super()._require_init(): + text_splitter = TokenTextSplitter( + separator=" ", chunk_size=self.chunk_size, chunk_overlap=self.chunk_overlap + ) + if self.llm: + self.llm.create() + metadata_extractor = MetadataExtractor( + extractors=[ + TitleExtractor(nodes=5, llm=self.llm.get_instance()) + ], + in_place=False, + ) + else: + metadata_extractor = None + + self.node_parser = SimpleNodeParser.from_defaults( + text_splitter=text_splitter, + metadata_extractor=metadata_extractor, + ) + return None + + def apply(self, docs: List[Document]): + docs = DocSplitter._to_documents(docs) + self.create() + doc_nodes = self.node_parser.get_nodes_from_documents(docs) + chunks = [node.get_content(metadata_mode=MetadataMode.LLM) for node in doc_nodes] + return chunks + + +class LangChainRecursiveCharacterTextSplitter(DocSplitter): + + def __init__(self, chunk_size=1024, chunk_overlap=64, pretrained_model_path: str=None) -> None: + super().__init__() + self.chunk_size = chunk_size + self.chunk_overlap = chunk_overlap + self.pretrained_model_path = pretrained_model_path + + def create(self): + if super()._require_init(): + if self.pretrained_model_path: + tokenizer = AutoTokenizer.from_pretrained(self.pretrained_model_path) + self.text_splitter = RecursiveCharacterTextSplitter.from_huggingface_tokenizer(tokenizer, + chunk_size=self.chunk_size, + chunk_overlap=self.chunk_overlap) + else: + self.text_splitter = RecursiveCharacterTextSplitter( + chunk_size = self.chunk_size, + chunk_overlap = self.chunk_overlap + ) + + def apply(self, docs): + self.create() + txt = DocSplitter._to_text(docs) + chunks = self.text_splitter.split_text(txt) + return chunks + + +class TokenizerTextSpliter(DocSplitter): + + def __init__(self, chunk_size=1024, chunk_overlap=64, pretrained_tokenizer_path: str=None) -> None: + super().__init__() + self.chunk_size = chunk_size + self.chunk_overlap = chunk_overlap + self.pretrained_tokenizer_path = pretrained_tokenizer_path + self.tokenizer = None + + def create(self): + if not self.pretrained_tokenizer_path: + raise ValueError("Pretrained tokenizer is not defined in TokenizerTextSplitter.") + if super()._require_init(): + self.tokenizer = AutoTokenizer.from_pretrained(self.pretrained_tokenizer_path) + + def apply(self, text: Union[str, List[Document]]) -> List[str]: + self.create() + text = DocSplitter._to_text(text) + text_length = len(self.tokenizer.encode(text)) + num_chunks = math.ceil(text_length / self.chunk_size) + chunk_length = math.ceil(len(text) / num_chunks) + chunks = [] + for i in range(0, len(text), chunk_length): + start = max(0, i-self.chunk_overlap) + end = min(len(text), i + chunk_length + self.chunk_overlap) + chunks.append(text[start:end]) + return chunks + + +class LLMFeature(LLMTool): + + def __init__(self, name: str, reader: DocReader, splitter: DocSplitter) -> None: + super().__init__() + self.name = name + self.reader = reader + self.splitter = splitter + + def create(self): + if super()._require_init(): + self.reader.create() + self.splitter.create() + + def apply(self, filename: str): + self.create() + docs = self.reader.apply(filename) + return self.splitter.apply(docs) + + +class LLMUtils: + + @classmethod + def split_docs(cls, fileName: str, llmFeat: LLMFeature): + print(fileName) + chunks = llmFeat.apply(fileName) + return (chunks,) + + @classmethod + def process_docs(cls, partitionData, llmFeat): + llmFeat.create() + for row in partitionData: + yield cls.split_docs(row, llmFeat) \ No newline at end of file diff --git a/notebooks/feature_factory_llms.py b/notebooks/feature_factory_llms.py new file mode 100644 index 0000000..92b98c5 --- /dev/null +++ b/notebooks/feature_factory_llms.py @@ -0,0 +1,84 @@ +# Databricks notebook source +# MAGIC %pip install llama-index pypdf + +# COMMAND ---------- + +# MAGIC %pip install typing_extensions==4.7.1 + +# COMMAND ---------- + +dbutils.library.restartPython() + +# COMMAND ---------- + +from framework.feature_factory.llm_tools import LLMFeature, LlamaIndexDocReader, LlamaIndexDocSplitter, LLMDef +from framework.feature_factory import Feature_Factory +import torch +from llama_index.llms import HuggingFaceLLM + +# COMMAND ---------- + +ff = Feature_Factory() + +# COMMAND ---------- + +class MPT7b(LLMDef): + def create(self): + torch.cuda.empty_cache() + generate_params = { + "temperature": 1.0, + "top_p": 1.0, + "top_k": 50, + "use_cache": True, + "do_sample": True, + "eos_token_id": 0, + "pad_token_id": 0 + } + + self._instance = HuggingFaceLLM( + max_new_tokens=256, + generate_kwargs=generate_params, + # system_prompt=system_prompt, + # query_wrapper_prompt=query_wrapper_prompt, + tokenizer_name="mosaicml/mpt-7b-instruct", + model_name="mosaicml/mpt-7b-instruct", + device_map="auto", + tokenizer_kwargs={"max_length": 1024}, + model_kwargs={"torch_dtype": torch.float16, "trust_remote_code": True} + ) + return None + + def apply(self): + ... + +# COMMAND ---------- + +doc_splitter = LlamaIndexDocSplitter( + chunk_size = 1024, + chunk_overlap = 32, + llm = MPT7b() +) + +# COMMAND ---------- + +llm_feature = LLMFeature ( + name = "chunks", + reader = LlamaIndexDocReader(), + splitter = doc_splitter +) + +# COMMAND ---------- + +partition_num = 2 + +# COMMAND ---------- + +df = ff.assemble_llm_feature(spark, srcDirectory= "/dbfs/tmp/li_yu/va_llms/pdf", llmFeature=llm_feature, partitionNum=partition_num) + +# COMMAND ---------- + +display(df) + +# COMMAND ---------- + + diff --git a/requirements.txt b/requirements.txt index 53427bb..d8f7401 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,7 +4,13 @@ py4j==0.10.9 pyarrow==5.0.0 pyspark==3.1.3 python-dateutil==2.8.1 +pdf2image>=1.16.3 scipy==1.7.1 six==1.15.0 coverage - +langchain>=0.0.317 +llama-index>=0.8.61 +pypdf>=3.17.0 +PyPDF2>=3.0.1 +transformers>=4.31.0 +unstructured>=0.10.30 diff --git a/test/data/sample.pdf b/test/data/sample.pdf new file mode 100644 index 0000000..dbf091d Binary files /dev/null and b/test/data/sample.pdf differ diff --git a/test/test_chunking.py b/test/test_chunking.py new file mode 100644 index 0000000..7c90b97 --- /dev/null +++ b/test/test_chunking.py @@ -0,0 +1,116 @@ +import unittest +from framework.feature_factory.feature import Feature, FeatureSet, CompositeFeature +from framework.feature_factory.feature_dict import ImmutableDictBase +from framework.feature_factory import Feature_Factory +from framework.feature_factory.helpers import Helpers +import pyspark.sql.functions as f +import json +from pyspark.sql.types import StructType +from test.local_spark_singleton import SparkSingleton +from framework.feature_factory.catalog import CatalogBase +from enum import IntEnum +from framework.feature_factory.llm_tools import * + + +class TestLLMTools(unittest.TestCase): + + def test_llamaindex_reader(self): + doc_reader = LlamaIndexDocReader() + doc_reader.create() + docs = doc_reader.apply("test/data/sample.pdf") + assert len(docs) == 2 + + def test_llamaindex_splitter(self): + doc_reader = LlamaIndexDocReader() + doc_reader.create() + docs = doc_reader.apply("test/data/sample.pdf") + + doc_splitter = LlamaIndexDocSplitter() + doc_splitter.create() + chunks = doc_splitter.apply(docs=docs) + assert len(chunks) == 2 + + def test_recursive_splitter(self): + doc_splitter = LangChainRecursiveCharacterTextSplitter(chunk_size = 200, chunk_overlap=10) + doc_splitter.create() + txt = "a"*200 + "b"*30 + chunks = doc_splitter.apply(txt) + assert len(chunks) == 2 and len(chunks[0]) == 200 and len(chunks[1]) == (30+10) + + def test_recursive_splitter_llamaindex_docs(self): + doc_reader = LlamaIndexDocReader() + docs = doc_reader.apply("test/data/sample.pdf") + + doc_splitter = LangChainRecursiveCharacterTextSplitter(chunk_size = 200, chunk_overlap=10) + chunks = doc_splitter.apply(docs=docs) + assert len(chunks) > 0 + assert doc_splitter._require_init() == False + + def test_process_docs(self): + doc_reader = LlamaIndexDocReader() + doc_splitter = LlamaIndexDocSplitter() + llm_feature = LLMFeature("test_llm", reader=doc_reader, splitter=doc_splitter) + chunks = LLMUtils.process_docs(["test/data/sample.pdf"], llmFeat=llm_feature) + for chunk in chunks: + assert len(chunk) == 1 + + def test_wrap_docs(self): + doc = LCDocument(page_content="test", metadata={"filepath": "/tmp/filename"}) + docs = DocSplitter._wrap_docs(doc) + assert docs[0] == doc + + doc = Document(txt="test") + docs = DocSplitter._wrap_docs(doc) + assert docs[0] == doc + + docs = DocSplitter._wrap_docs("test") + assert docs == "test" + + + def test_convert_to_text(self): + assert DocSplitter._to_text(None) == "" + doc = LCDocument(page_content="test", metadata={"filepath": "/tmp/filename"}) + txt = DocSplitter._to_text([doc]) + assert txt == "test" + + doc = Document(text="test") + txt = DocSplitter._to_text([doc]) + assert txt == "test" + + txt = DocSplitter._to_text("test") + assert txt == "test" + + def test_convert_to_document(self): + assert DocSplitter._to_documents(None) is None + doc = LCDocument(page_content="test", metadata={"filepath": "/tmp/filename"}) + new_docs = DocSplitter._to_documents([doc]) + assert new_docs[0].text == "test" + + doc = Document(text="test") + new_docs = DocSplitter._to_documents([doc]) + assert new_docs[0].text == "test" + + new_docs = DocSplitter._to_documents("test") + assert new_docs[0].text == "test" + + def test_convert_to_lcdocument(self): + assert DocSplitter._to_lcdocuments(None) is None + doc = LCDocument(page_content="test", metadata={"filepath": "/tmp/filename"}) + new_docs = DocSplitter._to_lcdocuments([doc]) + assert new_docs[0].page_content == "test" + + doc = Document(text="test") + new_docs = DocSplitter._to_lcdocuments([doc]) + assert new_docs[0].page_content == "test" + + new_docs = DocSplitter._to_lcdocuments("test") + assert new_docs[0].page_content == "test" + + def test_token_splitter(self): + doc_reader = LlamaIndexDocReader() + docs = doc_reader.apply("./test/data/sample.pdf") + assert len(docs) > 0 + doc_splitter = TokenizerTextSpliter(chunk_size=1024, chunk_overlap=32, pretrained_tokenizer_path="hf-internal-testing/llama-tokenizer") + chunks = doc_splitter.apply(docs) + assert len(chunks) == 1 + \ No newline at end of file diff --git a/test/test_feature_catalog.py b/test/test_feature_catalog.py index 4b12c84..9336dbf 100644 --- a/test/test_feature_catalog.py +++ b/test/test_feature_catalog.py @@ -10,6 +10,7 @@ from framework.feature_factory.catalog import CatalogBase from enum import IntEnum +spark = SparkSingleton.get_instance() class CommonCatalog(CatalogBase): total_sales = Feature.create( base_col=f.col("ss_net_paid").cast("float"),