Skip to content

Commit

Permalink
Fix issue about not publishing data node updates (#2316)
Browse files Browse the repository at this point in the history
* Fix linter

* update PR checklist fixing typos.

* Update .github/PULL_REQUEST_TEMPLATE.md

* rename _do_write as the name was already taken by children class

* rename _do_write as the name was already taken by children class

* fix failing tests

* Add unit tests

* Make ruff happy

* Update .github/PULL_REQUEST_TEMPLATE.md

Co-authored-by: Đỗ Trường Giang <[email protected]>

* use pytest.parametrize to factorize code.

---------

Co-authored-by: Đỗ Trường Giang <[email protected]>
  • Loading branch information
jrobinAV and trgiangdo authored Dec 13, 2024
1 parent 727d657 commit 31a6b96
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 15 deletions.
12 changes: 6 additions & 6 deletions .github/PULL_REQUEST_TEMPLATE.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ _Describe which projects this change will impact and that needs to be backported
## Checklist
_We encourage you to keep the code coverage percentage at 80% and above._

- [ ] Does this solution meet the acceptance criteria of the related issue?
- [ ] Is the related issue checklist completed?
- [ ] Does this PR adds unit tests for the developed code? If not, why?
- [ ] End-to-End tests have been added or updated?
- [ ] Was the documentation updated, or a dedicated issue for documentation created? (If applicable)
- [ ] Is the release notes updated? (If applicable)
- [ ] This solution meets the acceptance criteria of the related issue.
- [ ] The related issue checklist is completed.
- [ ] This PR adds unit tests for the developed code. If not, why?
- [ ] End-to-End tests have been added or updated. If not, why?
- [ ] The documentation has been updated, or a dedicated issue created. (If applicable)
- [ ] The release notes have been updated? (If applicable)
3 changes: 3 additions & 0 deletions taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,4 +159,7 @@ def _update_job_status(job: Job, exceptions):
_TaipyLogger._get_logger().error(st)
_JobManagerFactory._build_manager()._set(job)
else:
for output in job.task.output.values():
output.track_edit(job_id=job.id)
output.unlock_edit()
job.completed()
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def _write_data(self, outputs: List[DataNode], results, job_id: JobId):
for res, dn in zip(_results, outputs):
try:
data_node = data_manager._get(dn.id)
data_node.write(res, job_id=job_id)
data_node._write(res)
except Exception as e:
logger.error("Error during write", exc_info=1)
exceptions.append(DataNodeWritingError(f"Error writing in datanode id {dn.id}: {e}"))
Expand Down
4 changes: 2 additions & 2 deletions taipy/core/data/data_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,8 +461,6 @@ def write(self,
**kwargs (Any): Extra information to attach to the edit document
corresponding to this write.
"""
from ._data_manager_factory import _DataManagerFactory

if (editor_id
and self.edit_in_progress
and self.editor_id != editor_id
Expand All @@ -471,6 +469,8 @@ def write(self,
self._write(data)
self.track_edit(job_id=job_id, editor_id=editor_id, comment=comment, **kwargs)
self.unlock_edit()
from ._data_manager_factory import _DataManagerFactory

_DataManagerFactory._build_manager()._set(self)

def track_edit(self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,25 @@
# specific language governing permissions and limitations under the License.
import traceback

from taipy import Job, JobId, Status, Task
from taipy import Job, JobId, Scope, Status, Task
from taipy.core._orchestrator._dispatcher import _JobDispatcher
from taipy.core._orchestrator._orchestrator_factory import _OrchestratorFactory
from taipy.core.data import InMemoryDataNode
from taipy.core.data.data_node_id import EDIT_JOB_ID_KEY, EDIT_TIMESTAMP_KEY
from taipy.core.job._job_manager_factory import _JobManagerFactory
from taipy.core.task._task_manager_factory import _TaskManagerFactory


def nothing(*args):
pass
return 42


def _error():
raise RuntimeError("Something bad has happened")

def test_update_job_status_no_exception():
task = Task("config_id", {}, nothing)
output = InMemoryDataNode("data_node", scope=Scope.SCENARIO)
task = Task("config_id", {}, nothing, output=[output])
_TaskManagerFactory._build_manager()._set(task)
job = Job(JobId("id"), task, "s_id", task.id)
_JobManagerFactory._build_manager()._set(job)
Expand All @@ -31,6 +37,14 @@ def test_update_job_status_no_exception():

assert job.status == Status.COMPLETED
assert job.stacktrace == []
assert len(output.edits) == 1
assert len(output.edits[0]) == 2
assert output.edits[0][EDIT_JOB_ID_KEY] == job.id
assert output.edits[0][EDIT_TIMESTAMP_KEY] is not None
assert output.last_edit_date is not None
assert output.editor_id is None
assert output.editor_expiration_date is None
assert not output.edit_in_progress


def test_update_job_status_with_one_exception():
Expand Down
3 changes: 3 additions & 0 deletions tests/core/job/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,12 @@ def job(task, job_id):
@pytest.fixture
def replace_in_memory_write_fct():
default_write = InMemoryDataNode.write
default__write = InMemoryDataNode._write
InMemoryDataNode.write = _error
InMemoryDataNode._write = _error
yield
InMemoryDataNode.write = default_write
InMemoryDataNode._write = default__write


def _foo():
Expand Down
15 changes: 12 additions & 3 deletions tests/core/notification/test_events_published.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
from queue import SimpleQueue
from typing import Any, Dict, List

import pytest

from taipy import Orchestrator
from taipy.common.config import Config, Frequency
from taipy.core import taipy as tp
from taipy.core.job.status import Status
Expand Down Expand Up @@ -154,8 +157,10 @@ def test_events_published_for_writing_dn():
assert snapshot.operation_collected.get(EventOperation.UPDATE, 0) == 5
all_evts.stop()


def test_events_published_for_scenario_submission():
@pytest.mark.parametrize("standalone", [False, True])
def test_events_published_for_scenario_submission(standalone):
if standalone:
Config.configure_job_executions(mode="standalone", max_nb_of_workers=2)
input_config = Config.configure_data_node("the_input")
output_config = Config.configure_data_node("the_output")
task_config = Config.configure_task("the_task", identity, input=input_config, output=output_config)
Expand All @@ -176,7 +181,11 @@ def test_events_published_for_scenario_submission():
# 1 submission update event for jobs
# 3 submission update events (for status: PENDING, RUNNING and COMPLETED)
# 1 submission update event for is_completed
scenario.submit()
if standalone:
Orchestrator().run()
scenario.submit(wait=True)
else:
scenario.submit()
snapshot = all_evts.capture()
assert len(snapshot.collected_events) == 18
assert snapshot.entity_type_collected.get(EventEntityType.CYCLE, 0) == 0
Expand Down

0 comments on commit 31a6b96

Please sign in to comment.