diff --git a/airflow/dags/ingestion/ask-astro-load-airflow-docs.py b/airflow/dags/ingestion/ask-astro-load-airflow-docs.py new file mode 100644 index 00000000..a7c41adb --- /dev/null +++ b/airflow/dags/ingestion/ask-astro-load-airflow-docs.py @@ -0,0 +1,47 @@ +import os +from datetime import datetime + +from include.tasks import ingest, split +from include.tasks.extract import airflow_docs + +from airflow.decorators import dag, task + +ask_astro_env = os.environ.get("ASK_ASTRO_ENV", "") + +_WEAVIATE_CONN_ID = f"weaviate_{ask_astro_env}" +WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsProd") + +airflow_docs_base_url = "https://airflow.apache.org/docs/" + +default_args = {"retries": 3, "retry_delay": 30} + +schedule_interval = "0 5 * * *" if ask_astro_env == "prod" else None + + +@dag( + schedule_interval=schedule_interval, + start_date=datetime(2023, 9, 27), + catchup=False, + is_paused_upon_creation=True, + default_args=default_args, +) +def ask_astro_load_airflow_docs(): + """ + This DAG performs incremental load for any new Airflow docs. Initial load via ask_astro_load_bulk imported + data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator + any existing documents that have been updated will be removed and re-added. + """ + + extracted_airflow_docs = task(airflow_docs.extract_airflow_docs)(docs_base_url=airflow_docs_base_url) + + split_md_docs = task(split.split_html).expand(dfs=[extracted_airflow_docs]) + + task.weaviate_import( + ingest.import_upsert_data, + weaviate_conn_id=_WEAVIATE_CONN_ID, + ).partial( + class_name=WEAVIATE_CLASS, primary_key="docLink" + ).expand(dfs=[split_md_docs]) + + +ask_astro_load_airflow_docs() diff --git a/airflow/dags/ingestion/ask-astro-load-blogs.py b/airflow/dags/ingestion/ask-astro-load-blogs.py index 239aad8e..5c6f2fac 100644 --- a/airflow/dags/ingestion/ask-astro-load-blogs.py +++ b/airflow/dags/ingestion/ask-astro-load-blogs.py @@ -13,8 +13,18 @@ blog_cutoff_date = datetime.strptime("2023-01-19", "%Y-%m-%d") +default_args = {"retries": 3, "retry_delay": 30} -@dag(schedule_interval="0 5 * * *", start_date=datetime(2023, 9, 27), catchup=False, is_paused_upon_creation=True) +schedule_interval = "0 5 * * *" if ask_astro_env == "prod" else None + + +@dag( + schedule_interval=schedule_interval, + start_date=datetime(2023, 9, 27), + catchup=False, + is_paused_upon_creation=True, + default_args=default_args, +) def ask_astro_load_blogs(): """ This DAG performs incremental load for any new docs. Initial load via ask_astro_load_bulk imported @@ -22,16 +32,16 @@ def ask_astro_load_blogs(): any existing documents that have been updated will be removed and re-added. """ - blogs_docs = task(blogs.extract_astro_blogs, retries=3)(blog_cutoff_date=blog_cutoff_date) + blogs_docs = task(blogs.extract_astro_blogs)(blog_cutoff_date=blog_cutoff_date) split_md_docs = task(split.split_markdown).expand(dfs=[blogs_docs]) task.weaviate_import( ingest.import_upsert_data, weaviate_conn_id=_WEAVIATE_CONN_ID, - retries=10, - retry_delay=30, - ).partial(class_name=WEAVIATE_CLASS, primary_key="docLink").expand(dfs=[split_md_docs]) + ).partial( + class_name=WEAVIATE_CLASS, primary_key="docLink" + ).expand(dfs=[split_md_docs]) ask_astro_load_blogs() diff --git a/airflow/dags/ingestion/ask-astro-load-github.py b/airflow/dags/ingestion/ask-astro-load-github.py index 4dbad5a7..d18031e1 100644 --- a/airflow/dags/ingestion/ask-astro-load-github.py +++ b/airflow/dags/ingestion/ask-astro-load-github.py @@ -17,9 +17,6 @@ {"doc_dir": "", "repo_base": "OpenLineage/docs"}, {"doc_dir": "", "repo_base": "OpenLineage/OpenLineage"}, ] -rst_docs_sources = [ - {"doc_dir": "docs", "repo_base": "apache/airflow", "exclude_docs": ["changelog.rst", "commits.rst"]}, -] code_samples_sources = [ {"doc_dir": "code-samples", "repo_base": "astronomer/docs"}, ] @@ -27,8 +24,18 @@ "apache/airflow", ] +default_args = {"retries": 3, "retry_delay": 30} + +schedule_interval = "0 5 * * *" if ask_astro_env == "prod" else None + -@dag(schedule_interval="0 5 * * *", start_date=datetime(2023, 9, 27), catchup=False, is_paused_upon_creation=True) +@dag( + schedule_interval=schedule_interval, + start_date=datetime(2023, 9, 27), + catchup=False, + is_paused_upon_creation=True, + default_args=default_args, +) def ask_astro_load_github(): """ This DAG performs incremental load for any new docs. Initial load via ask_astro_load_bulk imported @@ -37,43 +44,31 @@ def ask_astro_load_github(): """ md_docs = ( - task(github.extract_github_markdown, retries=3) + task(github.extract_github_markdown) .partial(github_conn_id=_GITHUB_CONN_ID) .expand(source=markdown_docs_sources) ) - rst_docs = ( - task(github.extract_github_rst, retries=3) - .partial(github_conn_id=_GITHUB_CONN_ID) - .expand(source=rst_docs_sources) - ) - issues_docs = ( - task(github.extract_github_issues, retries=3) - .partial(github_conn_id=_GITHUB_CONN_ID) - .expand(repo_base=issues_docs_sources) + task(github.extract_github_issues).partial(github_conn_id=_GITHUB_CONN_ID).expand(repo_base=issues_docs_sources) ) code_samples = ( - task(github.extract_github_python, retries=3) - .partial(github_conn_id=_GITHUB_CONN_ID) - .expand(source=code_samples_sources) + task(github.extract_github_python).partial(github_conn_id=_GITHUB_CONN_ID).expand(source=code_samples_sources) ) - markdown_tasks = [md_docs, rst_docs, issues_docs] - - split_md_docs = task(split.split_markdown).expand(dfs=markdown_tasks) + split_md_docs = task(split.split_markdown).expand(dfs=[md_docs, issues_docs]) split_code_docs = task(split.split_python).expand(dfs=[code_samples]) task.weaviate_import( ingest.import_upsert_data, weaviate_conn_id=_WEAVIATE_CONN_ID, - retries=10, - retry_delay=30, - ).partial(class_name=WEAVIATE_CLASS, primary_key="docLink").expand(dfs=[split_md_docs, split_code_docs]) + ).partial( + class_name=WEAVIATE_CLASS, primary_key="docLink" + ).expand(dfs=[split_md_docs, split_code_docs]) - issues_docs >> md_docs >> rst_docs >> code_samples + issues_docs >> md_docs >> code_samples ask_astro_load_github() diff --git a/airflow/dags/ingestion/ask-astro-load-registry.py b/airflow/dags/ingestion/ask-astro-load-registry.py index 08b617c9..e41b0a6d 100644 --- a/airflow/dags/ingestion/ask-astro-load-registry.py +++ b/airflow/dags/ingestion/ask-astro-load-registry.py @@ -11,8 +11,18 @@ _WEAVIATE_CONN_ID = f"weaviate_{ask_astro_env}" WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsProd") +default_args = {"retries": 3, "retry_delay": 30} -@dag(schedule_interval="0 5 * * *", start_date=datetime(2023, 9, 27), catchup=False, is_paused_upon_creation=True) +schedule_interval = "0 5 * * *" if ask_astro_env == "prod" else None + + +@dag( + schedule_interval=schedule_interval, + start_date=datetime(2023, 9, 27), + catchup=False, + is_paused_upon_creation=True, + default_args=default_args, +) def ask_astro_load_registry(): """ This DAG performs incremental load for any new docs. Initial load via ask_astro_load_bulk imported @@ -20,9 +30,9 @@ def ask_astro_load_registry(): any existing documents that have been updated will be removed and re-added. """ - registry_cells_docs = task(registry.extract_astro_registry_cell_types, retries=3)() + registry_cells_docs = task(registry.extract_astro_registry_cell_types)() - registry_dags_docs = task(registry.extract_astro_registry_dags, retries=3)() + registry_dags_docs = task(registry.extract_astro_registry_dags)() split_md_docs = task(split.split_markdown).expand(dfs=[registry_cells_docs]) @@ -31,9 +41,9 @@ def ask_astro_load_registry(): task.weaviate_import( ingest.import_upsert_data, weaviate_conn_id=_WEAVIATE_CONN_ID, - retries=10, - retry_delay=30, - ).partial(class_name=WEAVIATE_CLASS, primary_key="docLink").expand(dfs=[split_md_docs, split_code_docs]) + ).partial( + class_name=WEAVIATE_CLASS, primary_key="docLink" + ).expand(dfs=[split_md_docs, split_code_docs]) ask_astro_load_registry() diff --git a/airflow/dags/ingestion/ask-astro-load-slack.py b/airflow/dags/ingestion/ask-astro-load-slack.py index d8804c5c..93e56b3f 100644 --- a/airflow/dags/ingestion/ask-astro-load-slack.py +++ b/airflow/dags/ingestion/ask-astro-load-slack.py @@ -20,8 +20,18 @@ } ] +default_args = {"retries": 3, "retry_delay": 30} -@dag(schedule_interval="0 5 * * *", start_date=datetime(2023, 9, 27), catchup=False, is_paused_upon_creation=True) +schedule_interval = "0 5 * * *" if ask_astro_env == "prod" else None + + +@dag( + schedule_interval=schedule_interval, + start_date=datetime(2023, 9, 27), + catchup=False, + is_paused_upon_creation=True, + default_args=default_args, +) def ask_astro_load_slack(): """ This DAG performs incremental load for any new slack threads. The slack archive is a point-in-time capture. This @@ -29,16 +39,16 @@ def ask_astro_load_slack(): weaviate_import decorator any existing documents that have been updated will be removed and re-added. """ - slack_docs = task(slack.extract_slack, retries=3).expand(source=slack_channel_sources) + slack_docs = task(slack.extract_slack).expand(source=slack_channel_sources) split_md_docs = task(split.split_markdown).expand(dfs=[slack_docs]) task.weaviate_import( ingest.import_upsert_data, weaviate_conn_id=_WEAVIATE_CONN_ID, - retries=10, - retry_delay=30, - ).partial(class_name=WEAVIATE_CLASS, primary_key="docLink").expand(dfs=[split_md_docs]) + ).partial( + class_name=WEAVIATE_CLASS, primary_key="docLink" + ).expand(dfs=[split_md_docs]) ask_astro_load_slack() diff --git a/airflow/dags/ingestion/ask-astro-load-stackoverflow.py b/airflow/dags/ingestion/ask-astro-load-stackoverflow.py index 28e43993..dfdbf255 100644 --- a/airflow/dags/ingestion/ask-astro-load-stackoverflow.py +++ b/airflow/dags/ingestion/ask-astro-load-stackoverflow.py @@ -16,8 +16,18 @@ "airflow", ] +default_args = {"retries": 3, "retry_delay": 30} -@dag(schedule_interval=None, start_date=datetime(2023, 9, 27), catchup=False, is_paused_upon_creation=True) +schedule_interval = "0 5 * * *" if ask_astro_env == "prod" else None + + +@dag( + schedule_interval=schedule_interval, + start_date=datetime(2023, 9, 27), + catchup=False, + is_paused_upon_creation=True, + default_args=default_args, +) def ask_astro_load_stackoverflow(): """ This DAG performs incremental load for any new docs. Initial load via ask_astro_load_bulk imported @@ -26,7 +36,7 @@ def ask_astro_load_stackoverflow(): """ stack_overflow_docs = ( - task(stack_overflow.extract_stack_overflow_archive, retries=3) + task(stack_overflow.extract_stack_overflow_archive) .partial(stackoverflow_cutoff_date=stackoverflow_cutoff_date) .expand(tag=stackoverflow_tags) ) @@ -36,9 +46,9 @@ def ask_astro_load_stackoverflow(): task.weaviate_import( ingest.import_upsert_data, weaviate_conn_id=_WEAVIATE_CONN_ID, - retries=10, - retry_delay=30, - ).partial(class_name=WEAVIATE_CLASS, primary_key="docLink").expand(dfs=[split_md_docs]) + ).partial( + class_name=WEAVIATE_CLASS, primary_key="docLink" + ).expand(dfs=[split_md_docs]) ask_astro_load_stackoverflow() diff --git a/airflow/dags/ingestion/ask-astro-load.py b/airflow/dags/ingestion/ask-astro-load.py index 5fe5f33b..79d8eb7e 100644 --- a/airflow/dags/ingestion/ask-astro-load.py +++ b/airflow/dags/ingestion/ask-astro-load.py @@ -1,10 +1,12 @@ +import json import os from datetime import datetime +from pathlib import Path from textwrap import dedent import pandas as pd from include.tasks import ingest, split -from include.tasks.extract import blogs, github, registry, stack_overflow +from include.tasks.extract import airflow_docs, blogs, github, registry, stack_overflow from weaviate_provider.operators.weaviate import WeaviateCheckSchemaBranchOperator, WeaviateCreateSchemaOperator from airflow.decorators import dag, task @@ -23,9 +25,6 @@ {"doc_dir": "", "repo_base": "OpenLineage/docs"}, {"doc_dir": "", "repo_base": "OpenLineage/OpenLineage"}, ] -rst_docs_sources = [ - {"doc_dir": "docs", "repo_base": "apache/airflow", "exclude_docs": ["changelog.rst", "commits.rst"]}, -] code_samples_sources = [ {"doc_dir": "code-samples", "repo_base": "astronomer/docs"}, ] @@ -49,10 +48,20 @@ "airflow", ] -schedule_interval = "@daily" if ask_astro_env == "prod" else None +airflow_docs_base_url = "https://airflow.apache.org/docs/" + +default_args = {"retries": 3, "retry_delay": 30} + +schedule_interval = "0 5 * * *" if ask_astro_env == "prod" else None -@dag(schedule_interval=schedule_interval, start_date=datetime(2023, 9, 27), catchup=False, is_paused_upon_creation=True) +@dag( + schedule_interval=schedule_interval, + start_date=datetime(2023, 9, 27), + catchup=False, + is_paused_upon_creation=True, + default_args=default_args, +) def ask_astro_load_bulk(): """ This DAG performs the initial load of data from sources. @@ -67,10 +76,14 @@ def ask_astro_load_bulk(): """ + class_object_data = json.loads(Path("include/data/schema.json").read_text()) + class_object_data["classes"][0].update({"class": WEAVIATE_CLASS}) + class_object_data = json.dumps(class_object_data) + _check_schema = WeaviateCheckSchemaBranchOperator( task_id="check_schema", weaviate_conn_id=_WEAVIATE_CONN_ID, - class_object_data="file://include/data/schema.json", + class_object_data=class_object_data, follow_task_ids_if_true=["check_seed_baseline"], follow_task_ids_if_false=["create_schema"], doc_md=dedent( @@ -83,7 +96,7 @@ def ask_astro_load_bulk(): _create_schema = WeaviateCreateSchemaOperator( task_id="create_schema", weaviate_conn_id=_WEAVIATE_CONN_ID, - class_object_data="file://include/data/schema.json", + class_object_data=class_object_data, existing="ignore", ) @@ -98,7 +111,7 @@ def check_seed_baseline() -> str: else: return [ "extract_github_markdown", - "extract_github_rst", + "extract_airflow_docs", "extract_stack_overflow", # "extract_slack_archive", "extract_astro_registry_cell_types", @@ -108,7 +121,7 @@ def check_seed_baseline() -> str: "extract_astro_registry_dags", ] - @task(trigger_rule="none_skipped") + @task(trigger_rule="none_failed") def extract_github_markdown(source: dict): try: df = pd.read_parquet(f"include/data/{source['repo_base']}/{source['doc_dir']}.parquet") @@ -118,25 +131,25 @@ def extract_github_markdown(source: dict): return df - @task(trigger_rule="none_skipped") - def extract_github_rst(source: dict): + @task(trigger_rule="none_failed") + def extract_github_python(source: dict): try: df = pd.read_parquet(f"include/data/{source['repo_base']}/{source['doc_dir']}.parquet") except Exception: - df = github.extract_github_rst(source=source, github_conn_id=_GITHUB_CONN_ID) + df = github.extract_github_python(source, _GITHUB_CONN_ID) df.to_parquet(f"include/data/{source['repo_base']}/{source['doc_dir']}.parquet") return df @task(trigger_rule="none_failed") - def extract_github_python(source: dict): + def extract_airflow_docs(): try: - df = pd.read_parquet(f"include/data/{source['repo_base']}/{source['doc_dir']}.parquet") + df = pd.read_parquet("include/data/apache/airflow/docs.parquet") except Exception: - df = github.extract_github_python(source, _GITHUB_CONN_ID) - df.to_parquet(f"include/data/{source['repo_base']}/{source['doc_dir']}.parquet") + df = airflow_docs.extract_airflow_docs(docs_base_url=airflow_docs_base_url)[0] + df.to_parquet("include/data/apache/airflow/docs.parquet") - return df + return [df] @task(trigger_rule="none_failed") def extract_stack_overflow(tag: str, stackoverflow_cutoff_date: str): @@ -204,8 +217,6 @@ def extract_astro_blogs(): md_docs = extract_github_markdown.expand(source=markdown_docs_sources) - rst_docs = extract_github_rst.expand(source=rst_docs_sources) - issues_docs = extract_github_issues.expand(repo_base=issues_docs_sources) stackoverflow_docs = extract_stack_overflow.partial(stackoverflow_cutoff_date=stackoverflow_cutoff_date).expand( @@ -224,7 +235,6 @@ def extract_astro_blogs(): markdown_tasks = [ md_docs, - rst_docs, issues_docs, stackoverflow_docs, # slack_docs, @@ -232,15 +242,21 @@ def extract_astro_blogs(): registry_cells_docs, ] + extracted_airflow_docs = extract_airflow_docs() + + html_tasks = [extracted_airflow_docs] + python_code_tasks = [registry_dags_docs, code_samples] split_md_docs = task(split.split_markdown).expand(dfs=markdown_tasks) split_code_docs = task(split.split_python).expand(dfs=python_code_tasks) - task.weaviate_import(ingest.import_data, weaviate_conn_id=_WEAVIATE_CONN_ID, retries=10, retry_delay=30).partial( + split_html_docs = task(split.split_html).expand(dfs=html_tasks) + + task.weaviate_import(ingest.import_data, weaviate_conn_id=_WEAVIATE_CONN_ID, retries=10).partial( class_name=WEAVIATE_CLASS - ).expand(dfs=[split_md_docs, split_code_docs]) + ).expand(dfs=[split_md_docs, split_code_docs, split_html_docs]) _import_baseline = task.weaviate_import( ingest.import_baseline, trigger_rule="none_failed", weaviate_conn_id=_WEAVIATE_CONN_ID @@ -248,9 +264,9 @@ def extract_astro_blogs(): _check_schema >> [_check_seed_baseline, _create_schema] - _create_schema >> markdown_tasks + python_code_tasks + [_check_seed_baseline] + _create_schema >> markdown_tasks + python_code_tasks + html_tasks + [_check_seed_baseline] - _check_seed_baseline >> issues_docs >> rst_docs >> md_docs + _check_seed_baseline >> issues_docs >> md_docs # ( # _check_seed_baseline # >> [stackoverflow_docs, slack_docs, blogs_docs, registry_cells_docs, _import_baseline] + python_code_tasks @@ -258,7 +274,7 @@ def extract_astro_blogs(): ( _check_seed_baseline - >> [stackoverflow_docs, blogs_docs, registry_cells_docs, _import_baseline] + python_code_tasks + >> [stackoverflow_docs, blogs_docs, registry_cells_docs, _import_baseline] + python_code_tasks + html_tasks ) diff --git a/airflow/include/data/apache/apache_license.rst b/airflow/include/data/apache/apache_license.rst deleted file mode 100644 index 106592bd..00000000 --- a/airflow/include/data/apache/apache_license.rst +++ /dev/null @@ -1,16 +0,0 @@ - .. Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - .. http://www.apache.org/licenses/LICENSE-2.0 - - .. Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. diff --git a/airflow/include/tasks/extract/airflow_docs.py b/airflow/include/tasks/extract/airflow_docs.py new file mode 100644 index 00000000..d828d0b6 --- /dev/null +++ b/airflow/include/tasks/extract/airflow_docs.py @@ -0,0 +1,76 @@ +from __future__ import annotations + +import re +import urllib.parse + +import pandas as pd +import requests +from bs4 import BeautifulSoup +from include.tasks.extract.utils.html_helpers import get_all_links +from weaviate.util import generate_uuid5 + + +def extract_airflow_docs(docs_base_url: str) -> list[pd.DataFrame]: + """ + This task scrapes docs from the Airflow website and returns a list of pandas dataframes. Return + type is a list in order to map to upstream dynamic tasks. The code recursively generates a list + of html files relative to 'docs_base_url' and then extracts each as text. + + Note: Only the (class_: body, role: main) tag and children are extracted. + + Note: This code will also pickup https://airflow.apache.org/howto/* + which are also referenced in the docs pages. These are useful for Ask Astro and also they are relatively few + pages so we leave them in. + + param docs_base_url: Base URL to start extract. + type docs_base_url: str + + The returned data includes the following fields: + 'docSource': 'apache/airflow/docs' + 'docLink': URL for the page + 'content': HTML content of the page + 'sha': A UUID from the other fields + """ + + # we exclude the following docs which are not useful and/or too large for easy processing. + exclude_docs = [ + "changelog.html", + "commits.html", + "docs/apache-airflow/stable/release_notes.html", + "docs/stable/release_notes.html", + "_api", + "_modules", + "installing-providers-from-sources.html", + "apache-airflow/1.", + "apache-airflow/2.", + "example", + "cli-and-env-variables-ref.html", + ] + + docs_url_parts = urllib.parse.urlsplit(docs_base_url) + docs_url_base = f"{docs_url_parts.scheme}://{docs_url_parts.netloc}" + + all_links = {docs_base_url} + get_all_links(url=list(all_links)[0], all_links=all_links, exclude_docs=exclude_docs) + + # make sure we didn't accidentally pickup any unrelated links in recursion + non_doc_links = {link if docs_url_base not in link else "" for link in all_links} + docs_links = all_links - non_doc_links + + df = pd.DataFrame(docs_links, columns=["docLink"]) + + df["html_content"] = df["docLink"].apply(lambda x: requests.get(x).content) + + df["content"] = df["html_content"].apply( + lambda x: str(BeautifulSoup(x, "html.parser").find(class_="body", role="main")) + ) + df["content"] = df["content"].apply(lambda x: re.sub("ΒΆ", "", x)) + + df["sha"] = df["content"].apply(generate_uuid5) + df["docSource"] = "apache/airflow/docs" + df.reset_index(drop=True, inplace=True) + + # column order matters for uuid generation + df = df[["docSource", "sha", "content", "docLink"]] + + return [df] diff --git a/airflow/include/tasks/extract/utils/html_helpers.py b/airflow/include/tasks/extract/utils/html_helpers.py new file mode 100644 index 00000000..070e43a7 --- /dev/null +++ b/airflow/include/tasks/extract/utils/html_helpers.py @@ -0,0 +1,65 @@ +from __future__ import annotations + +import urllib.parse +from time import sleep + +import requests +from bs4 import BeautifulSoup + + +def get_links(url: str, exclude_docs: list) -> set: + """ + Given a HTML url this function scrapes the page for any HTML links ( tags) and returns a set of links which: + a) starts with the same base (ie. scheme + netloc) + b) is a relative link from the currently read page + + Relative links are converted to absolute links. + + Note: The absolute link may not be unique due to redirects. Need to check for redirects in calling function. + """ + response = requests.get(url) + data = response.text + soup = BeautifulSoup(data, "lxml") + + url_parts = urllib.parse.urlsplit(url) + url_base = f"{url_parts.scheme}://{url_parts.netloc}" + + links = set() + for link in soup.find_all("a"): + link_url = link.get("href") + + if link_url.endswith(".html"): + if link_url.startswith(url_base) and not any(substring in link_url for substring in exclude_docs): + links.add(link_url) + elif not link_url.startswith("http"): + absolute_url = urllib.parse.urljoin(url, link_url) + if not any(substring in absolute_url for substring in exclude_docs): + links.add(absolute_url) + + return links + + +def get_all_links(url: str, all_links: set, exclude_docs: list): + """ + This is a recursive function to find all the sub-pages of a webpage. Given a starting URL the function + recurses through all child links referenced in the page. + + The all_links set is updated in recursion so no return set is passed. + """ + links = get_links(url=url, exclude_docs=exclude_docs) + for link in links: + # check if the linked page actually exists and get the redirect which is hopefully unique + + response = requests.head(link, allow_redirects=True) + if response.ok: + redirect_url = response.url + if redirect_url not in all_links: + print(redirect_url) + all_links.add(redirect_url) + try: + get_all_links(url=redirect_url, all_links=all_links, exclude_docs=exclude_docs) + except Exception as e: + print(e) + print("Retrying") + sleep(5) + get_all_links(url=redirect_url, all_links=all_links, exclude_docs=exclude_docs) diff --git a/airflow/include/tasks/split.py b/airflow/include/tasks/split.py index 57afcd6f..65096946 100644 --- a/airflow/include/tasks/split.py +++ b/airflow/include/tasks/split.py @@ -3,6 +3,7 @@ import pandas as pd from langchain.schema import Document from langchain.text_splitter import ( + HTMLHeaderTextSplitter, Language, RecursiveCharacterTextSplitter, ) @@ -68,3 +69,45 @@ def split_python(dfs: list[pd.DataFrame]) -> pd.DataFrame: df.reset_index(inplace=True, drop=True) return df + + +def split_html(dfs: list[pd.DataFrame]) -> pd.DataFrame: + """ + This task concatenates multiple dataframes from upstream dynamic tasks and splits html code before importing + to a vector database. + + param dfs: A list of dataframes from downstream dynamic tasks + type dfs: list[pd.DataFrame] + + Returned dataframe fields are: + 'docSource': ie. 'astro', 'learn', 'docs', etc. + 'sha': the github sha for the document + 'docLink': URL for the specific document in github. + 'content': Chunked content in markdown format. + + """ + + headers_to_split_on = [ + ("h2", "h2"), + ] + + df = pd.concat(dfs, axis=0, ignore_index=True) + + splitter = HTMLHeaderTextSplitter(headers_to_split_on) + + df["doc_chunks"] = df["content"].apply(lambda x: splitter.split_text(text=x)) + df = df.explode("doc_chunks", ignore_index=True) + df["content"] = df["doc_chunks"].apply(lambda x: x.page_content) + + # import tiktoken + # encoding = tiktoken.get_encoding("cl100k_base") + # df["num_tokens"] = df["content"].apply(lambda x: len(encoding.encode(x))) + # df.num_tokens.sort_values() + # df.iloc[2165]['docLink'] + # df.drop(2165, inplace=True) + # df.reset_index(drop=True, inplace=True) + + df.drop(["doc_chunks"], inplace=True, axis=1) + df.reset_index(inplace=True, drop=True) + + return df