Skip to content

Commit

Permalink
feat(api): add integration tests, fix some inconsistencies with run_api
Browse files Browse the repository at this point in the history
  • Loading branch information
salemsd committed Dec 16, 2024
1 parent 5ccf8c7 commit c2a7aef
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 26 deletions.
4 changes: 4 additions & 0 deletions src/antares/exceptions/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,10 @@ def __init__(self, job_id: str, time_out: int) -> None:
self.message = f"Job {job_id} exceeded timeout of {time_out} seconds"
super().__init__(self.message)

class TaskTimeOutError(Exception):
def __init__(self, task_id: str, time_out: int) -> None:
self.message = f"Task {task_id} exceeded timeout of {time_out} seconds"
super().__init__(self.message)

class AntaresSimulationUnzipError(Exception):
def __init__(self, study_id: str, message: str) -> None:
Expand Down
51 changes: 32 additions & 19 deletions src/antares/service/api_services/run_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
AntaresSimulationUnzipError,
APIError,
SimulationFailedError,
SimulationTimeOutError,
SimulationTimeOutError, TaskTimeOutError,
)
from antares.model.simulation import AntaresSimulationParameters, Job, JobStatus
from antares.service.base_services import BaseRunService
Expand All @@ -43,7 +43,7 @@ def run_antares_simulation(self, parameters: Optional[AntaresSimulationParameter
response = self._wrapper.post(url, json=payload)
else:
response = self._wrapper.post(url)
job_id = response.json()["id"]
job_id = response.json()["job_id"]
return self._get_job_from_id(job_id)
except APIError as e:
raise AntaresSimulationRunningError(self.study_id, e.message) from e
Expand All @@ -53,54 +53,67 @@ def _get_job_from_id(self, job_id: str) -> Job:
response = self._wrapper.get(url)
job_info = response.json()
status = JobStatus.from_str(job_info["status"])
output_id = job_info["output_id"] if status == JobStatus.SUCCESS else None
launcher_params = json.loads(job_info["launcher_params"])
output_id = job_info["output_id"] if status == JobStatus.SUCCESS else None
return Job(job_id=job_id, status=status, unzip_output=launcher_params["auto_unzip"], output_id=output_id)

def wait_job_completion(self, job: Job, time_out: int) -> None:
start_time = time.time()
repeat_interval = 5
if job.status == JobStatus.SUCCESS:
self._update_job(job)

while job.status in (JobStatus.RUNNING, JobStatus.PENDING):
if time.time() - start_time > time_out:
raise SimulationTimeOutError(job.job_id, time_out)
time.sleep(repeat_interval)
updated_job = self._get_job_from_id(job.job_id)
job.status = updated_job.status
job.output_id = updated_job.output_id
self._update_job(job)

if job.status == JobStatus.FAILED:
if job.status == JobStatus.FAILED or not job.output_id:
raise SimulationFailedError(self.study_id)

if job.unzip_output and job.output_id:
self._wait_unzip_output(self.study_id, ["UNARCHIVE"], job.output_id)
if job.unzip_output:
self._wait_unzip_output(self.study_id, job.output_id, time_out)

return None

def _wait_unzip_output(self, ref_id: str, type: list[str], job_output_id: str) -> None:
def _update_job(self, job):
updated_job = self._get_job_from_id(job.job_id)
job.status = updated_job.status
job.output_id = updated_job.output_id

def _wait_unzip_output(self, ref_id: str, job_output_id: str, time_out: int) -> None:
url = f"{self._base_url}/tasks"
repeat_interval = 2
payload = {"type": type, "ref_id": ref_id}
payload = {"type": ["UNARCHIVE"], "ref_id": ref_id}
try:
response = self._wrapper.post(url, json=payload)
tasks = response.json()
task_id = self._get_task_id(job_output_id, tasks)
url += f"/{task_id}"
self._get_task_until_success(url, repeat_interval)
task_id = self._get_unarchiving_task_id(job_output_id, tasks)
self._wait_task_completion(task_id, repeat_interval, time_out)
except APIError as e:
raise AntaresSimulationUnzipError(self.study_id, e.message) from e

def _get_task_id(self, job_output_id: str, tasks: list[dict[str, Any]]) -> str:
def _get_unarchiving_task_id(self, job_output_id: str, tasks: list[dict[str, Any]]) -> str:
for task in tasks:
task_name = task["name"]
output_id = task_name.split("/")[-1].split(" ")[0]
if output_id == job_output_id:
return task["id"]
raise AntaresSimulationUnzipError(self.study_id, "Could not find task for unarchiving job")

def _get_task_until_success(self, url: str, repeat_interval: int) -> None:
task_success = False
while not task_success:
def _wait_task_completion(self, task_id: str, repeat_interval: int, time_out: int) -> None:
url = f"{self._base_url}/tasks/{task_id}"

start_time = time.time()
task_result = None
while not task_result:
if time.time() - start_time > time_out:
raise TaskTimeOutError(task_id, time_out)
response = self._wrapper.get(url)
task = response.json()
task_success = task["result"]["success"]
task_result = task["result"]
time.sleep(repeat_interval)

if not task_result["success"]:
raise SimulationFailedError(self.study_id)
15 changes: 11 additions & 4 deletions tests/antares/services/api_services/test_study_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,34 +343,37 @@ def test_run_and_wait_antares_simulation(self):
with requests_mock.Mocker() as mocker, patch("time.sleep", return_value=None):
run_url = f"https://antares.com/api/v1/launcher/run/{self.study_id}"
job_id = "1234-g6z17"
mocker.post(run_url, json={"id": job_id}, status_code=200)
mocker.post(run_url, json={"job_id": job_id}, status_code=200)

job_url = f"https://antares.com/api/v1/launcher/jobs/{job_id}"
output_id = "2024abcdefg-output"

# ========== SUCCESS TEST ============

parameters_as_api = dumps(parameters.to_api())
# mock can take a list of responses that it maps one by one to each request, since we're doing multiple ones
response_list = [
{
"json": {
"id": job_id,
"status": "pending",
"launcher_params": dumps(parameters.to_api()),
"launcher_params": parameters_as_api,
},
"status_code": 200,
},
{
"json": {
"id": job_id,
"status": "running",
"launcher_params": dumps(parameters.to_api()),
"launcher_params": parameters_as_api,
},
"status_code": 200,
},
{
"json": {
"id": job_id,
"status": "success",
"launcher_params": dumps(parameters.to_api()),
"launcher_params": parameters_as_api,
"output_id": output_id,
},
"status_code": 200,
Expand All @@ -394,6 +397,8 @@ def test_run_and_wait_antares_simulation(self):

assert job.status == JobStatus.SUCCESS

# ========= TIMEOUT TEST ==========

response_list.pop()
mocker.get(job_url, response_list)
mocker.post(tasks_url, status_code=200)
Expand All @@ -402,6 +407,8 @@ def test_run_and_wait_antares_simulation(self):
with pytest.raises(SimulationTimeOutError):
self.study.wait_job_completion(job, time_out=2)

# =========== FAILED TEST ===========

response_list.append(
{
"json": {
Expand Down
20 changes: 17 additions & 3 deletions tests/integration/test_web_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from antares.model.settings.advanced_parameters import AdvancedParameters, UnitCommitmentMode
from antares.model.settings.general import GeneralParameters, Mode
from antares.model.settings.study_settings import PlaylistParameters, StudySettings
from antares.model.simulation import AntaresSimulationParameters, JobStatus, Job
from antares.model.st_storage import STStorageGroup, STStorageMatrixName, STStorageProperties
from antares.model.study import create_study_api, create_variant_api, read_study_api
from antares.model.thermal import ThermalClusterGroup, ThermalClusterProperties
Expand Down Expand Up @@ -74,7 +75,7 @@ def test_creation_lifecycle(self, antares_web: AntaresWebDesktop):
assert area_fr.get_load_matrix().equals(load_matrix)

# asserts solar and wind matrices can be created and read.
ts_matrix = pd.DataFrame(data=np.ones((4, 4)))
ts_matrix = pd.DataFrame(data=np.ones((8760, 4)))

area_fr.create_solar(ts_matrix)
assert area_fr.get_solar_matrix().equals(ts_matrix)
Expand Down Expand Up @@ -440,10 +441,10 @@ def test_creation_lifecycle(self, antares_web: AntaresWebDesktop):
f"to be deleted, because it is referenced in "
f"the following binding constraints:\n1- 'bc_2'.",
):
study.delete_area(area_fr.id)
study.delete_area(area_fr)

# tests area deletion success
study.delete_area(area_de.id)
study.delete_area(area_de)
assert area_de.id not in study.get_areas()

# test study creation with settings
Expand Down Expand Up @@ -497,3 +498,16 @@ def test_creation_lifecycle(self, antares_web: AntaresWebDesktop):
assert list(variant_from_api.get_binding_constraints().keys()) == list(
new_study.get_binding_constraints().keys()
)

# ===== test run study simulation and wait job completion ======
# parameters = AntaresSimulationParameters(nb_cpu=4, unzip_output=True)
#
# job = study.run_antares_simulation(parameters)
# assert isinstance(job, Job)
# assert job.status == JobStatus.PENDING
#
# study.wait_job_completion(job, time_out=60)
# assert job.status == JobStatus.SUCCESS
#
# assert job.output_id is not None
# assert job.unzip_output is True

0 comments on commit c2a7aef

Please sign in to comment.