From 3d89e1111dfff416deae8dc4bddc395ede969141 Mon Sep 17 00:00:00 2001 From: neel-astro Date: Wed, 18 Sep 2024 15:58:52 +0530 Subject: [PATCH] add temporary astro project at root level --- .astro/config.yaml | 2 + .astro/dag_integrity_exceptions.txt | 1 + .astro/test_dag_integrity_default.py | 141 +++++++++++++++++++++++++++ Dockerfile | 1 + 4 files changed, 145 insertions(+) create mode 100644 .astro/config.yaml create mode 100644 .astro/dag_integrity_exceptions.txt create mode 100644 .astro/test_dag_integrity_default.py create mode 100644 Dockerfile diff --git a/.astro/config.yaml b/.astro/config.yaml new file mode 100644 index 0000000..72cf0fd --- /dev/null +++ b/.astro/config.yaml @@ -0,0 +1,2 @@ +project: + name: astro-project diff --git a/.astro/dag_integrity_exceptions.txt b/.astro/dag_integrity_exceptions.txt new file mode 100644 index 0000000..c9a2a63 --- /dev/null +++ b/.astro/dag_integrity_exceptions.txt @@ -0,0 +1 @@ +# Add dag files to exempt from parse test below. ex: dags/ \ No newline at end of file diff --git a/.astro/test_dag_integrity_default.py b/.astro/test_dag_integrity_default.py new file mode 100644 index 0000000..e433703 --- /dev/null +++ b/.astro/test_dag_integrity_default.py @@ -0,0 +1,141 @@ +"""Test the validity of all DAGs. **USED BY DEV PARSE COMMAND DO NOT EDIT**""" + +from contextlib import contextmanager +import logging +import os + +import pytest + +from airflow.models import DagBag, Variable, Connection +from airflow.hooks.base import BaseHook +from airflow.utils.db import initdb + +# init airflow database +initdb() + +# The following code patches errors caused by missing OS Variables, Airflow Connections, and Airflow Variables + + +# =========== MONKEYPATCH BaseHook.get_connection() =========== +def basehook_get_connection_monkeypatch(key: str, *args, **kwargs): + print( + f"Attempted to fetch connection during parse returning an empty Connection object for {key}" + ) + return Connection(key) + + +BaseHook.get_connection = basehook_get_connection_monkeypatch +# # =========== /MONKEYPATCH BASEHOOK.GET_CONNECTION() =========== + + +# =========== MONKEYPATCH OS.GETENV() =========== +def os_getenv_monkeypatch(key: str, *args, **kwargs): + default = None + if args: + default = args[0] # os.getenv should get at most 1 arg after the key + if kwargs: + default = kwargs.get( + "default", None + ) # and sometimes kwarg if people are using the sig + + env_value = os.environ.get(key, None) + + if env_value: + return env_value # if the env_value is set, return it + if ( + key == "JENKINS_HOME" and default is None + ): # fix https://github.com/astronomer/astro-cli/issues/601 + return None + if default: + return default # otherwise return whatever default has been passed + return f"MOCKED_{key.upper()}_VALUE" # if absolutely nothing has been passed - return the mocked value + + +os.getenv = os_getenv_monkeypatch +# # =========== /MONKEYPATCH OS.GETENV() =========== + +# =========== MONKEYPATCH VARIABLE.GET() =========== + + +class magic_dict(dict): + def __init__(self, *args, **kwargs): + self.update(*args, **kwargs) + + def __getitem__(self, key): + return {}.get(key, "MOCKED_KEY_VALUE") + + +_no_default = object() # allow falsey defaults + + +def variable_get_monkeypatch(key: str, default_var=_no_default, deserialize_json=False): + print( + f"Attempted to get Variable value during parse, returning a mocked value for {key}" + ) + + if default_var is not _no_default: + return default_var + if deserialize_json: + return magic_dict() + return "NON_DEFAULT_MOCKED_VARIABLE_VALUE" + + +Variable.get = variable_get_monkeypatch +# # =========== /MONKEYPATCH VARIABLE.GET() =========== + + +@contextmanager +def suppress_logging(namespace): + """ + Suppress logging within a specific namespace to keep tests "clean" during build + """ + logger = logging.getLogger(namespace) + old_value = logger.disabled + logger.disabled = True + try: + yield + finally: + logger.disabled = old_value + + +def get_import_errors(): + """ + Generate a tuple for import errors in the dag bag, and include DAGs without errors. + """ + with suppress_logging("airflow"): + dag_bag = DagBag(include_examples=False) + + def strip_path_prefix(path): + return os.path.relpath(path, os.environ.get("AIRFLOW_HOME")) + + # Initialize an empty list to store the tuples + result = [] + + # Iterate over the items in import_errors + for k, v in dag_bag.import_errors.items(): + result.append((strip_path_prefix(k), v.strip())) + + # Check if there are DAGs without errors + for file_path in dag_bag.dags: + # Check if the file_path is not in import_errors, meaning no errors + if file_path not in dag_bag.import_errors: + result.append((strip_path_prefix(file_path), "No import errors")) + + return result + + +@pytest.mark.parametrize( + "rel_path, rv", get_import_errors(), ids=[x[0] for x in get_import_errors()] +) +def test_file_imports(rel_path, rv): + """Test for import errors on a file""" + if os.path.exists(".astro/dag_integrity_exceptions.txt"): + with open(".astro/dag_integrity_exceptions.txt", "r") as f: + exceptions = f.readlines() + print(f"Exceptions: {exceptions}") + if (rv != "No import errors") and rel_path not in exceptions: + # If rv is not "No import errors," consider it a failed test + raise Exception(f"{rel_path} failed to import with message \n {rv}") + else: + # If rv is "No import errors," consider it a passed test + print(f"{rel_path} passed the import test") diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..3ee9cb3 --- /dev/null +++ b/Dockerfile @@ -0,0 +1 @@ +FROM quay.io/astronomer/astro-runtime:12.1.0