Skip to content

Commit

Permalink
Merge branch 'main' into banner-alert
Browse files Browse the repository at this point in the history
  • Loading branch information
davidgxue committed Feb 28, 2024
2 parents b50bff6 + daf6de8 commit cca7041
Show file tree
Hide file tree
Showing 32 changed files with 305 additions and 260 deletions.
2 changes: 1 addition & 1 deletion CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -1 +1 @@
* @Lee-W @pankajastro @sunank200 @davidgxue
* @Lee-W @pankajastro @sunank200 @davidgxue @jlaneve
2 changes: 1 addition & 1 deletion airflow/dags/ingestion/ask-astro-forum-load.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def ask_astro_load_astro_forum():
class_name=WEAVIATE_CLASS,
existing="replace",
document_column="docLink",
batch_config_params={"batch_size": 1000},
batch_config_params={"batch_size": 7, "dynamic": False},
verbose=True,
conn_id=_WEAVIATE_CONN_ID,
task_id="WeaviateDocumentIngestOperator",
Expand Down
25 changes: 6 additions & 19 deletions airflow/dags/ingestion/ask-astro-load-airflow-docs.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import os
from datetime import datetime

import pandas as pd
from include.utils.slack import send_failure_notification

from airflow.decorators import dag, task
Expand All @@ -20,21 +19,6 @@
schedule_interval = os.environ.get("INGESTION_SCHEDULE", "0 5 * * 2") if ask_astro_env == "prod" else None


@task
def split_docs(urls: str, chunk_size: int = 100) -> list[list[pd.DataFrame]]:
"""
Split the URLs in chunk and get dataframe for the content
param urls: List for HTTP URL
param chunk_size: Max number of document in split chunk
"""
from include.tasks import split
from include.tasks.extract.utils.html_utils import urls_to_dataframe

chunked_urls = split.split_list(list(urls), chunk_size=chunk_size)
return [[urls_to_dataframe(chunk_url)] for chunk_url in chunked_urls]


