Skip to content

Commit

Permalink
feat(api): add possibility to run study simulations (#35)
Browse files Browse the repository at this point in the history
  • Loading branch information
salemsd authored Dec 16, 2024
1 parent a4217ef commit 3ca68f4
Show file tree
Hide file tree
Showing 9 changed files with 432 additions and 6 deletions.
36 changes: 36 additions & 0 deletions src/antares/exceptions/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,3 +286,39 @@ class ConfigurationError(Exception):
def __init__(self, message: str = "Error") -> None:
self.message = "Unsupported configuration type" + f" {message}" if message != "Error" else ""
super().__init__(self.message)


class AntaresSimulationRunningError(Exception):
def __init__(self, study_id: str, message: str) -> None:
self.message = f"Could not run the simulation for study {study_id}: " + message
super().__init__(self.message)


class SimulationTimeOutError(Exception):
def __init__(self, job_id: str, time_out: int) -> None:
self.message = f"Job {job_id} exceeded timeout of {time_out} seconds"
super().__init__(self.message)


class TaskTimeOutError(Exception):
def __init__(self, task_id: str, time_out: int) -> None:
self.message = f"Task {task_id} exceeded timeout of {time_out} seconds"
super().__init__(self.message)


class TaskFailedError(Exception):
def __init__(self, task_id: str) -> None:
self.message = f"Task {task_id} failed"
super().__init__(self.message)


class AntaresSimulationUnzipError(Exception):
def __init__(self, study_id: str, job_id: str, message: str) -> None:
self.message = f"Could not unzip simulation for study {study_id} and job {job_id}: " + message
super().__init__(self.message)


class SimulationFailedError(Exception):
def __init__(self, study_id: str, job_id: str) -> None:
self.message = f"Simulation failed for {study_id} and job {job_id}"
super().__init__(self.message)
65 changes: 65 additions & 0 deletions src/antares/model/simulation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Copyright (c) 2024, RTE (https://www.rte-france.com)
#
# See AUTHORS.txt
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
#
# SPDX-License-Identifier: MPL-2.0
#
# This file is part of the Antares project.

from enum import Enum
from typing import Any, Optional

from pydantic import BaseModel, Field


class Solver(Enum):
COIN = "coin"
XPRESS = "xpress"
SIRIUS = "sirius"


class AntaresSimulationParameters(BaseModel):
solver: Solver = Solver.SIRIUS
nb_cpu: Optional[int] = None
unzip_output: bool = Field(alias="auto_unzip", default=True)
output_suffix: Optional[str] = None
presolve: bool = False

@property
def other_options(self) -> str:
options = []
if self.presolve:
options.append("presolve")
if self.solver != Solver.SIRIUS:
options.append(self.solver.name)
return " ".join(options)

def to_api(self) -> dict[str, Any]:
data = self.model_dump(by_alias=True)
if self.other_options:
data["other_options"] = self.other_options
data.pop("solver", None)
data.pop("presolve", None)
return data


class JobStatus(Enum):
PENDING = "pending"
RUNNING = "running"
SUCCESS = "success"
FAILED = "failed"

@staticmethod
def from_str(input: str) -> "JobStatus":
return JobStatus.__getitem__(input.upper())


class Job(BaseModel):
job_id: str
status: JobStatus
output_id: Optional[str] = None
parameters: AntaresSimulationParameters
24 changes: 24 additions & 0 deletions src/antares/model/study.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from antares.model.link import Link, LinkProperties, LinkUi
from antares.model.settings.study_settings import DefaultStudySettings, StudySettings, StudySettingsLocal
from antares.model.settings.time_series import correlation_defaults
from antares.model.simulation import AntaresSimulationParameters, Job
from antares.service.api_services.study_api import _returns_study_settings
from antares.service.base_services import BaseStudyService
from antares.service.service_factory import ServiceFactory
Expand Down Expand Up @@ -215,6 +216,7 @@ def __init__(
self._study_service = service_factory.create_study_service()
self._area_service = service_factory.create_area_service()
self._link_service = service_factory.create_link_service()
self._run_service = service_factory.create_run_service()
self._binding_constraints_service = service_factory.create_binding_constraints_service()
self._settings = DefaultStudySettings.model_validate(settings if settings is not None else StudySettings())
self._areas: Dict[str, Area] = dict()
Expand Down Expand Up @@ -343,6 +345,28 @@ def create_variant(self, variant_name: str) -> "Study":
"""
return self._study_service.create_variant(variant_name)

def run_antares_simulation(self, parameters: Optional[AntaresSimulationParameters] = None) -> Job:
"""
Runs the Antares simulation.
This method starts an antares simulation with the given parameters
Returns: A job representing the simulation task
"""
return self._run_service.run_antares_simulation(parameters)

def wait_job_completion(self, job: Job, time_out: int = 172800) -> None:
"""
Waits for the completion of a job
Args:
job: The job to wait for
time_out: Time limit for waiting (seconds), default: 172800s
Raises: SimulationTimeOutError if exceeded timeout
"""
self._run_service.wait_job_completion(job, time_out)


def _verify_study_already_exists(study_directory: Path) -> None:
if study_directory.exists():
Expand Down
123 changes: 123 additions & 0 deletions src/antares/service/api_services/run_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
# Copyright (c) 2024, RTE (https://www.rte-france.com)
#
# See AUTHORS.txt
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
#
# SPDX-License-Identifier: MPL-2.0
#
# This file is part of the Antares project.
import time

from typing import Any, Optional

from antares.api_conf.api_conf import APIconf
from antares.api_conf.request_wrapper import RequestWrapper
from antares.exceptions.exceptions import (
AntaresSimulationRunningError,
AntaresSimulationUnzipError,
APIError,
SimulationFailedError,
SimulationTimeOutError,
TaskFailedError,
TaskTimeOutError,
)
from antares.model.simulation import AntaresSimulationParameters, Job, JobStatus
from antares.service.base_services import BaseRunService


class RunApiService(BaseRunService):
def __init__(self, config: APIconf, study_id: str):
super().__init__()
self.config = config
self.study_id = study_id
self._base_url = f"{self.config.get_host()}/api/v1"
self._wrapper = RequestWrapper(self.config.set_up_api_conf())

def run_antares_simulation(self, parameters: Optional[AntaresSimulationParameters] = None) -> Job:
url = f"{self._base_url}/launcher/run/{self.study_id}"
try:
if parameters is not None:
payload = parameters.to_api()
response = self._wrapper.post(url, json=payload)
else:
parameters = AntaresSimulationParameters()
response = self._wrapper.post(url)
job_id = response.json()["job_id"]
return self._get_job_from_id(job_id, parameters)
except APIError as e:
raise AntaresSimulationRunningError(self.study_id, e.message) from e

def _get_job_from_id(self, job_id: str, parameters: AntaresSimulationParameters) -> Job:
url = f"{self._base_url}/launcher/jobs/{job_id}"
response = self._wrapper.get(url)
job_info = response.json()
status = JobStatus.from_str(job_info["status"])
output_id = job_info.get("output_id")
return Job(job_id=job_id, status=status, parameters=parameters, output_id=output_id)

def wait_job_completion(self, job: Job, time_out: int) -> None:
start_time = time.time()
repeat_interval = 5
if job.status == JobStatus.SUCCESS:
self._update_job(job)

while job.status in (JobStatus.RUNNING, JobStatus.PENDING):
if time.time() - start_time > time_out:
raise SimulationTimeOutError(job.job_id, time_out)
time.sleep(repeat_interval)
self._update_job(job)

if job.status == JobStatus.FAILED or not job.output_id:
raise SimulationFailedError(self.study_id, job.job_id)

if job.parameters.unzip_output:
try:
self._wait_unzip_output(self.study_id, job, time_out)
except AntaresSimulationUnzipError as e:
raise SimulationFailedError(self.study_id, job.job_id) from e

return None

def _update_job(self, job: Job) -> None:
updated_job = self._get_job_from_id(job.job_id, job.parameters)
job.status = updated_job.status
job.output_id = updated_job.output_id

def _wait_unzip_output(self, ref_id: str, job: Job, time_out: int) -> None:
url = f"{self._base_url}/tasks"
repeat_interval = 2
payload = {"type": ["UNARCHIVE"], "ref_id": ref_id}
try:
response = self._wrapper.post(url, json=payload)
tasks = response.json()
task_id = self._get_unarchiving_task_id(job, tasks)
self._wait_task_completion(task_id, repeat_interval, time_out)
except (APIError, TaskFailedError) as e:
raise AntaresSimulationUnzipError(self.study_id, job.job_id, e.message) from e

def _get_unarchiving_task_id(self, job: Job, tasks: list[dict[str, Any]]) -> str:
for task in tasks:
task_name = task["name"]
output_id = task_name.split("/")[-1].split(" ")[0]
if output_id == job.output_id:
return task["id"]
raise AntaresSimulationUnzipError(self.study_id, job.job_id, "Could not find task for unarchiving job")

def _wait_task_completion(self, task_id: str, repeat_interval: int, time_out: int) -> None:
url = f"{self._base_url}/tasks/{task_id}"

start_time = time.time()
task_result = None
while not task_result:
if time.time() - start_time > time_out:
raise TaskTimeOutError(task_id, time_out)
response = self._wrapper.get(url)
task = response.json()
task_result = task["result"]
time.sleep(repeat_interval)

if not task_result["success"]:
raise TaskFailedError(task_id)
27 changes: 27 additions & 0 deletions src/antares/service/base_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from antares.model.link import Link, LinkProperties, LinkUi
from antares.model.renewable import RenewableCluster, RenewableClusterProperties
from antares.model.settings.study_settings import StudySettings
from antares.model.simulation import AntaresSimulationParameters, Job
from antares.model.st_storage import STStorage, STStorageProperties
from antares.model.thermal import ThermalCluster, ThermalClusterMatrixName, ThermalClusterProperties

Expand Down Expand Up @@ -599,3 +600,29 @@ def update_st_storage_properties(
@abstractmethod
def read_st_storages(self, area_id: str) -> List[STStorage]:
pass


class BaseRunService(ABC):
@abstractmethod
def run_antares_simulation(self, parameters: Optional[AntaresSimulationParameters] = None) -> Job:
"""
Runs the Antares simulation.
This method starts an antares simulation for the current study config and params
Returns: A job representing the simulation task
"""
pass

@abstractmethod
def wait_job_completion(self, job: Job, time_out: int) -> None:
"""
Waits for the completion of a job
Args:
job: The job to wait for
time_out: Time limit for waiting (seconds)
Raises: SimulationTimeOutError if exceeded timeout
"""
pass
29 changes: 29 additions & 0 deletions src/antares/service/local_services/run_local.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Copyright (c) 2024, RTE (https://www.rte-france.com)
#
# See AUTHORS.txt
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
#
# SPDX-License-Identifier: MPL-2.0
#
# This file is part of the Antares project.
from typing import Any, Optional

from antares.config.local_configuration import LocalConfiguration
from antares.model.simulation import AntaresSimulationParameters, Job
from antares.service.base_services import BaseRunService


class RunLocalService(BaseRunService):
def __init__(self, config: LocalConfiguration, study_name: str, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.config = config
self.study_name = study_name

def run_antares_simulation(self, parameters: Optional[AntaresSimulationParameters] = None) -> Job:
raise NotImplementedError

def wait_job_completion(self, job: Job, time_out: int) -> None:
raise NotImplementedError
12 changes: 12 additions & 0 deletions src/antares/service/service_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from antares.service.api_services.binding_constraint_api import BindingConstraintApiService
from antares.service.api_services.link_api import LinkApiService
from antares.service.api_services.renewable_api import RenewableApiService
from antares.service.api_services.run_api import RunApiService
from antares.service.api_services.st_storage_api import ShortTermStorageApiService
from antares.service.api_services.study_api import StudyApiService
from antares.service.api_services.thermal_api import ThermalApiService
Expand All @@ -25,6 +26,7 @@
BaseBindingConstraintService,
BaseLinkService,
BaseRenewableService,
BaseRunService,
BaseShortTermStorageService,
BaseStudyService,
BaseThermalService,
Expand All @@ -33,6 +35,7 @@
from antares.service.local_services.binding_constraint_local import BindingConstraintLocalService
from antares.service.local_services.link_local import LinkLocalService
from antares.service.local_services.renewable_local import RenewableLocalService
from antares.service.local_services.run_local import RunLocalService
from antares.service.local_services.st_storage_local import ShortTermStorageLocalService
from antares.service.local_services.study_local import StudyLocalService
from antares.service.local_services.thermal_local import ThermalLocalService
Expand Down Expand Up @@ -124,3 +127,12 @@ def create_st_storage_service(self) -> BaseShortTermStorageService:
else:
raise TypeError(f"{ERROR_MESSAGE}{repr(self.config)}")
return short_term_storage_service

def create_run_service(self) -> BaseRunService:
if isinstance(self.config, APIconf):
run_service: BaseRunService = RunApiService(self.config, self.study_id)
elif isinstance(self.config, LocalConfiguration):
run_service = RunLocalService(self.config, self.study_name)
else:
raise TypeError(f"{ERROR_MESSAGE}{repr(self.config)}")
return run_service
Loading

0 comments on commit 3ca68f4

Please sign in to comment.