Skip to content

Commit

Permalink
Merge pull request #452 from Open-EO/issue432_multibackendjobmanager-…
Browse files Browse the repository at this point in the history
…stop-when-finished

Issue432 multibackendjobmanager stop when finished
  • Loading branch information
jdries authored Aug 1, 2023
2 parents 4afc159 + 0e37fb2 commit f65ab3b
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 0 deletions.
1 change: 1 addition & 0 deletions openeo/extra/job_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ def run_jobs(
(df.status != "finished")
& (df.status != "skipped")
& (df.status != "start_failed")
& (df.status != "error")
].size
> 0
):
Expand Down
135 changes: 135 additions & 0 deletions tests/extra/test_job_management.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import json
import multiprocessing
import platform
import threading
from unittest import mock

# TODO: can we avoid using httpretty?
Expand Down Expand Up @@ -116,6 +119,138 @@ def start_job(row, connection, **kwargs):
metadata_path = manager.get_job_metadata_path(job_id="job-2022")
assert metadata_path.exists()

def test_manager_must_exit_when_all_jobs_done(self, tmp_path, requests_mock, sleep_mock):
"""Make sure the MultiBackendJobManager does not hang after all processes finish.
Regression test for:
https://github.com/Open-EO/openeo-python-client/issues/432
Cause was that the run_jobs had an infinite loop when jobs ended with status error.
"""

requests_mock.get("http://foo.test/", json={"api_version": "1.1.0"})
requests_mock.get("http://bar.test/", json={"api_version": "1.1.0"})

def mock_job_status(job_id, succeeds: bool):
"""Mock job status polling sequence.
We set up one job with finishes with status error
"""
response_list = sum(
[
[
{
"json": {
"id": job_id,
"title": f"Job {job_id}",
"status": "queued",
}
}
],
[
{
"json": {
"id": job_id,
"title": f"Job {job_id}",
"status": "running",
}
}
],
[
{
"json": {
"id": job_id,
"title": f"Job {job_id}",
"status": "finished" if succeeds else "error",
}
}
],
],
[],
)
for backend in ["http://foo.test", "http://bar.test"]:
requests_mock.get(f"{backend}/jobs/{job_id}", response_list)
# It also needs job results endpoint for succesful jobs and the
# log endpoint for a failed job. Both are dummy implementations.
# When the job is finished the system tries to download the
# results or the logs and that is what needs these endpoints.
if succeeds:
requests_mock.get(f"{backend}/jobs/{job_id}/results", json={"links": []})
else:
response = {
"level": "error",
"logs": [
{
"id": "1",
"code": "SampleError",
"level": "error",
"message": "Error for testing",
"time": "2019-08-24T14:15:22Z",
"data": None,
"path": [],
"usage": {},
"links": [],
}
],
"links": [],
}
requests_mock.get(f"{backend}/jobs/{job_id}/logs?level=error", json=response)

mock_job_status("job-2018", succeeds=True)
mock_job_status("job-2019", succeeds=True)
mock_job_status("job-2020", succeeds=True)
mock_job_status("job-2021", succeeds=True)
mock_job_status("job-2022", succeeds=False)

root_dir = tmp_path / "job_mgr_root"
manager = MultiBackendJobManager(root_dir=root_dir)

manager.add_backend("foo", connection=openeo.connect("http://foo.test"))
manager.add_backend("bar", connection=openeo.connect("http://bar.test"))

df = pd.DataFrame(
{
"year": [2018, 2019, 2020, 2021, 2022],
# Use simple points in WKT format to test conversion to the geometry dtype
"geometry": ["POINT (1 2)"] * 5,
}
)
output_file = tmp_path / "jobs.csv"

def start_job(row, connection, **kwargs):
year = row["year"]
return BatchJob(job_id=f"job-{year}", connection=connection)

is_done_file = tmp_path / "is_done.txt"

def start_worker_thread():
manager.run_jobs(df=df, start_job=start_job, output_file=output_file)
is_done_file.write_text("Done!")

thread = threading.Thread(target=start_worker_thread, name="Worker process", daemon=True)

timeout_sec = 5.0
thread.start()
# We stop waiting for the process after the timeout.
# If that happens it is likely we detected that run_jobs will loop infinitely.
thread.join(timeout=timeout_sec)

assert is_done_file.exists(), (
"MultiBackendJobManager did not finish on its own and was killed. " + "Infinite loop is probable."
)

# Also check that we got sensible end results.
result = pd.read_csv(output_file)
assert len(result) == 5
assert set(result.status) == {"finished", "error"}
assert set(result.backend_name) == {"foo", "bar"}

# We expect that the job metadata was saved for a successful job,
# so verify that it exists.
# Checking it for one of the jobs is enough.
metadata_path = manager.get_job_metadata_path(job_id="job-2021")
assert metadata_path.exists()


def test_on_error_log(self, tmp_path, requests_mock):
backend = "http://foo.test"
requests_mock.get(backend, json={"api_version": "1.1.0"})
Expand Down

0 comments on commit f65ab3b

Please sign in to comment.