@dag(
schedule_interval=schedule_interval,
start_date=datetime(2023, 9, 27),
Expand All @@ -51,19 +35,22 @@ def ask_astro_load_airflow_docs():
data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator
any existing documents that have been updated will be removed and re-added.
"""
from include.tasks import split
from include.tasks.extract import airflow_docs

extracted_airflow_docs = task(airflow_docs.extract_airflow_docs)(docs_base_url=airflow_docs_base_url)
extracted_airflow_docs = task(split.split_html).expand(
dfs=[airflow_docs.extract_airflow_docs(docs_base_url=airflow_docs_base_url)]
)

_import_data = WeaviateDocumentIngestOperator.partial(
class_name=WEAVIATE_CLASS,
existing="replace",
document_column="docLink",
batch_config_params={"batch_size": 1000},
batch_config_params={"batch_size": 7, "dynamic": False},
verbose=True,
conn_id=_WEAVIATE_CONN_ID,
task_id="WeaviateDocumentIngestOperator",
).expand(input_data=split_docs(extracted_airflow_docs, chunk_size=100))
).expand(input_data=[extracted_airflow_docs])


ask_astro_load_airflow_docs()
2 changes: 1 addition & 1 deletion airflow/dags/ingestion/ask-astro-load-astro-cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def ask_astro_load_astro_cli_docs():
class_name=WEAVIATE_CLASS,
existing="replace",
document_column="docLink",
batch_config_params={"batch_size": 1000},
batch_config_params={"batch_size": 7, "dynamic": False},
verbose=True,
conn_id=_WEAVIATE_CONN_ID,
task_id="WeaviateDocumentIngestOperator",
Expand Down
2 changes: 1 addition & 1 deletion airflow/dags/ingestion/ask-astro-load-astro-sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def ask_astro_load_astro_sdk():
class_name=WEAVIATE_CLASS,
existing="replace",
document_column="docLink",
batch_config_params={"batch_size": 1000},
batch_config_params={"batch_size": 7, "dynamic": False},
verbose=True,
conn_id=_WEAVIATE_CONN_ID,
task_id="WeaviateDocumentIngestOperator",
Expand Down
6 changes: 3 additions & 3 deletions airflow/dags/ingestion/ask-astro-load-astronomer-docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,17 @@ def ask_astro_load_astronomer_docs():

astro_docs = task(extract_astro_docs)()

split_md_docs = task(split.split_markdown).expand(dfs=[astro_docs])
split_html_docs = task(split.split_html).expand(dfs=[astro_docs])

_import_data = WeaviateDocumentIngestOperator.partial(
class_name=WEAVIATE_CLASS,
existing="replace",
document_column="docLink",
batch_config_params={"batch_size": 1000},
batch_config_params={"batch_size": 7, "dynamic": False},
verbose=True,
conn_id=_WEAVIATE_CONN_ID,
task_id="WeaviateDocumentIngestOperator",
).expand(input_data=[split_md_docs])
).expand(input_data=[split_html_docs])


ask_astro_load_astronomer_docs()
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def ask_astro_load_astronomer_providers():
class_name=WEAVIATE_CLASS,
existing="replace",
document_column="docLink",
batch_config_params={"batch_size": 1000},
batch_config_params={"batch_size": 7, "dynamic": False},
verbose=True,
conn_id=_WEAVIATE_CONN_ID,
task_id="WeaviateDocumentIngestOperator",
Expand Down
2 changes: 1 addition & 1 deletion airflow/dags/ingestion/ask-astro-load-blogs.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def ask_astro_load_blogs():
class_name=WEAVIATE_CLASS,
existing="replace",
document_column="docLink",
batch_config_params={"batch_size": 1000},
batch_config_params={"batch_size": 7, "dynamic": False},
verbose=True,
conn_id=_WEAVIATE_CONN_ID,
task_id="WeaviateDocumentIngestOperator",
Expand Down
51 changes: 51 additions & 0 deletions airflow/dags/ingestion/ask-astro-load-cosmos-docs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import os
from datetime import datetime

from include.tasks.extract.cosmos_docs import extract_cosmos_docs
from include.utils.slack import send_failure_notification

from airflow.decorators import dag, task
from airflow.providers.weaviate.operators.weaviate import WeaviateDocumentIngestOperator

ask_astro_env = os.environ.get("ASK_ASTRO_ENV", "dev")

_WEAVIATE_CONN_ID = f"weaviate_{ask_astro_env}"
WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsDev")


schedule_interval = os.environ.get("INGESTION_SCHEDULE", "0 5 * * 2") if ask_astro_env == "prod" else None


@dag(
schedule=schedule_interval,
start_date=datetime(2023, 9, 27),
catchup=False,
is_paused_upon_creation=True,
default_args={"retries": 3, "retry_delay": 30},
on_failure_callback=send_failure_notification(
dag_id="{{ dag.dag_id }}", execution_date="{{ dag_run.execution_date }}"
),
)
def ask_astro_load_cosmos_docs():
"""
This DAG performs incremental load for any new Cosmos docs. Initial load via ask_astro_load_bulk imported
data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator
any existing documents that have been updated will be removed and re-added.
"""

from include.tasks import split

split_docs = task(split.split_html).expand(dfs=[extract_cosmos_docs()])

_import_data = WeaviateDocumentIngestOperator.partial(
class_name=WEAVIATE_CLASS,
existing="replace",
document_column="docLink",
batch_config_params={"batch_size": 1000},
verbose=True,
conn_id=_WEAVIATE_CONN_ID,
task_id="load_cosmos_docs_to_weaviate",
).expand(input_data=[split_docs])


ask_astro_load_cosmos_docs()
2 changes: 1 addition & 1 deletion airflow/dags/ingestion/ask-astro-load-github.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def ask_astro_load_github():
class_name=WEAVIATE_CLASS,
existing="replace",
document_column="docLink",
batch_config_params={"batch_size": 1000},
batch_config_params={"batch_size": 7, "dynamic": False},
verbose=True,
conn_id=_WEAVIATE_CONN_ID,
task_id="WeaviateDocumentIngestOperator",
Expand Down
2 changes: 1 addition & 1 deletion airflow/dags/ingestion/ask-astro-load-registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def ask_astro_load_registry():
class_name=WEAVIATE_CLASS,
existing="replace",
document_column="docLink",
batch_config_params={"batch_size": 1000},
batch_config_params={"batch_size": 7, "dynamic": False},
verbose=True,
conn_id=_WEAVIATE_CONN_ID,
task_id="WeaviateDocumentIngestOperator",
Expand Down
2 changes: 1 addition & 1 deletion airflow/dags/ingestion/ask-astro-load-slack.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def ask_astro_load_slack():
class_name=WEAVIATE_CLASS,
existing="replace",
document_column="docLink",
batch_config_params={"batch_size": 1000},
batch_config_params={"batch_size": 7, "dynamic": False},
verbose=True,
conn_id=_WEAVIATE_CONN_ID,
task_id="WeaviateDocumentIngestOperator",
Expand Down
2 changes: 1 addition & 1 deletion airflow/dags/ingestion/ask-astro-load-stackoverflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def ask_astro_load_stackoverflow():
class_name=WEAVIATE_CLASS,
existing="replace",
document_column="docLink",
batch_config_params={"batch_size": 1000},
batch_config_params={"batch_size": 7, "dynamic": False},
verbose=True,
conn_id=_WEAVIATE_CONN_ID,
task_id="WeaviateDocumentIngestOperator",
Expand Down
Loading

0 comments on commit cca7041

Please sign in to comment.