From 0e37fb21cf57a2998410924e6032f4813c9ed818 Mon Sep 17 00:00:00 2001 From: Johan Schreurs Date: Mon, 31 Jul 2023 09:58:28 +0200 Subject: [PATCH] fixup! Issue #432 Fix failing tests on Windows --- tests/extra/test_job_management.py | 147 ----------------------------- 1 file changed, 147 deletions(-) diff --git a/tests/extra/test_job_management.py b/tests/extra/test_job_management.py index 00e4eb4e3..952b40ef6 100644 --- a/tests/extra/test_job_management.py +++ b/tests/extra/test_job_management.py @@ -119,13 +119,6 @@ def start_job(row, connection, **kwargs): metadata_path = manager.get_job_metadata_path(job_id="job-2022") assert metadata_path.exists() - @pytest.mark.skipif( - platform.system() == "Windows", - reason=( - "Windows support for multiprocessing is too different, the errors to" - "solve are too complicated: pickling certain local functions fails." - ), - ) def test_manager_must_exit_when_all_jobs_done(self, tmp_path, requests_mock, sleep_mock): """Make sure the MultiBackendJobManager does not hang after all processes finish. @@ -223,146 +216,6 @@ def mock_job_status(job_id, succeeds: bool): ) output_file = tmp_path / "jobs.csv" - def start_job(row, connection, **kwargs): - year = row["year"] - return BatchJob(job_id=f"job-{year}", connection=connection) - - def start_worker_thread(): - manager.run_jobs(df=df, start_job=start_job, output_file=output_file) - - # This should be finished almost immediately within a second. - # If it takes too long then we assume it will run forever. - # We will check that the exit code of this process == 0, i.e. it finished normally. - # If it gets killed it will have a different exit code - # (On Linux usually -9 SIGKILL) - proc = multiprocessing.Process(target=start_worker_thread, name="Worker process") - - timeout_sec = 5.0 - proc.start() - # We stop waiting for the process after the timeout. - # If that happens it is likely we detected that run_jobs will loop infinitely. - proc.join(timeout=timeout_sec) - - if proc.is_alive: - # now forcibly kill the process, then have to join it again. - proc.terminate() - proc.join() - - assert proc.exitcode == 0, ( - "MultiBackendJobManager did not finish on its own and was killed. " - + "Infinite loop is probable. Expected exit code == 0, but found " - + f"proc.exitcode={proc.exitcode!r}, proc={proc!r}" - ) - - # Also check that we got sensible end results. - result = pd.read_csv(output_file) - assert len(result) == 5 - assert set(result.status) == {"finished", "error"} - assert set(result.backend_name) == {"foo", "bar"} - - # We expect that the job metadata was saved for a successful job, - # so verify that it exists. - # Checking it for one of the jobs is enough. - metadata_path = manager.get_job_metadata_path(job_id="job-2021") - assert metadata_path.exists() - - def test_manager_must_exit_when_all_jobs_done_windows(self, tmp_path, requests_mock, sleep_mock): - """Make sure the MultiBackendJobManager does not hang after all processes finish. - - Regression test for: - https://github.com/Open-EO/openeo-python-client/issues/432 - - Cause was that the run_jobs had an infinite loop when jobs ended with status error. - """ - - requests_mock.get("http://foo.test/", json={"api_version": "1.1.0"}) - requests_mock.get("http://bar.test/", json={"api_version": "1.1.0"}) - - def mock_job_status(job_id, succeeds: bool): - """Mock job status polling sequence. - We set up one job with finishes with status error - """ - response_list = sum( - [ - [ - { - "json": { - "id": job_id, - "title": f"Job {job_id}", - "status": "queued", - } - } - ], - [ - { - "json": { - "id": job_id, - "title": f"Job {job_id}", - "status": "running", - } - } - ], - [ - { - "json": { - "id": job_id, - "title": f"Job {job_id}", - "status": "finished" if succeeds else "error", - } - } - ], - ], - [], - ) - for backend in ["http://foo.test", "http://bar.test"]: - requests_mock.get(f"{backend}/jobs/{job_id}", response_list) - # It also needs job results endpoint for succesful jobs and the - # log endpoint for a failed job. Both are dummy implementations. - # When the job is finished the system tries to download the - # results or the logs and that is what needs these endpoints. - if succeeds: - requests_mock.get(f"{backend}/jobs/{job_id}/results", json={"links": []}) - else: - response = { - "level": "error", - "logs": [ - { - "id": "1", - "code": "SampleError", - "level": "error", - "message": "Error for testing", - "time": "2019-08-24T14:15:22Z", - "data": None, - "path": [], - "usage": {}, - "links": [], - } - ], - "links": [], - } - requests_mock.get(f"{backend}/jobs/{job_id}/logs?level=error", json=response) - - mock_job_status("job-2018", succeeds=True) - mock_job_status("job-2019", succeeds=True) - mock_job_status("job-2020", succeeds=True) - mock_job_status("job-2021", succeeds=True) - mock_job_status("job-2022", succeeds=False) - - root_dir = tmp_path / "job_mgr_root" - manager = MultiBackendJobManager(root_dir=root_dir) - - manager.add_backend("foo", connection=openeo.connect("http://foo.test")) - manager.add_backend("bar", connection=openeo.connect("http://bar.test")) - - df = pd.DataFrame( - { - "year": [2018, 2019, 2020, 2021, 2022], - # Use simple points in WKT format to test conversion to the geometry dtype - "geometry": ["POINT (1 2)"] * 5, - } - ) - output_file = tmp_path / "jobs.csv" - def start_job(row, connection, **kwargs): year = row["year"] return BatchJob(job_id=f"job-{year}", connection=connection)