diff --git a/backend/danswer/connectors/directory/connector.py b/backend/danswer/connectors/directory/connector.py index 8a6ff440d22..9b071e9f7fb 100644 --- a/backend/danswer/connectors/directory/connector.py +++ b/backend/danswer/connectors/directory/connector.py @@ -2,6 +2,7 @@ import os from collections.abc import Generator from pathlib import Path +from typing import cast from typing import Any from typing import IO @@ -13,6 +14,9 @@ from danswer.connectors.interfaces import LoadConnector from danswer.connectors.models import Document from danswer.connectors.models import Section +from danswer.dynamic_configs import get_dynamic_config_store +from danswer.dynamic_configs.interface import ConfigNotFoundError +from danswer.dynamic_configs.interface import JSON_ro from danswer.utils.logger import setup_logger @@ -20,6 +24,8 @@ _METADATA_FLAG = "#DANSWER_METADATA=" +LOAD_STATE_KEY = "directory_connector_state" +MAX_BATCHES = 10 def _open_files_at_location_recursive( base_path: str | Path, @@ -71,16 +77,49 @@ def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None pass def load_from_state(self) -> GenerateDocumentsOutput: + num_batches = 0 + + try: + state = cast(dict, get_dynamic_config_store().load(LOAD_STATE_KEY)) + except ConfigNotFoundError: + state = {} + + processed_files: list[str] = [] documents: list[Document] = [] + done = False for file_location in self.file_locations: files = _open_files_at_location_recursive(file_location, '') for file_name, file in files: + file_path = os.path.join(file_location, file_name) + if file_path in state: + logger.debug(f"Skipping file '{file_path}' as it has already been processed") + continue + + logger.info(f"Batch {num_batches + 1}: Processing file '{file_path}'") documents.extend(_process_file(file_name, file)) + processed_files.append(file_path) if len(documents) >= self.batch_size: yield documents documents = [] + for file_path in processed_files: + state[file_path] = True + + num_batches += 1 + if num_batches >= MAX_BATCHES: + logger.info(f"Reached max batches of {MAX_BATCHES}, stopping") + done = True + break + + if done: + break + if documents: yield documents + + for file_path in processed_files: + state[file_path] = True + + get_dynamic_config_store().store(LOAD_STATE_KEY, cast(JSON_ro, state))