Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use AskAstroWeaviateHook for ask-astro-load-github.py #157

Closed
wants to merge 15 commits into from
Closed
1 change: 0 additions & 1 deletion airflow/.dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,3 @@ astro
.env
airflow_settings.yaml
logs/
dags/
6 changes: 1 addition & 5 deletions airflow/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1 @@
# syntax=quay.io/astronomer/airflow-extensions:latest

FROM quay.io/astronomer/astro-runtime:9.5.0-base

COPY include/airflow_provider_weaviate-0.0.1-py3-none-any.whl /tmp
FROM quay.io/astronomer/astro-runtime:9.5.0
47 changes: 47 additions & 0 deletions airflow/dags/ingestion/ask-astro-load-airflow-docs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import os
from datetime import datetime

from include.tasks import ingest, split
from include.tasks.extract import airflow_docs

from airflow.decorators import dag, task

ask_astro_env = os.environ.get("ASK_ASTRO_ENV", "")

_WEAVIATE_CONN_ID = f"weaviate_{ask_astro_env}"
WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsProd")

airflow_docs_base_url = "https://airflow.apache.org/docs/"

default_args = {"retries": 3, "retry_delay": 30}

schedule_interval = "0 5 * * *" if ask_astro_env == "prod" else None


@dag(
schedule_interval=schedule_interval,
start_date=datetime(2023, 9, 27),
catchup=False,
is_paused_upon_creation=True,
default_args=default_args,
)
def ask_astro_load_airflow_docs():
"""
This DAG performs incremental load for any new Airflow docs. Initial load via ask_astro_load_bulk imported
data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator
any existing documents that have been updated will be removed and re-added.
"""

extracted_airflow_docs = task(airflow_docs.extract_airflow_docs)(docs_base_url=airflow_docs_base_url)

split_md_docs = task(split.split_html).expand(dfs=[extracted_airflow_docs])

task.weaviate_import(
ingest.import_upsert_data,
weaviate_conn_id=_WEAVIATE_CONN_ID,
).partial(
class_name=WEAVIATE_CLASS, primary_key="docLink"
).expand(dfs=[split_md_docs])


ask_astro_load_airflow_docs()
28 changes: 19 additions & 9 deletions airflow/dags/ingestion/ask-astro-load-blogs.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import datetime
import os
from datetime import datetime

from include.tasks import ingest, split
from include.tasks.extract import blogs
Expand All @@ -11,27 +11,37 @@
_WEAVIATE_CONN_ID = f"weaviate_{ask_astro_env}"
WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsProd")

blog_cutoff_date = datetime.strptime("2023-01-19", "%Y-%m-%d")
blog_cutoff_date = datetime.date(2023, 1, 19)

default_args = {"retries": 3, "retry_delay": 30}

@dag(schedule_interval="0 5 * * *", start_date=datetime(2023, 9, 27), catchup=False, is_paused_upon_creation=True)
schedule_interval = "0 5 * * *" if ask_astro_env == "prod" else None


@dag(
schedule_interval=schedule_interval,
start_date=datetime.datetime(2023, 9, 27),
catchup=False,
is_paused_upon_creation=True,
default_args=default_args,
)
def ask_astro_load_blogs():
"""
This DAG performs incremental load for any new docs. Initial load via ask_astro_load_bulk imported
data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator
This DAG performs incremental load for any new docs. Initial load via ask_astro_load_bulk imported
data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator
any existing documents that have been updated will be removed and re-added.
"""

blogs_docs = task(blogs.extract_astro_blogs, retries=3)(blog_cutoff_date=blog_cutoff_date)
blogs_docs = task(blogs.extract_astro_blogs)(blog_cutoff_date=blog_cutoff_date)

split_md_docs = task(split.split_markdown).expand(dfs=[blogs_docs])

task.weaviate_import(
ingest.import_upsert_data,
weaviate_conn_id=_WEAVIATE_CONN_ID,
retries=10,
retry_delay=30,
).partial(class_name=WEAVIATE_CLASS, primary_key="docLink").expand(dfs=[split_md_docs])
).partial(
class_name=WEAVIATE_CLASS, primary_key="docLink"
).expand(dfs=[split_md_docs])


ask_astro_load_blogs()
77 changes: 42 additions & 35 deletions airflow/dags/ingestion/ask-astro-load-github.py
Original file line number Diff line number Diff line change
@@ -1,79 +1,86 @@
import datetime
import os
from datetime import datetime

from include.tasks import ingest, split
from dateutil.relativedelta import relativedelta
from include.tasks import split
from include.tasks.extract import github
from include.tasks.extract.utils.weaviate.ask_astro_weaviate_hook import AskAstroWeaviateHook

from airflow.decorators import dag, task

