Skip to content

Commit

Permalink
Merge pull request #1640 from Avaiga/feature/#1544-add-job-execution-…
Browse files Browse the repository at this point in the history
…duration

Feature/#1640 - Add execution duration as a information field in Job and Submission entities
  • Loading branch information
trgiangdo authored Aug 8, 2024
2 parents 6fa0903 + fd3cb48 commit a46d9fa
Show file tree
Hide file tree
Showing 8 changed files with 184 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.

import datetime
from typing import Optional

from ...job.job import Job
Expand Down Expand Up @@ -44,5 +45,7 @@ def _dispatch(self, job: Job):
Parameters:
job (Job^): The job to submit on an executor with an available worker.
"""
job.execution_started_at = datetime.datetime.now()
rs = _TaskFunctionWrapper(job.id, job.task).execute()
self._update_job_status(job, rs)
job.execution_ended_at = datetime.datetime.now()
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.

import datetime
import multiprocessing as mp
from concurrent.futures import Executor, ProcessPoolExecutor
from functools import partial
Expand Down Expand Up @@ -59,6 +60,8 @@ def _dispatch(self, job: Job):
self._nb_available_workers -= 1
self._logger.debug(f"Setting nb_available_workers to {self._nb_available_workers} in the dispatch method.")
config_as_string = _TomlSerializer()._serialize(Config._applied_config) # type: ignore[attr-defined]

job.execution_started_at = datetime.datetime.now()
future = self._executor.submit(_TaskFunctionWrapper(job.id, job.task), config_as_string=config_as_string)
future.add_done_callback(partial(self._update_job_status_from_future, job))

Expand All @@ -67,3 +70,4 @@ def _update_job_status_from_future(self, job: Job, ft):
self._nb_available_workers += 1
self._logger.debug(f"Setting nb_available_workers to {self._nb_available_workers} in the callback method.")
self._update_job_status(job, ft.result())
job.execution_ended_at = datetime.datetime.now()
6 changes: 6 additions & 0 deletions taipy/core/job/_job_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ def _entity_to_model(cls, job: Job) -> _JobModel:
job.submit_id,
job.submit_entity_id,
job._creation_date.isoformat(),
job._execution_started_at.isoformat() if job._execution_started_at else None,
job._execution_ended_at.isoformat() if job._execution_ended_at else None,
cls.__serialize_subscribers(job._subscribers),
job._stacktrace,
version=job._version,
Expand All @@ -52,6 +54,10 @@ def _model_to_entity(cls, model: _JobModel) -> Job:
job._status = model.status # type: ignore
job._force = model.force # type: ignore
job._creation_date = datetime.fromisoformat(model.creation_date) # type: ignore
job._execution_started_at = (
datetime.fromisoformat(model.execution_started_at) if model.execution_started_at else None
)
job._execution_ended_at = datetime.fromisoformat(model.execution_ended_at) if model.execution_ended_at else None
for it in model.subscribers:
try:
fct_module, fct_name = it.get("fct_module"), it.get("fct_name")
Expand Down
8 changes: 7 additions & 1 deletion taipy/core/job/_job_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
# specific language governing permissions and limitations under the License.

from dataclasses import dataclass
from typing import Any, Dict, List
from typing import Any, Dict, List, Optional

from .._repository._base_taipy_model import _BaseModel
from .job_id import JobId
Expand All @@ -26,6 +26,8 @@ class _JobModel(_BaseModel):
submit_id: str
submit_entity_id: str
creation_date: str
execution_started_at: Optional[str]
execution_ended_at: Optional[str]
subscribers: List[Dict]
stacktrace: List[str]
version: str
Expand All @@ -40,6 +42,8 @@ def from_dict(data: Dict[str, Any]):
submit_id=data["submit_id"],
submit_entity_id=data["submit_entity_id"],
creation_date=data["creation_date"],
execution_started_at=data["execution_started_at"],
execution_ended_at=data["execution_ended_at"],
subscribers=_BaseModel._deserialize_attribute(data["subscribers"]),
stacktrace=_BaseModel._deserialize_attribute(data["stacktrace"]),
version=data["version"],
Expand All @@ -54,6 +58,8 @@ def to_list(self):
self.submit_id,
self.submit_entity_id,
self.creation_date,
self.execution_started_at,
self.execution_ended_at,
_BaseModel._serialize_attribute(self.subscribers),
_BaseModel._serialize_attribute(self.stacktrace),
self.version,
Expand Down
35 changes: 35 additions & 0 deletions taipy/core/job/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ def __init__(self, id: JobId, task: "Task", submit_id: str, submit_entity_id: st
self._creation_date = datetime.now()
self._submit_id: str = submit_id
self._submit_entity_id: str = submit_entity_id
self._execution_started_at: Optional[datetime] = None
self._execution_ended_at: Optional[datetime] = None
self._subscribers: List[Callable] = []
self._stacktrace: List[str] = []
self.__logger = _TaipyLogger._get_logger()
Expand Down Expand Up @@ -144,6 +146,39 @@ def creation_date(self):
def creation_date(self, val):
self._creation_date = val

@property
@_self_reload(_MANAGER_NAME)
def execution_started_at(self) -> Optional[datetime]:
return self._execution_started_at

@execution_started_at.setter
@_self_setter(_MANAGER_NAME)
def execution_started_at(self, val):
self._execution_started_at = val

@property
@_self_reload(_MANAGER_NAME)
def execution_ended_at(self) -> Optional[datetime]:
return self._execution_ended_at

@execution_ended_at.setter
@_self_setter(_MANAGER_NAME)
def execution_ended_at(self, val):
self._execution_ended_at = val

@property
@_self_reload(_MANAGER_NAME)
def execution_duration(self) -> Optional[float]:
"""Get the duration of the job execution in seconds.
Returns:
Optional[float]: The duration of the job execution in seconds. If the job is not
completed, None is returned.
"""
if self._execution_started_at and self._execution_ended_at:
return (self._execution_ended_at - self._execution_started_at).total_seconds()
return None

@property # type: ignore
@_self_reload(_MANAGER_NAME)
def stacktrace(self) -> List[str]:
Expand Down
27 changes: 27 additions & 0 deletions taipy/core/submission/submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,33 @@ def properties(self):
def creation_date(self):
return self._creation_date

@property
@_self_reload(_MANAGER_NAME)
def execution_started_at(self) -> Optional[datetime]:
if all(job.execution_started_at is not None for job in self.jobs):
return min(job.execution_started_at for job in self.jobs)
return None

@property
@_self_reload(_MANAGER_NAME)
def execution_ended_at(self) -> Optional[datetime]:
if all(job.execution_ended_at is not None for job in self.jobs):
return max(job.execution_ended_at for job in self.jobs)
return None

@property
@_self_reload(_MANAGER_NAME)
def execution_duration(self) -> Optional[float]:
"""Get the duration of the submission in seconds.
Returns:
Optional[float]: The duration of the submission in seconds. If the job is not
completed, None is returned.
"""
if self.execution_started_at and self.execution_ended_at:
return (self.execution_ended_at - self.execution_started_at).total_seconds()
return None

def get_label(self) -> str:
"""Returns the submission simple label prefixed by its owner label.
Expand Down
75 changes: 74 additions & 1 deletion tests/core/_orchestrator/test_orchestrator__submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@
# specific language governing permissions and limitations under the License.

