Skip to content

Commit

Permalink
Create TaskFlow API to use WeaviateHook and use taskflow API for ask-…
Browse files Browse the repository at this point in the history
…astro-load.py (#132)

- I have taken airflow Docs ingestion for my implementation. Hence
commits from Michael.
- Create `AskAstroWeaviateHook` which inherits from OSS `WeaviateHook`.
- Add `get_schema`, `check_schema`, `create_schema`,
`handle_upsert_rollback` and `ingest_data`
- Use Taskflow API for `ask-astro-load.py`.
- Add airflow docs as source


closes: #134
  • Loading branch information
sunank200 authored Nov 23, 2023
1 parent 440a15c commit 1067794
Show file tree
Hide file tree
Showing 29 changed files with 1,037 additions and 295 deletions.
1 change: 0 additions & 1 deletion airflow/.dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,3 @@ astro
.env
airflow_settings.yaml
logs/
dags/
6 changes: 1 addition & 5 deletions airflow/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1 @@
# syntax=quay.io/astronomer/airflow-extensions:latest

FROM quay.io/astronomer/astro-runtime:9.5.0-base

COPY include/airflow_provider_weaviate-0.0.1-py3-none-any.whl /tmp
FROM quay.io/astronomer/astro-runtime:9.5.0
55 changes: 55 additions & 0 deletions airflow/dags/ingestion/ask-astro-load-airflow-docs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import os
from datetime import datetime

from include.tasks import split
from include.tasks.extract import airflow_docs
from include.tasks.extract.utils.weaviate.ask_astro_weaviate_hook import AskAstroWeaviateHook

from airflow.decorators import dag, task

ask_astro_env = os.environ.get("ASK_ASTRO_ENV", "dev")

_WEAVIATE_CONN_ID = f"weaviate_{ask_astro_env}"
WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsDev")

ask_astro_weaviate_hook = AskAstroWeaviateHook(_WEAVIATE_CONN_ID)

airflow_docs_base_url = "https://airflow.apache.org/docs/"

default_args = {"retries": 3, "retry_delay": 30}

schedule_interval = "0 5 * * *" if ask_astro_env == "prod" else None


@dag(
schedule_interval=schedule_interval,
start_date=datetime(2023, 9, 27),
catchup=False,
is_paused_upon_creation=True,
default_args=default_args,
)
def ask_astro_load_airflow_docs():
"""
This DAG performs incremental load for any new Airflow docs. Initial load via ask_astro_load_bulk imported
data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator
any existing documents that have been updated will be removed and re-added.
"""

extracted_airflow_docs = task(airflow_docs.extract_airflow_docs)(docs_base_url=airflow_docs_base_url)

split_md_docs = task(split.split_html).expand(dfs=[extracted_airflow_docs])

_import_data = (
task(ask_astro_weaviate_hook.ingest_data, retries=10)
.partial(
class_name=WEAVIATE_CLASS,
existing="upsert",
doc_key="docLink",
batch_params={"batch_size": 1000},
verbose=True,
)
.expand(dfs=[split_md_docs])
)


ask_astro_load_airflow_docs()
47 changes: 32 additions & 15 deletions airflow/dags/ingestion/ask-astro-load-blogs.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,54 @@
import datetime
import os
from datetime import datetime

from include.tasks import ingest, split
from include.tasks import split
from include.tasks.extract import blogs
from include.tasks.extract.utils.weaviate.ask_astro_weaviate_hook import AskAstroWeaviateHook

from airflow.decorators import dag, task

ask_astro_env = os.environ.get("ASK_ASTRO_ENV", "")
ask_astro_env = os.environ.get("ASK_ASTRO_ENV", "dev")

_WEAVIATE_CONN_ID = f"weaviate_{ask_astro_env}"
WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsProd")
WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsDev")
ask_astro_weaviate_hook = AskAstroWeaviateHook(_WEAVIATE_CONN_ID)

blog_cutoff_date = datetime.strptime("2023-01-19", "%Y-%m-%d")
blog_cutoff_date = datetime.date(2023, 1, 19)

default_args = {"retries": 3, "retry_delay": 30}

@dag(schedule_interval="0 5 * * *", start_date=datetime(2023, 9, 27), catchup=False, is_paused_upon_creation=True)
schedule_interval = "0 5 * * *" if ask_astro_env == "prod" else None


@dag(
schedule_interval=schedule_interval,
start_date=datetime.datetime(2023, 9, 27),
catchup=False,
is_paused_upon_creation=True,
default_args=default_args,
)
def ask_astro_load_blogs():
"""
This DAG performs incremental load for any new docs. Initial load via ask_astro_load_bulk imported
data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator
This DAG performs incremental load for any new docs. Initial load via ask_astro_load_bulk imported
data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator
any existing documents that have been updated will be removed and re-added.
"""

blogs_docs = task(blogs.extract_astro_blogs, retries=3)(blog_cutoff_date=blog_cutoff_date)
blogs_docs = task(blogs.extract_astro_blogs)(blog_cutoff_date=blog_cutoff_date)

split_md_docs = task(split.split_markdown).expand(dfs=[blogs_docs])

task.weaviate_import(
ingest.import_upsert_data,
weaviate_conn_id=_WEAVIATE_CONN_ID,
retries=10,
retry_delay=30,
).partial(class_name=WEAVIATE_CLASS, primary_key="docLink").expand(dfs=[split_md_docs])
_import_data = (
task(ask_astro_weaviate_hook.ingest_data, retries=10)
.partial(
class_name=WEAVIATE_CLASS,
existing="upsert",
doc_key="docLink",
batch_params={"batch_size": 1000},
verbose=True,
)
.expand(dfs=[split_md_docs])
)


ask_astro_load_blogs()
70 changes: 36 additions & 34 deletions airflow/dags/ingestion/ask-astro-load-github.py
Original file line number Diff line number Diff line change
@@ -1,79 +1,81 @@
import datetime
import os
from datetime import datetime

from include.tasks import ingest, split
from include.tasks import split
from include.tasks.extract import github
from include.tasks.extract.utils.weaviate.ask_astro_weaviate_hook import AskAstroWeaviateHook

from airflow.decorators import dag, task

ask_astro_env = os.environ.get("ASK_ASTRO_ENV", "")
ask_astro_env = os.environ.get("ASK_ASTRO_ENV", "dev")

_WEAVIATE_CONN_ID = f"weaviate_{ask_astro_env}"
_GITHUB_CONN_ID = "github_ro"
WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsProd")
WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsDev")

ask_astro_weaviate_hook = AskAstroWeaviateHook(_WEAVIATE_CONN_ID)

markdown_docs_sources = [
{"doc_dir": "learn", "repo_base": "astronomer/docs"},
{"doc_dir": "astro", "repo_base": "astronomer/docs"},
{"doc_dir": "", "repo_base": "OpenLineage/docs"},
{"doc_dir": "", "repo_base": "OpenLineage/OpenLineage"},
]
rst_docs_sources = [
{"doc_dir": "docs", "repo_base": "apache/airflow", "exclude_docs": ["changelog.rst", "commits.rst"]},
]
code_samples_sources = [
{"doc_dir": "code-samples", "repo_base": "astronomer/docs"},
]
issues_docs_sources = [
"apache/airflow",
]

default_args = {"retries": 3, "retry_delay": 30}

schedule_interval = "0 5 * * *" if ask_astro_env == "prod" else None

@dag(schedule_interval="0 5 * * *", start_date=datetime(2023, 9, 27), catchup=False, is_paused_upon_creation=True)

@dag(
schedule_interval=schedule_interval,
start_date=datetime.datetime(2023, 9, 27),
catchup=False,
is_paused_upon_creation=True,
default_args=default_args,
)
def ask_astro_load_github():
"""
This DAG performs incremental load for any new docs. Initial load via ask_astro_load_bulk imported
data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator
This DAG performs incremental load for any new docs. Initial load via ask_astro_load_bulk imported
data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator
any existing documents that have been updated will be removed and re-added.
"""

md_docs = (
task(github.extract_github_markdown, retries=3)
task(github.extract_github_markdown)
.partial(github_conn_id=_GITHUB_CONN_ID)
.expand(source=markdown_docs_sources)
)

rst_docs = (
task(github.extract_github_rst, retries=3)
.partial(github_conn_id=_GITHUB_CONN_ID)
.expand(source=rst_docs_sources)
)

issues_docs = (
task(github.extract_github_issues, retries=3)
.partial(github_conn_id=_GITHUB_CONN_ID)
.expand(repo_base=issues_docs_sources)
task(github.extract_github_issues).partial(github_conn_id=_GITHUB_CONN_ID).expand(repo_base=issues_docs_sources)
)

code_samples = (
task(github.extract_github_python, retries=3)
.partial(github_conn_id=_GITHUB_CONN_ID)
.expand(source=code_samples_sources)
task(github.extract_github_python).partial(github_conn_id=_GITHUB_CONN_ID).expand(source=code_samples_sources)
)

markdown_tasks = [md_docs, rst_docs, issues_docs]

split_md_docs = task(split.split_markdown).expand(dfs=markdown_tasks)
split_md_docs = task(split.split_markdown).expand(dfs=[md_docs, issues_docs])

split_code_docs = task(split.split_python).expand(dfs=[code_samples])

task.weaviate_import(
ingest.import_upsert_data,
weaviate_conn_id=_WEAVIATE_CONN_ID,
retries=10,
retry_delay=30,
).partial(class_name=WEAVIATE_CLASS, primary_key="docLink").expand(dfs=[split_md_docs, split_code_docs])

issues_docs >> md_docs >> rst_docs >> code_samples
_import_data = (
task(ask_astro_weaviate_hook.ingest_data, retries=10)
.partial(
class_name=WEAVIATE_CLASS,
existing="upsert",
doc_key="docLink",
batch_params={"batch_size": 1000},
verbose=True,
)
.expand(dfs=[split_md_docs, split_code_docs])
)


ask_astro_load_github()
46 changes: 32 additions & 14 deletions airflow/dags/ingestion/ask-astro-load-registry.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,57 @@
import os
from datetime import datetime

from include.tasks import ingest, split
from include.tasks import split
from include.tasks.extract import registry
from include.tasks.extract.utils.weaviate.ask_astro_weaviate_hook import AskAstroWeaviateHook

from airflow.decorators import dag, task

ask_astro_env = os.environ.get("ASK_ASTRO_ENV", "")
ask_astro_env = os.environ.get("ASK_ASTRO_ENV", "dev")

_WEAVIATE_CONN_ID = f"weaviate_{ask_astro_env}"
WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsProd")
WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsDev")

ask_astro_weaviate_hook = AskAstroWeaviateHook(_WEAVIATE_CONN_ID)

@dag(schedule_interval="0 5 * * *", start_date=datetime(2023, 9, 27), catchup=False, is_paused_upon_creation=True)
default_args = {"retries": 3, "retry_delay": 30}

schedule_interval = "0 5 * * *" if ask_astro_env == "prod" else None


@dag(
schedule_interval=schedule_interval,
start_date=datetime(2023, 9, 27),
catchup=False,
is_paused_upon_creation=True,
default_args=default_args,
)
def ask_astro_load_registry():
"""
This DAG performs incremental load for any new docs. Initial load via ask_astro_load_bulk imported
data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator
This DAG performs incremental load for any new docs. Initial load via ask_astro_load_bulk imported
data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator
any existing documents that have been updated will be removed and re-added.
"""

registry_cells_docs = task(registry.extract_astro_registry_cell_types, retries=3)()
registry_cells_docs = task(registry.extract_astro_registry_cell_types)()

registry_dags_docs = task(registry.extract_astro_registry_dags, retries=3)()
registry_dags_docs = task(registry.extract_astro_registry_dags)()

split_md_docs = task(split.split_markdown).expand(dfs=[registry_cells_docs])

split_code_docs = task(split.split_python).expand(dfs=[registry_dags_docs])

task.weaviate_import(
ingest.import_upsert_data,
weaviate_conn_id=_WEAVIATE_CONN_ID,
retries=10,
retry_delay=30,
).partial(class_name=WEAVIATE_CLASS, primary_key="docLink").expand(dfs=[split_md_docs, split_code_docs])
_import_data = (
task(ask_astro_weaviate_hook.ingest_data, retries=10)
.partial(
class_name=WEAVIATE_CLASS,
existing="upsert",
doc_key="docLink",
batch_params={"batch_size": 1000},
verbose=True,
)
.expand(dfs=[split_md_docs, split_code_docs])
)


ask_astro_load_registry()
43 changes: 30 additions & 13 deletions airflow/dags/ingestion/ask-astro-load-slack.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import os
from datetime import datetime

from include.tasks import ingest, split
from include.tasks import split
from include.tasks.extract import slack
from include.tasks.extract.utils.weaviate.ask_astro_weaviate_hook import AskAstroWeaviateHook

from airflow.decorators import dag, task

ask_astro_env = os.environ.get("ASK_ASTRO_ENV", "")
ask_astro_env = os.environ.get("ASK_ASTRO_ENV", "dev")

_WEAVIATE_CONN_ID = f"weaviate_{ask_astro_env}"
WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsProd")
WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsDev")
ask_astro_weaviate_hook = AskAstroWeaviateHook(_WEAVIATE_CONN_ID)
slack_channel_sources = [
{
"channel_name": "troubleshooting",
Expand All @@ -20,25 +22,40 @@
}
]

default_args = {"retries": 3, "retry_delay": 30}

@dag(schedule_interval="0 5 * * *", start_date=datetime(2023, 9, 27), catchup=False, is_paused_upon_creation=True)
schedule_interval = "0 5 * * *" if ask_astro_env == "prod" else None


@dag(
schedule_interval=schedule_interval,
start_date=datetime(2023, 9, 27),
catchup=False,
is_paused_upon_creation=True,
default_args=default_args,
)
def ask_astro_load_slack():
"""
This DAG performs incremental load for any new slack threads. The slack archive is a point-in-time capture. This
DAG should run nightly to capture threads between archive periods. By using the upsert logic of the
This DAG performs incremental load for any new slack threads. The slack archive is a point-in-time capture. This
DAG should run nightly to capture threads between archive periods. By using the upsert logic of the
weaviate_import decorator any existing documents that have been updated will be removed and re-added.
"""

slack_docs = task(slack.extract_slack, retries=3).expand(source=slack_channel_sources)
slack_docs = task(slack.extract_slack).expand(source=slack_channel_sources)

split_md_docs = task(split.split_markdown).expand(dfs=[slack_docs])

task.weaviate_import(
ingest.import_upsert_data,
weaviate_conn_id=_WEAVIATE_CONN_ID,
retries=10,
retry_delay=30,
).partial(class_name=WEAVIATE_CLASS, primary_key="docLink").expand(dfs=[split_md_docs])
_import_data = (
task(ask_astro_weaviate_hook.ingest_data, retries=10)
.partial(
class_name=WEAVIATE_CLASS,
existing="upsert",
doc_key="docLink",
batch_params={"batch_size": 1000},
verbose=True,
)
.expand(dfs=[split_md_docs])
)


ask_astro_load_slack()
Loading

0 comments on commit 1067794

Please sign in to comment.