Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing collecton ingest DAG #261

Open
wants to merge 1 commit into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions dags/generate_dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@ def generate_dags():

from pathlib import Path

airflow_vars = Variable.get("aws_dags_variables")
airflow_vars_json = json.loads(airflow_vars)
bucket = airflow_vars_json.get("EVENT_BUCKET")
airflow_vars = Variable.get("aws_dags_variables", default_var={}, deserialize_json=True)
bucket = airflow_vars.get("EVENT_BUCKET")

try:
client = boto3.client("s3")
Expand Down
80 changes: 37 additions & 43 deletions dags/veda_data_pipeline/groups/collection_group.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import requests
from airflow.models.variable import Variable
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup
from airflow.decorators import task
from veda_data_pipeline.utils.collection_generation import GenerateCollection
from veda_data_pipeline.utils.submit_stac import submission_handler

generator = GenerateCollection()


def check_collection_exists(endpoint: str, collection_id: str):
"""
Expand All @@ -24,27 +22,7 @@ def check_collection_exists(endpoint: str, collection_id: str):
)


def ingest_collection_task(ti):
"""
Ingest a collection into the STAC catalog

Args:
dataset (Dict[str, Any]): dataset dictionary (JSON)
role_arn (str): role arn for Zarr collection generation
"""
import json
collection = ti.xcom_pull(task_ids='Collection.generate_collection')
airflow_vars = Variable.get("aws_dags_variables")
airflow_vars_json = json.loads(airflow_vars)
cognito_app_secret = airflow_vars_json.get("COGNITO_APP_SECRET")
stac_ingestor_api_url = airflow_vars_json.get("STAC_INGESTOR_API_URL")

return submission_handler(
event=collection,
endpoint="/collections",
cognito_app_secret=cognito_app_secret,
stac_ingestor_api_url=stac_ingestor_api_url
)


# NOTE unused, but useful for item ingests, since collections are a dependency for items
Expand All @@ -60,32 +38,48 @@ def check_collection_exists_task(ti):
)


def generate_collection_task(ti):
import json
config = ti.dag_run.conf
airflow_vars = Variable.get("aws_dags_variables")
airflow_vars_json = json.loads(airflow_vars)
role_arn = airflow_vars_json.get("ASSUME_ROLE_READ_ARN")

# TODO it would be ideal if this also works with complete collections where provided - this would make the collection ingest more re-usable
collection = generator.generate_stac(
dataset_config=config, role_arn=role_arn
)
return collection



group_kwgs = {"group_id": "Collection", "tooltip": "Collection"}


def collection_task_group():
with TaskGroup(**group_kwgs) as collection_task_grp:
generate_collection = PythonOperator(
task_id="generate_collection", python_callable=generate_collection_task
)
ingest_collection = PythonOperator(
task_id="ingest_collection", python_callable=ingest_collection_task
)
generate_collection >> ingest_collection
@task()
def generate_collection_task(ti):

config = ti.dag_run.conf
airflow_vars_json = Variable.get("aws_dags_variables", deserialize_json=True)
role_arn = airflow_vars_json.get("ASSUME_ROLE_READ_ARN")

# TODO it would be ideal if this also works with complete collections where provided - this would make the collection ingest more re-usable
generator = GenerateCollection()
collection = generator.generate_stac(
dataset_config=config, role_arn=role_arn
)
return collection

@task()
def ingest_collection_task(collection):
"""
Ingest a collection into the STAC catalog

Args:
collection:

"""
airflow_vars_json = Variable.get("aws_dags_variables", deserialize_json=True)
cognito_app_secret = airflow_vars_json.get("COGNITO_APP_SECRET")
stac_ingestor_api_url = airflow_vars_json.get("STAC_INGESTOR_API_URL")

return submission_handler(
event=collection,
endpoint="/collections",
cognito_app_secret=cognito_app_secret,
stac_ingestor_api_url=stac_ingestor_api_url
)

collection = generate_collection_task()
ingest_collection_task(collection)

return collection_task_grp
4 changes: 2 additions & 2 deletions dags/veda_data_pipeline/utils/collection_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,14 @@ def create_cog_collection(self, dataset: Dict[str, Any]) -> dict:

# Override the extents if they exists
if spatial_extent := dataset.get("spatial_extent"):
collection_stac["extent"]["spatial"] = {"bbox": [list(spatial_extent.values())]},
collection_stac["extent"]["spatial"] = {"bbox": [list(spatial_extent.values())]}

if temporal_extent := dataset.get("temporal_extent"):
collection_stac["extent"]["temporal"] = {
"interval": [
# most of our data uses the Z suffix for UTC - isoformat() doesn't
[
datetime.fromisoformat(x).astimezone(timezone.utc).isoformat().replace("+00:00", "Z")
x
if x else None
for x in list(temporal_extent.values())
]
Expand Down
4 changes: 2 additions & 2 deletions dags/veda_data_pipeline/utils/submit_stac.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def submission_handler(
cognito_app_secret=None,
stac_ingestor_api_url=None,
context=None,
) -> None:
) -> [Dict[str, Any], None]:
if context is None:
context = {}

Expand All @@ -121,7 +121,7 @@ def submission_handler(
secret_id=cognito_app_secret,
base_url=stac_ingestor_api_url,
)
ingestor.submit(event=stac_item, endpoint=endpoint)
return ingestor.submit(event=stac_item, endpoint=endpoint)


if __name__ == "__main__":
Expand Down
18 changes: 10 additions & 8 deletions dags/veda_data_pipeline/veda_collection_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,20 @@
}

template_dag_run_conf = {
"collection": "<collection-id>",
"data_type": "cog",
"description": "<collection-description>",
"is_periodic": "<true|false>",
"license": "<collection-LICENSE>",
"time_density": "<time-density>",
"title": "<collection-title>"
"collection": "<collection-id>",
"data_type": "cog",
"description": "<collection-description>",
"is_periodic": "<true|false>",
"license": "<collection-LICENSE>",
"time_density": "<time-density>",
"title": "<collection-title>",
}

with DAG("veda_collection_pipeline", params=template_dag_run_conf, **dag_args) as dag:
start = EmptyOperator(task_id="start", dag=dag)
end = EmptyOperator(task_id="end", trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS, dag=dag)
end = EmptyOperator(
task_id="end", trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS, dag=dag
)

collection_grp = collection_task_group()

Expand Down
108 changes: 35 additions & 73 deletions dags/veda_data_pipeline/veda_dataset_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,36 @@
import pendulum
from airflow import DAG
from airflow.decorators import task
from airflow.models.param import Param
from airflow.operators.dummy_operator import DummyOperator as EmptyOperator
from airflow.models.variable import Variable
import json
from airflow_multi_dagrun.operators import TriggerMultiDagRunOperator
from veda_data_pipeline.groups.collection_group import collection_task_group
from veda_data_pipeline.groups.discover_group import discover_from_s3_task, get_dataset_files_to_process
from veda_data_pipeline.groups.processing_tasks import submit_to_stac_ingestor_task

dag_doc_md = """
template_dag_run_conf = {
"collection": "<collection-id>",
"data_type": "cog",
"description": "<collection-description>",
"discovery_items": [
{
"bucket": "<bucket-name>",
"datetime_range": "<range>",
"discovery": "s3",
"filename_regex": "<regex>",
"prefix": "<example-prefix/>",
}
],
"is_periodic": "<true|false>",
"license": "<collection-LICENSE>",
"time_density": "<time-density>",
"title": "<collection-title>",
}