ask_astro_env = os.environ.get("ASK_ASTRO_ENV", "")
ask_astro_env = os.environ.get("ASK_ASTRO_ENV", "dev")

_WEAVIATE_CONN_ID = f"weaviate_{ask_astro_env}"
_GITHUB_CONN_ID = "github_ro"
WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsProd")
WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsDev")

ask_astro_weaviate_hook = AskAstroWeaviateHook(_WEAVIATE_CONN_ID)

markdown_docs_sources = [
{"doc_dir": "learn", "repo_base": "astronomer/docs"},
{"doc_dir": "astro", "repo_base": "astronomer/docs"},
{"doc_dir": "", "repo_base": "OpenLineage/docs"},
{"doc_dir": "", "repo_base": "OpenLineage/OpenLineage"},
]
rst_docs_sources = [
{"doc_dir": "docs", "repo_base": "apache/airflow", "exclude_docs": ["changelog.rst", "commits.rst"]},
]
code_samples_sources = [
{"doc_dir": "code-samples", "repo_base": "astronomer/docs"},
]
issues_docs_sources = [
"apache/airflow",
{
"repo_base": "apache/airflow",
"cutoff_date": datetime.date.today() - relativedelta(months=1),
"cutoff_issue_number": 30000,
}
]

default_args = {"retries": 3, "retry_delay": 30}

schedule_interval = "0 5 * * *" if ask_astro_env == "prod" else None

@dag(schedule_interval="0 5 * * *", start_date=datetime(2023, 9, 27), catchup=False, is_paused_upon_creation=True)

@dag(
schedule_interval=schedule_interval,
start_date=datetime.datetime(2023, 9, 27),
catchup=False,
is_paused_upon_creation=True,
default_args=default_args,
)
def ask_astro_load_github():
"""
This DAG performs incremental load for any new docs. Initial load via ask_astro_load_bulk imported
data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator
This DAG performs incremental load for any new docs. Initial load via ask_astro_load_bulk imported
data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator
any existing documents that have been updated will be removed and re-added.
"""

md_docs = (
task(github.extract_github_markdown, retries=3)
task(github.extract_github_markdown)
.partial(github_conn_id=_GITHUB_CONN_ID)
.expand(source=markdown_docs_sources)
)

rst_docs = (
task(github.extract_github_rst, retries=3)
.partial(github_conn_id=_GITHUB_CONN_ID)
.expand(source=rst_docs_sources)
)

issues_docs = (
task(github.extract_github_issues, retries=3)
.partial(github_conn_id=_GITHUB_CONN_ID)
.expand(repo_base=issues_docs_sources)
task(github.extract_github_issues).partial(github_conn_id=_GITHUB_CONN_ID).expand(source=issues_docs_sources)
)

code_samples = (
task(github.extract_github_python, retries=3)
.partial(github_conn_id=_GITHUB_CONN_ID)
.expand(source=code_samples_sources)
task(github.extract_github_python).partial(github_conn_id=_GITHUB_CONN_ID).expand(source=code_samples_sources)
)

markdown_tasks = [md_docs, rst_docs, issues_docs]

split_md_docs = task(split.split_markdown).expand(dfs=markdown_tasks)
split_md_docs = task(split.split_markdown).expand(dfs=[md_docs, issues_docs])

split_code_docs = task(split.split_python).expand(dfs=[code_samples])

task.weaviate_import(
ingest.import_upsert_data,
weaviate_conn_id=_WEAVIATE_CONN_ID,
retries=10,
retry_delay=30,
).partial(class_name=WEAVIATE_CLASS, primary_key="docLink").expand(dfs=[split_md_docs, split_code_docs])

issues_docs >> md_docs >> rst_docs >> code_samples
_import_data = (
task(ask_astro_weaviate_hook.ingest_data, retries=10)
.partial(
class_name=WEAVIATE_CLASS,
existing="upsert",
doc_key="docLink",
batch_params={"batch_size": 1000},
verbose=True,
)
.expand(dfs=[split_md_docs, split_code_docs])
)


ask_astro_load_github()
26 changes: 18 additions & 8 deletions airflow/dags/ingestion/ask-astro-load-registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,28 @@
_WEAVIATE_CONN_ID = f"weaviate_{ask_astro_env}"
WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsProd")

default_args = {"retries": 3, "retry_delay": 30}

@dag(schedule_interval="0 5 * * *", start_date=datetime(2023, 9, 27), catchup=False, is_paused_upon_creation=True)
schedule_interval = "0 5 * * *" if ask_astro_env == "prod" else None


