From d4002261b57236ffdca9a5790097f295794965cf Mon Sep 17 00:00:00 2001 From: Dagang Wei Date: Thu, 26 Oct 2023 05:05:11 -0700 Subject: [PATCH] Replace blocking IO with async IO in AsyncKubernetesHook (#35162) * Replace blocking IO with async IO in AsyncKubernetesHook * Use aiofiles.NamedTemporaryFile with default mode * Minor update * Minor update --- airflow/providers/cncf/kubernetes/hooks/kubernetes.py | 7 ++++--- airflow/providers/cncf/kubernetes/provider.yaml | 1 + generated/provider_dependencies.json | 1 + setup.py | 1 + 4 files changed, 7 insertions(+), 3 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py index d64e9d680ff93..83e7fc84771aa 100644 --- a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py +++ b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py @@ -22,6 +22,7 @@ from functools import cached_property from typing import TYPE_CHECKING, Any, Generator +import aiofiles from asgiref.sync import sync_to_async from kubernetes import client, config, watch from kubernetes.config import ConfigException @@ -502,13 +503,13 @@ async def _load_config(self): return async_client.ApiClient() if kubeconfig is not None: - with tempfile.NamedTemporaryFile() as temp_config: + async with aiofiles.tempfile.NamedTemporaryFile() as temp_config: self.log.debug( "Reading kubernetes configuration file from connection " "object and writing temporary config file with its content", ) - temp_config.write(kubeconfig.encode()) - temp_config.flush() + await temp_config.write(kubeconfig.encode()) + await temp_config.flush() self._is_in_cluster = False await async_config.load_kube_config( config_file=temp_config.name, diff --git a/airflow/providers/cncf/kubernetes/provider.yaml b/airflow/providers/cncf/kubernetes/provider.yaml index 9d3998bf3b8d2..ecd430142f4a3 100644 --- a/airflow/providers/cncf/kubernetes/provider.yaml +++ b/airflow/providers/cncf/kubernetes/provider.yaml @@ -69,6 +69,7 @@ versions: - 1.0.0 dependencies: + - aiofiles>=23.2.0 - apache-airflow>=2.5.0 - asgiref>=3.5.2 - cryptography>=2.0.0 diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json index 3e4479921d8a8..de71df10f7c2d 100644 --- a/generated/provider_dependencies.json +++ b/generated/provider_dependencies.json @@ -258,6 +258,7 @@ }, "cncf.kubernetes": { "deps": [ + "aiofiles>=23.2.0", "apache-airflow>=2.5.0", "asgiref>=3.5.2", "cryptography>=2.0.0", diff --git a/setup.py b/setup.py index 84ca00c0c914e..b96364c6c257a 100644 --- a/setup.py +++ b/setup.py @@ -385,6 +385,7 @@ def write_version(filename: str = str(AIRFLOW_SOURCES_ROOT / "airflow" / "git_ve # Make sure to upgrade the mypy version in update-common-sql-api-stubs in .pre-commit-config.yaml # when you upgrade it here !!!! "mypy==1.2.0", + "types-aiofiles", "types-certifi", "types-croniter", "types-Deprecated",