Skip to content

Commit

Permalink
fixes #105, #106, #107, #108
Browse files Browse the repository at this point in the history
  • Loading branch information
mpgreg authored and sunank200 committed Nov 20, 2023
1 parent f194886 commit 000a918
Show file tree
Hide file tree
Showing 11 changed files with 353 additions and 87 deletions.
47 changes: 47 additions & 0 deletions airflow/dags/ingestion/ask-astro-load-airflow-docs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import os
from datetime import datetime

from include.tasks import ingest, split
from include.tasks.extract import airflow_docs

from airflow.decorators import dag, task

ask_astro_env = os.environ.get("ASK_ASTRO_ENV", "")

_WEAVIATE_CONN_ID = f"weaviate_{ask_astro_env}"
WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsProd")

airflow_docs_base_url = "https://airflow.apache.org/docs/"

default_args = {"retries": 3, "retry_delay": 30}

schedule_interval = "0 5 * * *" if ask_astro_env == "prod" else None


@dag(
schedule_interval=schedule_interval,
start_date=datetime(2023, 9, 27),
catchup=False,
is_paused_upon_creation=True,
default_args=default_args,
)
def ask_astro_load_airflow_docs():
"""
This DAG performs incremental load for any new Airflow docs. Initial load via ask_astro_load_bulk imported
data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator
any existing documents that have been updated will be removed and re-added.
"""

extracted_airflow_docs = task(airflow_docs.extract_airflow_docs)(docs_base_url=airflow_docs_base_url)

split_md_docs = task(split.split_html).expand(dfs=[extracted_airflow_docs])

task.weaviate_import(
ingest.import_upsert_data,
weaviate_conn_id=_WEAVIATE_CONN_ID,
).partial(
class_name=WEAVIATE_CLASS, primary_key="docLink"
).expand(dfs=[split_md_docs])


ask_astro_load_airflow_docs()
20 changes: 15 additions & 5 deletions airflow/dags/ingestion/ask-astro-load-blogs.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,35 @@

blog_cutoff_date = datetime.strptime("2023-01-19", "%Y-%m-%d")

default_args = {"retries": 3, "retry_delay": 30}

@dag(schedule_interval="0 5 * * *", start_date=datetime(2023, 9, 27), catchup=False, is_paused_upon_creation=True)
schedule_interval = "0 5 * * *" if ask_astro_env == "prod" else None


@dag(
schedule_interval=schedule_interval,
start_date=datetime(2023, 9, 27),
catchup=False,
is_paused_upon_creation=True,
default_args=default_args,
)
def ask_astro_load_blogs():
"""
This DAG performs incremental load for any new docs. Initial load via ask_astro_load_bulk imported
data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator
any existing documents that have been updated will be removed and re-added.
"""

blogs_docs = task(blogs.extract_astro_blogs, retries=3)(blog_cutoff_date=blog_cutoff_date)
blogs_docs = task(blogs.extract_astro_blogs)(blog_cutoff_date=blog_cutoff_date)

split_md_docs = task(split.split_markdown).expand(dfs=[blogs_docs])

task.weaviate_import(
ingest.import_upsert_data,
weaviate_conn_id=_WEAVIATE_CONN_ID,
retries=10,
retry_delay=30,
).partial(class_name=WEAVIATE_CLASS, primary_key="docLink").expand(dfs=[split_md_docs])
).partial(
class_name=WEAVIATE_CLASS, primary_key="docLink"
).expand(dfs=[split_md_docs])


ask_astro_load_blogs()
43 changes: 19 additions & 24 deletions airflow/dags/ingestion/ask-astro-load-github.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,25 @@
{"doc_dir": "", "repo_base": "OpenLineage/docs"},
{"doc_dir": "", "repo_base": "OpenLineage/OpenLineage"},
]
rst_docs_sources = [
{"doc_dir": "docs", "repo_base": "apache/airflow", "exclude_docs": ["changelog.rst", "commits.rst"]},
]
code_samples_sources = [
{"doc_dir": "code-samples", "repo_base": "astronomer/docs"},
]
issues_docs_sources = [
"apache/airflow",
]

