Skip to content

Commit

Permalink
Merge pull request #1277 from Avaiga/feature/#1197-add-job-to-on-subm…
Browse files Browse the repository at this point in the history
…ission-change-event

feature/#1197 pass job as metadata for event
  • Loading branch information
toan-quach authored May 28, 2024
2 parents 27f4a9a + e7bd8bf commit d490ef1
Show file tree
Hide file tree
Showing 9 changed files with 134 additions and 82 deletions.
4 changes: 1 addition & 3 deletions taipy/core/_entity/_reload.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,4 @@ def _get_manager(manager: str) -> _Manager:
"job": _JobManagerFactory._build_manager(),
"task": _TaskManagerFactory._build_manager(),
"submission": _SubmissionManagerFactory._build_manager(),
}[
manager
] # type: ignore
}[manager] # type: ignore
7 changes: 4 additions & 3 deletions taipy/core/_orchestrator/_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,11 @@ def _lock_dn_output_and_create_job(

@classmethod
def _update_submission_status(cls, job: Job):
if submission := _SubmissionManagerFactory._build_manager()._get(job.submit_id):
submission._update_submission_status(job)
submission_manager = _SubmissionManagerFactory._build_manager()
if submission := submission_manager._get(job.submit_id):
submission_manager._update_submission_status(submission, job)
else:
submissions = _SubmissionManagerFactory._build_manager()._get_all()
submissions = submission_manager._get_all()
cls.__logger.error(f"Submission {job.submit_id} not found.")
msg = "\n--------------------------------------------------------------------------------\n"
msg += f"Submission {job.submit_id} not found.\n"
Expand Down
1 change: 1 addition & 0 deletions taipy/core/notification/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def register(
<li>TASK</li>
<li>DATA_NODE</li>
<li>JOB</li>
<li>SUBMISSION</li>
</ul>
entity_id (Optional[str]): If provided, the listener will be notified
for all events related to this entity. Otherwise, the listener
Expand Down
88 changes: 88 additions & 0 deletions taipy/core/submission/_submission_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,17 @@
# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.

from threading import Lock
from typing import List, Optional, Union

from taipy.logger._taipy_logger import _TaipyLogger

from .._entity._entity_ids import _EntityIds
from .._manager._manager import _Manager
from .._repository._abstract_repository import _AbstractRepository
from .._version._version_mixin import _VersionMixin
from ..exceptions.exceptions import SubmissionNotDeletedException
from ..job.job import Job, Status
from ..notification import EventEntityType, EventOperation, Notifier, _make_event
from ..scenario.scenario import Scenario
from ..sequence.sequence import Sequence
Expand All @@ -27,6 +31,8 @@ class _SubmissionManager(_Manager[Submission], _VersionMixin):
_ENTITY_NAME = Submission.__name__
_repository: _AbstractRepository
_EVENT_ENTITY_TYPE = EventEntityType.SUBMISSION
__lock = Lock()
__logger = _TaipyLogger._get_logger()

@classmethod
def _get_all(cls, version_number: Optional[str] = None) -> List[Submission]:
Expand All @@ -47,6 +53,88 @@ def _create(cls, entity_id: str, entity_type: str, entity_config: Optional[str],

return submission

@classmethod
def _update_submission_status(cls, submission: Submission, job: Job):
with cls.__lock:
submission = cls._get(submission)

if submission._submission_status == SubmissionStatus.FAILED:
return

job_status = job.status
if job_status == Status.FAILED:
submission._submission_status = SubmissionStatus.FAILED
cls._set(submission)
cls.__logger.debug(
f"{job.id} status is {job_status}. Submission status set to `{submission._submission_status}`."
)
return
if job_status == Status.CANCELED:
submission._is_canceled = True
elif job_status == Status.BLOCKED:
submission._blocked_jobs.add(job.id)
submission._pending_jobs.discard(job.id)
elif job_status == Status.PENDING or job_status == Status.SUBMITTED:
submission._pending_jobs.add(job.id)
submission._blocked_jobs.discard(job.id)
elif job_status == Status.RUNNING:
submission._running_jobs.add(job.id)
submission._pending_jobs.discard(job.id)
elif job_status == Status.COMPLETED or job_status == Status.SKIPPED:
submission._is_completed = True # type: ignore
submission._blocked_jobs.discard(job.id)
submission._pending_jobs.discard(job.id)
submission._running_jobs.discard(job.id)
elif job_status == Status.ABANDONED:
submission._is_abandoned = True # type: ignore
submission._running_jobs.discard(job.id)
submission._blocked_jobs.discard(job.id)
submission._pending_jobs.discard(job.id)
cls._set(submission)

# The submission_status is set later to make sure notification for updating
# the submission_status attribute is triggered
if submission._is_canceled:
cls._set_submission_status(submission, SubmissionStatus.CANCELED, job)
elif submission._is_abandoned:
cls._set_submission_status(submission, SubmissionStatus.UNDEFINED, job)
elif submission._running_jobs:
cls._set_submission_status(submission, SubmissionStatus.RUNNING, job)
elif submission._pending_jobs:
cls._set_submission_status(submission, SubmissionStatus.PENDING, job)
elif submission._blocked_jobs:
cls._set_submission_status(submission, SubmissionStatus.BLOCKED, job)
elif submission._is_completed:
cls._set_submission_status(submission, SubmissionStatus.COMPLETED, job)
else:
cls._set_submission_status(submission, SubmissionStatus.UNDEFINED, job)
cls.__logger.debug(
f"{job.id} status is {job_status}. Submission status set to `{submission._submission_status}`"
)

@classmethod
def _set_submission_status(cls, submission: Submission, new_submission_status: SubmissionStatus, job: Job):
if not submission._is_in_context:
submission = cls._get(submission)
_current_submission_status = submission._submission_status
submission._submission_status = new_submission_status

cls._set(submission)

if _current_submission_status != submission._submission_status:
event = _make_event(
submission,
EventOperation.UPDATE,
"submission_status",
submission._submission_status,
job_triggered_submission_status_changed=job.id,
)

if not submission._is_in_context:
Notifier.publish(event)
else:
submission._in_context_attributes_changed_collector.append(event)

@classmethod
def _get_latest(cls, entity: Union[Scenario, Sequence, Task]) -> Optional[Submission]:
entity_id = entity.id if not isinstance(entity, str) else entity
Expand Down
67 changes: 2 additions & 65 deletions taipy/core/submission/submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,13 @@
from datetime import datetime
from typing import Any, Dict, List, Optional, Set, Union

from taipy.logger._taipy_logger import _TaipyLogger

from .._entity._entity import _Entity
from .._entity._labeled import _Labeled
from .._entity._properties import _Properties
from .._entity._reload import _Reloader, _self_reload, _self_setter
from .._version._version_manager_factory import _VersionManagerFactory
from ..job.job import Job, JobId, Status
from ..notification.event import Event, EventEntityType, EventOperation, _make_event
from ..job.job import Job, JobId
from ..notification import Event, EventEntityType, EventOperation, _make_event
from .submission_id import SubmissionId
from .submission_status import SubmissionStatus

Expand All @@ -45,7 +43,6 @@ class Submission(_Entity, _Labeled):
_MANAGER_NAME = "submission"
__SEPARATOR = "_"
lock = threading.Lock()
__logger = _TaipyLogger._get_logger()

def __init__(
self,
Expand Down Expand Up @@ -192,66 +189,6 @@ def __gt__(self, other):
def __ge__(self, other):
return self.creation_date.timestamp() >= other.creation_date.timestamp()

def _update_submission_status(self, job: Job):
from ._submission_manager_factory import _SubmissionManagerFactory

with self.lock:
submission_manager = _SubmissionManagerFactory._build_manager()
submission = submission_manager._get(self)
if submission._submission_status == SubmissionStatus.FAILED:
return

job_status = job.status
if job_status == Status.FAILED:
submission._submission_status = SubmissionStatus.FAILED
submission_manager._set(submission)
self.__logger.debug(
f"{job.id} status is {job_status}. Submission status set to " f"{submission._submission_status}"
)
return
if job_status == Status.CANCELED:
submission._is_canceled = True
elif job_status == Status.BLOCKED:
submission._blocked_jobs.add(job.id)
submission._pending_jobs.discard(job.id)
elif job_status == Status.PENDING or job_status == Status.SUBMITTED:
submission._pending_jobs.add(job.id)
submission._blocked_jobs.discard(job.id)
elif job_status == Status.RUNNING:
submission._running_jobs.add(job.id)
submission._pending_jobs.discard(job.id)
elif job_status == Status.COMPLETED or job_status == Status.SKIPPED:
submission._is_completed = True # type: ignore
submission._blocked_jobs.discard(job.id)
submission._pending_jobs.discard(job.id)
submission._running_jobs.discard(job.id)
elif job_status == Status.ABANDONED:
submission._is_abandoned = True # type: ignore
submission._running_jobs.discard(job.id)
submission._blocked_jobs.discard(job.id)
submission._pending_jobs.discard(job.id)
submission_manager._set(submission)

# The submission_status is set later to make sure notification for updating
# the submission_status attribute is triggered
if submission._is_canceled:
submission.submission_status = SubmissionStatus.CANCELED # type: ignore
elif submission._is_abandoned:
submission.submission_status = SubmissionStatus.UNDEFINED # type: ignore
elif submission._running_jobs:
submission.submission_status = SubmissionStatus.RUNNING # type: ignore
elif submission._pending_jobs:
submission.submission_status = SubmissionStatus.PENDING # type: ignore
elif submission._blocked_jobs:
submission.submission_status = SubmissionStatus.BLOCKED # type: ignore
elif submission._is_completed:
submission.submission_status = SubmissionStatus.COMPLETED # type: ignore
else:
submission.submission_status = SubmissionStatus.UNDEFINED # type: ignore
self.__logger.debug(
f"{job.id} status is {job_status}. Submission status set to " f"{submission._submission_status}"
)

def is_finished(self) -> bool:
"""Indicate if the submission is finished.
Expand Down
13 changes: 10 additions & 3 deletions taipy/gui_core/_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def process_event(self, event: Event):
with self.lock:
self.jobs_list = None
elif event.entity_type == EventEntityType.SUBMISSION:
self.submission_status_callback(event.entity_id)
self.submission_status_callback(event.entity_id, event)
elif event.entity_type == EventEntityType.DATA_NODE:
with self.lock:
self.data_nodes_by_owner = None
Expand All @@ -146,7 +146,7 @@ def scenario_refresh(self, scenario_id: t.Optional[str]):
{"scenario": scenario_id or True},
)

def submission_status_callback(self, submission_id: t.Optional[str]):
def submission_status_callback(self, submission_id: t.Optional[str] = None, event: t.Optional[Event] = None):
if not submission_id or not is_readable(t.cast(SubmissionId, submission_id)):
return
try:
Expand Down Expand Up @@ -182,7 +182,14 @@ def submission_status_callback(self, submission_id: t.Optional[str]):
self.gui._call_user_callback(
client_id,
submission_name,
[core_get(submission.entity_id), {"submission_status": new_status.name}],
[
core_get(submission.id),
{
"submission_status": new_status.name,
"submittable_entity": core_get(submission.entity_id),
**(event.metadata if event else {}),
},
],
submission.properties.get("module_context"),
)

Expand Down
6 changes: 3 additions & 3 deletions taipy/gui_core/viselements.json
Original file line number Diff line number Diff line change
Expand Up @@ -207,15 +207,15 @@
{
"name": "on_submission_change",
"type": "Callback",
"doc": "The name of the function that is triggered when a submission status is changed.<br/><br/>All the parameters of that function are optional:\n<ul>\n<li>state (<code>State^</code>): the state instance.</li>\n<li>submittable (Submittable): the entity (usually a Scenario) that was submitted.</li>\n<li>details (dict): the details on this callback's invocation.<br/>\nThis dictionary has the following keys:\n<ul>\n<li>submission_status (str): the new status of the submission (possible values: SUBMITTED, COMPLETED, CANCELED, FAILED, BLOCKED, WAITING, RUNNING).</li>\n<li>job: the Job (if any) that is at the origin of the submission status change.</li>\n</ul>",
"doc": "The name of the function that is triggered when a submission status is changed.<br/><br/>All the parameters of that function are optional:\n<ul>\n<li>state (<code>State^</code>): the state instance.</li>\n<li>submission (Submission): the submission entity containing submission information.</li>\n<li>details (dict): the details on this callback's invocation.<br/>\nThis dictionary has the following keys:\n<ul>\n<li>submission_status (str): the new status of the submission (possible values: SUBMITTED, COMPLETED, CANCELED, FAILED, BLOCKED, WAITING, RUNNING).</li>\n<li>job: the Job (if any) that is at the origin of the submission status change.</li>\n<li>submittable_entity: submittable (Submittable): the entity (usually a Scenario) that was submitted.</li>\n</ul>",
"signature": [
[
"state",
"State"
],
[
"submittable",
"Submittable"
"submission",
"Submission"
],
[
"details",
Expand Down
16 changes: 16 additions & 0 deletions tests/core/notification/test_notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from taipy.core.notification.event import Event
from taipy.core.notification.notifier import Notifier
from taipy.core.scenario._scenario_manager_factory import _ScenarioManagerFactory
from taipy.core.submission.submission_status import SubmissionStatus


def test_register():
Expand Down Expand Up @@ -742,6 +743,21 @@ def test_publish_submission_event():
and event.attribute_name == expected_attribute_names[i]
for i, event in enumerate(published_events)
)
assert "job_triggered_submission_status_changed" in published_events[4].metadata
assert published_events[4].metadata["job_triggered_submission_status_changed"] == job.id

# Test updating submission_status manually will not add the job_triggered_submission_status_changed
# to the metadata as no job was used to update the submission_status
submission.submission_status = SubmissionStatus.CANCELED

assert registration_queue.qsize() == 1
published_event = registration_queue.get()

assert published_event.entity_type == EventEntityType.SUBMISSION
assert published_event.entity_id == submission.id
assert published_event.operation == EventOperation.UPDATE
assert published_event.attribute_name == "submission_status"
assert "job_triggered_submission_status_changed" not in published_event.metadata


def test_publish_deletion_event():
Expand Down
14 changes: 9 additions & 5 deletions tests/core/submission/test_submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def __test_update_submission_status(job_ids, expected_submission_status):
submission.jobs = [jobs[job_id] for job_id in job_ids]
for job_id in job_ids:
job = jobs[job_id]
submission._update_submission_status(job)
_SubmissionManagerFactory._build_manager()._update_submission_status(submission, job)
assert submission.submission_status == expected_submission_status


Expand Down Expand Up @@ -470,29 +470,33 @@ def test_auto_set_and_reload_properties():
],
)
def test_update_submission_status_with_single_job_completed(job_statuses, expected_submission_statuses):
submission_manager = _SubmissionManagerFactory._build_manager()

job = MockJob("job_id", Status.SUBMITTED)
submission = Submission("submission_id", "ENTITY_TYPE", "entity_config_id")
_SubmissionManagerFactory._build_manager()._set(submission)
submission_manager._set(submission)

assert submission.submission_status == SubmissionStatus.SUBMITTED

for job_status, submission_status in zip(job_statuses, expected_submission_statuses):
job.status = job_status
submission._update_submission_status(job)
submission_manager._update_submission_status(submission, job)
assert submission.submission_status == submission_status


def __test_update_submission_status_with_two_jobs(job_ids, job_statuses, expected_submission_statuses):
submission_manager = _SubmissionManagerFactory._build_manager()

jobs = {job_id: MockJob(job_id, Status.SUBMITTED) for job_id in job_ids}
submission = Submission("submission_id", "ENTITY_TYPE", "entity_config_id")
_SubmissionManagerFactory._build_manager()._set(submission)
submission_manager._set(submission)

assert submission.submission_status == SubmissionStatus.SUBMITTED

for (job_id, job_status), submission_status in zip(job_statuses, expected_submission_statuses):
job = jobs[job_id]
job.status = job_status
submission._update_submission_status(job)
submission_manager._update_submission_status(submission, job)
assert submission.submission_status == submission_status


Expand Down

0 comments on commit d490ef1

Please sign in to comment.