Skip to content

Commit

Permalink
Add astro cli project + kind Raycluster setup instruction (#83)
Browse files Browse the repository at this point in the history
This PR introduces several enhancements aimed at improving the setup and local testing

- **Documentation for RayCluster Setup:** Added comprehensive instruction documents to facilitate the setup of RayCluster using Kind.
- **Example Connection for Local Kind RayCluster:** Included an example configuration to demonstrate how to connect to a local RayCluster deployed with Kind.
- **Introduce Astro-CLI:**  Introduced the astro-cli Airflow project, allowing users to easily spin up Airflow and execute example DAGs from this repository.
- **Makefile:** Added a Makefile that wraps astro-cli and pip commands, simplifying the process for testing local changes.


**Docs generated HTML**
<img width="1104" alt="Screenshot 2024-10-26 at 1 03 09 AM" src="https://github.com/user-attachments/assets/f2d389db-4924-4820-ae7a-ca1c7197f5d7">

Created follow-up tickets: #89 and #88
  • Loading branch information
pankajastro authored Oct 30, 2024
1 parent 4a362a2 commit cf599b8
Show file tree
Hide file tree
Showing 27 changed files with 410 additions and 6 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,7 @@ dmypy.json

# Cython debug symbols
cython_debug/

# asro-cli
dev/include/*
dev/dags/config
27 changes: 27 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
.PHONY: help
help:
@grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}'

.PHONY: setup-dev
setup-dev: ## Setup development environment
python3 -m venv venv
. venv/bin/activate && pip install .[tests]
@echo "To activate the virtual environment, run:"
@echo "source venv/bin/activate"

.PHONY: build-whl
build-whl: setup-dev ## Build installable whl file
cd dev
python3 -m build --outdir dev/include/

.PHONY: docker-run
docker-run: build-whl ## Runs local Airflow for testing
@if ! lsof -i :8080 | grep LISTEN > /dev/null; then \
cd dev && astro dev start; \
else \
cd dev && astro dev restart; \
fi

.PHONY: docker-stop
docker-stop: ## Stop Docker container
cd dev && astro dev stop
2 changes: 2 additions & 0 deletions dev/.astro/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
project:
name: dev
1 change: 1 addition & 0 deletions dev/.astro/dag_integrity_exceptions.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Add dag files to exempt from parse test below. ex: dags/<test-file>
130 changes: 130 additions & 0 deletions dev/.astro/test_dag_integrity_default.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
"""Test the validity of all DAGs. **USED BY DEV PARSE COMMAND DO NOT EDIT**"""

import logging
import os
from contextlib import contextmanager

import pytest
from airflow.hooks.base import BaseHook
from airflow.models import Connection, DagBag, Variable
from airflow.utils.db import initdb

# init airflow database
initdb()

# The following code patches errors caused by missing OS Variables, Airflow Connections, and Airflow Variables


# =========== MONKEYPATCH BaseHook.get_connection() ===========
def basehook_get_connection_monkeypatch(key: str, *args, **kwargs):
print(f"Attempted to fetch connection during parse returning an empty Connection object for {key}")
return Connection(key)


BaseHook.get_connection = basehook_get_connection_monkeypatch
# # =========== /MONKEYPATCH BASEHOOK.GET_CONNECTION() ===========


# =========== MONKEYPATCH OS.GETENV() ===========
def os_getenv_monkeypatch(key: str, *args, **kwargs):
default = None
if args:
default = args[0] # os.getenv should get at most 1 arg after the key
if kwargs:
default = kwargs.get("default", None) # and sometimes kwarg if people are using the sig

env_value = os.environ.get(key, None)

if env_value:
return env_value # if the env_value is set, return it
if key == "JENKINS_HOME" and default is None: # fix https://github.com/astronomer/astro-cli/issues/601
return None
if default:
return default # otherwise return whatever default has been passed
return f"MOCKED_{key.upper()}_VALUE" # if absolutely nothing has been passed - return the mocked value


os.getenv = os_getenv_monkeypatch
# # =========== /MONKEYPATCH OS.GETENV() ===========

# =========== MONKEYPATCH VARIABLE.GET() ===========


class magic_dict(dict):
def __init__(self, *args, **kwargs):
self.update(*args, **kwargs)

def __getitem__(self, key):
return {}.get(key, "MOCKED_KEY_VALUE")


_no_default = object() # allow falsey defaults


def variable_get_monkeypatch(key: str, default_var=_no_default, deserialize_json=False):
print(f"Attempted to get Variable value during parse, returning a mocked value for {key}")

if default_var is not _no_default:
return default_var
if deserialize_json:
return magic_dict()
return "NON_DEFAULT_MOCKED_VARIABLE_VALUE"


Variable.get = variable_get_monkeypatch
# # =========== /MONKEYPATCH VARIABLE.GET() ===========


@contextmanager
def suppress_logging(namespace):
"""
Suppress logging within a specific namespace to keep tests "clean" during build
"""
logger = logging.getLogger(namespace)
old_value = logger.disabled
logger.disabled = True
try:
yield
finally:
logger.disabled = old_value


def get_import_errors():
"""
Generate a tuple for import errors in the dag bag, and include DAGs without errors.
"""
with suppress_logging("airflow"):
dag_bag = DagBag(include_examples=False)

def strip_path_prefix(path):
return os.path.relpath(path, os.environ.get("AIRFLOW_HOME"))

# Initialize an empty list to store the tuples
result = []

# Iterate over the items in import_errors
for k, v in dag_bag.import_errors.items():
result.append((strip_path_prefix(k), v.strip()))

# Check if there are DAGs without errors
for file_path in dag_bag.dags:
# Check if the file_path is not in import_errors, meaning no errors
if file_path not in dag_bag.import_errors:
result.append((strip_path_prefix(file_path), "No import errors"))

return result


@pytest.mark.parametrize("rel_path, rv", get_import_errors(), ids=[x[0] for x in get_import_errors()])
def test_file_imports(rel_path, rv):
"""Test for import errors on a file"""
if os.path.exists(".astro/dag_integrity_exceptions.txt"):
with open(".astro/dag_integrity_exceptions.txt") as f:
exceptions = f.readlines()
print(f"Exceptions: {exceptions}")
if (rv != "No import errors") and rel_path not in exceptions:
# If rv is not "No import errors," consider it a failed test
raise Exception(f"{rel_path} failed to import with message \n {rv}")
else:
# If rv is "No import errors," consider it a passed test
print(f"{rel_path} passed the import test")
8 changes: 8 additions & 0 deletions dev/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
astro
.git
.env
airflow_settings.yaml
logs/
.venv
airflow.db
airflow.cfg
11 changes: 11 additions & 0 deletions dev/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
.git
.env
.DS_Store
airflow_settings.yaml
__pycache__/
astro
.venv
airflow-webserver.pid
webserver_config.py
airflow.cfg
airflow.db
11 changes: 11 additions & 0 deletions dev/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
FROM quay.io/astronomer/astro-runtime:12.2.0

USER root

RUN curl -fsSL -o get_helm.sh https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3 && \
chmod 700 get_helm.sh && \
./get_helm.sh

USER astro

RUN pip install /usr/local/airflow/include/*.whl
Empty file added dev/dags/.airflowignore
Empty file.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Empty file added dev/packages.txt
Empty file.
1 change: 1 addition & 0 deletions dev/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Astro Runtime includes the following pre-installed providers packages: https://www.astronomer.io/docs/astro/runtime-image-architecture#provider-packages
74 changes: 74 additions & 0 deletions dev/tests/dags/test_dag_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""Example DAGs test. This test ensures that all Dags have tags, retries set to two, and no import errors. This is an example pytest and may not be fit the context of your DAGs. Feel free to add and remove tests."""

import logging
import os
from contextlib import contextmanager

import pytest
from airflow.models import DagBag


@contextmanager
def suppress_logging(namespace):
logger = logging.getLogger(namespace)
old_value = logger.disabled
logger.disabled = True
try:
yield
finally:
logger.disabled = old_value


def get_import_errors():
"""
Generate a tuple for import errors in the dag bag
"""
with suppress_logging("airflow"):
dag_bag = DagBag(include_examples=False)

def strip_path_prefix(path):
return os.path.relpath(path, os.environ.get("AIRFLOW_HOME"))

# prepend "(None,None)" to ensure that a test object is always created even if it's a no op.
return [(None, None)] + [(strip_path_prefix(k), v.strip()) for k, v in dag_bag.import_errors.items()]


def get_dags():
"""
Generate a tuple of dag_id, <DAG objects> in the DagBag
"""
with suppress_logging("airflow"):
dag_bag = DagBag(include_examples=False)

def strip_path_prefix(path):
return os.path.relpath(path, os.environ.get("AIRFLOW_HOME"))

return [(k, v, strip_path_prefix(v.fileloc)) for k, v in dag_bag.dags.items()]


@pytest.mark.parametrize("rel_path,rv", get_import_errors(), ids=[x[0] for x in get_import_errors()])
def test_file_imports(rel_path, rv):
"""Test for import errors on a file"""
if rel_path and rv:
raise Exception(f"{rel_path} failed to import with message \n {rv}")


APPROVED_TAGS = {}


@pytest.mark.parametrize("dag_id,dag,fileloc", get_dags(), ids=[x[2] for x in get_dags()])
def test_dag_tags(dag_id, dag, fileloc):
"""
test if a DAG is tagged and if those TAGs are in the approved list
"""
assert dag.tags, f"{dag_id} in {fileloc} has no tags"
if APPROVED_TAGS:
assert not set(dag.tags) - APPROVED_TAGS


@pytest.mark.parametrize("dag_id,dag, fileloc", get_dags(), ids=[x[2] for x in get_dags()])
def test_dag_retries(dag_id, dag, fileloc):
"""
test if a DAG has retries set
"""
assert dag.default_args.get("retries", None) >= 2, f"{dag_id} in {fileloc} must have task retries >= 2."
6 changes: 6 additions & 0 deletions docs/CONTRIBUTING.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ Pre-requisites
pip install pytest
Set up RayCluster and Apache Airflow
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

For instructions on setting up RayCluster and Apache Airflow, please see the `Local Development Setup <https://github.com/astronomer/astro-provider-ray/blob/main/docs/getting_started/local_development_setup.rst>`_.

Run tests
~~~~~~~~~

Expand Down
Binary file added docs/_static/basic_local_kubernetes_conn.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
10 changes: 5 additions & 5 deletions docs/getting_started/code_samples.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ In the example below (``ray_taskflow_example_existing_cluster.py``), the ``@ray.
.. important::
**Set the Ray Dashboard URL connection parameter or RAY_ADDRESS on your airflow worker to connect to your cluster**

.. literalinclude:: ../../example_dags/ray_taskflow_example_existing_cluster.py
.. literalinclude:: ../../dev/dags/ray_taskflow_example_existing_cluster.py
:language: python
:linenos:

Expand All @@ -30,7 +30,7 @@ Ray Cluster Sample Spec (YAML)

Save this file in a location accessible to your Airflow installation, and reference it in your DAG code.

.. literalinclude:: ../../example_dags/scripts/ray.yaml
.. literalinclude:: ../../dev/dags/scripts/ray.yaml
:language: yaml


Expand All @@ -41,7 +41,7 @@ The below example showcases how to use the ``@ray.task`` decorator to manage the

This approach is ideal for jobs that require a dedicated, short-lived cluster, optimizing resource usage by cleaning up after task completion.

.. literalinclude:: ../../example_dags/ray_taskflow_example.py
.. literalinclude:: ../../dev/dags/ray_taskflow_example.py
:language: python
:linenos:

Expand All @@ -53,7 +53,7 @@ This example demonstrates how to use the ``SubmitRayJob`` operator to manage the

This operator provides a more declarative way to define your Ray job within an Airflow DAG.

.. literalinclude:: ../../example_dags/ray_single_operator.py
.. literalinclude:: ../../dev/dags/ray_single_operator.py
:language: python
:linenos:

Expand All @@ -76,6 +76,6 @@ This method is ideal for scenarios where you need fine-grained control over the
.. important::
**The SubmitRayJob operator uses the xcom_task_key parameter "SetupRayCluster.dashboard" to retrieve the Ray dashboard URL. This URL, stored as an XCom variable by the SetupRayCluster task, is necessary for job submission.**

.. literalinclude:: ../../example_dags/setup-teardown.py
.. literalinclude:: ../../dev/dags/setup-teardown.py
:language: python
:linenos:
Loading

0 comments on commit cf599b8

Please sign in to comment.