diff --git a/src/openeo_gfmap/manager/job_manager.py b/src/openeo_gfmap/manager/job_manager.py index 9e5a5a6..9b76443 100644 --- a/src/openeo_gfmap/manager/job_manager.py +++ b/src/openeo_gfmap/manager/job_manager.py @@ -1,4 +1,5 @@ import json +from functools import partial from concurrent.futures import ThreadPoolExecutor from enum import Enum from pathlib import Path @@ -142,11 +143,11 @@ def _resume_postjob_actions(self, df: pd.DataFrame): if row.status == "postprocessing": _log.info(f"Resuming postprocessing of job {row.id}, queueing on_job_finished...") future = self._executor.submit(self.on_job_done, job, row) - future.add_done_callback(done_callback(future, df, idx)) + future.add_done_callback(partial(done_callback, df=df, idx=idx)) else: _log.info(f"Resuming postprocessing of job {row.id}, queueing on_job_error...") future = self._executor.submit(self.on_job_error, job, row) - future.add_done_callback(done_callback(future, df, idx)) + future.add_done_callback(partial(done_callback, df=df, idx=idx)) self._futures.append(future) def _restart_failed_jobs(self, df: pd.DataFrame): @@ -193,7 +194,7 @@ def _update_statuses(self, df: pd.DataFrame): job_status = "postprocessing" future = self._executor.submit(self.on_job_done, job, row) # Future will setup the status to finished when the job is done - future.add_done_callback(lambda future: done_callback(future, df, idx)) + future.add_done_callback(partial(done_callback, df=df, idx=idx)) self._futures.append(future) df.loc[idx, "costs"] = job_metadata["costs"] @@ -203,7 +204,7 @@ def _update_statuses(self, df: pd.DataFrame): job_status = "postprocessing-error" future = self._executor.submit(self.on_job_error, job, row) # Future will setup the status to error when the job is done - future.add_done_callback(lambda future: done_callback(future, df, idx)) + future.add_done_callback(partial(done_callback, df=df, idx=idx)) self._futures.append(future) df.loc[idx, "costs"] = job_metadata["costs"]