diff --git a/metaflow/plugins/kubernetes/kubernetes_jobsets.py b/metaflow/plugins/kubernetes/kubernetes_jobsets.py index cf6c6affe2..c84d655fc4 100644 --- a/metaflow/plugins/kubernetes/kubernetes_jobsets.py +++ b/metaflow/plugins/kubernetes/kubernetes_jobsets.py @@ -4,7 +4,6 @@ import random import time from collections import namedtuple - from metaflow.exception import MetaflowException from metaflow.metaflow_config import KUBERNETES_JOBSET_GROUP, KUBERNETES_JOBSET_VERSION from metaflow.tracing import inject_tracing_vars @@ -320,33 +319,49 @@ def _fetch_pod(self): def kill(self): plural = "jobsets" client = self._client.get() - # Get the jobset - with client.ApiClient() as api_client: - api_instance = client.CustomObjectsApi(api_client) - try: - obj = api_instance.get_namespaced_custom_object( - group=self._group, - version=self._version, - namespace=self._namespace, - plural=plural, - name=self._name, - ) - - # Suspend the jobset - obj["spec"]["suspend"] = True - - api_instance.replace_namespaced_custom_object( - group=self._group, - version=self._version, - namespace=self._namespace, - plural=plural, - name=obj["metadata"]["name"], - body=obj, - ) - except Exception as e: - raise KubernetesJobsetException( - "Exception when suspending existing jobset: %s\n" % e - ) + try: + # Killing the control pod will trigger the jobset to mark everything as failed. + # Since jobsets have a successPolicy set to `All` which ensures that everything has + # to succeed for the jobset to succeed. + from kubernetes.stream import stream + + control_pod = self._fetch_pod() + stream( + client.CoreV1Api().connect_get_namespaced_pod_exec, + name=control_pod["metadata"]["name"], + namespace=control_pod["metadata"]["namespace"], + command=[ + "/bin/sh", + "-c", + "/sbin/killall5", + ], + stderr=True, + stdin=False, + stdout=True, + tty=False, + ) + except Exception as e: + with client.ApiClient() as api_client: + # If we are unable to kill the control pod then + # Delete the jobset to kill the subsequent pods. + # There are a few reasons for deleting a jobset to kill it : + # 1. Jobset has a `suspend` attribute to suspend it's execution, but this + # doesn't play nicely when jobsets are deployed with other components like kueue. + # 2. Jobset doesn't play nicely when we mutate status + # 3. Deletion is a gaurenteed way of removing any pods. + api_instance = client.CustomObjectsApi(api_client) + try: + api_instance.delete_namespaced_custom_object( + group=self._group, + version=self._version, + namespace=self._namespace, + plural=plural, + name=self._name, + ) + except Exception as e: + raise KubernetesJobsetException( + "Exception when deleting existing jobset: %s\n" % e + ) @property def id(self):