Skip to content

Commit

Permalink
Add astronomer-provider ingestions
Browse files Browse the repository at this point in the history
  • Loading branch information
pankajastro committed Dec 1, 2023
1 parent 3de2485 commit 7a328a4
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 19 deletions.
52 changes: 52 additions & 0 deletions airflow/dags/ingestion/ask-astro-load-astronomer-provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import datetime
import os

from include.tasks.extract.utils.weaviate.ask_astro_weaviate_hook import AskAstroWeaviateHook
from include.tasks.extract.astronomer_providers_docs import extract_docs
from airflow.decorators import dag, task

ask_astro_env = os.environ.get("`ASK_ASTRO_ENV", "dev")

_WEAVIATE_CONN_ID = f"weaviate_{ask_astro_env}"
WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsDev")
ask_astro_weaviate_hook = AskAstroWeaviateHook(_WEAVIATE_CONN_ID)

blog_cutoff_date = datetime.date(2023, 1, 19)

default_args = {"retries": 3, "retry_delay": 30}

schedule_interval = "0 5 * * *" if ask_astro_env == "prod" else None


def get_docs_df():
return extract_docs()


@dag(
schedule_interval=schedule_interval,
start_date=datetime.datetime(2023, 9, 27),
catchup=False,
is_paused_upon_creation=True,
default_args=default_args,
)
def ask_astro_load_astronomer_providers():
"""
This DAG performs incremental load for any new docs. Initial load via ask_astro_load_bulk imported
data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator
any existing documents that have been updated will be removed and re-added.
"""

_import_data = (
task(ask_astro_weaviate_hook.ingest_data, retries=10)
.partial(
class_name=WEAVIATE_CLASS,
existing="upsert",
doc_key="docLink",
batch_params={"batch_size": 1000},
verbose=True,
)
.expand(dfs=get_docs_df)
)


ask_astro_load_astronomer_providers()
36 changes: 36 additions & 0 deletions airflow/include/tasks/extract/astronomer_providers_docs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from __future__ import annotations

import re

import pandas as pd
import requests
from bs4 import BeautifulSoup
from weaviate.util import generate_uuid5

from include.tasks.extract.utils.html_url_extractor import extract


def extract_docs() -> list[pd.DataFrame]:

exclude_docs = ["_api", "_modules", "_sources"]
base_url = "https://astronomer-providers.readthedocs.io/en/stable/"

links = extract(base_url, exclude_docs)

df = pd.DataFrame(links, columns=["docLink"])

df["html_content"] = df["docLink"].apply(lambda x: requests.get(x).content)

df["content"] = df["html_content"].apply(
lambda x: str(BeautifulSoup(x, "html.parser").find(class_="body", role="main"))
)
df["content"] = df["content"].apply(lambda x: re.sub("¶", "", x))

df["sha"] = df["content"].apply(generate_uuid5)
df["docSource"] = ""
df.reset_index(drop=True, inplace=True)

# column order matters for uuid generation
df = df[["docSource", "sha", "content", "docLink"]]

return [df]
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
import requests
from urllib.parse import urlparse, urljoin
from bs4 import BeautifulSoup
import time

internal_urls = set()


def is_valid_url(url):
parsed = urlparse(url)
return bool(parsed.netloc) and bool(parsed.scheme)


urls = set()
def exclude_path(url, exclude_docs: list[str] = []):
url_path = urlparse(url).path
for docs in exclude_docs:
if docs in url_path:
return True
return False


def get_all_website_links(url):
"""
Returns all URLs that is found on `url` in which it belongs to the same website
"""
def get_all_links(url, exclude_docs: list[str] = []):
urls = set()
domain_name = urlparse(url).netloc
soup = BeautifulSoup(requests.get(url).content, "html.parser")
for a_tag in soup.findAll("a"):
Expand All @@ -23,29 +28,27 @@ def get_all_website_links(url):
continue
href = urljoin(url, href)
parsed_href = urlparse(href)
# remove URL GET parameters, URL fragments, etc.
href = parsed_href.scheme + "://" + parsed_href.netloc + parsed_href.path
if not is_valid_url(href):
continue
# if href in exclude_links:
# continue
if href in urls:
if href in internal_urls:
continue
if domain_name not in href:
continue
if exclude_path(href, exclude_docs):
continue
urls.add(href)
return urls
print(href)
internal_urls.add(href)

return urls


def crawl(url):
links = get_all_website_links(url)
def extract(url, exclude_docs: list[str] = []):
links = get_all_links(url, exclude_docs)
if links:
for link in links:
crawl(link)
extract(link, exclude_docs)

return internal_urls

start = time.time()
crawl("https://astronomer-providers.readthedocs.io/en/stable")
print(urls)
end = time.time()
print(end - start)

0 comments on commit 7a328a4

Please sign in to comment.