from datetime import datetime, timedelta
from time import sleep
from unittest import mock

import freezegun
import pytest

from taipy import Scenario, Scope, Task
from taipy.config import Config
from taipy.core import taipy
from taipy.core import Core, taipy
from taipy.core._orchestrator._orchestrator import _Orchestrator
from taipy.core._orchestrator._orchestrator_factory import _OrchestratorFactory
from taipy.core.config import JobConfig
Expand All @@ -27,6 +28,7 @@
from taipy.core.submission._submission_manager_factory import _SubmissionManagerFactory
from taipy.core.submission.submission_status import SubmissionStatus
from taipy.core.task._task_manager import _TaskManager
from tests.core.utils import assert_true_after_time


def nothing(*args, **kwargs):
Expand All @@ -53,6 +55,7 @@ def test_submit_scenario_development_mode():
scenario = create_scenario()
scenario.dn_0.write(0) # input data is made ready
orchestrator = _OrchestratorFactory._build_orchestrator()
_OrchestratorFactory._build_dispatcher()

submit_time = datetime.now() + timedelta(seconds=1) # +1 to ensure the edit time of dn_0 is before the submit time
with freezegun.freeze_time(submit_time):
Expand Down Expand Up @@ -505,3 +508,73 @@ def test_submit_submittable_generate_unique_submit_id():
assert jobs_1[0].submit_id == jobs_1[1].submit_id
assert jobs_2[0].submit_id == jobs_2[1].submit_id
assert jobs_1[0].submit_id != jobs_2[0].submit_id