default_args = {"retries": 3, "retry_delay": 30}

schedule_interval = "0 5 * * *" if ask_astro_env == "prod" else None


@dag(schedule_interval="0 5 * * *", start_date=datetime(2023, 9, 27), catchup=False, is_paused_upon_creation=True)
@dag(
schedule_interval=schedule_interval,
start_date=datetime(2023, 9, 27),
catchup=False,
is_paused_upon_creation=True,
default_args=default_args,
)
def ask_astro_load_github():
"""
This DAG performs incremental load for any new docs. Initial load via ask_astro_load_bulk imported
Expand All @@ -37,43 +44,31 @@ def ask_astro_load_github():
"""

md_docs = (
task(github.extract_github_markdown, retries=3)
task(github.extract_github_markdown)
.partial(github_conn_id=_GITHUB_CONN_ID)
.expand(source=markdown_docs_sources)
)

rst_docs = (
task(github.extract_github_rst, retries=3)
.partial(github_conn_id=_GITHUB_CONN_ID)
.expand(source=rst_docs_sources)
)

issues_docs = (
task(github.extract_github_issues, retries=3)
.partial(github_conn_id=_GITHUB_CONN_ID)
.expand(repo_base=issues_docs_sources)
task(github.extract_github_issues).partial(github_conn_id=_GITHUB_CONN_ID).expand(repo_base=issues_docs_sources)
)

code_samples = (
task(github.extract_github_python, retries=3)
.partial(github_conn_id=_GITHUB_CONN_ID)
.expand(source=code_samples_sources)
task(github.extract_github_python).partial(github_conn_id=_GITHUB_CONN_ID).expand(source=code_samples_sources)
)

markdown_tasks = [md_docs, rst_docs, issues_docs]

split_md_docs = task(split.split_markdown).expand(dfs=markdown_tasks)
split_md_docs = task(split.split_markdown).expand(dfs=[md_docs, issues_docs])

split_code_docs = task(split.split_python).expand(dfs=[code_samples])

task.weaviate_import(
ingest.import_upsert_data,
weaviate_conn_id=_WEAVIATE_CONN_ID,
retries=10,
retry_delay=30,
).partial(class_name=WEAVIATE_CLASS, primary_key="docLink").expand(dfs=[split_md_docs, split_code_docs])
).partial(
class_name=WEAVIATE_CLASS, primary_key="docLink"
).expand(dfs=[split_md_docs, split_code_docs])

issues_docs >> md_docs >> rst_docs >> code_samples
issues_docs >> md_docs >> code_samples


ask_astro_load_github()
22 changes: 16 additions & 6 deletions airflow/dags/ingestion/ask-astro-load-registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,28 @@
_WEAVIATE_CONN_ID = f"weaviate_{ask_astro_env}"
WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsProd")

default_args = {"retries": 3, "retry_delay": 30}

@dag(schedule_interval="0 5 * * *", start_date=datetime(2023, 9, 27), catchup=False, is_paused_upon_creation=True)
schedule_interval = "0 5 * * *" if ask_astro_env == "prod" else None


@dag(
schedule_interval=schedule_interval,
start_date=datetime(2023, 9, 27),
catchup=False,
is_paused_upon_creation=True,
default_args=default_args,
)
def ask_astro_load_registry():
"""
This DAG performs incremental load for any new docs. Initial load via ask_astro_load_bulk imported
data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator
any existing documents that have been updated will be removed and re-added.
"""

registry_cells_docs = task(registry.extract_astro_registry_cell_types, retries=3)()
registry_cells_docs = task(registry.extract_astro_registry_cell_types)()

registry_dags_docs = task(registry.extract_astro_registry_dags, retries=3)()
registry_dags_docs = task(registry.extract_astro_registry_dags)()