@dag(
schedule_interval=schedule_interval,
start_date=datetime(2023, 9, 27),
catchup=False,
is_paused_upon_creation=True,
default_args=default_args,
)
def ask_astro_load_registry():
"""
This DAG performs incremental load for any new docs. Initial load via ask_astro_load_bulk imported
data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator
This DAG performs incremental load for any new docs. Initial load via ask_astro_load_bulk imported
data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator
any existing documents that have been updated will be removed and re-added.
"""

registry_cells_docs = task(registry.extract_astro_registry_cell_types, retries=3)()
registry_cells_docs = task(registry.extract_astro_registry_cell_types)()

registry_dags_docs = task(registry.extract_astro_registry_dags, retries=3)()
registry_dags_docs = task(registry.extract_astro_registry_dags)()

split_md_docs = task(split.split_markdown).expand(dfs=[registry_cells_docs])

Expand All @@ -31,9 +41,9 @@ def ask_astro_load_registry():
task.weaviate_import(
ingest.import_upsert_data,
weaviate_conn_id=_WEAVIATE_CONN_ID,
retries=10,
retry_delay=30,
).partial(class_name=WEAVIATE_CLASS, primary_key="docLink").expand(dfs=[split_md_docs, split_code_docs])
).partial(
class_name=WEAVIATE_CLASS, primary_key="docLink"
).expand(dfs=[split_md_docs, split_code_docs])


ask_astro_load_registry()
24 changes: 17 additions & 7 deletions airflow/dags/ingestion/ask-astro-load-slack.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,35 @@
}
]

default_args = {"retries": 3, "retry_delay": 30}

@dag(schedule_interval="0 5 * * *", start_date=datetime(2023, 9, 27), catchup=False, is_paused_upon_creation=True)
schedule_interval = "0 5 * * *" if ask_astro_env == "prod" else None


@dag(
schedule_interval=schedule_interval,
start_date=datetime(2023, 9, 27),
catchup=False,
is_paused_upon_creation=True,
default_args=default_args,
)
def ask_astro_load_slack():
"""
This DAG performs incremental load for any new slack threads. The slack archive is a point-in-time capture. This
DAG should run nightly to capture threads between archive periods. By using the upsert logic of the
This DAG performs incremental load for any new slack threads. The slack archive is a point-in-time capture. This
DAG should run nightly to capture threads between archive periods. By using the upsert logic of the
weaviate_import decorator any existing documents that have been updated will be removed and re-added.
"""

slack_docs = task(slack.extract_slack, retries=3).expand(source=slack_channel_sources)
slack_docs = task(slack.extract_slack).expand(source=slack_channel_sources)

split_md_docs = task(split.split_markdown).expand(dfs=[slack_docs])

task.weaviate_import(
ingest.import_upsert_data,
weaviate_conn_id=_WEAVIATE_CONN_ID,
retries=10,
retry_delay=30,
).partial(class_name=WEAVIATE_CLASS, primary_key="docLink").expand(dfs=[split_md_docs])
).partial(
class_name=WEAVIATE_CLASS, primary_key="docLink"
).expand(dfs=[split_md_docs])


ask_astro_load_slack()
24 changes: 17 additions & 7 deletions airflow/dags/ingestion/ask-astro-load-stackoverflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,27 @@
"airflow",
]

default_args = {"retries": 3, "retry_delay": 30}

@dag(schedule_interval=None, start_date=datetime(2023, 9, 27), catchup=False, is_paused_upon_creation=True)
schedule_interval = "0 5 * * *" if ask_astro_env == "prod" else None


@dag(
schedule_interval=schedule_interval,
start_date=datetime(2023, 9, 27),
catchup=False,
is_paused_upon_creation=True,
default_args=default_args,
)
def ask_astro_load_stackoverflow():
"""
This DAG performs incremental load for any new docs. Initial load via ask_astro_load_bulk imported
data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator
This DAG performs incremental load for any new docs. Initial load via ask_astro_load_bulk imported
data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator
any existing documents that have been updated will be removed and re-added.
"""

stack_overflow_docs = (
task(stack_overflow.extract_stack_overflow_archive, retries=3)
task(stack_overflow.extract_stack_overflow)
.partial(stackoverflow_cutoff_date=stackoverflow_cutoff_date)
.expand(tag=stackoverflow_tags)
)
Expand All @@ -36,9 +46,9 @@ def ask_astro_load_stackoverflow():
task.weaviate_import(
ingest.import_upsert_data,
weaviate_conn_id=_WEAVIATE_CONN_ID,
retries=10,
retry_delay=30,
).partial(class_name=WEAVIATE_CLASS, primary_key="docLink").expand(dfs=[split_md_docs])
).partial(
class_name=WEAVIATE_CLASS, primary_key="docLink"
).expand(dfs=[split_md_docs])


ask_astro_load_stackoverflow()
Loading
Loading