def task_sleep_1():
sleep(1)


def task_sleep_2():
sleep(2)
return


def test_submit_duration_development_mode():
core = Core()
core.run()

task_1 = Task("task_config_id_1", {}, task_sleep_1, [], [])
task_2 = Task("task_config_id_2", {}, task_sleep_2, [], [])

_TaskManager._set(task_1)
_TaskManager._set(task_2)

scenario = Scenario("scenario", {task_1, task_2}, {})
_ScenarioManager._set(scenario)
submission = taipy.submit(scenario)
jobs = submission.jobs
core.stop()

assert all(isinstance(job.execution_started_at, datetime) for job in jobs)
assert all(isinstance(job.execution_ended_at, datetime) for job in jobs)
jobs_1s = jobs[0] if jobs[0].task.config_id == "task_config_id_1" else jobs[1]
jobs_2s = jobs[0] if jobs[0].task.config_id == "task_config_id_2" else jobs[1]
assert jobs_1s.execution_duration >= 1
assert jobs_2s.execution_duration >= 2

assert submission.execution_duration >= 3
assert submission.execution_started_at == min(jobs_1s.execution_started_at, jobs_2s.execution_started_at)
assert submission.execution_ended_at == max(jobs_1s.execution_ended_at, jobs_2s.execution_ended_at)


@pytest.mark.standalone
def test_submit_duration_standalone_mode():
Config.configure_job_executions(mode=JobConfig._STANDALONE_MODE)
core = Core()
core.run()

task_1 = Task("task_config_id_1", {}, task_sleep_1, [], [])
task_2 = Task("task_config_id_2", {}, task_sleep_2, [], [])

_TaskManager._set(task_1)
_TaskManager._set(task_2)

scenario = Scenario("scenario", {task_1, task_2}, {})
_ScenarioManager._set(scenario)
submission = taipy.submit(scenario)
jobs = submission.jobs

assert_true_after_time(jobs[1].is_completed)

core.stop()

assert all(isinstance(job.execution_started_at, datetime) for job in jobs)
assert all(isinstance(job.execution_ended_at, datetime) for job in jobs)
jobs_1s = jobs[0] if jobs[0].task.config_id == "task_config_id_1" else jobs[1]
jobs_2s = jobs[0] if jobs[0].task.config_id == "task_config_id_2" else jobs[1]
assert jobs_1s.execution_duration >= 1
assert jobs_2s.execution_duration >= 2

assert submission.execution_duration >= 2 # Both tasks are executed in parallel so the duration may smaller than 3
assert submission.execution_started_at == min(jobs_1s.execution_started_at, jobs_2s.execution_started_at)
assert submission.execution_ended_at == max(jobs_1s.execution_ended_at, jobs_2s.execution_ended_at)
28 changes: 28 additions & 0 deletions tests/core/submission/test_submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -903,3 +903,31 @@ def test_is_finished():
submission.submission_status = SubmissionStatus.COMPLETED
assert submission.submission_status == SubmissionStatus.COMPLETED
assert submission.is_finished()


def test_execution_duration():
task = Task(config_id="task_1", properties={}, function=print, id=TaskId("task_1"))
submission = Submission(task.id, task._ID_PREFIX, task.config_id, properties={})
job_1 = Job("job_1", task, submission.id, submission.entity_id)
job_2 = Job("job_2", task, submission.id, submission.entity_id)

_TaskManagerFactory._build_manager()._set(task)
_SubmissionManagerFactory._build_manager()._set(submission)
_JobManagerFactory._build_manager()._set(job_1)
_JobManagerFactory._build_manager()._set(job_2)

submission.jobs = [job_1, job_2]
_SubmissionManagerFactory._build_manager()._set(submission)

job_1.execution_started_at = datetime(2024, 1, 1, 0, 0, 0)
job_1.execution_ended_at = datetime(2024, 1, 1, 0, 0, 10)
job_2.execution_started_at = datetime(2024, 1, 1, 0, 1, 0)
job_2.execution_ended_at = datetime(2024, 1, 1, 0, 2, 30)
assert submission.execution_started_at == job_1.execution_started_at
assert submission.execution_ended_at == job_2.execution_ended_at
assert submission.execution_duration == 150

job_2.execution_ended_at = None # job_2 is still running
assert submission.execution_started_at == job_1.execution_started_at
assert submission.execution_ended_at is None
assert submission.execution_duration is None

0 comments on commit a46d9fa

Please sign in to comment.