Skip to content

Commit

Permalink
Merge branch 'develop' into feature/#428-selector-dropdown-filter
Browse files Browse the repository at this point in the history
  • Loading branch information
FredLL-Avaiga authored May 31, 2024
2 parents 0190e84 + ed8fa60 commit 853294d
Show file tree
Hide file tree
Showing 22 changed files with 240 additions and 159 deletions.
6 changes: 3 additions & 3 deletions taipy/core/_entity/_ready_to_run_property.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@

from typing import TYPE_CHECKING, Dict, Set, Union

from ..common.reason import Reason
from ..notification import EventOperation, Notifier, _make_event
from ..reason.reason import Reasons

if TYPE_CHECKING:
from ..data.data_node import DataNode, DataNodeId
Expand All @@ -29,7 +29,7 @@ class _ReadyToRunProperty:

# A nested dictionary of the submittable entities (Scenario, Sequence, Task) and
# the data nodes that make it not ready_to_run with the reason(s)
_submittable_id_datanodes: Dict[Union["ScenarioId", "SequenceId", "TaskId"], Reason] = {}
_submittable_id_datanodes: Dict[Union["ScenarioId", "SequenceId", "TaskId"], Reasons] = {}

@classmethod
def _add(cls, dn: "DataNode", reason: str) -> None:
Expand Down Expand Up @@ -81,7 +81,7 @@ def __add(cls, submittable: Union["Scenario", "Sequence", "Task"], datanode: "Da
cls.__publish_submittable_property_event(submittable, False)

if submittable.id not in cls._submittable_id_datanodes:
cls._submittable_id_datanodes[submittable.id] = Reason(submittable.id)
cls._submittable_id_datanodes[submittable.id] = Reasons(submittable.id)
cls._submittable_id_datanodes[submittable.id]._add_reason(datanode.id, reason)

@staticmethod
Expand Down
11 changes: 6 additions & 5 deletions taipy/core/_entity/submittable.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@

from ..common._listattributes import _ListAttributes
from ..common._utils import _Subscriber
from ..common.reason import Reason
from ..data.data_node import DataNode
from ..job.job import Job
from ..reason._reason_factory import _build_data_node_is_being_edited_reason, _build_data_node_is_not_written
from ..reason.reason import Reasons
from ..submission.submission import Submission
from ..task.task import Task
from ._dag import _DAG
Expand Down Expand Up @@ -82,20 +83,20 @@ def get_intermediate(self) -> Set[DataNode]:
all_data_nodes_in_dag = {node for node in dag.nodes if isinstance(node, DataNode)}
return all_data_nodes_in_dag - self.__get_inputs(dag) - self.__get_outputs(dag)

def is_ready_to_run(self) -> Reason:
def is_ready_to_run(self) -> Reasons:
"""Indicate if the entity is ready to be run.
Returns:
A Reason object that can function as a Boolean value.
which is True if the given entity is ready to be run or there is no reason to be blocked, False otherwise.
"""
reason = Reason(self._submittable_id)
reason = Reasons(self._submittable_id)

for node in self.get_inputs():
if node._edit_in_progress:
reason._add_reason(node.id, node._build_edit_in_progress_reason())
reason._add_reason(node.id, _build_data_node_is_being_edited_reason(node.id))
if not node._last_edit_date:
reason._add_reason(node.id, node._build_not_written_reason())
reason._add_reason(node.id, _build_data_node_is_not_written(node.id))

return reason

Expand Down
4 changes: 0 additions & 4 deletions taipy/core/_manager/_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@ class _Manager(Generic[EntityType]):
_logger = _TaipyLogger._get_logger()
_ENTITY_NAME: str = "Entity"

@classmethod
def _build_not_submittable_entity_reason(cls, entity_id: str) -> str:
return f"Entity {entity_id} is not a submittable entity"

@classmethod
def _delete_all(cls):
"""
Expand Down
28 changes: 14 additions & 14 deletions taipy/core/_orchestrator/_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ def _lock_dn_output_and_create_job(
)

@classmethod
def _update_submission_status(cls, job: Job):
def _update_submission_status(cls, job: Job) -> None:
submission_manager = _SubmissionManagerFactory._build_manager()
if submission := submission_manager._get(job.submit_id):
submission_manager._update_submission_status(submission, job)
Expand All @@ -182,7 +182,7 @@ def _update_submission_status(cls, job: Job):
cls.__logger.error(f"Job {job.id} status: {job.status}")

@classmethod
def _orchestrate_job_to_run_or_block(cls, jobs: List[Job]):
def _orchestrate_job_to_run_or_block(cls, jobs: List[Job]) -> None:
blocked_jobs = []
pending_jobs = []

Expand All @@ -199,7 +199,7 @@ def _orchestrate_job_to_run_or_block(cls, jobs: List[Job]):
cls.jobs_to_run.put(job)

@classmethod
def _wait_until_job_finished(cls, jobs: Union[List[Job], Job], timeout: float = 0):
def _wait_until_job_finished(cls, jobs: Union[List[Job], Job], timeout: float = 0) -> None:
# Note: this method should be prefixed by two underscores, but it has only one, so it can be mocked in tests.
def __check_if_timeout(st, to):
return (datetime.now() - st).seconds < to
Expand Down Expand Up @@ -231,21 +231,21 @@ def _is_blocked(cls, obj: Union[Task, Job]) -> bool:
return any(not data_manager._get(dn.id).is_ready_for_reading for dn in input_data_nodes)

@staticmethod
def _unlock_edit_on_jobs_outputs(jobs: Union[Job, List[Job], Set[Job]]):
def _unlock_edit_on_jobs_outputs(jobs: Union[Job, List[Job], Set[Job]]) -> None:
jobs = [jobs] if isinstance(jobs, Job) else jobs
for job in jobs:
job._unlock_edit_on_outputs()

@classmethod
def _on_status_change(cls, job: Job):
def _on_status_change(cls, job: Job) -> None:
if job.is_completed() or job.is_skipped():
cls.__logger.debug(f"{job.id} has been completed or skipped. Unblocking jobs.")
cls.__unblock_jobs()
elif job.is_failed():
cls._fail_subsequent_jobs(job)

@classmethod
def __unblock_jobs(cls):
def __unblock_jobs(cls) -> None:
with cls.lock:
cls.__logger.debug("Acquiring lock to unblock jobs.")
for job in cls.blocked_jobs:
Expand All @@ -258,14 +258,14 @@ def __unblock_jobs(cls):
cls.jobs_to_run.put(job)

@classmethod
def __remove_blocked_job(cls, job):
def __remove_blocked_job(cls, job: Job) -> None:
try: # In case the job has been removed from the list of blocked_jobs.
cls.blocked_jobs.remove(job)
except Exception:
cls.__logger.warning(f"{job.id} is not in the blocked list anymore.")

@classmethod
def cancel_job(cls, job: Job):
def cancel_job(cls, job: Job) -> None:
if job.is_canceled():
cls.__logger.info(f"{job.id} has already been canceled.")
elif job.is_abandoned():
Expand Down Expand Up @@ -298,21 +298,21 @@ def __find_subsequent_jobs(cls, submit_id, output_dn_config_ids: Set) -> Set[Job
return subsequent_jobs

@classmethod
def __remove_blocked_jobs(cls, jobs):
def __remove_blocked_jobs(cls, jobs: Set[Job]) -> None:
for job in jobs:
cls.__remove_blocked_job(job)

@classmethod
def __remove_jobs_to_run(cls, jobs):
new_jobs_to_run = Queue()
def __remove_jobs_to_run(cls, jobs: Set[Job]) -> None:
new_jobs_to_run: Queue = Queue()
while not cls.jobs_to_run.empty():
current_job = cls.jobs_to_run.get()
if current_job not in jobs:
new_jobs_to_run.put(current_job)
cls.jobs_to_run = new_jobs_to_run

@classmethod
def _fail_subsequent_jobs(cls, failed_job: Job):
def _fail_subsequent_jobs(cls, failed_job: Job) -> None:
with cls.lock:
cls.__logger.debug("Acquiring lock to fail subsequent jobs.")
to_fail_or_abandon_jobs = set()
Expand All @@ -327,7 +327,7 @@ def _fail_subsequent_jobs(cls, failed_job: Job):
cls._unlock_edit_on_jobs_outputs(to_fail_or_abandon_jobs)

@classmethod
def _cancel_jobs(cls, job_id_to_cancel: JobId, jobs: Set[Job]):
def _cancel_jobs(cls, job_id_to_cancel: JobId, jobs: Set[Job]) -> None:
for job in jobs:
if job.is_running():
cls.__logger.info(f"{job.id} is running and cannot be canceled.")
Expand All @@ -341,7 +341,7 @@ def _cancel_jobs(cls, job_id_to_cancel: JobId, jobs: Set[Job]):
job.abandoned()

@staticmethod
def _check_and_execute_jobs_if_development_mode():
def _check_and_execute_jobs_if_development_mode() -> None:
from ._orchestrator_factory import _OrchestratorFactory

if dispatcher := _OrchestratorFactory._dispatcher:
Expand Down
2 changes: 1 addition & 1 deletion taipy/core/_repository/_filesystem_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def _delete_by(self, attribute: str, value: str):
def _search(self, attribute: str, value: Any, filters: Optional[List[Dict]] = None) -> List[Entity]:
return list(self.__search(attribute, value, filters))

def _export(self, entity_id: str, folder_path: Union[str, pathlib.Path]):
def _export(self, entity_id: str, folder_path: Union[str, pathlib.Path]) -> None:
if isinstance(folder_path, str):
folder: pathlib.Path = pathlib.Path(folder_path)
else:
Expand Down
6 changes: 3 additions & 3 deletions taipy/core/cycle/_cycle_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class _CycleManager(_Manager[Cycle]):
@classmethod
def _create(
cls, frequency: Frequency, name: Optional[str] = None, creation_date: Optional[datetime] = None, **properties
):
) -> Cycle:
creation_date = creation_date if creation_date else datetime.now()
start_date = _CycleManager._get_start_date_of_cycle(frequency, creation_date)
end_date = _CycleManager._get_end_date_of_cycle(frequency, start_date)
Expand Down Expand Up @@ -63,7 +63,7 @@ def _get_or_create(
return cls._create(frequency=frequency, creation_date=creation_date, name=name)

@staticmethod
def _get_start_date_of_cycle(frequency: Frequency, creation_date: datetime):
def _get_start_date_of_cycle(frequency: Frequency, creation_date: datetime) -> datetime:
start_date = creation_date.date()
start_time = time()
if frequency == Frequency.DAILY:
Expand All @@ -77,7 +77,7 @@ def _get_start_date_of_cycle(frequency: Frequency, creation_date: datetime):
return datetime.combine(start_date, start_time)

@staticmethod
def _get_end_date_of_cycle(frequency: Frequency, start_date: datetime):
def _get_end_date_of_cycle(frequency: Frequency, start_date: datetime) -> datetime:
end_date = start_date
if frequency == Frequency.DAILY:
end_date = end_date + timedelta(days=1)
Expand Down
14 changes: 7 additions & 7 deletions taipy/core/data/_data_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,25 +111,25 @@ def _get_all(cls, version_number: Optional[str] = None) -> List[DataNode]:
return cls._repository._load_all(filters)

@classmethod
def _clean_generated_file(cls, data_node: DataNode):
def _clean_generated_file(cls, data_node: DataNode) -> None:
if not isinstance(data_node, _FileDataNodeMixin):
return
if data_node.is_generated and os.path.exists(data_node.path):
os.remove(data_node.path)

@classmethod
def _clean_generated_files(cls, data_nodes: Iterable[DataNode]):
def _clean_generated_files(cls, data_nodes: Iterable[DataNode]) -> None:
for data_node in data_nodes:
cls._clean_generated_file(data_node)

@classmethod
def _delete(cls, data_node_id: DataNodeId):
def _delete(cls, data_node_id: DataNodeId) -> None:
if data_node := cls._get(data_node_id, None):
cls._clean_generated_file(data_node)
super()._delete(data_node_id)

@classmethod
def _delete_many(cls, data_node_ids: Iterable[DataNodeId]):
def _delete_many(cls, data_node_ids: Iterable[DataNodeId]) -> None:
data_nodes = []
for data_node_id in data_node_ids:
if data_node := cls._get(data_node_id):
Expand All @@ -138,13 +138,13 @@ def _delete_many(cls, data_node_ids: Iterable[DataNodeId]):
super()._delete_many(data_node_ids)

@classmethod
def _delete_all(cls):
def _delete_all(cls) -> None:
data_nodes = cls._get_all()
cls._clean_generated_files(data_nodes)
super()._delete_all()

@classmethod
def _delete_by_version(cls, version_number: str):
def _delete_by_version(cls, version_number: str) -> None:
data_nodes = cls._get_all(version_number)
cls._clean_generated_files(data_nodes)
cls._repository._delete_by(attribute="version", value=version_number)
Expand All @@ -165,7 +165,7 @@ def _get_by_config_id(cls, config_id: str, version_number: Optional[str] = None)
return cls._repository._load_all(filters)

@classmethod
def _export(cls, id: str, folder_path: Union[str, pathlib.Path], **kwargs):
def _export(cls, id: str, folder_path: Union[str, pathlib.Path], **kwargs) -> None:
cls._repository._export(id, folder_path)

if not kwargs.get("include_data"):
Expand Down
6 changes: 0 additions & 6 deletions taipy/core/data/data_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,6 @@ def last_edit_date(self):
def last_edit_date(self, val):
self._last_edit_date = val

def _build_not_written_reason(self) -> str:
return f"DataNode {self.id} is not written"

@property # type: ignore
@_self_reload(_MANAGER_NAME)
def scope(self):
Expand Down Expand Up @@ -297,9 +294,6 @@ def edit_in_progress(self):
def edit_in_progress(self, val):
self._edit_in_progress = val

def _build_edit_in_progress_reason(self) -> str:
return f"DataNode {self.id} is being edited"

@property # type: ignore
@_self_reload(_MANAGER_NAME)
def editor_id(self):
Expand Down
4 changes: 2 additions & 2 deletions taipy/core/job/_job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def _create(
return job

@classmethod
def _delete(cls, job: Union[Job, JobId], force=False):
def _delete(cls, job: Union[Job, JobId], force=False) -> None:
if isinstance(job, str):
job = cls._get(job)
if cls._is_deletable(job) or force:
Expand All @@ -69,7 +69,7 @@ def _delete(cls, job: Union[Job, JobId], force=False):
raise err

@classmethod
def _cancel(cls, job: Union[str, Job]):
def _cancel(cls, job: Union[str, Job]) -> None:
job = cls._get(job) if isinstance(job, str) else job

from .._orchestrator._orchestrator_factory import _OrchestratorFactory
Expand Down
12 changes: 12 additions & 0 deletions taipy/core/reason/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Copyright 2021-2024 Avaiga Private Limited
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.

from .reason import Reasons
24 changes: 24 additions & 0 deletions taipy/core/reason/_reason_factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Copyright 2021-2024 Avaiga Private Limited
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.

from ..data.data_node import DataNodeId


def _build_data_node_is_being_edited_reason(dn_id: DataNodeId) -> str:
return f"DataNode {dn_id} is being edited"


def _build_data_node_is_not_written(dn_id: DataNodeId) -> str:
return f"DataNode {dn_id} is not written"


def _build_not_submittable_entity_reason(entity_id: str) -> str:
return f"Entity {entity_id} is not a submittable entity"
6 changes: 3 additions & 3 deletions taipy/core/common/reason.py → taipy/core/reason/reason.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,18 @@
from typing import Dict, Set


class Reason:
class Reasons:
def __init__(self, entity_id: str) -> None:
self.entity_id: str = entity_id
self._reasons: Dict[str, Set[str]] = {}

def _add_reason(self, entity_id: str, reason: str) -> "Reason":
def _add_reason(self, entity_id: str, reason: str) -> "Reasons":
if entity_id not in self._reasons:
self._reasons[entity_id] = set()
self._reasons[entity_id].add(reason)
return self

def _remove_reason(self, entity_id: str, reason: str) -> "Reason":
def _remove_reason(self, entity_id: str, reason: str) -> "Reasons":
if entity_id in self._reasons and reason in self._reasons[entity_id]:
self._reasons[entity_id].remove(reason)
if len(self._reasons[entity_id]) == 0:
Expand Down
Loading

0 comments on commit 853294d

Please sign in to comment.