dag_doc_md = f"""
### Dataset Pipeline
Generates a collection and triggers the file discovery process
#### Notes
- This DAG can run with the following configuration <br>
```json
{
"collection": "collection-id",
"data_type": "cog",
"description": "collection description",
"discovery_items":
[
{
"bucket": "veda-data-store-staging",
"datetime_range": "year",
"discovery": "s3",
"filename_regex": "^(.*).tif$",
"prefix": "example-prefix/"
}
],
"is_periodic": true,
"license": "collection-LICENSE",
"time_density": "year",
"title": "collection-title"
}
{template_dag_run_conf}
```
"""

Expand All @@ -45,57 +43,21 @@
}


@task
def extract_discovery_items(**kwargs):
ti = kwargs.get("ti")
def trigger_discover_and_build_task(ti):
discovery_items = ti.dag_run.conf.get("discovery_items")
print(discovery_items)
return discovery_items


@task(max_active_tis_per_dag=3)
def build_stac_task(payload):
from veda_data_pipeline.utils.build_stac.handler import stac_handler
airflow_vars = Variable.get("aws_dags_variables")
airflow_vars_json = json.loads(airflow_vars)
event_bucket = airflow_vars_json.get("EVENT_BUCKET")
return stac_handler(payload_src=payload, bucket_output=event_bucket)
for discovery_item in discovery_items:
yield discovery_item


template_dag_run_conf = {
"collection": "<collection-id>",
"data_type": "cog",
"description": "<collection-description>",
"discovery_items":
[
{
"bucket": "<bucket-name>",
"datetime_range": "<range>",
"discovery": "s3",
"filename_regex": "<regex>",
"prefix": "<example-prefix/>"
}
],
"is_periodic": "<true|false>",
"license": "<collection-LICENSE>",
"time_density": "<time-density>",
"title": "<collection-title>"
}

with DAG("veda_dataset_pipeline", params=template_dag_run_conf, **dag_args) as dag:
# ECS dependency variable

start = EmptyOperator(task_id="start", dag=dag)
end = EmptyOperator(task_id="end", dag=dag)

collection_grp = collection_task_group()
discover = discover_from_s3_task.expand(event=extract_discovery_items())
discover.set_upstream(collection_grp) # do not discover until collection exists
get_files = get_dataset_files_to_process(payload=discover)
start = EmptyOperator(task_id="start")
end = EmptyOperator(task_id="end")

build_stac = build_stac_task.expand(payload=get_files)
# .output is needed coming from a non-taskflow operator
submit_stac = submit_to_stac_ingestor_task.expand(built_stac=build_stac)
run_discover_build_and_push = TriggerMultiDagRunOperator(
task_id="trigger_discover_items_dag",
dag=dag,
trigger_dag_id="veda_discover",
python_callable=trigger_discover_and_build_task,
)
Comment on lines +56 to +61
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not use this operator, as it creates a disconnect between the original DAGRun event and subsequent DAG runs (ie. a failure in an instance veda-discover will not feed back to the original veda-dataset DAG using this operator). We could instead describe the discover pipeline in a TaskGroup and call expand() on it, which will map task instances to the correct event.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to re-use discover_build_ingest DAG instead of redefining the same tasks. But having a task group used in both DAGs is not a bad idea


collection_grp.set_upstream(start)
submit_stac.set_downstream(end)
start >> collection_task_group() >> run_discover_build_and_push >> end
4 changes: 3 additions & 1 deletion sm2a/airflow_worker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ COPY --chown=airflow:airflow scripts "${AIRFLOW_HOME}/scripts"

RUN cp ${AIRFLOW_HOME}/configuration/airflow.cfg* ${AIRFLOW_HOME}/.

RUN pip install pypgstac==0.7.4
# Commited because it downgrade pydentics to v1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an artifact and we should not need pypgstac in airflow--that should be entirely handled by the ingest api

# I am not even sure this library is used
#RUN pip install pypgstac==0.7.4

# ENV
ENV AIRFLOW_HOME ${AIRFLOW_HOME}
Expand Down