Skip to content

Commit

Permalink
correctly await config.load_kube_config (PrefectHQ#15655)
Browse files Browse the repository at this point in the history
  • Loading branch information
zzstoatzz authored Oct 10, 2024
1 parent 585afd2 commit e8cd03b
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ async def get_client(
config.load_incluster_config(client_configuration=client_configuration)
except config.ConfigException:
# If in-cluster config fails, load the local kubeconfig
config.load_kube_config(
await config.load_kube_config(
client_configuration=client_configuration,
)
async with ApiClient(configuration=client_configuration) as api_client:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import yaml
from kubernetes_asyncio.client.models import V1DeleteOptions, V1Job, V1JobList, V1Status
from pydantic import Field
from typing_extensions import Self
from typing_extensions import Self, TypeAlias

from prefect import task
from prefect.blocks.abstract import JobBlock, JobRun
Expand All @@ -16,7 +16,7 @@
from prefect_kubernetes.exceptions import KubernetesJobTimeoutError
from prefect_kubernetes.pods import list_namespaced_pod, read_namespaced_pod_log

KubernetesManifest = Union[Dict, Path, str]
KubernetesManifest: TypeAlias = Union[Dict, Path, str]


@task
Expand Down Expand Up @@ -502,7 +502,8 @@ class KubernetesJob(JobBlock):
examples=[{"pretty": "true"}],
)
credentials: KubernetesCredentials = Field(
default=..., description="The credentials to configure a client from."
default_factory=KubernetesCredentials,
description="The credentials to configure a client from.",
)
delete_after_completion: bool = Field(
default=True,
Expand Down
6 changes: 6 additions & 0 deletions src/integrations/prefect-kubernetes/tests/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,3 +210,9 @@ async def test_job_block_wait_never_called_raises(
ValueError, match="The Kubernetes Job run is not in a completed state"
):
await job_run.fetch_result()


async def test_job_block_generates_default_credentials():
job_block = KubernetesJob(v1_job=dict(metadata=dict(name="test-job")))
assert job_block.credentials is not None
assert job_block.credentials.cluster_config is None

0 comments on commit e8cd03b

Please sign in to comment.