split_md_docs = task(split.split_markdown).expand(dfs=[registry_cells_docs])

Expand All @@ -31,9 +41,9 @@ def ask_astro_load_registry():
task.weaviate_import(
ingest.import_upsert_data,
weaviate_conn_id=_WEAVIATE_CONN_ID,
retries=10,
retry_delay=30,
).partial(class_name=WEAVIATE_CLASS, primary_key="docLink").expand(dfs=[split_md_docs, split_code_docs])
).partial(
class_name=WEAVIATE_CLASS, primary_key="docLink"
).expand(dfs=[split_md_docs, split_code_docs])


ask_astro_load_registry()
20 changes: 15 additions & 5 deletions airflow/dags/ingestion/ask-astro-load-slack.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,35 @@
}
]

default_args = {"retries": 3, "retry_delay": 30}

@dag(schedule_interval="0 5 * * *", start_date=datetime(2023, 9, 27), catchup=False, is_paused_upon_creation=True)
schedule_interval = "0 5 * * *" if ask_astro_env == "prod" else None


@dag(
schedule_interval=schedule_interval,
start_date=datetime(2023, 9, 27),
catchup=False,
is_paused_upon_creation=True,
default_args=default_args,
)
def ask_astro_load_slack():
"""
This DAG performs incremental load for any new slack threads. The slack archive is a point-in-time capture. This
DAG should run nightly to capture threads between archive periods. By using the upsert logic of the
weaviate_import decorator any existing documents that have been updated will be removed and re-added.
"""

slack_docs = task(slack.extract_slack, retries=3).expand(source=slack_channel_sources)
slack_docs = task(slack.extract_slack).expand(source=slack_channel_sources)

split_md_docs = task(split.split_markdown).expand(dfs=[slack_docs])

task.weaviate_import(
ingest.import_upsert_data,
weaviate_conn_id=_WEAVIATE_CONN_ID,
retries=10,
retry_delay=30,
).partial(class_name=WEAVIATE_CLASS, primary_key="docLink").expand(dfs=[split_md_docs])
).partial(
class_name=WEAVIATE_CLASS, primary_key="docLink"
).expand(dfs=[split_md_docs])


ask_astro_load_slack()
20 changes: 15 additions & 5 deletions airflow/dags/ingestion/ask-astro-load-stackoverflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,18 @@
"airflow",
]

default_args = {"retries": 3, "retry_delay": 30}

@dag(schedule_interval=None, start_date=datetime(2023, 9, 27), catchup=False, is_paused_upon_creation=True)
schedule_interval = "0 5 * * *" if ask_astro_env == "prod" else None


@dag(
schedule_interval=schedule_interval,
start_date=datetime(2023, 9, 27),
catchup=False,
is_paused_upon_creation=True,
default_args=default_args,
)
def ask_astro_load_stackoverflow():
"""
This DAG performs incremental load for any new docs. Initial load via ask_astro_load_bulk imported
Expand All @@ -26,7 +36,7 @@ def ask_astro_load_stackoverflow():
"""

stack_overflow_docs = (
task(stack_overflow.extract_stack_overflow_archive, retries=3)
task(stack_overflow.extract_stack_overflow_archive)
.partial(stackoverflow_cutoff_date=stackoverflow_cutoff_date)
.expand(tag=stackoverflow_tags)
)
Expand All @@ -36,9 +46,9 @@ def ask_astro_load_stackoverflow():
task.weaviate_import(
ingest.import_upsert_data,
weaviate_conn_id=_WEAVIATE_CONN_ID,
retries=10,
retry_delay=30,
).partial(class_name=WEAVIATE_CLASS, primary_key="docLink").expand(dfs=[split_md_docs])
).partial(
class_name=WEAVIATE_CLASS, primary_key="docLink"
).expand(dfs=[split_md_docs])


ask_astro_load_stackoverflow()
Loading

0 comments on commit 000a918

Please sign in to comment.