Modified ingest.py that processes past errors and logs the offending files #825
thephimart
started this conversation in
Show and tell
Replies: 1 comment
-
@thephimart Thanks so much for this. I discovered that ingest was failing every single one of the def load_documents(source_dir: str, ignored_files: List[str] = []) -> List[Document]:
"""
Loads all documents from the source documents directory, ignoring specified files
"""
all_files = []
for ext in LOADER_MAPPING:
all_files.extend(
glob.glob(os.path.join(source_dir, f"**/*{ext}"), recursive=True)
)
filtered_files = [file_path for file_path in all_files if file_path not in ignored_files]
with Pool(processes=os.cpu_count()) as pool:
results = []
skipped = []
retried = []
with tqdm(total=len(filtered_files), desc='Loading new documents', ncols=80) as pbar:
for i, file_path in enumerate(filtered_files):
success = process_single_document(pbar, results, file_path)
if not success:
folder_path, file_name = os.path.split(file_path)
base_name, ext = os.path.splitext(file_name)
if ext == '.md':
retried.append(file_path)
temp_folder = 'source_temp'
if not os.path.exists(temp_folder):
os.makedirs(temp_folder)
new_file_path = os.path.join(temp_folder, base_name + '.txt')
print("Ingest failed for", file_path, ", copying to", new_file_path, "and will try ingesting again")
shutil.copy(file_path, new_file_path)
success = process_single_document(pbar, results, new_file_path)
if not success:
skipped.append(file_path)
print("Processed", len(filtered_files), "files,", len(retried), "files retried, and", len(skipped), "files skipped")
print("Failed to process these files:", skipped)
return results
def process_single_document(pbar, results, file_path):
"""
process a single document and return True if successful
"""
try:
docs = load_single_document(file_path)
results.extend(docs)
pbar.update()
return True
except Exception as e:
# skipped.append(file_path)
error_msg = f"Error loading document: {file_path}\n{str(e)}"
with open("./ingest_log.txt", "a") as log_file:
log_file.write(error_msg + "\n")
return False Thanks again for the code changes that gave me the idea where to make the changes to automate retries. |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
So when I "finally" got to the ingest phase of the build after all the issues with C++ on windows, and got past the "soffice" not found, my ingest kept halting on various files and the errors did not reference the offending files specifically. So I had a short chat with chatGPT and produced a modified "ingest.py".
Basically, it powers thru the exceptions and logs a list of the offending files and the exception to "./ingest_log.txt".
PS 6 out of 22778 files were the problem 🤣
Here's my modified "ingest.py" I hope it helps another poor soul trying to build this 🥲
#!/usr/bin/env python3
import os
import glob
from typing import List
from dotenv import load_dotenv
from multiprocessing import Pool
from tqdm import tqdm
from langchain.document_loaders import (
CSVLoader,
EverNoteLoader,
PyMuPDFLoader,
TextLoader,
UnstructuredEmailLoader,
UnstructuredEPubLoader,
UnstructuredHTMLLoader,
UnstructuredMarkdownLoader,
UnstructuredODTLoader,
UnstructuredPowerPointLoader,
UnstructuredWordDocumentLoader,
)
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores import Chroma
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.docstore.document import Document
from constants import CHROMA_SETTINGS
load_dotenv()
Load environment variables
persist_directory = os.environ.get('PERSIST_DIRECTORY')
source_directory = os.environ.get('SOURCE_DIRECTORY', 'source_documents')
embeddings_model_name = os.environ.get('EMBEDDINGS_MODEL_NAME')
chunk_size = 500
chunk_overlap = 50
Custom document loaders
class MyElmLoader(UnstructuredEmailLoader):
"""Wrapper to fallback to text/plain when default does not work"""
Map file extensions to document loaders and their arguments
LOADER_MAPPING = {
".csv": (CSVLoader, {}),
# ".docx": (Docx2txtLoader, {}),
".doc": (UnstructuredWordDocumentLoader, {}),
".docx": (UnstructuredWordDocumentLoader, {}),
".enex": (EverNoteLoader, {}),
".eml": (MyElmLoader, {}),
".epub": (UnstructuredEPubLoader, {}),
".html": (UnstructuredHTMLLoader, {}),
".md": (UnstructuredMarkdownLoader, {}),
".odt": (UnstructuredODTLoader, {}),
".pdf": (PyMuPDFLoader, {}),
".ppt": (UnstructuredPowerPointLoader, {}),
".pptx": (UnstructuredPowerPointLoader, {}),
".txt": (TextLoader, {"encoding": "utf8"}),
# Add more mappings for other file extensions and loaders as needed
}
def load_single_document(file_path: str) -> List[Document]:
ext = "." + file_path.rsplit(".", 1)[-1]
if ext in LOADER_MAPPING:
loader_class, loader_args = LOADER_MAPPING[ext]
loader = loader_class(file_path, **loader_args)
return loader.load()
def load_documents(source_dir: str, ignored_files: List[str] = []) -> List[Document]:
"""
Loads all documents from the source documents directory, ignoring specified files
"""
all_files = []
for ext in LOADER_MAPPING:
all_files.extend(
glob.glob(os.path.join(source_dir, f"**/*{ext}"), recursive=True)
)
filtered_files = [file_path for file_path in all_files if file_path not in ignored_files]
def process_documents(ignored_files: List[str] = []) -> List[Document]:
"""
Load documents and split in chunks
"""
print(f"Loading documents from {source_directory}")
documents = load_documents(source_directory, ignored_files)
if not documents:
print("No new documents to load")
exit(0)
print(f"Loaded {len(documents)} new documents from {source_directory}")
text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
texts = text_splitter.split_documents(documents)
print(f"Split into {len(texts)} chunks of text (max. {chunk_size} tokens each)")
return texts
def does_vectorstore_exist(persist_directory: str) -> bool:
"""
Checks if vectorstore exists
"""
if os.path.exists(os.path.join(persist_directory, 'index')):
if os.path.exists(os.path.join(persist_directory, 'chroma-collections.parquet')) and os.path.exists(os.path.join(persist_directory, 'chroma-embeddings.parquet')):
list_index_files = glob.glob(os.path.join(persist_directory, 'index/.bin'))
list_index_files += glob.glob(os.path.join(persist_directory, 'index/.pkl'))
# At least 3 documents are needed in a working vectorstore
if len(list_index_files) > 3:
return True
return False
def main():
# Create embeddings
embeddings = HuggingFaceEmbeddings(model_name=embeddings_model_name)
if name == "main":
main()
Beta Was this translation helpful? Give feedback.
All reactions