Skip to content

Commit

Permalink
Fix comments from Michael
Browse files Browse the repository at this point in the history
  • Loading branch information
sunank200 committed Nov 21, 2023
1 parent c5cd78d commit 57af49b
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 90 deletions.
4 changes: 2 additions & 2 deletions airflow/dags/ingestion/ask-astro-load.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
{"doc_dir": "code-samples", "repo_base": "astronomer/docs"},
]
issues_docs_sources = [
{"repo_base": "apache/airflow", "cutoff_date": datetime.date(2020, 1, 1), "cutoff_issue_number": 30000}
"apache/airflow",
]
slack_channel_sources = [
{
Expand Down Expand Up @@ -218,7 +218,7 @@ def extract_astro_blogs():
return [df]

md_docs = extract_github_markdown.expand(source=markdown_docs_sources)
issues_docs = extract_github_issues.expand(source=issues_docs_sources)
issues_docs = extract_github_issues.expand(repo_base=issues_docs_sources)
stackoverflow_docs = extract_stack_overflow.expand(tag=stackoverflow_tags)
# slack_docs = extract_slack_archive.expand(source=slack_channel_sources)
registry_cells_docs = extract_astro_registry_cell_types()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,7 @@ def batch_process_data(
self.logger.error(f"Failed to add row {row_id} with UUID {uuid}. Error: {e}")
batch_errors.append({"row_id": row_id, "uuid": uuid, "error": str(e)})

results = batch.create_objects()
return batch_errors + [item for result in results for item in result.get("errors", [])], results
return batch_errors

def process_batch_errors(self, results: list, verbose: bool) -> list:
"""
Expand Down Expand Up @@ -339,11 +338,11 @@ def ingest_data(

self.logger.info(f"Passing {len(df)} objects for ingest.")

batch_errors, results = self.batch_process_data(
batch_errors = self.batch_process_data(
df, class_name, uuid_column, vector_column, batch_params, existing, verbose
)

batch_errors += self.process_batch_errors(results, verbose)
batch_errors += self.process_batch_errors(batch_errors, verbose)

if existing == "upsert" and batch_errors:
self.logger.warning("Error during upsert. Rolling back all inserts.")
Expand Down
84 changes: 0 additions & 84 deletions airflow/include/tasks/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,90 +2,6 @@

import pandas as pd
import requests
from weaviate.util import generate_uuid5


def import_upsert_data(dfs: list[pd.DataFrame], class_name: str, primary_key: str) -> list:
"""
This task concatenates multiple dataframes from upstream dynamic tasks and vectorizes with import to weaviate.
This function is used as a python_callable with the weaviate_import decorator. The returned dictionary is passed
to the WeaviateImportDataOperator for ingest. The operator returns a list of any objects that failed to import.
A 'uuid' is generated based on the content and metadata (the git sha, document url, the document source and a
concatenation of the headers).
Any existing documents with the same primary_key but differing UUID or sha will be deleted prior to import.
param dfs: A list of dataframes from downstream dynamic tasks
type dfs: list[pd.DataFrame]
param class_name: The name of the class to import data. Class should be created with weaviate schema.
type class_name: str
param primary_key: The name of a column to use as a primary key for upsert logic.
type primary_key: str
"""

df = pd.concat(dfs, ignore_index=True)

df["uuid"] = df.apply(lambda x: generate_uuid5(identifier=x.to_dict(), namespace=class_name), axis=1)

if df[["docLink", "uuid"]].duplicated().any():
df.drop_duplicates(subset=["docLink", "uuid"], keep="first", inplace=True)
df.reset_index(drop=True, inplace=True)

print(f"Passing {len(df)} objects for import.")

return {
"data": df,
"class_name": class_name,
"existing": "upsert",
"primary_key": primary_key,
"uuid_column": "uuid",
"error_threshold": 0,
"verbose": True,
}


def import_data(dfs: list[pd.DataFrame], class_name: str) -> list:
"""
This task concatenates multiple dataframes from upstream dynamic tasks and vectorizes with import to weaviate.
This function is used as a python_callable with the weaviate_import decorator. The returned dictionary is passed
to the WeaviateImportDataOperator for ingest. The operator returns a list of any objects that failed to import.
A 'uuid' is generated based on the content and metadata (the git sha, document url, the document source and a
concatenation of the headers) and Weaviate will create the vectors.
Any existing documents are skipped. The assumption is that this is a first
import of data and skipping upsert checks will speed up import.
param dfs: A list of dataframes from downstream dynamic tasks
type dfs: list[pd.DataFrame]
param class_name: The name of the class to import data. Class should be created with weaviate schema.
type class_name: str
"""

df = pd.concat(dfs, ignore_index=True)

df["uuid"] = df.apply(lambda x: generate_uuid5(identifier=x.to_dict(), namespace=class_name), axis=1)

if df[["docLink", "uuid"]].duplicated().any():
df.drop_duplicates(subset=["docLink", "uuid"], keep="first", inplace=True)
df.reset_index(drop=True, inplace=True)

print(f"Passing {len(df)} objects for import.")

return {
"data": df,
"class_name": class_name,
"existing": "skip",
"uuid_column": "uuid",
"error_threshold": 0,
"batched_mode": True,
"batch_size": 1000,
"verbose": False,
}


def import_baseline(class_name: str, seed_baseline_url: str) -> list:
Expand Down

0 comments on commit 57af49b

Please sign in to comment.