diff --git a/CODEOWNERS b/CODEOWNERS index da19fe09..cf5b7676 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -1 +1 @@ -* @Lee-W @pankajastro @sunank200 @davidgxue +* @Lee-W @pankajastro @sunank200 @davidgxue @jlaneve diff --git a/airflow/dags/ingestion/ask-astro-forum-load.py b/airflow/dags/ingestion/ask-astro-forum-load.py index 0d0496c7..7e4d4bd1 100644 --- a/airflow/dags/ingestion/ask-astro-forum-load.py +++ b/airflow/dags/ingestion/ask-astro-forum-load.py @@ -44,7 +44,7 @@ def ask_astro_load_astro_forum(): class_name=WEAVIATE_CLASS, existing="replace", document_column="docLink", - batch_config_params={"batch_size": 1000}, + batch_config_params={"batch_size": 7, "dynamic": False}, verbose=True, conn_id=_WEAVIATE_CONN_ID, task_id="WeaviateDocumentIngestOperator", diff --git a/airflow/dags/ingestion/ask-astro-load-airflow-docs.py b/airflow/dags/ingestion/ask-astro-load-airflow-docs.py index 391176c8..bb3ec557 100644 --- a/airflow/dags/ingestion/ask-astro-load-airflow-docs.py +++ b/airflow/dags/ingestion/ask-astro-load-airflow-docs.py @@ -1,7 +1,6 @@ import os from datetime import datetime -import pandas as pd from include.utils.slack import send_failure_notification from airflow.decorators import dag, task @@ -20,21 +19,6 @@ schedule_interval = os.environ.get("INGESTION_SCHEDULE", "0 5 * * 2") if ask_astro_env == "prod" else None -@task -def split_docs(urls: str, chunk_size: int = 100) -> list[list[pd.DataFrame]]: - """ - Split the URLs in chunk and get dataframe for the content - - param urls: List for HTTP URL - param chunk_size: Max number of document in split chunk - """ - from include.tasks import split - from include.tasks.extract.utils.html_utils import urls_to_dataframe - - chunked_urls = split.split_list(list(urls), chunk_size=chunk_size) - return [[urls_to_dataframe(chunk_url)] for chunk_url in chunked_urls] - - @dag( schedule_interval=schedule_interval, start_date=datetime(2023, 9, 27), @@ -51,19 +35,22 @@ def ask_astro_load_airflow_docs(): data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator any existing documents that have been updated will be removed and re-added. """ + from include.tasks import split from include.tasks.extract import airflow_docs - extracted_airflow_docs = task(airflow_docs.extract_airflow_docs)(docs_base_url=airflow_docs_base_url) + extracted_airflow_docs = task(split.split_html).expand( + dfs=[airflow_docs.extract_airflow_docs(docs_base_url=airflow_docs_base_url)] + ) _import_data = WeaviateDocumentIngestOperator.partial( class_name=WEAVIATE_CLASS, existing="replace", document_column="docLink", - batch_config_params={"batch_size": 1000}, + batch_config_params={"batch_size": 7, "dynamic": False}, verbose=True, conn_id=_WEAVIATE_CONN_ID, task_id="WeaviateDocumentIngestOperator", - ).expand(input_data=split_docs(extracted_airflow_docs, chunk_size=100)) + ).expand(input_data=[extracted_airflow_docs]) ask_astro_load_airflow_docs() diff --git a/airflow/dags/ingestion/ask-astro-load-astro-cli.py b/airflow/dags/ingestion/ask-astro-load-astro-cli.py index 80658d9f..20fd2da1 100644 --- a/airflow/dags/ingestion/ask-astro-load-astro-cli.py +++ b/airflow/dags/ingestion/ask-astro-load-astro-cli.py @@ -42,7 +42,7 @@ def ask_astro_load_astro_cli_docs(): class_name=WEAVIATE_CLASS, existing="replace", document_column="docLink", - batch_config_params={"batch_size": 1000}, + batch_config_params={"batch_size": 7, "dynamic": False}, verbose=True, conn_id=_WEAVIATE_CONN_ID, task_id="WeaviateDocumentIngestOperator", diff --git a/airflow/dags/ingestion/ask-astro-load-astro-sdk.py b/airflow/dags/ingestion/ask-astro-load-astro-sdk.py index 9b4e1b6d..11d2f7d9 100644 --- a/airflow/dags/ingestion/ask-astro-load-astro-sdk.py +++ b/airflow/dags/ingestion/ask-astro-load-astro-sdk.py @@ -41,7 +41,7 @@ def ask_astro_load_astro_sdk(): class_name=WEAVIATE_CLASS, existing="replace", document_column="docLink", - batch_config_params={"batch_size": 1000}, + batch_config_params={"batch_size": 7, "dynamic": False}, verbose=True, conn_id=_WEAVIATE_CONN_ID, task_id="WeaviateDocumentIngestOperator", diff --git a/airflow/dags/ingestion/ask-astro-load-astronomer-docs.py b/airflow/dags/ingestion/ask-astro-load-astronomer-docs.py index 175b5a6f..bf6b26cd 100644 --- a/airflow/dags/ingestion/ask-astro-load-astronomer-docs.py +++ b/airflow/dags/ingestion/ask-astro-load-astronomer-docs.py @@ -36,17 +36,17 @@ def ask_astro_load_astronomer_docs(): astro_docs = task(extract_astro_docs)() - split_md_docs = task(split.split_markdown).expand(dfs=[astro_docs]) + split_html_docs = task(split.split_html).expand(dfs=[astro_docs]) _import_data = WeaviateDocumentIngestOperator.partial( class_name=WEAVIATE_CLASS, existing="replace", document_column="docLink", - batch_config_params={"batch_size": 1000}, + batch_config_params={"batch_size": 7, "dynamic": False}, verbose=True, conn_id=_WEAVIATE_CONN_ID, task_id="WeaviateDocumentIngestOperator", - ).expand(input_data=[split_md_docs]) + ).expand(input_data=[split_html_docs]) ask_astro_load_astronomer_docs() diff --git a/airflow/dags/ingestion/ask-astro-load-astronomer-provider.py b/airflow/dags/ingestion/ask-astro-load-astronomer-provider.py index 729ce291..f5d5c65d 100644 --- a/airflow/dags/ingestion/ask-astro-load-astronomer-provider.py +++ b/airflow/dags/ingestion/ask-astro-load-astronomer-provider.py @@ -47,7 +47,7 @@ def ask_astro_load_astronomer_providers(): class_name=WEAVIATE_CLASS, existing="replace", document_column="docLink", - batch_config_params={"batch_size": 1000}, + batch_config_params={"batch_size": 7, "dynamic": False}, verbose=True, conn_id=_WEAVIATE_CONN_ID, task_id="WeaviateDocumentIngestOperator", diff --git a/airflow/dags/ingestion/ask-astro-load-blogs.py b/airflow/dags/ingestion/ask-astro-load-blogs.py index 91a81d58..84f78019 100644 --- a/airflow/dags/ingestion/ask-astro-load-blogs.py +++ b/airflow/dags/ingestion/ask-astro-load-blogs.py @@ -45,7 +45,7 @@ def ask_astro_load_blogs(): class_name=WEAVIATE_CLASS, existing="replace", document_column="docLink", - batch_config_params={"batch_size": 1000}, + batch_config_params={"batch_size": 7, "dynamic": False}, verbose=True, conn_id=_WEAVIATE_CONN_ID, task_id="WeaviateDocumentIngestOperator", diff --git a/airflow/dags/ingestion/ask-astro-load-cosmos-docs.py b/airflow/dags/ingestion/ask-astro-load-cosmos-docs.py new file mode 100644 index 00000000..ea2bcad0 --- /dev/null +++ b/airflow/dags/ingestion/ask-astro-load-cosmos-docs.py @@ -0,0 +1,51 @@ +import os +from datetime import datetime + +from include.tasks.extract.cosmos_docs import extract_cosmos_docs +from include.utils.slack import send_failure_notification + +from airflow.decorators import dag, task +from airflow.providers.weaviate.operators.weaviate import WeaviateDocumentIngestOperator + +ask_astro_env = os.environ.get("ASK_ASTRO_ENV", "dev") + +_WEAVIATE_CONN_ID = f"weaviate_{ask_astro_env}" +WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsDev") + + +schedule_interval = os.environ.get("INGESTION_SCHEDULE", "0 5 * * 2") if ask_astro_env == "prod" else None + + +@dag( + schedule=schedule_interval, + start_date=datetime(2023, 9, 27), + catchup=False, + is_paused_upon_creation=True, + default_args={"retries": 3, "retry_delay": 30}, + on_failure_callback=send_failure_notification( + dag_id="{{ dag.dag_id }}", execution_date="{{ dag_run.execution_date }}" + ), +) +def ask_astro_load_cosmos_docs(): + """ + This DAG performs incremental load for any new Cosmos docs. Initial load via ask_astro_load_bulk imported + data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator + any existing documents that have been updated will be removed and re-added. + """ + + from include.tasks import split + + split_docs = task(split.split_html).expand(dfs=[extract_cosmos_docs()]) + + _import_data = WeaviateDocumentIngestOperator.partial( + class_name=WEAVIATE_CLASS, + existing="replace", + document_column="docLink", + batch_config_params={"batch_size": 1000}, + verbose=True, + conn_id=_WEAVIATE_CONN_ID, + task_id="load_cosmos_docs_to_weaviate", + ).expand(input_data=[split_docs]) + + +ask_astro_load_cosmos_docs() diff --git a/airflow/dags/ingestion/ask-astro-load-github.py b/airflow/dags/ingestion/ask-astro-load-github.py index 6aa1a48c..e68055e2 100644 --- a/airflow/dags/ingestion/ask-astro-load-github.py +++ b/airflow/dags/ingestion/ask-astro-load-github.py @@ -64,7 +64,7 @@ def ask_astro_load_github(): class_name=WEAVIATE_CLASS, existing="replace", document_column="docLink", - batch_config_params={"batch_size": 1000}, + batch_config_params={"batch_size": 7, "dynamic": False}, verbose=True, conn_id=_WEAVIATE_CONN_ID, task_id="WeaviateDocumentIngestOperator", diff --git a/airflow/dags/ingestion/ask-astro-load-registry.py b/airflow/dags/ingestion/ask-astro-load-registry.py index 33282ab4..480a70cb 100644 --- a/airflow/dags/ingestion/ask-astro-load-registry.py +++ b/airflow/dags/ingestion/ask-astro-load-registry.py @@ -47,7 +47,7 @@ def ask_astro_load_registry(): class_name=WEAVIATE_CLASS, existing="replace", document_column="docLink", - batch_config_params={"batch_size": 1000}, + batch_config_params={"batch_size": 7, "dynamic": False}, verbose=True, conn_id=_WEAVIATE_CONN_ID, task_id="WeaviateDocumentIngestOperator", diff --git a/airflow/dags/ingestion/ask-astro-load-slack.py b/airflow/dags/ingestion/ask-astro-load-slack.py index 4435da4d..8fbf1b44 100644 --- a/airflow/dags/ingestion/ask-astro-load-slack.py +++ b/airflow/dags/ingestion/ask-astro-load-slack.py @@ -53,7 +53,7 @@ def ask_astro_load_slack(): class_name=WEAVIATE_CLASS, existing="replace", document_column="docLink", - batch_config_params={"batch_size": 1000}, + batch_config_params={"batch_size": 7, "dynamic": False}, verbose=True, conn_id=_WEAVIATE_CONN_ID, task_id="WeaviateDocumentIngestOperator", diff --git a/airflow/dags/ingestion/ask-astro-load-stackoverflow.py b/airflow/dags/ingestion/ask-astro-load-stackoverflow.py index 12f553a1..6135f82d 100644 --- a/airflow/dags/ingestion/ask-astro-load-stackoverflow.py +++ b/airflow/dags/ingestion/ask-astro-load-stackoverflow.py @@ -53,7 +53,7 @@ def ask_astro_load_stackoverflow(): class_name=WEAVIATE_CLASS, existing="replace", document_column="docLink", - batch_config_params={"batch_size": 1000}, + batch_config_params={"batch_size": 7, "dynamic": False}, verbose=True, conn_id=_WEAVIATE_CONN_ID, task_id="WeaviateDocumentIngestOperator", diff --git a/airflow/dags/ingestion/ask-astro-load.py b/airflow/dags/ingestion/ask-astro-load.py index 229e7d70..a71fd090 100644 --- a/airflow/dags/ingestion/ask-astro-load.py +++ b/airflow/dags/ingestion/ask-astro-load.py @@ -12,6 +12,7 @@ from airflow.decorators import dag, task from airflow.exceptions import AirflowException from airflow.providers.weaviate.operators.weaviate import WeaviateDocumentIngestOperator +from airflow.utils.trigger_rule import TriggerRule seed_baseline_url = None stackoverflow_cutoff_date = "2021-09-01" @@ -120,7 +121,7 @@ def check_schema(class_objects: list) -> list[str]: else ["create_schema"] ) - @task(trigger_rule="none_failed") + @task(trigger_rule=TriggerRule.NONE_FAILED) def create_schema(class_objects: list, existing: str = "ignore") -> None: """ Creates or updates the schema in Weaviate based on the given class objects. @@ -135,7 +136,7 @@ def create_schema(class_objects: list, existing: str = "ignore") -> None: schema_json={cls["class"]: cls for cls in class_objects}, existing=existing ) - @task.branch(trigger_rule="none_failed") + @task.branch(trigger_rule=TriggerRule.NONE_FAILED) def check_seed_baseline(seed_baseline_url: str = None) -> str | set: """ Check if we will ingest from pre-embedded baseline or extract each source. @@ -156,9 +157,10 @@ def check_seed_baseline(seed_baseline_url: str = None) -> str | set: "extract_astro_provider_doc", "extract_astro_forum_doc", "extract_astronomer_docs", + "get_cached_or_extract_cosmos_docs", } - @task(trigger_rule="none_failed") + @task(trigger_rule=TriggerRule.NONE_FAILED) def extract_github_markdown(source: dict): from include.tasks.extract import github @@ -175,7 +177,7 @@ def extract_github_markdown(source: dict): return df - @task(trigger_rule="none_failed") + @task(trigger_rule=TriggerRule.NONE_FAILED) def extract_github_python(source: dict): from include.tasks.extract import github @@ -192,7 +194,7 @@ def extract_github_python(source: dict): return df - @task(trigger_rule="none_failed") + @task(trigger_rule=TriggerRule.NONE_FAILED) def extract_airflow_docs(): from include.tasks.extract import airflow_docs @@ -204,12 +206,12 @@ def extract_airflow_docs(): else: raise Exception("Parquet file exists locally but is not readable.") else: - df = airflow_docs.extract_airflow_docs(docs_base_url=airflow_docs_base_url)[0] + df = airflow_docs.extract_airflow_docs.function(docs_base_url=airflow_docs_base_url)[0] df.to_parquet(parquet_file) return [df] - @task(trigger_rule="none_failed") + @task(trigger_rule=TriggerRule.NONE_FAILED) def extract_astro_cli_docs(): from include.tasks.extract import astro_cli_docs @@ -222,7 +224,7 @@ def extract_astro_cli_docs(): return [df] - @task(trigger_rule="none_failed") + @task(trigger_rule=TriggerRule.NONE_FAILED) def extract_astro_provider_doc(): from include.tasks.extract.astronomer_providers_docs import extract_provider_docs @@ -235,7 +237,7 @@ def extract_astro_provider_doc(): return [df] - @task(trigger_rule="none_failed") + @task(trigger_rule=TriggerRule.NONE_FAILED) def extract_stack_overflow(tag: str, stackoverflow_cutoff_date: str = stackoverflow_cutoff_date): from include.tasks.extract import stack_overflow @@ -247,7 +249,7 @@ def extract_stack_overflow(tag: str, stackoverflow_cutoff_date: str = stackoverf return df - @task(trigger_rule="none_failed") + @task(trigger_rule=TriggerRule.NONE_FAILED) def extract_astro_forum_doc(): from include.tasks.extract.astro_forum_docs import get_forum_df @@ -260,7 +262,7 @@ def extract_astro_forum_doc(): return [df] - @task(trigger_rule="none_failed") + @task(trigger_rule=TriggerRule.NONE_FAILED) def extract_github_issues(repo_base: str): from include.tasks.extract import github @@ -277,7 +279,7 @@ def extract_github_issues(repo_base: str): return df - @task(trigger_rule="none_failed") + @task(trigger_rule=TriggerRule.NONE_FAILED) def extract_astro_registry_cell_types(): from include.tasks.extract import registry @@ -294,7 +296,7 @@ def extract_astro_registry_cell_types(): return [df] - @task(trigger_rule="none_failed") + @task(trigger_rule=TriggerRule.NONE_FAILED) def extract_astro_registry_dags(): from include.tasks.extract import registry @@ -311,7 +313,7 @@ def extract_astro_registry_dags(): return [df] - @task(trigger_rule="none_failed") + @task(trigger_rule=TriggerRule.NONE_FAILED) def extract_astro_blogs(): from include.tasks.extract import blogs @@ -328,7 +330,21 @@ def extract_astro_blogs(): return [df] - @task(trigger_rule="none_failed") + @task(trigger_rule=TriggerRule.NONE_FAILED) + def get_cached_or_extract_cosmos_docs(): + from include.tasks.extract import cosmos_docs + + parquet_file_path = "include/data/astronomer/cosmos/cosmos_docs.parquet" + + try: + df = pd.read_parquet(parquet_file_path) + except FileNotFoundError: + df = cosmos_docs.extract_cosmos_docs.function()[0] + df.to_parquet(parquet_file_path) + + return [df] + + @task(trigger_rule=TriggerRule.NONE_FAILED) def extract_astronomer_docs(): from include.tasks.extract.astro_docs import extract_astro_docs @@ -344,7 +360,7 @@ def extract_astronomer_docs(): return [df] - @task(trigger_rule="none_failed") + @task(trigger_rule=TriggerRule.NONE_FAILED) def import_baseline( document_column: str, class_name: str, @@ -390,6 +406,7 @@ def import_baseline( _astro_cli_docs = extract_astro_cli_docs() _extract_astro_providers_docs = extract_astro_provider_doc() _astro_forum_docs = extract_astro_forum_doc() + _cosmos_docs = get_cached_or_extract_cosmos_docs() _get_schema = get_schema_and_process(schema_file="include/data/schema.json") _check_schema = check_schema(class_objects=_get_schema) @@ -410,6 +427,7 @@ def import_baseline( _extract_astro_providers_docs, _astro_forum_docs, _astro_docs, + _cosmos_docs, ] python_code_tasks = [registry_dags_docs] @@ -424,7 +442,7 @@ def import_baseline( class_name=WEAVIATE_CLASS, existing="replace", document_column="docLink", - batch_config_params={"batch_size": 1000}, + batch_config_params={"batch_size": 7, "dynamic": False}, verbose=True, conn_id=_WEAVIATE_CONN_ID, task_id="WeaviateDocumentIngestOperator", @@ -437,7 +455,7 @@ def import_baseline( document_column="docLink", uuid_column="id", vector_column="vector", - batch_config_params={"batch_size": 1000}, + batch_config_params={"batch_size": 7, "dynamic": False}, verbose=True, ) diff --git a/airflow/dags/monitor/test_retrieval.py b/airflow/dags/monitor/evaluate_rag_quality.py similarity index 91% rename from airflow/dags/monitor/test_retrieval.py rename to airflow/dags/monitor/evaluate_rag_quality.py index b7e28f48..80525751 100644 --- a/airflow/dags/monitor/test_retrieval.py +++ b/airflow/dags/monitor/evaluate_rag_quality.py @@ -1,5 +1,6 @@ from __future__ import annotations +import ast import json import logging import os @@ -8,11 +9,9 @@ from textwrap import dedent import pandas as pd -from include.tasks.extract.utils.retrieval_tests import ( +from include.tasks.extract.utils.evaluate_helpers import ( generate_answer, get_or_create_drive_folder, - weaviate_search, - weaviate_search_multiquery_retriever, ) from airflow.decorators import dag, task @@ -63,7 +62,7 @@ ) }, ) -def test_retrieval(question_number_subset: str): +def evaluate_rag_quality(question_number_subset: str): """ This DAG performs a test of document retrieval from Ask Astro's vector database. @@ -175,7 +174,7 @@ def generate_test_answers(test_question_template_path: Path, ts_nodash=None, **c question_number_subset = context["params"]["question_number_subset"] if question_number_subset: - question_number_subset = json.loads(question_number_subset) + question_number_subset = ast.literal_eval(question_number_subset) results_file = f"include/data/test_questions_{ts_nodash}.csv" @@ -183,30 +182,16 @@ def generate_test_answers(test_question_template_path: Path, ts_nodash=None, **c "test_number", "question", "expected_references", - "weaviate_search_references", - "weaviate_mqr_references", "askastro_answer", "askastro_references", "langsmith_link", ] - weaviate_client = WeaviateHook(_WEAVIATE_CONN_ID).get_conn() - questions_df = pd.read_csv(test_question_template_path) if question_number_subset: questions_df = questions_df[questions_df.test_number.isin(question_number_subset)] - questions_df["weaviate_search_references"] = questions_df.question.apply( - lambda x: weaviate_search(weaviate_client=weaviate_client, question=x, class_name=WEAVIATE_CLASS) - ) - - questions_df["weaviate_mqr_references"] = questions_df.question.apply( - lambda x: weaviate_search_multiquery_retriever( - weaviate_client=weaviate_client, question=x, class_name=WEAVIATE_CLASS, azure_endpoint=azure_endpoint - ) - ) - questions_df[["askastro_answer", "askastro_references", "langsmith_link"]] = questions_df.question.apply( lambda x: pd.Series( generate_answer( @@ -266,4 +251,4 @@ def upload_results(results_file: str, drive_id: str, ts_nodash: str = None): _check_schema >> _results_file >> _upload_results -test_retrieval(question_number_subset=None) +evaluate_rag_quality(question_number_subset=None) diff --git a/airflow/docker-compose.override.yml b/airflow/docker-compose.override.yml index 55b3eb71..78fcacf3 100644 --- a/airflow/docker-compose.override.yml +++ b/airflow/docker-compose.override.yml @@ -6,7 +6,7 @@ services: networks: - airflow weaviate: - image: semitechnologies/weaviate:1.21.0 + image: semitechnologies/weaviate:1.23.10 command: "--host 0.0.0.0 --port '8081' --scheme http" volumes: - ${PWD}/include/weaviate/backup:/var/lib/weaviate/backup diff --git a/airflow/include/data/astronomer/cosmos/.gitinclude b/airflow/include/data/astronomer/cosmos/.gitinclude new file mode 100644 index 00000000..e69de29b diff --git a/airflow/include/data/schema.json b/airflow/include/data/schema.json index d49b9a6f..70bb6608 100644 --- a/airflow/include/data/schema.json +++ b/airflow/include/data/schema.json @@ -6,8 +6,7 @@ "vectorizer": "text2vec-openai", "moduleConfig": { "text2vec-openai": { - "model": "ada", - "modelVersion": "002", + "model": "text-embedding-3-small", "type": "text", "vectorizeClassName": "False" }, diff --git a/airflow/include/tasks/extract/airflow_docs.py b/airflow/include/tasks/extract/airflow_docs.py index fb9de592..7869745e 100644 --- a/airflow/include/tasks/extract/airflow_docs.py +++ b/airflow/include/tasks/extract/airflow_docs.py @@ -8,9 +8,11 @@ from bs4 import BeautifulSoup from weaviate.util import generate_uuid5 +from airflow.decorators import task from include.tasks.extract.utils.html_utils import get_internal_links +@task def extract_airflow_docs(docs_base_url: str) -> list[pd.DataFrame]: """ This task return all internal url for Airflow docs @@ -36,7 +38,10 @@ def extract_airflow_docs(docs_base_url: str) -> list[pd.DataFrame]: docs_url_parts = urllib.parse.urlsplit(docs_base_url) docs_url_base = f"{docs_url_parts.scheme}://{docs_url_parts.netloc}" # make sure we didn't accidentally pickup any unrelated links in recursion - non_doc_links = {link if docs_url_base not in link else "" for link in all_links} + old_version_doc_pattern = r"/(\d+\.)*\d+/" + non_doc_links = { + link if (docs_url_base not in link) or re.search(old_version_doc_pattern, link) else "" for link in all_links + } docs_links = all_links - non_doc_links df = pd.DataFrame(docs_links, columns=["docLink"]) diff --git a/airflow/include/tasks/extract/astro_docs.py b/airflow/include/tasks/extract/astro_docs.py index 5cec7a54..e984d753 100644 --- a/airflow/include/tasks/extract/astro_docs.py +++ b/airflow/include/tasks/extract/astro_docs.py @@ -1,83 +1,58 @@ from __future__ import annotations -import logging -from urllib.parse import urldefrag, urljoin +import re import pandas as pd -import requests from bs4 import BeautifulSoup from weaviate.util import generate_uuid5 +from include.tasks.extract.utils.html_utils import fetch_page_content, get_internal_links + base_url = "https://docs.astronomer.io/" -def fetch_page_content(url: str) -> str: - """ - Fetches the content of a given URL. +def process_astro_doc_page_content(page_content: str) -> str: + soup = BeautifulSoup(page_content, "html.parser") - :param url: URL of the page to fetch. - :return: HTML content of the page. - """ - try: - response = requests.get(url, headers={"User-Agent": "Mozilla/5.0"}) - if response.status_code == 200: - return response.content - except requests.RequestException as e: - logging.error(f"Error fetching {url}: {e}") - return "" + # Find the main article container + main_container = soup.find("main", class_="docMainContainer_TBSr") + content_of_interest = main_container if main_container else soup + for nav_tag in content_of_interest.find_all("nav"): + nav_tag.decompose() -def extract_links(soup: BeautifulSoup, base_url: str) -> list[str]: - """ - Extracts all valid links from a BeautifulSoup object. + for script_or_style in content_of_interest.find_all(["script", "style", "button", "img", "svg"]): + script_or_style.decompose() - :param soup: BeautifulSoup object to extract links from. - :param base_url: Base URL for relative links. - :return: List of extracted URLs. - """ - links = [] - for link in soup.find_all("a", href=True): - href = link["href"] - if not href.startswith("http"): - href = urljoin(base_url, href) - if href.startswith(base_url): - links.append(href) - return links + feedback_widget = content_of_interest.find("div", id="feedbackWidget") + if feedback_widget: + feedback_widget.decompose() + newsletter_form = content_of_interest.find("form", id="newsletterForm") + if newsletter_form: + newsletter_form.decompose() -def scrape_page(url: str, visited_urls: set, docs_data: list) -> None: - """ - Recursively scrapes a webpage and its subpages. + sidebar = content_of_interest.find("ul", class_=lambda value: value and "table-of-contents" in value) + if sidebar: + sidebar.decompose() - :param url: URL of the page to scrape. - :param visited_urls: Set of URLs already visited. - :param docs_data: List to append extracted data to. - """ - if url in visited_urls or not url.startswith(base_url): - return - - # Normalize URL by stripping off the fragment - base_url_no_fragment, frag = urldefrag(url) + footers = content_of_interest.find_all("footer") + for footer in footers: + footer.decompose() - # If the URL is the base URL plus a fragment, ignore it - if base_url_no_fragment == base_url and frag: - return + # The actual article in almost all pages of Astro Docs website is in the following HTML container + container_div = content_of_interest.find("div", class_=lambda value: value and "container" in value) - visited_urls.add(url) + if container_div: + row_div = container_div.find("div", class_="row") - logging.info(f"Scraping : {url}") + if row_div: + col_div = row_div.find("div", class_=lambda value: value and "col" in value) - page_content = fetch_page_content(url) - if not page_content: - return + if col_div: + content_of_interest = str(col_div) - soup = BeautifulSoup(page_content, "lxml") - content = soup.get_text(strip=True) - sha = generate_uuid5(content) - docs_data.append({"docSource": "astro docs", "sha": sha, "content": content, "docLink": url}) - # Recursively scrape linked pages - for link in extract_links(soup, base_url): - scrape_page(link, visited_urls, docs_data) + return str(content_of_interest).strip() def extract_astro_docs(base_url: str = base_url) -> list[pd.DataFrame]: @@ -86,13 +61,26 @@ def extract_astro_docs(base_url: str = base_url) -> list[pd.DataFrame]: :return: A list of pandas dataframes with extracted data. """ - visited_urls = set() - docs_data = [] + all_links = get_internal_links(base_url, exclude_literal=["learn/tags"]) + + # for software references, we only want latest docs, ones with version number (old) is removed + old_version_doc_pattern = r"^https://docs\.astronomer\.io/software/\d+\.\d+/.+$" + # remove duplicate xml files, we only want html pages + non_doc_links = { + link if link.endswith("xml") or re.match(old_version_doc_pattern, link) else "" for link in all_links + } + docs_links = all_links - non_doc_links + + df = pd.DataFrame(docs_links, columns=["docLink"]) + + df["html_content"] = df["docLink"].apply(lambda url: fetch_page_content(url)) - scrape_page(base_url, visited_urls, docs_data) + # Only keep the main article content + df["content"] = df["html_content"].apply(process_astro_doc_page_content) - df = pd.DataFrame(docs_data) - df.drop_duplicates(subset="sha", inplace=True) + df["sha"] = df["content"].apply(generate_uuid5) + df["docSource"] = "astro docs" df.reset_index(drop=True, inplace=True) + df = df[["docSource", "sha", "content", "docLink"]] return [df] diff --git a/airflow/include/tasks/extract/cosmos_docs.py b/airflow/include/tasks/extract/cosmos_docs.py new file mode 100644 index 00000000..3a029670 --- /dev/null +++ b/airflow/include/tasks/extract/cosmos_docs.py @@ -0,0 +1,48 @@ +from __future__ import annotations + +import urllib.parse + +import pandas as pd +import requests +from bs4 import BeautifulSoup +from weaviate.util import generate_uuid5 + +from airflow.decorators import task +from include.tasks.extract.utils.html_utils import get_internal_links + + +@task +def extract_cosmos_docs(docs_base_url: str = "https://astronomer.github.io/astronomer-cosmos/") -> list[pd.DataFrame]: + """ + This task return a dataframe containing the extracted data for cosmos docs. + """ + + # we exclude the following docs which are not useful and/or too large for easy processing. + exclude_docs = [ + "_sources", + ] + + all_links = get_internal_links(docs_base_url, exclude_literal=exclude_docs) + html_links = {url for url in all_links if url.endswith(".html")} + + docs_url_parts = urllib.parse.urlsplit(docs_base_url) + docs_url_base = f"{docs_url_parts.scheme}://{docs_url_parts.netloc}" + # make sure we didn't accidentally pickup any unrelated links in recursion + non_doc_links = {link if docs_url_base not in link else "" for link in html_links} + docs_links = html_links - non_doc_links + + df = pd.DataFrame(docs_links, columns=["docLink"]) + + df["html_content"] = df["docLink"].apply(lambda url: requests.get(url).content) + + # Only keep the main article content, + df["content"] = df["html_content"].apply(lambda x: str(BeautifulSoup(x, "html.parser").find(name="article"))) + + df["sha"] = df["content"].apply(generate_uuid5) + df["docSource"] = "cosmos docs" + df.reset_index(drop=True, inplace=True) + + # column order matters for uuid generation + df = df[["docSource", "sha", "content", "docLink"]] + + return [df] diff --git a/airflow/include/tasks/extract/utils/retrieval_tests.py b/airflow/include/tasks/extract/utils/evaluate_helpers.py similarity index 60% rename from airflow/include/tasks/extract/utils/retrieval_tests.py rename to airflow/include/tasks/extract/utils/evaluate_helpers.py index a47a4ab0..7d0f8439 100644 --- a/airflow/include/tasks/extract/utils/retrieval_tests.py +++ b/airflow/include/tasks/extract/utils/evaluate_helpers.py @@ -1,15 +1,10 @@ from __future__ import annotations import asyncio -import json import logging import aiohttp import backoff -from langchain.chat_models import AzureChatOpenAI -from langchain.retrievers import MultiQueryRetriever -from langchain.vectorstores import Weaviate as WeaviateVectorStore -from weaviate.client import Client as WeaviateClient from airflow.providers.google.suite.hooks.drive import GoogleDriveHook @@ -79,7 +74,7 @@ def generate_answer( assert response.get("status") == "complete" answer = response.get("response") - references = {source["name"] for source in response.get("sources")} + references = [source["name"] for source in response.get("sources")] references = "\n".join(references) langsmith_link = langsmith_link_template.format( org=langchain_org_id, project=langchain_project_id, run_id=response.get("langchain_run_id") @@ -94,85 +89,6 @@ def generate_answer( return (answer, references, langsmith_link) -def weaviate_search(weaviate_client: WeaviateClient, question: str, class_name: str) -> str: - """ - This function uses Weaviate's - [Similarity Search](https://weaviate.io/developers/weaviate/search/similarity) - and returns a pandas series of reference documents. This is a one-shot retrieval unlike - Ask Astro frontend which uses LangChain's MultiQueryRetrieval. - - :param weaviate_client: An instantiated weaviate client to use for the search. - :param question: A question. - :param class_name: The name of the class to search. - """ - - try: - results = ( - weaviate_client.query.get(class_name=class_name, properties=["docLink"]) - .with_near_text( - { - "concepts": question, - } - ) - .with_limit(5) - .with_additional(["id", "certainty"]) - .do()["data"]["Get"][class_name] - ) - - references = "\n".join( - [f"{result['docLink']} [{round(result['_additional']['certainty'], 3)}]" for result in results] - ) - - except Exception as e: - logger.info(e) - references = [] - - return references - - -def weaviate_search_multiquery_retriever( - weaviate_client: WeaviateClient, question: str, class_name: str, azure_endpoint: str -) -> str: - """ - This function uses LangChain's - [MultiQueryRetriever](https://api.python.langchain.com/en/latest/retrievers/langchain.retrievers.multi_query.MultiQueryRetriever.html) - to retrieve a set of documents based on a question. - - :param weaviate_client: An instantiated weaviate client to use for the search. - :param question: A question. - :param class_name: The name of the class to search. - :param azure_gpt35_endpoint: Azure OpenAI endpoint to use for multi-query retrieval - """ - - docsearch = WeaviateVectorStore( - client=weaviate_client, - index_name=class_name, - text_key="content", - attributes=["docLink"], - ) - - retriever = MultiQueryRetriever.from_llm( - llm=AzureChatOpenAI( - **json.loads(azure_endpoint), - deployment_name="gpt-35-turbo", - temperature="0.0", - ), - retriever=docsearch.as_retriever(), - ) - - try: - results = retriever.get_relevant_documents(query=question) - - references = {result.metadata["docLink"] for result in results} - references = "\n".join(references) - - except Exception as e: - logger.info(e) - references = [] - - return references - - def get_or_create_drive_folder(gd_hook: GoogleDriveHook, folder_name: str, parent_id: str | None) -> str: """ Creates a google drive folder if it does not exist. diff --git a/airflow/include/tasks/extract/utils/html_utils.py b/airflow/include/tasks/extract/utils/html_utils.py index 1caac139..c8034676 100644 --- a/airflow/include/tasks/extract/utils/html_utils.py +++ b/airflow/include/tasks/extract/utils/html_utils.py @@ -7,12 +7,14 @@ import pandas as pd import requests from bs4 import BeautifulSoup +from tenacity import RetryCallState, retry, retry_if_exception_type, stop_after_attempt, wait_exponential from weaviate.util import generate_uuid5 logger = logging.getLogger("airflow.task") - +attempted_urls = set() internal_urls = set() +internal_page_hashset = set() def is_valid_url(url: str) -> bool: @@ -25,19 +27,29 @@ def is_valid_url(url: str) -> bool: return bool(parsed.netloc) and bool(parsed.scheme) +def _fetch_page_content_retry_default_return(retry_state: RetryCallState) -> str: + logger.info( + "Error fetching content for %s. May be expected if making attempts to validate unknown URLs.", + retry_state.args[0], + ) + return "" + + +@retry( + retry=retry_if_exception_type(requests.RequestException), + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, max=10), + retry_error_callback=_fetch_page_content_retry_default_return, +) def fetch_page_content(url: str) -> str: """ Fetch the content of a html page param url: The url of a page """ - try: - response = requests.get(url, headers={"User-agent": "Ask Astro"}) - response.raise_for_status() # Raise an HTTPError for bad responses - return response.content - except requests.RequestException: - logger.info("Error fetching content for %s: %s", url, url) - return "" + response = requests.get(url, headers={"User-agent": "Ask Astro"}) + response.raise_for_status() # Raise an HTTPError for bad responses + return response.content def is_excluded_url(url: str, exclude_literal: list[str]) -> bool: @@ -98,16 +110,19 @@ def truncate_tokens(text: str, encoding_name: str = "gpt-3.5-turbo", max_length: logger.info(e) -def get_page_links(url: str, exclude_literal: list[str]) -> set[str]: +def get_page_links(url: str, current_page_content: bytes, exclude_literal: list[str]) -> None: """ - Extract all valid and internal links from the given URL. + Recursively extract all valid and internal links from the given URL. + Deduplicates any links with the exact same page content in the process. param url (str): The URL to extract links from. + param current_page_content: Bytes of the content of the url passed in for hashing. param exclude_docs (list): List of strings to exclude from the URL path. """ - urls = set() domain_name = urlparse(url).netloc - soup = BeautifulSoup(requests.get(url).content, "html.parser") + page_content_hash = generate_uuid5(current_page_content) + internal_page_hashset.add(page_content_hash) + soup = BeautifulSoup(current_page_content, "html.parser") for a_tag in soup.findAll("a"): href = a_tag.attrs.get("href") if href == "" or href is None: @@ -117,16 +132,20 @@ def get_page_links(url: str, exclude_literal: list[str]) -> set[str]: href = parsed_href.scheme + "://" + parsed_href.netloc + parsed_href.path if ( not is_valid_url(href) + or not href.startswith("https") or href in internal_urls + or href in attempted_urls or domain_name not in href or is_excluded_url(href, exclude_literal) ): continue - urls.add(href) + attempted_urls.add(href) + new_page_content = fetch_page_content(href) + if (not new_page_content) or generate_uuid5(new_page_content) in page_content_hash: + continue logger.info(href) internal_urls.add(href) - - return urls + get_page_links(href, new_page_content, exclude_literal) def get_internal_links(base_url: str, exclude_literal: list[str] | None = None) -> set[str]: @@ -139,10 +158,9 @@ def get_internal_links(base_url: str, exclude_literal: list[str] | None = None) if exclude_literal is None: exclude_literal = [] - links = get_page_links(base_url, exclude_literal) - - for link in links: - get_page_links(link, exclude_literal) + page_content = fetch_page_content(base_url) + get_page_links(base_url, page_content, exclude_literal) + internal_urls.add(base_url) return internal_urls diff --git a/airflow/include/tasks/split.py b/airflow/include/tasks/split.py index c5933bf3..ba387009 100644 --- a/airflow/include/tasks/split.py +++ b/airflow/include/tasks/split.py @@ -3,10 +3,10 @@ import pandas as pd from langchain.schema import Document from langchain.text_splitter import ( - HTMLHeaderTextSplitter, Language, RecursiveCharacterTextSplitter, ) +from langchain_community.document_transformers import Html2TextTransformer def split_markdown(dfs: list[pd.DataFrame]) -> pd.DataFrame: @@ -86,18 +86,29 @@ def split_html(dfs: list[pd.DataFrame]) -> pd.DataFrame: 'content': Chunked content in markdown format. """ - - headers_to_split_on = [ - ("h2", "h2"), - ] + separators = ["