Skip to content

Commit

Permalink
Merge branch 'pr596-jobmanager-cancel'
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Sep 3, 2024
2 parents 29ae888 + d47de26 commit 712a6fa
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 54 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added

- `load_stac`/`metadata_from_stac`: add support for extracting actual temporal dimension metadata ([#567](https://github.com/Open-EO/openeo-python-client/issues/567))
- `MultiBackendJobManager`: add `cancel_running_job_after` option to automatically cancel jobs that are running for too long ([#590](https://github.com/Open-EO/openeo-python-client/issues/590))

### Changed

Expand Down
103 changes: 75 additions & 28 deletions openeo/extra/job_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@

from openeo import BatchJob, Connection
from openeo.rest import OpenEoApiError
from openeo.util import deep_get
from openeo.util import deep_get, rfc3339

_log = logging.getLogger(__name__)


class _Backend(NamedTuple):
"""Container for backend info/settings"""

Expand Down Expand Up @@ -112,7 +111,11 @@ def start_job(
"""

def __init__(
self, poll_sleep: int = 60, root_dir: Optional[Union[str, Path]] = "."
self,
poll_sleep: int = 60,
root_dir: Optional[Union[str, Path]] = ".",
*,
cancel_running_job_after: Optional[int] = None,
):
"""Create a MultiBackendJobManager.
Expand All @@ -129,6 +132,13 @@ def __init__(
- get_job_dir
- get_error_log_path
- get_job_metadata_path
:param cancel_running_job_after [seconds]:
Optional temporal limit (in seconds) after which running jobs should be canceled
by the job manager.
.. versionchanged:: 0.32.0
Added `cancel_running_job_after` parameter.
"""
self.backends: Dict[str, _Backend] = {}
self.poll_sleep = poll_sleep
Expand All @@ -137,6 +147,10 @@ def __init__(
# An explicit None or "" should also default to "."
self._root_dir = Path(root_dir or ".")

self._cancel_running_job_after = (
datetime.timedelta(seconds=cancel_running_job_after) if cancel_running_job_after is not None else None
)

def add_backend(
self,
name: str,
Expand All @@ -161,9 +175,7 @@ def add_backend(
c = connection
connection = lambda: c
assert callable(connection)
self.backends[name] = _Backend(
get_connection=connection, parallel_jobs=parallel_jobs
)
self.backends[name] = _Backend(get_connection=connection, parallel_jobs=parallel_jobs)

def _get_connection(self, backend_name: str, resilient: bool = True) -> Connection:
"""Get a connection for the backend and optionally make it resilient (adds retry behavior)
Expand Down Expand Up @@ -226,9 +238,10 @@ def _normalize_df(self, df: pd.DataFrame) -> pd.DataFrame:
("status", "not_started"),
("id", None),
("start_time", None),
("running_start_time", None),
# TODO: columns "cpu", "memory", "duration" are not referenced directly
# within MultiBackendJobManager making it confusing to claim they are required.
# However, they are through assumptions about job "usage" metadata in `_update_statuses`.
# However, they are through assumptions about job "usage" metadata in `_track_statuses`.
("cpu", None),
("memory", None),
("duration", None),
Expand Down Expand Up @@ -336,30 +349,26 @@ def run_jobs(
& (df.status != "skipped")
& (df.status != "start_failed")
& (df.status != "error")
& (df.status != "canceled")
].size
> 0
):

with ignore_connection_errors(context="get statuses"):
self._update_statuses(df)
self._track_statuses(df)
status_histogram = df.groupby("status").size().to_dict()
_log.info(f"Status histogram: {status_histogram}")
job_db.persist(df)

if len(df[df.status == "not_started"]) > 0:
# Check number of jobs running at each backend
running = df[
(df.status == "created")
| (df.status == "queued")
| (df.status == "running")
]
running = df[(df.status == "created") | (df.status == "queued") | (df.status == "running")]
per_backend = running.groupby("backend_name").size().to_dict()
_log.info(f"Running per backend: {per_backend}")
for backend_name in self.backends:
backend_load = per_backend.get(backend_name, 0)
if backend_load < self.backends[backend_name].parallel_jobs:
to_add = (
self.backends[backend_name].parallel_jobs - backend_load
)
to_add = self.backends[backend_name].parallel_jobs - backend_load
to_launch = df[df.status == "not_started"].iloc[0:to_add]
for i in to_launch.index:
self._launch_job(start_job, df, i, backend_name)
Expand Down Expand Up @@ -407,7 +416,7 @@ def _launch_job(self, start_job, df, i, backend_name):
_log.warning(f"Failed to start job for {row.to_dict()}", exc_info=True)
df.loc[i, "status"] = "start_failed"
else:
df.loc[i, "start_time"] = datetime.datetime.now().isoformat()
df.loc[i, "start_time"] = rfc3339.utcnow()
if job:
df.loc[i, "id"] = job.job_id
with ignore_connection_errors(context="get status"):
Expand Down Expand Up @@ -463,6 +472,30 @@ def on_job_error(self, job: BatchJob, row):
self.ensure_job_dir_exists(job.job_id)
error_log_path.write_text(json.dumps(error_logs, indent=2))

def on_job_cancel(self, job: BatchJob, row):
"""
Handle a job that was cancelled. Can be overridden to provide custom behaviour.
Default implementation does not do anything.
:param job: The job that was canceled.
:param row: DataFrame row containing the job's metadata.
"""
pass

def _cancel_prolonged_job(self, job: BatchJob, row):
"""Cancel the job if it has been running for too long."""
job_running_start_time = rfc3339.parse_datetime(row["running_start_time"], with_timezone=True)
elapsed = datetime.datetime.now(tz=datetime.timezone.utc) - job_running_start_time
if elapsed > self._cancel_running_job_after:
try:
_log.info(
f"Cancelling long-running job {job.job_id} (after {elapsed}, running since {job_running_start_time})"
)
job.stop()
except OpenEoApiError as e:
_log.error(f"Failed to cancel long-running job {job.job_id}: {e}")

def get_job_dir(self, job_id: str) -> Path:
"""Path to directory where job metadata, results and error logs are be saved."""
return self._root_dir / f"job_{job_id}"
Expand All @@ -481,30 +514,44 @@ def ensure_job_dir_exists(self, job_id: str) -> Path:
if not job_dir.exists():
job_dir.mkdir(parents=True)

def _update_statuses(self, df: pd.DataFrame):
"""Update status (and stats) of running jobs (in place)."""
active = df.loc[
(df.status == "created")
| (df.status == "queued")
| (df.status == "running")
]
def _track_statuses(self, df: pd.DataFrame):
"""
Tracks status (and stats) of running jobs (in place).
Optionally cancels jobs when running too long.
"""
active = df.loc[(df.status == "created") | (df.status == "queued") | (df.status == "running")]
for i in active.index:
job_id = df.loc[i, "id"]
backend_name = df.loc[i, "backend_name"]
previous_status = df.loc[i, "status"]

try:
con = self._get_connection(backend_name)
the_job = con.job(job_id)
job_metadata = the_job.describe()
new_status = job_metadata["status"]

_log.info(
f"Status of job {job_id!r} (on backend {backend_name}) is {job_metadata['status']!r}"
f"Status of job {job_id!r} (on backend {backend_name}) is {new_status!r} (previously {previous_status!r})"
)
if job_metadata["status"] == "finished":

if new_status == "finished":
self.on_job_done(the_job, df.loc[i])
if df.loc[i, "status"] != "error" and job_metadata["status"] == "error":

if previous_status != "error" and new_status == "error":
self.on_job_error(the_job, df.loc[i])

df.loc[i, "status"] = job_metadata["status"]
if previous_status in {"created", "queued"} and new_status == "running":
df.loc[i, "running_start_time"] = rfc3339.utcnow()

if new_status == "canceled":
self.on_job_cancel(the_job, df.loc[i])

if self._cancel_running_job_after and new_status == "running":
self._cancel_prolonged_job(the_job, df.loc[i])

df.loc[i, "status"] = new_status

# TODO: there is well hidden coupling here with "cpu", "memory" and "duration" from `_normalize_df`
for key in job_metadata.get("usage", {}).keys():
df.loc[i, key] = _format_usage_stat(job_metadata, key)
Expand Down
Loading

0 comments on commit 712a6fa

Please sign in to comment.