diff --git a/provider/astronomer/providers/venv/decorators.py b/provider/astronomer/providers/venv/decorators.py index 1d38720..7ab1686 100644 --- a/provider/astronomer/providers/venv/decorators.py +++ b/provider/astronomer/providers/venv/decorators.py @@ -38,6 +38,46 @@ def get_python_source(self): res = remove_task_decorator(res, self.custom_operator_name) return res + # Override the version from superclass to be tollerant of astro specific post releases. + def _get_airflow_version_from_target_env(self) -> str | None: + import subprocess + + import airflow + from airflow.exceptions import AirflowConfigException + from packaging.version import Version + + airflow_version = Version(airflow.__version__) + try: + result = subprocess.check_output( + [self.python, "-c", "from airflow import __version__; print(__version__)"], + text=True, + # Avoid Airflow logs polluting stdout. + env={ + **os.environ, + "_AIRFLOW__AS_LIBRARY": "true", + "AIRFLOW__CORE__LAZY_LOAD_PROVIDERS": "True", + "AIRFLOW__CORE__LAZY_LOAD_PLUGINS": "True", + }, + ) + target_airflow_version = Version(result.strip()) + if target_airflow_version.base_version != airflow_version.base_version: + raise AirflowConfigException( + f"The version of Airflow installed for the {self.python}(" + f"{target_airflow_version}) is different than the runtime Airflow version: " + f"{airflow_version}. Make sure your environment has the same Airflow version " + f"installed as the Airflow runtime." + ) + return target_airflow_version + except Exception as e: + if self.expect_airflow: + self.log.warning("When checking for Airflow installed in virtual environment got %s", e) + self.log.warning( + f"This means that Airflow is not properly installed by " + f"{self.python}. Airflow context keys will not be available. " + f"Please Install Airflow {airflow_version.base_version} in your environment to access them." + ) + return None + def venv_task( venv: str,