diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index 799a22f95..f9ef4cd98 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -667,16 +667,26 @@ def on_job_cancel(self, job: BatchJob, row): def _cancel_prolonged_job(self, job: BatchJob, row): """Cancel the job if it has been running for too long.""" - job_running_start_time = rfc3339.parse_datetime(row["running_start_time"], with_timezone=True) - elapsed = datetime.datetime.now(tz=datetime.timezone.utc) - job_running_start_time - if elapsed > self._cancel_running_job_after: - try: - _log.info( - f"Cancelling long-running job {job.job_id} (after {elapsed}, running since {job_running_start_time})" - ) - job.stop() - except OpenEoApiError as e: - _log.error(f"Failed to cancel long-running job {job.job_id}: {e}") + try: + # Ensure running start time is valid + job_running_start_time = rfc3339.parse_datetime(row.get("running_start_time"), with_timezone=True) + + # Parse the current time into a datetime object with timezone info + current_time = rfc3339.parse_datetime(rfc3339.utcnow(), with_timezone=True) + + # Calculate the elapsed time between job start and now + elapsed = current_time - job_running_start_time + + if elapsed > self._cancel_running_job_after: + try: + _log.info( + f"Cancelling long-running job {job.job_id} (after {elapsed}, running since {job_running_start_time})" + ) + job.stop() + except OpenEoApiError as e: + _log.error(f"Failed to cancel long-running job {job.job_id}: {e}") + except Exception as e: + _log.error(f"Unexpected error while handling job {job.job_id}: {e}") def get_job_dir(self, job_id: str) -> Path: """Path to directory where job metadata, results and error logs are be saved.""" @@ -737,6 +747,13 @@ def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] = self.on_job_cancel(the_job, active.loc[i]) if self._cancel_running_job_after and new_status == "running": + if (not active.loc[i, "running_start_time"] or pd.isna(active.loc[i, "running_start_time"])): + _log.warning( + f"Unknown 'running_start_time' for running job {job_id}. Using current time as an approximation." + ) + stats["job started running"] += 1 + active.loc[i, "running_start_time"] = rfc3339.utcnow() + self._cancel_prolonged_job(the_job, active.loc[i]) active.loc[i, "status"] = new_status diff --git a/tests/extra/test_job_management.py b/tests/extra/test_job_management.py index 3419b46e1..ddf640cd1 100644 --- a/tests/extra/test_job_management.py +++ b/tests/extra/test_job_management.py @@ -7,6 +7,7 @@ from time import sleep from typing import Callable, Union from unittest import mock +import datetime import dirty_equals import geopandas @@ -554,6 +555,7 @@ def start_job(row, connection_provider, connection, **kwargs): 12 * 60 * 60, "finished", ), + ], ) def test_automatic_cancel_of_too_long_running_jobs( @@ -645,6 +647,80 @@ def test_status_logging(self, tmp_path, job_manager, job_manager_root_dir, sleep assert needle.search(caplog.text) + @pytest.mark.parametrize( + ["create_time", "start_time", "running_start_time", "end_time", "end_status", "cancel_after_seconds"], + [ + # Scenario 1: Missing running_start_time (None) + ( + "2024-09-01T09:00:00Z", # Job creation time + "2024-09-01T09:00:00Z", # Job start time (should be 1 hour after create_time) + None, # Missing running_start_time + "2024-09-01T20:00:00Z", # Job end time + "finished", # Job final status + 6 * 60 * 60, # Cancel after 6 hours + ), + # Scenario 2: NaN running_start_time + ( + "2024-09-01T09:00:00Z", + "2024-09-01T09:00:00Z", + float("nan"), # NaN running_start_time + "2024-09-01T20:00:00Z", # Job end time + "finished", # Job final status + 6 * 60 * 60, # Cancel after 6 hours + ), + ] + ) + def test_ensure_running_start_time_is_datetime( + self, + tmp_path, + time_machine, + create_time, + start_time, + running_start_time, + end_time, + end_status, + cancel_after_seconds, + dummy_backend_foo, + job_manager_root_dir, + ): + def get_status(job_id, current_status): + if rfc3339.utcnow() < start_time: + return "queued" + elif rfc3339.utcnow() < end_time: + return "running" + return end_status + + # Set the job status updater function for the mock backend + dummy_backend_foo.job_status_updater = get_status + + job_manager = MultiBackendJobManager( + root_dir=job_manager_root_dir, cancel_running_job_after=cancel_after_seconds + ) + job_manager.add_backend("foo", connection=dummy_backend_foo.connection) + + # Create a DataFrame representing the job database + df = pd.DataFrame({ + "year": [2024], + "running_start_time": [running_start_time], # Initial running_start_time + }) + + # Move the time machine to the job creation time + time_machine.move_to(create_time) + + job_db_path = tmp_path / "jobs.csv" + + # Mock sleep() to skip one hour at a time instead of actually sleeping + with mock.patch.object(openeo.extra.job_management.time, "sleep", new=lambda s: time_machine.shift(60 * 60)): + job_manager.run_jobs(df=df, start_job=self._create_year_job, job_db=job_db_path) + + final_df = CsvJobDatabase(job_db_path).read() + + # Validate running_start_time is a valid datetime object + filled_running_start_time = final_df.iloc[0]["running_start_time"] + assert isinstance(rfc3339.parse_datetime(filled_running_start_time), datetime.datetime) + + + JOB_DB_DF_BASICS = pd.DataFrame( { "numbers": [3, 2, 1],