Skip to content

Commit

Permalink
Resolve PR comments from Wei
Browse files Browse the repository at this point in the history
  • Loading branch information
sunank200 committed Nov 28, 2023
1 parent 0e9f09a commit 1f441bd
Showing 1 changed file with 20 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -220,9 +220,9 @@ def batch_ingest(
:param df: DataFrame containing the data to be ingested.
:param class_name: The name of the class in Weaviate to which data will be ingested.
:param uuid_column: Name of the column containing the UUID.
:param existing: Strategy to handle existing data ('skip', 'replace', 'upsert' or 'error').
:param vector_column: Name of the column containing the vector data.
:param batch_params: Parameters for batch configuration.
:param existing: Strategy to handle existing data ('skip', 'replace', 'upsert' or 'error').
:param verbose: Whether to log verbose output.
:param tenant: The tenant to which the object will be added.
"""
Expand All @@ -242,26 +242,26 @@ def batch_ingest(
try:
if self.client.data_object.exists(uuid=uuid, class_name=class_name):
if existing == "error":
raise AirflowException(f"Ingest of UUID {uuid} failed. Object exists.")
raise AirflowException(f"Ingest of UUID {uuid} failed. Object exists.")

if existing == "skip":
if verbose is True:
self.logger.warning(f"UUID {uuid} exists. Skipping.")
self.logger.warning(f"UUID {uuid} exists. Skipping.")
continue
elif existing == "replace":
# Default for weaviate is replace existing
if verbose is True:
self.logger.warning(f"UUID {uuid} exists. Overwriting.")

self.logger.warning(f"UUID {uuid} exists. Overwriting.")
except AirflowException as e:
if verbose:
self.logger.error(f"Failed to add row {row_id} with UUID {uuid}. Error: {e}")
self.batch_errors.append({"uuid": uuid, "result": {"errors": str(e)}})
break
except Exception as e:
if isinstance(e, AirflowException):
if verbose:
self.logger.error(f"Failed to add row {row_id} with UUID {uuid}. Error: {e}")
self.batch_errors.append({"uuid": uuid, "result": {"errors": str(e)}})
break
else:
if verbose:
self.logger.error(f"Failed to add row {row_id} with UUID {uuid}. Error: {e}")
self.batch_errors.append({"uuid": uuid, "result": {"errors": str(e)}})
continue
self.batch_errors.append({"uuid": uuid, "result": {"errors": str(e)}})
continue

try:
added_row = batch.add_data_object(
Expand Down Expand Up @@ -344,11 +344,11 @@ def handle_successful_upsert(
"""
Handles removal of previous objects after successful upsert.
:param class_name: Name of the class in Weaviate.
:param objects_to_remove: If there were errors rollback will generate a list of successfully inserted objects.
If not set, assume all objects inserted successfully and delete all objects_to_upsert['objects_to_delete']
:param tenant: The tenant to which the object will be added.
:param class_name: Name of the class in Weaviate.
:param verbose: Flag to enable verbose logging.
:param tenant: The tenant to which the object will be added.
"""
deletion_errors = []
for uuid in objects_to_remove:
Expand Down Expand Up @@ -435,7 +435,7 @@ def ingest_data(
)

if existing == "upsert":
if len(self.batch_errors) > 0:
if self.batch_errors:
self.logger.warning("Error during upsert. Rolling back all inserts for docs with errors.")
rollback_errors, objects_to_remove = self.handle_upsert_rollback(
objects_to_upsert=objects_to_upsert, class_name=class_name, verbose=verbose
Expand All @@ -447,9 +447,9 @@ def ingest_data(

rollback_errors += deletion_errors

if len(rollback_errors) > 0:
if rollback_errors:
self.logger.error("Errors encountered during rollback.")
[self.logger.error(f"{rollback_error}" for rollback_error in rollback_errors)]
self.logger.error("\n".join(rollback_errors))
raise AirflowException("Errors encountered during rollback.")
else:
removal_errors = self.handle_successful_upsert(
Expand All @@ -460,12 +460,12 @@ def ingest_data(
)
if removal_errors:
self.logger.error("Errors encountered during removal.")
[self.logger.error(f"{removal_error}" for removal_error in removal_errors)]
self.logger.error("\n".join(removal_errors))
raise AirflowException("Errors encountered during removal.")

if self.batch_errors:
self.logger.error("Errors encountered during ingest.")
[self.logger.error(f"{batch_error}" for batch_error in self.batch_errors)]
self.logger.error("\n".join(self.batch_errors))
raise AirflowException("Errors encountered during ingest.")

def _query_objects(self, value: Any, doc_key: str, class_name: str, uuid_column: str) -> set:
Expand Down

0 comments on commit 1f441bd

Please sign in to comment.