From a21e3aa3b4ed45ca0d8d376ac02976610786671f Mon Sep 17 00:00:00 2001 From: Li Yu Date: Sun, 5 Nov 2023 21:37:48 -0500 Subject: [PATCH 01/11] add llm tools --- framework/feature_factory/__init__.py | 15 +--- framework/feature_factory/llm_tools.py | 113 +++++++++++++++++++++++++ requirements.txt | 4 +- 3 files changed, 118 insertions(+), 14 deletions(-) create mode 100644 framework/feature_factory/llm_tools.py diff --git a/framework/feature_factory/__init__.py b/framework/feature_factory/__init__.py index 753c373..f63c590 100644 --- a/framework/feature_factory/__init__.py +++ b/framework/feature_factory/__init__.py @@ -33,25 +33,15 @@ def append_features(self, df: DataFrame, groupBy_cols, feature_sets: List[Featur """ # If groupBy Column is past in as something other than list, convert to list # Validation - If features, passed in is dict, convert to list of vals, etc. - # groupBy_cols = self.helpers._to_list(groupBy_cols) - # groupBy_cols, groupBy_joiners = self.helpers._extract_groupby_joiner(groupBy_cols) groupBy_cols = [gc.assembled_column if isinstance(gc, Feature) else gc for gc in groupBy_cols] features, dups = self.helpers._dedup_fast(df, [feature for feature_set in feature_sets for feature in feature_set.features.values()]) - # df = self.helpers._resolve_feature_joiners(df, features, groupBy_joiners).repartition(*groupBy_cols) df = df.repartition(*groupBy_cols) # feature_cols = [] agg_cols = [] non_agg_cols = {} features_to_drop = [] - # base_cols = [f.base_col for f in features] - - # column validation - # valid_result, undef_cols = self.helpers.validate_col(df, *base_cols) - # assert valid_result, "base cols {} are not defined in df columns {}".format(undef_cols, df.columns) - - # valid_result, undef_cols = self.helpers._validate_col(df, *groupBy_cols) - # assert valid_result, "groupby cols {} are not defined in df columns {}".format(undef_cols, df.columns) + granularity_validator = AggregationGranularity(granularityEnum) if granularityEnum else None for feature in features: assert True if ((len(feature.aggs) > 0) and (len( @@ -77,8 +67,7 @@ def append_features(self, df: DataFrame, groupBy_cols, feature_sets: List[Featur df = df.withColumn(fn, col) final_df = df.drop(*features_to_drop) - # else: - # new_df = df.select(*df.columns + feature_cols) + return final_df def append_catalog(self, df: DataFrame, groupBy_cols, catalog_cls, feature_names = [], withTrendsForFeatures: List[FeatureSet] = None, granularityEnum: Enum = None): diff --git a/framework/feature_factory/llm_tools.py b/framework/feature_factory/llm_tools.py new file mode 100644 index 0000000..a6edb97 --- /dev/null +++ b/framework/feature_factory/llm_tools.py @@ -0,0 +1,113 @@ +from abc import ABC, abstractmethod +from typing import List, Union +from llama_index import SimpleDirectoryReader +from llama_index.node_parser import SimpleNodeParser +from llama_index.node_parser.extractors import ( + MetadataExtractor, + TitleExtractor +) +from llama_index.text_splitter import TokenTextSplitter +from llama_index.schema import MetadataMode, Document +from langchain.text_splitter import RecursiveCharacterTextSplitter + + +class LLMTool(ABC): + + @abstractmethod + def create(self): + ... + + @abstractmethod + def apply(self): + ... + + +class DocReader(LLMTool): + + def apply(self, filename: str) -> Union[str, List[Document]]: + ... + +class DocSplitter(LLMTool): + + def apply(self, docs: Union[str, List[Document]]) -> List[str]: + ... + + +class LlamaIndexDocReader(DocReader): + + def apply(self): + documents = SimpleDirectoryReader(input_files=self.filenames).load_data() + return documents + + +class LLMDef(LLMTool): + + def __init__(self) -> None: + self._instance = None + + def get_instance(self): + return self._instance + + + +class LlamaIndexDocSplitter(DocSplitter): + + def __init__(self, chunk_size:int=1024, chunk_overlap:int=64, llm:LLMDef=None) -> None: + self.chunk_size = chunk_size + self.chunk_overlap = chunk_overlap + self.llm = llm + + def create(self): + text_splitter = TokenTextSplitter( + separator=" ", chunk_size=self.chunk_size, chunk_overlap=self.chunk_overlap + ) + if self.llm: + self.llm.create() + metadata_extractor = MetadataExtractor( + extractors=[ + TitleExtractor(nodes=5, llm=self.llm.get_instance()) + ], + in_place=False, + ) + else: + metadata_extractor = None + + self.node_parser = SimpleNodeParser.from_defaults( + text_splitter=text_splitter, + metadata_extractor=metadata_extractor, + ) + return None + + def apply(self, docs): + doc_nodes = self.node_parser.get_nodes_from_documents(docs) + chunks = [node.get_content(metadata_mode=MetadataMode.LLM) for node in doc_nodes] + return chunks + + +class LangChainRecursiveCharacterTextSplitter(DocSplitter): + + def __init__(self, chunk_size=1024, chunk_overlap=64) -> None: + self.chunk_size = chunk_size + self.chunk_overlap = chunk_overlap + + def create(self): + self.text_splitter = RecursiveCharacterTextSplitter( + chunk_size = self.chunk_size, + chunk_overlap = self.chunk_overlap + ) + + def apply(self, docs): + chunks = self.text_splitter.split_text(docs) + return chunks + + +class ParserToolbox(LLMTool): + + def __init__(self, reader: DocReader, splitter: DocSplitter) -> None: + self.reader = reader + self.splitter = splitter + + def apply(self, filename: str): + docs = self.reader.apply(filename) + return self.splitter.apply(docs) + diff --git a/requirements.txt b/requirements.txt index 53427bb..fa3b148 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,4 +7,6 @@ python-dateutil==2.8.1 scipy==1.7.1 six==1.15.0 coverage - +langchain>=0.0.317 +llama-index>=0.8.61 +pypdf>=3.17.0 \ No newline at end of file From 22370b38a829a22317b07903c8f74325d7707f6f Mon Sep 17 00:00:00 2001 From: Li Yu Date: Wed, 8 Nov 2023 21:05:17 -0500 Subject: [PATCH 02/11] add mapparitions for generating chunks col --- framework/feature_factory/__init__.py | 23 +++++++++++++++- framework/feature_factory/helpers.py | 13 +++++++-- framework/feature_factory/llm_tools.py | 16 ++++++++--- test/test_chunking.py | 38 ++++++++++++++++++++++++++ test/test_feature_catalog.py | 1 + 5 files changed, 83 insertions(+), 8 deletions(-) create mode 100644 test/test_chunking.py diff --git a/framework/feature_factory/__init__.py b/framework/feature_factory/__init__.py index f63c590..ca468d5 100644 --- a/framework/feature_factory/__init__.py +++ b/framework/feature_factory/__init__.py @@ -1,11 +1,12 @@ from pyspark.sql.functions import col, lit, when, struct from pyspark.sql.column import Column -from pyspark.sql import functions as F +from pyspark.sql import functions as F, SparkSession from pyspark.sql.dataframe import DataFrame from framework.feature_factory.feature import Feature, FeatureSet, Multiplier from framework.configobj import ConfigObj from framework.feature_factory.helpers import Helpers from framework.feature_factory.agg_granularity import AggregationGranularity +from framework.feature_factory.llm_tools import LLMFeature import re import logging import datetime @@ -85,3 +86,23 @@ def append_catalog(self, df: DataFrame, groupBy_cols, catalog_cls, feature_names fs = FeatureSet(dct) return self.append_features(df, groupBy_cols, [fs], withTrendsForFeatures, granularityEnum) + def assemble_llm_feature(self, spark: SparkSession, srcDirectory: str, llmFeature: LLMFeature, partitionNum: int): + + def split_docs(fileName: str, llmFeat: LLMFeature): + print(fileName) + chunks = llmFeat.apply(fileName) + return (chunks,) + + def process_docs(partitionData, llmFeat): + llmFeat.create() + for row in partitionData: + yield split_docs(row, llmFeat) + + src_pdf_loc = "/dbfs/tmp/li_yu/va_llms/pdf" + all_files = self.helpers.list_files_recursively(src_pdf_loc) + src_rdd = spark.sparkContext.parallelize(all_files, partitionNum) + + rdd = src_rdd.mapPartitions(lambda partition_data: process_docs(partitionData=partition_data, llmFeat=llmFeature)).repartition(partitionNum) + rdd.cache() + df = rdd.toDF([llmFeature.name]) + return df.select(F.explode(llmFeature.name).alias(llmFeature.name)) \ No newline at end of file diff --git a/framework/feature_factory/helpers.py b/framework/feature_factory/helpers.py index 1f250eb..2f48844 100644 --- a/framework/feature_factory/helpers.py +++ b/framework/feature_factory/helpers.py @@ -3,17 +3,16 @@ from pyspark.sql import functions as F from pyspark.sql.dataframe import DataFrame from framework.feature_factory.feature import Feature -from framework.spark_singleton import SparkSingleton from framework import feature_factory from datetime import datetime from datetime import timedelta -import inspect +import os from collections import OrderedDict import logging logger = logging.getLogger(__name__) -spark = SparkSingleton.get_instance() +# spark = SparkSingleton.get_instance() class Helpers: @@ -325,6 +324,14 @@ def get_months_range(self, dt: datetime.date, months: int): dt2 = self.subtract_months(dt, i) months_range.append(dt2) return months_range + + def list_files_recursively(self, directory: str): + file_list = [] + for root, _, files in os.walk(directory): + for file in files: + file_path = os.path.join(root, file) + file_list.append(file_path) + return file_list class Converter: def __init__(self, items, target_type): diff --git a/framework/feature_factory/llm_tools.py b/framework/feature_factory/llm_tools.py index a6edb97..9c70ec3 100644 --- a/framework/feature_factory/llm_tools.py +++ b/framework/feature_factory/llm_tools.py @@ -35,8 +35,11 @@ def apply(self, docs: Union[str, List[Document]]) -> List[str]: class LlamaIndexDocReader(DocReader): - def apply(self): - documents = SimpleDirectoryReader(input_files=self.filenames).load_data() + def create(self): + ... + + def apply(self, filename: str): + documents = SimpleDirectoryReader(input_files=[filename]).load_data() return documents @@ -101,11 +104,16 @@ def apply(self, docs): return chunks -class ParserToolbox(LLMTool): +class LLMFeature(LLMTool): - def __init__(self, reader: DocReader, splitter: DocSplitter) -> None: + def __init__(self, name: str, reader: DocReader, splitter: DocSplitter) -> None: + self.name = name self.reader = reader self.splitter = splitter + + def create(self): + self.reader.create() + self.splitter.create() def apply(self, filename: str): docs = self.reader.apply(filename) diff --git a/test/test_chunking.py b/test/test_chunking.py new file mode 100644 index 0000000..8c63308 --- /dev/null +++ b/test/test_chunking.py @@ -0,0 +1,38 @@ +import unittest +from framework.feature_factory.feature import Feature, FeatureSet, CompositeFeature +from framework.feature_factory.feature_dict import ImmutableDictBase +from framework.feature_factory import Feature_Factory +from framework.feature_factory.helpers import Helpers +import pyspark.sql.functions as f +import json +from pyspark.sql.types import StructType +from test.local_spark_singleton import SparkSingleton +from framework.feature_factory.catalog import CatalogBase +from enum import IntEnum +from framework.feature_factory.llm_tools import * + + +class TestLLMTools(unittest.TestCase): + + def test_llamaindex_reader(self): + doc_reader = LlamaIndexDocReader() + doc_reader.create() + docs = doc_reader.apply("test/data/sample.pdf") + assert len(docs) == 2 + + def test_llamaindex_splitter(self): + doc_reader = LlamaIndexDocReader() + doc_reader.create() + docs = doc_reader.apply("test/data/sample.pdf") + + doc_splitter = LlamaIndexDocSplitter() + doc_splitter.create() + chunks = doc_splitter.apply(docs=docs) + assert len(chunks) == 2 + + def test_recursive_splitter(self): + doc_splitter = LangChainRecursiveCharacterTextSplitter(chunk_size = 200, chunk_overlap=10) + doc_splitter.create() + txt = "a"*200 + "b"*30 + chunks = doc_splitter.apply(txt) + assert len(chunks) == 2 and len(chunks[0]) == 200 and len(chunks[1]) == (30+10) diff --git a/test/test_feature_catalog.py b/test/test_feature_catalog.py index 4b12c84..9336dbf 100644 --- a/test/test_feature_catalog.py +++ b/test/test_feature_catalog.py @@ -10,6 +10,7 @@ from framework.feature_factory.catalog import CatalogBase from enum import IntEnum +spark = SparkSingleton.get_instance() class CommonCatalog(CatalogBase): total_sales = Feature.create( base_col=f.col("ss_net_paid").cast("float"), From 7df4bc41c602b9333848ca720a6c6ba2a737b5a4 Mon Sep 17 00:00:00 2001 From: Li Yu Date: Fri, 10 Nov 2023 10:26:39 -0500 Subject: [PATCH 03/11] check the resource and create if not created before running apply logic --- framework/feature_factory/llm_tools.py | 98 ++++++++++++++++++-------- test/test_chunking.py | 10 +++ 2 files changed, 80 insertions(+), 28 deletions(-) diff --git a/framework/feature_factory/llm_tools.py b/framework/feature_factory/llm_tools.py index 9c70ec3..1211542 100644 --- a/framework/feature_factory/llm_tools.py +++ b/framework/feature_factory/llm_tools.py @@ -7,18 +7,29 @@ TitleExtractor ) from llama_index.text_splitter import TokenTextSplitter -from llama_index.schema import MetadataMode, Document +from llama_index.schema import MetadataMode, Document as Document +from langchain.docstore.document import Document as LCDocument from langchain.text_splitter import RecursiveCharacterTextSplitter class LLMTool(ABC): + def __init__(self) -> None: + self._initialized = False + + def _require_init(self) -> bool: + if self._initialized: + return False + else: + self._initialized = True + return True + @abstractmethod - def create(self): + def apply(self): ... @abstractmethod - def apply(self): + def create(self): ... @@ -27,14 +38,35 @@ class DocReader(LLMTool): def apply(self, filename: str) -> Union[str, List[Document]]: ... + class DocSplitter(LLMTool): + def __init__(self) -> None: + super().__init__() + + def _to_text(self, docs: Union[str, List[Document], List[LCDocument]]): + if not docs: + return "" + if isinstance(docs, str): + return docs + if isinstance(docs[0], Document): + texts = [doc.get_content(metadata_mode=MetadataMode.LLM) for doc in docs] + return "\n\n".join(texts) + if isinstance(docs[0], LCDocument): + texts = [doc.page_content for doc in docs] + return "\n\n".join(texts) + raise ValueError("doc type is not defined.") + + def apply(self, docs: Union[str, List[Document]]) -> List[str]: ... class LlamaIndexDocReader(DocReader): + def __init__(self) -> None: + super().__init__() + def create(self): ... @@ -56,32 +88,35 @@ def get_instance(self): class LlamaIndexDocSplitter(DocSplitter): def __init__(self, chunk_size:int=1024, chunk_overlap:int=64, llm:LLMDef=None) -> None: + super().__init__() self.chunk_size = chunk_size self.chunk_overlap = chunk_overlap self.llm = llm def create(self): - text_splitter = TokenTextSplitter( - separator=" ", chunk_size=self.chunk_size, chunk_overlap=self.chunk_overlap - ) - if self.llm: - self.llm.create() - metadata_extractor = MetadataExtractor( - extractors=[ - TitleExtractor(nodes=5, llm=self.llm.get_instance()) - ], - in_place=False, + if super()._require_init(): + text_splitter = TokenTextSplitter( + separator=" ", chunk_size=self.chunk_size, chunk_overlap=self.chunk_overlap + ) + if self.llm: + self.llm.create() + metadata_extractor = MetadataExtractor( + extractors=[ + TitleExtractor(nodes=5, llm=self.llm.get_instance()) + ], + in_place=False, + ) + else: + metadata_extractor = None + + self.node_parser = SimpleNodeParser.from_defaults( + text_splitter=text_splitter, + metadata_extractor=metadata_extractor, ) - else: - metadata_extractor = None - - self.node_parser = SimpleNodeParser.from_defaults( - text_splitter=text_splitter, - metadata_extractor=metadata_extractor, - ) return None - def apply(self, docs): + def apply(self, docs: List[Document]): + self.create() doc_nodes = self.node_parser.get_nodes_from_documents(docs) chunks = [node.get_content(metadata_mode=MetadataMode.LLM) for node in doc_nodes] return chunks @@ -90,32 +125,39 @@ def apply(self, docs): class LangChainRecursiveCharacterTextSplitter(DocSplitter): def __init__(self, chunk_size=1024, chunk_overlap=64) -> None: + super().__init__() self.chunk_size = chunk_size self.chunk_overlap = chunk_overlap def create(self): - self.text_splitter = RecursiveCharacterTextSplitter( - chunk_size = self.chunk_size, - chunk_overlap = self.chunk_overlap - ) + if super()._require_init(): + self.text_splitter = RecursiveCharacterTextSplitter( + chunk_size = self.chunk_size, + chunk_overlap = self.chunk_overlap + ) def apply(self, docs): - chunks = self.text_splitter.split_text(docs) + self.create() + txt = super()._to_text(docs) + chunks = self.text_splitter.split_text(txt) return chunks class LLMFeature(LLMTool): def __init__(self, name: str, reader: DocReader, splitter: DocSplitter) -> None: + super().__init__() self.name = name self.reader = reader self.splitter = splitter def create(self): - self.reader.create() - self.splitter.create() + if super()._require_init(): + self.reader.create() + self.splitter.create() def apply(self, filename: str): + self.create() docs = self.reader.apply(filename) return self.splitter.apply(docs) diff --git a/test/test_chunking.py b/test/test_chunking.py index 8c63308..95834c8 100644 --- a/test/test_chunking.py +++ b/test/test_chunking.py @@ -36,3 +36,13 @@ def test_recursive_splitter(self): txt = "a"*200 + "b"*30 chunks = doc_splitter.apply(txt) assert len(chunks) == 2 and len(chunks[0]) == 200 and len(chunks[1]) == (30+10) + + def test_recursive_splitter_llamaindex_docs(self): + doc_reader = LlamaIndexDocReader() + docs = doc_reader.apply("test/data/sample.pdf") + + doc_splitter = LangChainRecursiveCharacterTextSplitter(chunk_size = 200, chunk_overlap=10) + chunks = doc_splitter.apply(docs=docs) + assert len(chunks) > 0 + assert doc_splitter._require_init() == False + From 9e961fb71aabb57c79bdf855806262fe16115388 Mon Sep 17 00:00:00 2001 From: Li Yu Date: Fri, 10 Nov 2023 14:04:21 -0500 Subject: [PATCH 04/11] add tokenizer to the text splitter --- framework/feature_factory/llm_tools.py | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/framework/feature_factory/llm_tools.py b/framework/feature_factory/llm_tools.py index 1211542..47aa63f 100644 --- a/framework/feature_factory/llm_tools.py +++ b/framework/feature_factory/llm_tools.py @@ -10,7 +10,7 @@ from llama_index.schema import MetadataMode, Document as Document from langchain.docstore.document import Document as LCDocument from langchain.text_splitter import RecursiveCharacterTextSplitter - +from transformers import AutoTokenizer class LLMTool(ABC): @@ -120,21 +120,28 @@ def apply(self, docs: List[Document]): doc_nodes = self.node_parser.get_nodes_from_documents(docs) chunks = [node.get_content(metadata_mode=MetadataMode.LLM) for node in doc_nodes] return chunks - + class LangChainRecursiveCharacterTextSplitter(DocSplitter): - def __init__(self, chunk_size=1024, chunk_overlap=64) -> None: + def __init__(self, chunk_size=1024, chunk_overlap=64, pretrained_model_path: str=None) -> None: super().__init__() self.chunk_size = chunk_size self.chunk_overlap = chunk_overlap + self.pretrained_model_path = pretrained_model_path def create(self): if super()._require_init(): - self.text_splitter = RecursiveCharacterTextSplitter( - chunk_size = self.chunk_size, - chunk_overlap = self.chunk_overlap - ) + if self.pretrained_model_path: + tokenizer = AutoTokenizer.from_pretrained(self.pretrained_model_path) + self.text_splitter = RecursiveCharacterTextSplitter.from_huggingface_tokenizer(tokenizer, + chunk_size=self.chunk_size, + chunk_overlap=self.chunk_overlap) + else: + self.text_splitter = RecursiveCharacterTextSplitter( + chunk_size = self.chunk_size, + chunk_overlap = self.chunk_overlap + ) def apply(self, docs): self.create() From 38abacacd57cf8e909c1f8b9f5a479e4032bd2e6 Mon Sep 17 00:00:00 2001 From: Li Yu Date: Fri, 10 Nov 2023 14:04:40 -0500 Subject: [PATCH 05/11] add llm notebooks --- notebooks/feature_factory_llms.py | 84 +++++++++++++++++++++++++++++++ 1 file changed, 84 insertions(+) create mode 100644 notebooks/feature_factory_llms.py diff --git a/notebooks/feature_factory_llms.py b/notebooks/feature_factory_llms.py new file mode 100644 index 0000000..92b98c5 --- /dev/null +++ b/notebooks/feature_factory_llms.py @@ -0,0 +1,84 @@ +# Databricks notebook source +# MAGIC %pip install llama-index pypdf + +# COMMAND ---------- + +# MAGIC %pip install typing_extensions==4.7.1 + +# COMMAND ---------- + +dbutils.library.restartPython() + +# COMMAND ---------- + +from framework.feature_factory.llm_tools import LLMFeature, LlamaIndexDocReader, LlamaIndexDocSplitter, LLMDef +from framework.feature_factory import Feature_Factory +import torch +from llama_index.llms import HuggingFaceLLM + +# COMMAND ---------- + +ff = Feature_Factory() + +# COMMAND ---------- + +class MPT7b(LLMDef): + def create(self): + torch.cuda.empty_cache() + generate_params = { + "temperature": 1.0, + "top_p": 1.0, + "top_k": 50, + "use_cache": True, + "do_sample": True, + "eos_token_id": 0, + "pad_token_id": 0 + } + + self._instance = HuggingFaceLLM( + max_new_tokens=256, + generate_kwargs=generate_params, + # system_prompt=system_prompt, + # query_wrapper_prompt=query_wrapper_prompt, + tokenizer_name="mosaicml/mpt-7b-instruct", + model_name="mosaicml/mpt-7b-instruct", + device_map="auto", + tokenizer_kwargs={"max_length": 1024}, + model_kwargs={"torch_dtype": torch.float16, "trust_remote_code": True} + ) + return None + + def apply(self): + ... + +# COMMAND ---------- + +doc_splitter = LlamaIndexDocSplitter( + chunk_size = 1024, + chunk_overlap = 32, + llm = MPT7b() +) + +# COMMAND ---------- + +llm_feature = LLMFeature ( + name = "chunks", + reader = LlamaIndexDocReader(), + splitter = doc_splitter +) + +# COMMAND ---------- + +partition_num = 2 + +# COMMAND ---------- + +df = ff.assemble_llm_feature(spark, srcDirectory= "/dbfs/tmp/li_yu/va_llms/pdf", llmFeature=llm_feature, partitionNum=partition_num) + +# COMMAND ---------- + +display(df) + +# COMMAND ---------- + + From e78ac5565a8a494ba28aaed343b35e0f2b56d420 Mon Sep 17 00:00:00 2001 From: Li Yu Date: Fri, 10 Nov 2023 14:24:20 -0500 Subject: [PATCH 06/11] add dependencies for langchain and llamaindex --- requirements.txt | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index fa3b148..e89dbf0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,4 +9,6 @@ six==1.15.0 coverage langchain>=0.0.317 llama-index>=0.8.61 -pypdf>=3.17.0 \ No newline at end of file +pypdf>=3.17.0 +PyPDF2>=3.0.1 +transformers>=4.31.0 \ No newline at end of file From 67f33eb480bc4147fa3e41819fbea411785f9cd5 Mon Sep 17 00:00:00 2001 From: Li Yu Date: Fri, 10 Nov 2023 14:42:09 -0500 Subject: [PATCH 07/11] add sample file for testing --- test/data/sample.pdf | Bin 0 -> 3028 bytes 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 test/data/sample.pdf diff --git a/test/data/sample.pdf b/test/data/sample.pdf new file mode 100644 index 0000000000000000000000000000000000000000..dbf091df9a607221e000593a8b5a97b5ea5fb073 GIT binary patch literal 3028 zcmd5;+iK%D7``|79sZj_3mw^3d>n>>rft|$r=<Gl zWIx$CH11?D%do5oLHZ^AN9p^&qnnG-8;=ca>*%k)|M=6kY|A5;iigj(_3oW*IpgQ0 zB;?N?G}O?F~e-o&fdSbEveGxxNVs&T}_+wIC);wN|S3_`=^Ym z?y1Je_6W!5=Pa%0o_u4M!sh=IbybO>E+wq5{dR6;Rn+AKku*_{3aqswkCH}v ztJ}FLi^-kT6dU1Mb|uqH42vhacOeZu&Rl#HCGFr-xBWM-|+e^~95C3kuyCa8Nz&=d3 zPw9GoO7mhx4-J@*eqI7o0NLmbm9D2#M#EZ@Dl~~|vk9Y>(382@*~jiaPA^4vloGW^${BO z&|L0&$FyZ34q1)S0TXjiAeyzye;HJqPhX+oj`M@hIuz@m%ZWTgO?gR!qsqvQPqV zVBwTl{djT$Lm)?KJ&`!^p- zB?yU29^x`|s{JSof_pN9Oqel#YyZFw~B0kRS*9GB0?&&kJAg z<34|7m-_(#cV8b5!0fg%+X9aQc`Db0`!4$;o1mTB0`JJMabTnKqnZ}rgzd~^$`C_Q S>NZV0@_bPEqs!}&ZT$n`rwj1_ literal 0 HcmV?d00001 From 06f336720375d4ecc6e0ec162a1ff922650ddc25 Mon Sep 17 00:00:00 2001 From: Li Yu Date: Fri, 10 Nov 2023 15:26:24 -0500 Subject: [PATCH 08/11] changed to relative path --- channelDemoCatalog/__init__.py | 2 +- channelDemoStore/__init__.py | 2 +- channelDemoWeb/__init__.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/channelDemoCatalog/__init__.py b/channelDemoCatalog/__init__.py index f6ed3c3..0bf3395 100644 --- a/channelDemoCatalog/__init__.py +++ b/channelDemoCatalog/__init__.py @@ -1,5 +1,5 @@ from framework.channel import Channel -from channelDemoStore.sales import Sales +from .sales import Sales from featurefamily_common.trends import TrendsCommon from framework.feature_factory.dtm import DateTimeManager from framework.feature_factory import Helpers diff --git a/channelDemoStore/__init__.py b/channelDemoStore/__init__.py index e7d544b..cc79fff 100644 --- a/channelDemoStore/__init__.py +++ b/channelDemoStore/__init__.py @@ -1,5 +1,5 @@ from framework.channel import Channel -from channelDemoStore.sales import Sales +from .sales import Sales from featurefamily_common.trends import TrendsCommon from framework.feature_factory.dtm import DateTimeManager from framework.feature_factory import Helpers diff --git a/channelDemoWeb/__init__.py b/channelDemoWeb/__init__.py index 4f7efde..2621fa6 100644 --- a/channelDemoWeb/__init__.py +++ b/channelDemoWeb/__init__.py @@ -1,5 +1,5 @@ from framework.channel import Channel -from channelDemoStore.sales import Sales +from .sales import Sales from featurefamily_common.trends import TrendsCommon from framework.feature_factory.dtm import DateTimeManager from framework.feature_factory import Helpers From e7eae7015ead92a7df323939fa8e1b30970ecbf8 Mon Sep 17 00:00:00 2001 From: Li Yu Date: Fri, 10 Nov 2023 16:50:10 -0500 Subject: [PATCH 09/11] fix build issue --- channelDemoCatalog/sales.py | 2 ++ channelDemoStore/sales.py | 2 ++ channelDemoWeb/sales.py | 2 ++ 3 files changed, 6 insertions(+) diff --git a/channelDemoCatalog/sales.py b/channelDemoCatalog/sales.py index 9bab1dd..3677360 100644 --- a/channelDemoCatalog/sales.py +++ b/channelDemoCatalog/sales.py @@ -5,12 +5,14 @@ from framework.feature_factory.feature_family import FeatureFamily from framework.feature_factory import Helpers from framework.configobj import ConfigObj +from framework.spark_singleton import SparkSingleton import inspect # joiner_func = Helpers()._register_joiner_func() multipliable = Helpers()._register_feature_func() base_feat = Helpers()._register_feature_func() +spark = SparkSingleton.get_instance() # Extend FeatureFamily class Sales(FeatureFamily): diff --git a/channelDemoStore/sales.py b/channelDemoStore/sales.py index f9ae16a..30a6160 100644 --- a/channelDemoStore/sales.py +++ b/channelDemoStore/sales.py @@ -4,12 +4,14 @@ from framework.feature_factory.feature_family import FeatureFamily from framework.feature_factory import Helpers from framework.configobj import ConfigObj +from framework.spark_singleton import SparkSingleton import inspect # joiner_func = Helpers()._register_joiner_func() multipliable = Helpers()._register_feature_func() base_feat = Helpers()._register_feature_func() +spark = SparkSingleton.get_instance() # Extend FeatureFamily class Sales(FeatureFamily): diff --git a/channelDemoWeb/sales.py b/channelDemoWeb/sales.py index 69b0663..15ff996 100644 --- a/channelDemoWeb/sales.py +++ b/channelDemoWeb/sales.py @@ -5,12 +5,14 @@ from framework.feature_factory.feature_family import FeatureFamily from framework.feature_factory import Helpers from framework.configobj import ConfigObj +from framework.spark_singleton import SparkSingleton import inspect # joiner_func = Helpers()._register_joiner_func() multipliable = Helpers()._register_feature_func() base_feat = Helpers()._register_feature_func() +spark = SparkSingleton.get_instance() # Extend FeatureFamily class Sales(FeatureFamily): From 09d88b8d1c5dc5f670d307b1f9bdc8846d2bc05e Mon Sep 17 00:00:00 2001 From: Li Yu Date: Sat, 11 Nov 2023 14:43:12 -0500 Subject: [PATCH 10/11] utils funcs to convert between string, document and lcdocument --- framework/feature_factory/__init__.py | 17 +---- framework/feature_factory/llm_tools.py | 95 ++++++++++++++++++++++++-- requirements.txt | 4 +- test/test_chunking.py | 65 ++++++++++++++++++ 4 files changed, 159 insertions(+), 22 deletions(-) diff --git a/framework/feature_factory/__init__.py b/framework/feature_factory/__init__.py index ca468d5..b3359fd 100644 --- a/framework/feature_factory/__init__.py +++ b/framework/feature_factory/__init__.py @@ -6,7 +6,7 @@ from framework.configobj import ConfigObj from framework.feature_factory.helpers import Helpers from framework.feature_factory.agg_granularity import AggregationGranularity -from framework.feature_factory.llm_tools import LLMFeature +from framework.feature_factory.llm_tools import LLMFeature, LLMUtils import re import logging import datetime @@ -88,21 +88,10 @@ def append_catalog(self, df: DataFrame, groupBy_cols, catalog_cls, feature_names def assemble_llm_feature(self, spark: SparkSession, srcDirectory: str, llmFeature: LLMFeature, partitionNum: int): - def split_docs(fileName: str, llmFeat: LLMFeature): - print(fileName) - chunks = llmFeat.apply(fileName) - return (chunks,) - - def process_docs(partitionData, llmFeat): - llmFeat.create() - for row in partitionData: - yield split_docs(row, llmFeat) - - src_pdf_loc = "/dbfs/tmp/li_yu/va_llms/pdf" - all_files = self.helpers.list_files_recursively(src_pdf_loc) + all_files = self.helpers.list_files_recursively(srcDirectory) src_rdd = spark.sparkContext.parallelize(all_files, partitionNum) - rdd = src_rdd.mapPartitions(lambda partition_data: process_docs(partitionData=partition_data, llmFeat=llmFeature)).repartition(partitionNum) + rdd = src_rdd.mapPartitions(lambda partition_data: LLMUtils.process_docs(partitionData=partition_data, llmFeat=llmFeature)).repartition(partitionNum) rdd.cache() df = rdd.toDF([llmFeature.name]) return df.select(F.explode(llmFeature.name).alias(llmFeature.name)) \ No newline at end of file diff --git a/framework/feature_factory/llm_tools.py b/framework/feature_factory/llm_tools.py index 47aa63f..ddeaf83 100644 --- a/framework/feature_factory/llm_tools.py +++ b/framework/feature_factory/llm_tools.py @@ -1,5 +1,5 @@ from abc import ABC, abstractmethod -from typing import List, Union +from typing import List, Union, Tuple from llama_index import SimpleDirectoryReader from llama_index.node_parser import SimpleNodeParser from llama_index.node_parser.extractors import ( @@ -11,6 +11,8 @@ from langchain.docstore.document import Document as LCDocument from langchain.text_splitter import RecursiveCharacterTextSplitter from transformers import AutoTokenizer +from langchain.document_loaders import UnstructuredPDFLoader + class LLMTool(ABC): @@ -35,6 +37,9 @@ def create(self): class DocReader(LLMTool): + def create(self): + ... + def apply(self, filename: str) -> Union[str, List[Document]]: ... @@ -44,11 +49,21 @@ class DocSplitter(LLMTool): def __init__(self) -> None: super().__init__() - def _to_text(self, docs: Union[str, List[Document], List[LCDocument]]): + @classmethod + def _wrap_docs(cls, docs: Union[Document, LCDocument]): + if isinstance(docs, Document): + docs = [docs] + elif isinstance(docs, LCDocument): + docs = [docs] + return docs + + @classmethod + def _to_text(cls, docs: Union[str, List[Document], List[LCDocument]]): if not docs: return "" if isinstance(docs, str): return docs + docs = cls._wrap_docs(docs) if isinstance(docs[0], Document): texts = [doc.get_content(metadata_mode=MetadataMode.LLM) for doc in docs] return "\n\n".join(texts) @@ -57,6 +72,41 @@ def _to_text(self, docs: Union[str, List[Document], List[LCDocument]]): return "\n\n".join(texts) raise ValueError("doc type is not defined.") + @classmethod + def _to_documents(cls, txt: Union[str, List[Document]]): + if not txt: + return None + txt = cls._wrap_docs(txt) + if isinstance(txt, str): + doc = Document(text=txt) + return [doc] + if isinstance(txt[0], Document): + return txt + if isinstance(txt[0], LCDocument): + new_docs = [] + for doc in txt: + new_doc = Document(text=doc.page_content, metadata=doc.metadata) + new_docs.append(new_doc) + return new_docs + + @classmethod + def _to_lcdocuments(cls, docs: Union[str, List[Document], List[LCDocument]]): + if not docs: + return None + docs = cls._wrap_docs(docs) + if isinstance(docs, str): + new_docs = LCDocument(page_content=docs) + return [new_docs] + if isinstance(docs[0], LCDocument): + return docs + if isinstance(docs[0], Document): + new_docs = [] + for doc in docs: + metadata = doc.metadata or {} + new_doc = LCDocument(page_content=doc.text, metadata=metadata) + new_docs.append(new_doc) + return new_docs + def apply(self, docs: Union[str, List[Document]]) -> List[str]: ... @@ -67,14 +117,30 @@ class LlamaIndexDocReader(DocReader): def __init__(self) -> None: super().__init__() - def create(self): - ... - - def apply(self, filename: str): + def apply(self, filename: str) -> List[Document]: documents = SimpleDirectoryReader(input_files=[filename]).load_data() return documents +class UnstructuredDocReader(DocReader): + + def __init__(self, allowedCategories: Tuple[str]=('NarrativeText', 'ListItem')) -> None: + super().__init__() + self.categories = allowedCategories + + def apply(self, filename: str) -> str: + loader = UnstructuredPDFLoader(filename, mode="elements") + docs = loader.load() + filtered_docs = [] + for d in docs: + if d.metadata.get('category') in self.categories: + filtered_docs.append(d.page_content) + combined_text = "\n\n".join(filtered_docs) + + return combined_text + + + class LLMDef(LLMTool): def __init__(self) -> None: @@ -116,6 +182,7 @@ def create(self): return None def apply(self, docs: List[Document]): + docs = DocSplitter._to_documents(docs) self.create() doc_nodes = self.node_parser.get_nodes_from_documents(docs) chunks = [node.get_content(metadata_mode=MetadataMode.LLM) for node in doc_nodes] @@ -145,7 +212,7 @@ def create(self): def apply(self, docs): self.create() - txt = super()._to_text(docs) + txt = DocSplitter._to_text(docs) chunks = self.text_splitter.split_text(txt) return chunks @@ -168,3 +235,17 @@ def apply(self, filename: str): docs = self.reader.apply(filename) return self.splitter.apply(docs) + +class LLMUtils: + + @classmethod + def split_docs(cls, fileName: str, llmFeat: LLMFeature): + print(fileName) + chunks = llmFeat.apply(fileName) + return (chunks,) + + @classmethod + def process_docs(cls, partitionData, llmFeat): + llmFeat.create() + for row in partitionData: + yield cls.split_docs(row, llmFeat) \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index e89dbf0..d8f7401 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,6 +4,7 @@ py4j==0.10.9 pyarrow==5.0.0 pyspark==3.1.3 python-dateutil==2.8.1 +pdf2image>=1.16.3 scipy==1.7.1 six==1.15.0 coverage @@ -11,4 +12,5 @@ langchain>=0.0.317 llama-index>=0.8.61 pypdf>=3.17.0 PyPDF2>=3.0.1 -transformers>=4.31.0 \ No newline at end of file +transformers>=4.31.0 +unstructured>=0.10.30 diff --git a/test/test_chunking.py b/test/test_chunking.py index 95834c8..e38b155 100644 --- a/test/test_chunking.py +++ b/test/test_chunking.py @@ -46,3 +46,68 @@ def test_recursive_splitter_llamaindex_docs(self): assert len(chunks) > 0 assert doc_splitter._require_init() == False + def test_process_docs(self): + doc_reader = LlamaIndexDocReader() + doc_splitter = LlamaIndexDocSplitter() + llm_feature = LLMFeature("test_llm", reader=doc_reader, splitter=doc_splitter) + chunks = LLMUtils.process_docs(["test/data/sample.pdf"], llmFeat=llm_feature) + for chunk in chunks: + assert len(chunk) == 1 + + def test_wrap_docs(self): + doc = LCDocument(page_content="test", metadata={"filepath": "/tmp/filename"}) + docs = DocSplitter._wrap_docs(doc) + assert docs[0] == doc + + doc = Document(txt="test") + docs = DocSplitter._wrap_docs(doc) + assert docs[0] == doc + + docs = DocSplitter._wrap_docs("test") + assert docs == "test" + + + def test_convert_to_text(self): + assert DocSplitter._to_text(None) == "" + doc = LCDocument(page_content="test", metadata={"filepath": "/tmp/filename"}) + txt = DocSplitter._to_text([doc]) + assert txt == "test" + + doc = Document(text="test") + txt = DocSplitter._to_text([doc]) + assert txt == "test" + + txt = DocSplitter._to_text("test") + assert txt == "test" + + def test_convert_to_document(self): + assert DocSplitter._to_documents(None) is None + doc = LCDocument(page_content="test", metadata={"filepath": "/tmp/filename"}) + new_docs = DocSplitter._to_documents([doc]) + assert new_docs[0].text == "test" + + doc = Document(text="test") + new_docs = DocSplitter._to_documents([doc]) + assert new_docs[0].text == "test" + + new_docs = DocSplitter._to_documents("test") + assert new_docs[0].text == "test" + + def test_convert_to_lcdocument(self): + assert DocSplitter._to_lcdocuments(None) is None + doc = LCDocument(page_content="test", metadata={"filepath": "/tmp/filename"}) + new_docs = DocSplitter._to_lcdocuments([doc]) + assert new_docs[0].page_content == "test" + + doc = Document(text="test") + new_docs = DocSplitter._to_lcdocuments([doc]) + assert new_docs[0].page_content == "test" + + new_docs = DocSplitter._to_lcdocuments("test") + assert new_docs[0].page_content == "test" + + # def test_unstructured_doc_reader(self): + # doc_reader = UnstructuredDocReader() + # txt = doc_reader.apply("./test/data/sample.pdf") + # assert len(txt) > 0 + \ No newline at end of file From 333271178f537a67c5c0553d38c310fa8f5bdbf2 Mon Sep 17 00:00:00 2001 From: Li Yu Date: Sat, 11 Nov 2023 16:17:19 -0500 Subject: [PATCH 11/11] add tokenizer splitter --- framework/feature_factory/llm_tools.py | 30 ++++++++++++++++++++++++++ test/test_chunking.py | 11 ++++++---- 2 files changed, 37 insertions(+), 4 deletions(-) diff --git a/framework/feature_factory/llm_tools.py b/framework/feature_factory/llm_tools.py index ddeaf83..6e229c5 100644 --- a/framework/feature_factory/llm_tools.py +++ b/framework/feature_factory/llm_tools.py @@ -12,6 +12,7 @@ from langchain.text_splitter import RecursiveCharacterTextSplitter from transformers import AutoTokenizer from langchain.document_loaders import UnstructuredPDFLoader +import math class LLMTool(ABC): @@ -217,6 +218,35 @@ def apply(self, docs): return chunks +class TokenizerTextSpliter(DocSplitter): + + def __init__(self, chunk_size=1024, chunk_overlap=64, pretrained_tokenizer_path: str=None) -> None: + super().__init__() + self.chunk_size = chunk_size + self.chunk_overlap = chunk_overlap + self.pretrained_tokenizer_path = pretrained_tokenizer_path + self.tokenizer = None + + def create(self): + if not self.pretrained_tokenizer_path: + raise ValueError("Pretrained tokenizer is not defined in TokenizerTextSplitter.") + if super()._require_init(): + self.tokenizer = AutoTokenizer.from_pretrained(self.pretrained_tokenizer_path) + + def apply(self, text: Union[str, List[Document]]) -> List[str]: + self.create() + text = DocSplitter._to_text(text) + text_length = len(self.tokenizer.encode(text)) + num_chunks = math.ceil(text_length / self.chunk_size) + chunk_length = math.ceil(len(text) / num_chunks) + chunks = [] + for i in range(0, len(text), chunk_length): + start = max(0, i-self.chunk_overlap) + end = min(len(text), i + chunk_length + self.chunk_overlap) + chunks.append(text[start:end]) + return chunks + + class LLMFeature(LLMTool): def __init__(self, name: str, reader: DocReader, splitter: DocSplitter) -> None: diff --git a/test/test_chunking.py b/test/test_chunking.py index e38b155..7c90b97 100644 --- a/test/test_chunking.py +++ b/test/test_chunking.py @@ -106,8 +106,11 @@ def test_convert_to_lcdocument(self): new_docs = DocSplitter._to_lcdocuments("test") assert new_docs[0].page_content == "test" - # def test_unstructured_doc_reader(self): - # doc_reader = UnstructuredDocReader() - # txt = doc_reader.apply("./test/data/sample.pdf") - # assert len(txt) > 0 + def test_token_splitter(self): + doc_reader = LlamaIndexDocReader() + docs = doc_reader.apply("./test/data/sample.pdf") + assert len(docs) > 0 + doc_splitter = TokenizerTextSpliter(chunk_size=1024, chunk_overlap=32, pretrained_tokenizer_path="hf-internal-testing/llama-tokenizer") + chunks = doc_splitter.apply(docs) + assert len(chunks) == 1 \ No newline at end of file