From 7311ecc296944c2a868212c0b89e93e54630c781 Mon Sep 17 00:00:00 2001 From: jrobinAV Date: Thu, 8 Feb 2024 12:21:36 +0100 Subject: [PATCH 01/62] Attempt to fix a sqlite conflict on submission status. --- taipy/core/_repository/db/_sql_connection.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/taipy/core/_repository/db/_sql_connection.py b/taipy/core/_repository/db/_sql_connection.py index 94a0ae42e2..9004964dbf 100644 --- a/taipy/core/_repository/db/_sql_connection.py +++ b/taipy/core/_repository/db/_sql_connection.py @@ -87,4 +87,4 @@ def _build_connection() -> Connection: @lru_cache def __build_connection(db_location: str): - return sqlite3.connect(db_location, check_same_thread=False) + return sqlite3.connect(db_location, check_same_thread=False, timeout=20) From abb1bce443b0af8cfda0cfa06b938685c9c4f1a4 Mon Sep 17 00:00:00 2001 From: jrobinAV Date: Mon, 4 Mar 2024 10:05:59 +0100 Subject: [PATCH 02/62] add status change traces --- taipy/core/_orchestrator/_orchestrator.py | 1 + taipy/core/submission/submission.py | 8 +++++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/taipy/core/_orchestrator/_orchestrator.py b/taipy/core/_orchestrator/_orchestrator.py index 94a8dc20ba..24d82c134f 100644 --- a/taipy/core/_orchestrator/_orchestrator.py +++ b/taipy/core/_orchestrator/_orchestrator.py @@ -224,6 +224,7 @@ def _unlock_edit_on_jobs_outputs(jobs: Union[Job, List[Job], Set[Job]]): @classmethod def _on_status_change(cls, job: Job): + cls.__logger.info(f"{job.id} status has changed to {job.status}.") if job.is_completed() or job.is_skipped(): cls.__unblock_jobs() elif job.is_failed(): diff --git a/taipy/core/submission/submission.py b/taipy/core/submission/submission.py index 5d69d8a86e..76e219babe 100644 --- a/taipy/core/submission/submission.py +++ b/taipy/core/submission/submission.py @@ -14,6 +14,8 @@ from datetime import datetime from typing import Any, Dict, List, Optional, Set, Union +from taipy.logger._taipy_logger import _TaipyLogger + from .._entity._entity import _Entity from .._entity._labeled import _Labeled from .._entity._properties import _Properties @@ -43,6 +45,7 @@ class Submission(_Entity, _Labeled): _MANAGER_NAME = "submission" __SEPARATOR = "_" lock = threading.Lock() + __logger = _TaipyLogger._get_logger() def __init__( self, @@ -206,6 +209,8 @@ def _update_submission_status(self, job: Job): if job_status == Status.FAILED: submission._submission_status = SubmissionStatus.FAILED _SubmissionManagerFactory._build_manager()._set(submission) + self.__logger.info(f"{job.id} status is {job_status}. Submission status set to " + f"{submission._submission_status}") return if job_status == Status.CANCELED: submission._is_canceled = True @@ -246,7 +251,8 @@ def _update_submission_status(self, job: Job): submission.submission_status = SubmissionStatus.COMPLETED # type: ignore else: submission.submission_status = SubmissionStatus.UNDEFINED # type: ignore - + self.__logger.info(f"{job.id} status is {job_status}. Submission status set to " + f"{submission._submission_status}") def is_finished(self) -> bool: """Indicate if the submission is finished. From f10da1907e77d73fd674f4e327c4293c62d14be6 Mon Sep 17 00:00:00 2001 From: jrobinAV Date: Mon, 4 Mar 2024 10:54:07 +0100 Subject: [PATCH 03/62] formatting --- taipy/core/submission/submission.py | 1 + 1 file changed, 1 insertion(+) diff --git a/taipy/core/submission/submission.py b/taipy/core/submission/submission.py index 76e219babe..8f99ec1ae3 100644 --- a/taipy/core/submission/submission.py +++ b/taipy/core/submission/submission.py @@ -253,6 +253,7 @@ def _update_submission_status(self, job: Job): submission.submission_status = SubmissionStatus.UNDEFINED # type: ignore self.__logger.info(f"{job.id} status is {job_status}. Submission status set to " f"{submission._submission_status}") + def is_finished(self) -> bool: """Indicate if the submission is finished. From 55a6da074b78b0aea7ea7f0e8f9426fcdcea567b Mon Sep 17 00:00:00 2001 From: jrobinAV Date: Mon, 4 Mar 2024 12:15:00 +0100 Subject: [PATCH 04/62] adjust logger location --- taipy/core/_orchestrator/_orchestrator.py | 1 - taipy/core/job/job.py | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/taipy/core/_orchestrator/_orchestrator.py b/taipy/core/_orchestrator/_orchestrator.py index 24d82c134f..94a8dc20ba 100644 --- a/taipy/core/_orchestrator/_orchestrator.py +++ b/taipy/core/_orchestrator/_orchestrator.py @@ -224,7 +224,6 @@ def _unlock_edit_on_jobs_outputs(jobs: Union[Job, List[Job], Set[Job]]): @classmethod def _on_status_change(cls, job: Job): - cls.__logger.info(f"{job.id} status has changed to {job.status}.") if job.is_completed() or job.is_skipped(): cls.__unblock_jobs() elif job.is_failed(): diff --git a/taipy/core/job/job.py b/taipy/core/job/job.py index 3151905dae..b98e3aba03 100644 --- a/taipy/core/job/job.py +++ b/taipy/core/job/job.py @@ -33,6 +33,7 @@ def _run_callbacks(fn): def __run_callbacks(job): fn(job) + _TaipyLogger._get_logger().info(f"{job.id} status has changed to {job.status}.") for fct in job._subscribers: fct(job) From 6db1222af4694600b5ce33cff1f6b4fd0786301c Mon Sep 17 00:00:00 2001 From: jrobinAV Date: Mon, 4 Mar 2024 13:12:12 +0100 Subject: [PATCH 05/62] adjust logger --- taipy/core/job/job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/taipy/core/job/job.py b/taipy/core/job/job.py index b98e3aba03..5092ad9bd3 100644 --- a/taipy/core/job/job.py +++ b/taipy/core/job/job.py @@ -323,7 +323,7 @@ def update_status(self, exceptions): self.__logger.error(st) else: self.completed() - self.__logger.info(f"job {self.id} is completed.") + # self.__logger.info(f"job {self.id} is completed.") def __hash__(self): return hash(self.id) From 2da9db072f8781ebe6bcae4b42a301a36c43e060 Mon Sep 17 00:00:00 2001 From: jrobinAV Date: Mon, 4 Mar 2024 14:12:15 +0100 Subject: [PATCH 06/62] Add traces --- taipy/core/submission/submission.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/taipy/core/submission/submission.py b/taipy/core/submission/submission.py index 8f99ec1ae3..498610a7f0 100644 --- a/taipy/core/submission/submission.py +++ b/taipy/core/submission/submission.py @@ -200,6 +200,7 @@ def __ge__(self, other): def _update_submission_status(self, job: Job): from ._submission_manager_factory import _SubmissionManagerFactory with self.lock: + self.__logger.info(f" -> Acquiring lock for {job.id} status update.") submission_manager = _SubmissionManagerFactory._build_manager() submission = submission_manager._get(self) if submission._submission_status == SubmissionStatus.FAILED: @@ -211,6 +212,7 @@ def _update_submission_status(self, job: Job): _SubmissionManagerFactory._build_manager()._set(submission) self.__logger.info(f"{job.id} status is {job_status}. Submission status set to " f"{submission._submission_status}") + self.__logger.info(f" -> Releasing lock for {job.id} status update.") return if job_status == Status.CANCELED: submission._is_canceled = True @@ -253,6 +255,7 @@ def _update_submission_status(self, job: Job): submission.submission_status = SubmissionStatus.UNDEFINED # type: ignore self.__logger.info(f"{job.id} status is {job_status}. Submission status set to " f"{submission._submission_status}") + self.__logger.info(f" -> Releasing lock for {job.id} status update.") def is_finished(self) -> bool: """Indicate if the submission is finished. From b745b9d1698866d6658c77dfe9885a905b2d2550 Mon Sep 17 00:00:00 2001 From: jrobinAV Date: Mon, 4 Mar 2024 14:37:19 +0100 Subject: [PATCH 07/62] Attempt to add traces --- taipy/core/submission/submission.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/taipy/core/submission/submission.py b/taipy/core/submission/submission.py index 498610a7f0..7b8c65bd6a 100644 --- a/taipy/core/submission/submission.py +++ b/taipy/core/submission/submission.py @@ -200,7 +200,7 @@ def __ge__(self, other): def _update_submission_status(self, job: Job): from ._submission_manager_factory import _SubmissionManagerFactory with self.lock: - self.__logger.info(f" -> Acquiring lock for {job.id} status update.") + self.__logger.info(f" -> Acquiring lock {str(self.lock)} for {self.id}. {job.id=} status update.") submission_manager = _SubmissionManagerFactory._build_manager() submission = submission_manager._get(self) if submission._submission_status == SubmissionStatus.FAILED: @@ -255,7 +255,7 @@ def _update_submission_status(self, job: Job): submission.submission_status = SubmissionStatus.UNDEFINED # type: ignore self.__logger.info(f"{job.id} status is {job_status}. Submission status set to " f"{submission._submission_status}") - self.__logger.info(f" -> Releasing lock for {job.id} status update.") + self.__logger.info(f" -> Releasing lock {str(self.lock)} for {self.id}. {job.id=} status update.") def is_finished(self) -> bool: """Indicate if the submission is finished. From f5f3996c251872c3c3fb7d94ac39f46b7aa457f5 Mon Sep 17 00:00:00 2001 From: jrobinAV Date: Wed, 6 Mar 2024 19:33:05 +0100 Subject: [PATCH 08/62] test --- .../_dispatcher/_standalone_job_dispatcher.py | 22 +++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py b/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py index 61f562be41..43eaf6a8ec 100644 --- a/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py +++ b/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py @@ -8,7 +8,10 @@ # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the # specific language governing permissions and limitations under the License. - +import os +import signal +import threading +import time from concurrent.futures import Executor, ProcessPoolExecutor from functools import partial from typing import Callable, Optional @@ -22,6 +25,20 @@ from ._task_function_wrapper import _TaskFunctionWrapper +def start_thread_to_terminate_when_parent_process_dies(ppid): + pid = os.getpid() + def f(): + while True: + try: + os.kill(ppid, 0) + except OSError: + os.kill(pid, signal.SIGTERM) + time.sleep(1) + + thread = threading.Thread(target=f, daemon=True) + thread.start() + + class _StandaloneJobDispatcher(_JobDispatcher): """Manages job dispatching (instances of `Job^` class) in an asynchronous way using a ProcessPoolExecutor.""" @@ -30,7 +47,8 @@ def __init__(self, orchestrator: _AbstractOrchestrator, subproc_initializer: Opt max_workers = Config.job_config.max_nb_of_workers or 1 self._executor: Executor = ProcessPoolExecutor( max_workers=max_workers, - initializer=subproc_initializer, + initializer=start_thread_to_terminate_when_parent_process_dies, + initargs=(os.getpid(),) ) # type: ignore self._nb_available_workers = self._executor._max_workers # type: ignore From 2fe9f842255f8b9ed0af43618a7e8995d31d86a7 Mon Sep 17 00:00:00 2001 From: jrobinAV Date: Thu, 7 Mar 2024 10:53:14 +0100 Subject: [PATCH 09/62] Revert "test" This reverts commit f5f3996c251872c3c3fb7d94ac39f46b7aa457f5. --- .../_dispatcher/_standalone_job_dispatcher.py | 22 ++----------------- 1 file changed, 2 insertions(+), 20 deletions(-) diff --git a/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py b/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py index 43eaf6a8ec..61f562be41 100644 --- a/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py +++ b/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py @@ -8,10 +8,7 @@ # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the # specific language governing permissions and limitations under the License. -import os -import signal -import threading -import time + from concurrent.futures import Executor, ProcessPoolExecutor from functools import partial from typing import Callable, Optional @@ -25,20 +22,6 @@ from ._task_function_wrapper import _TaskFunctionWrapper -def start_thread_to_terminate_when_parent_process_dies(ppid): - pid = os.getpid() - def f(): - while True: - try: - os.kill(ppid, 0) - except OSError: - os.kill(pid, signal.SIGTERM) - time.sleep(1) - - thread = threading.Thread(target=f, daemon=True) - thread.start() - - class _StandaloneJobDispatcher(_JobDispatcher): """Manages job dispatching (instances of `Job^` class) in an asynchronous way using a ProcessPoolExecutor.""" @@ -47,8 +30,7 @@ def __init__(self, orchestrator: _AbstractOrchestrator, subproc_initializer: Opt max_workers = Config.job_config.max_nb_of_workers or 1 self._executor: Executor = ProcessPoolExecutor( max_workers=max_workers, - initializer=start_thread_to_terminate_when_parent_process_dies, - initargs=(os.getpid(),) + initializer=subproc_initializer, ) # type: ignore self._nb_available_workers = self._executor._max_workers # type: ignore From cba70b737cb96484588112238c97c36c562b6995 Mon Sep 17 00:00:00 2001 From: jrobinAV Date: Fri, 8 Mar 2024 16:50:24 +0100 Subject: [PATCH 10/62] enlarge lock scope --- taipy/core/_orchestrator/_orchestrator.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/taipy/core/_orchestrator/_orchestrator.py b/taipy/core/_orchestrator/_orchestrator.py index 97e0bea369..8aa2890158 100644 --- a/taipy/core/_orchestrator/_orchestrator.py +++ b/taipy/core/_orchestrator/_orchestrator.py @@ -91,8 +91,8 @@ def submit( force=force, # type: ignore ) ) - submission.jobs = jobs # type: ignore - cls._orchestrate_job_to_run_or_block(jobs) + submission.jobs = jobs # type: ignore + cls._orchestrate_job_to_run_or_block(jobs) if Config.job_config.is_development: cls._check_and_execute_jobs_if_development_mode() else: @@ -137,9 +137,9 @@ def submit_task( itertools.chain([cls._update_submission_status], callbacks or []), force, ) - jobs = [job] - submission.jobs = jobs # type: ignore - cls._orchestrate_job_to_run_or_block(jobs) + jobs = [job] + submission.jobs = jobs # type: ignore + cls._orchestrate_job_to_run_or_block(jobs) if Config.job_config.is_development: cls._check_and_execute_jobs_if_development_mode() else: @@ -232,9 +232,9 @@ def _on_status_change(cls, job: Job): @classmethod def __unblock_jobs(cls): - for job in cls.blocked_jobs: - if not cls._is_blocked(job): - with cls.lock: + with cls.lock: + for job in cls.blocked_jobs: + if not cls._is_blocked(job): job.pending() cls.__remove_blocked_job(job) cls.jobs_to_run.put(job) From f68f2a91890626dc723af622abbc8d08113474f6 Mon Sep 17 00:00:00 2001 From: jrobinAV Date: Fri, 8 Mar 2024 17:06:51 +0100 Subject: [PATCH 11/62] Remove traces --- taipy/core/job/job.py | 4 ++-- taipy/core/submission/submission.py | 11 ++++------- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/taipy/core/job/job.py b/taipy/core/job/job.py index 5092ad9bd3..95740d9fef 100644 --- a/taipy/core/job/job.py +++ b/taipy/core/job/job.py @@ -33,7 +33,7 @@ def _run_callbacks(fn): def __run_callbacks(job): fn(job) - _TaipyLogger._get_logger().info(f"{job.id} status has changed to {job.status}.") + _TaipyLogger._get_logger().debug(f"{job.id} status has changed to {job.status}.") for fct in job._subscribers: fct(job) @@ -201,6 +201,7 @@ def failed(self): def completed(self): """Set the status to _completed_ and notify subscribers.""" self.status = Status.COMPLETED + self.__logger.info(f"job {self.id} is completed.") @_run_callbacks def skipped(self): @@ -323,7 +324,6 @@ def update_status(self, exceptions): self.__logger.error(st) else: self.completed() - # self.__logger.info(f"job {self.id} is completed.") def __hash__(self): return hash(self.id) diff --git a/taipy/core/submission/submission.py b/taipy/core/submission/submission.py index 7b8c65bd6a..32fe2976a2 100644 --- a/taipy/core/submission/submission.py +++ b/taipy/core/submission/submission.py @@ -200,7 +200,6 @@ def __ge__(self, other): def _update_submission_status(self, job: Job): from ._submission_manager_factory import _SubmissionManagerFactory with self.lock: - self.__logger.info(f" -> Acquiring lock {str(self.lock)} for {self.id}. {job.id=} status update.") submission_manager = _SubmissionManagerFactory._build_manager() submission = submission_manager._get(self) if submission._submission_status == SubmissionStatus.FAILED: @@ -210,9 +209,8 @@ def _update_submission_status(self, job: Job): if job_status == Status.FAILED: submission._submission_status = SubmissionStatus.FAILED _SubmissionManagerFactory._build_manager()._set(submission) - self.__logger.info(f"{job.id} status is {job_status}. Submission status set to " - f"{submission._submission_status}") - self.__logger.info(f" -> Releasing lock for {job.id} status update.") + self.__logger.debug(f"{job.id} status is {job_status}. Submission status set to " + f"{submission._submission_status}") return if job_status == Status.CANCELED: submission._is_canceled = True @@ -253,9 +251,8 @@ def _update_submission_status(self, job: Job): submission.submission_status = SubmissionStatus.COMPLETED # type: ignore else: submission.submission_status = SubmissionStatus.UNDEFINED # type: ignore - self.__logger.info(f"{job.id} status is {job_status}. Submission status set to " - f"{submission._submission_status}") - self.__logger.info(f" -> Releasing lock {str(self.lock)} for {self.id}. {job.id=} status update.") + self.__logger.debug(f"{job.id} status is {job_status}. Submission status set to " + f"{submission._submission_status}") def is_finished(self) -> bool: """Indicate if the submission is finished. From b8d544520c602827acea64fd1427e5dfd9b8c9e7 Mon Sep 17 00:00:00 2001 From: jrobinAV Date: Mon, 11 Mar 2024 14:26:43 +0100 Subject: [PATCH 12/62] retry pattern on sql save method --- taipy/core/_repository/_sql_repository.py | 26 ++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/taipy/core/_repository/_sql_repository.py b/taipy/core/_repository/_sql_repository.py index c605ef5de3..6eed02697f 100644 --- a/taipy/core/_repository/_sql_repository.py +++ b/taipy/core/_repository/_sql_repository.py @@ -11,6 +11,8 @@ import json import pathlib +from sqlite3 import DatabaseError +from time import sleep from typing import Any, Dict, Iterable, List, Optional, Type, Union from sqlalchemy.dialects import sqlite @@ -44,12 +46,26 @@ def __init__(self, model_type: Type[ModelType], converter: Type[Converter]): ############################### # ## Inherited methods ## # ############################### - def _save(self, entity: Entity): + def _save(self, entity: Entity, retry: int = 3): obj = self.converter._entity_to_model(entity) if self._exists(entity.id): # type: ignore - self._update_entry(obj) - return - self.__insert_model(obj) + try: + self._update_entry(obj) + return + except DatabaseError as e: + if retry > 0: + sleep(0.1) + self._save(entity, retry - 1) + else: + raise e + try: + self.__insert_model(obj) + except DatabaseError as e: + if retry > 0: + sleep(0.1) + self._save(entity, retry - 1) + else: + raise e def _exists(self, entity_id: str): query = self.table.select().filter_by(id=entity_id) @@ -190,7 +206,7 @@ def __get_entities_by_config_and_owner( if versions: table_name = self.table.name - query = query + f" AND {table_name}.version IN ({','.join(['?']*len(versions))})" + query = query + f" AND {table_name}.version IN ({','.join(['?'] * len(versions))})" parameters.extend(versions) if entry := self.db.execute(query, parameters).fetchone(): From 16ea1e78d1f46d6163736f4339af409299c8b8f9 Mon Sep 17 00:00:00 2001 From: jrobinAV Date: Mon, 11 Mar 2024 14:34:20 +0100 Subject: [PATCH 13/62] Adding traces --- taipy/core/_repository/_sql_repository.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/taipy/core/_repository/_sql_repository.py b/taipy/core/_repository/_sql_repository.py index 6eed02697f..8f9e4814a1 100644 --- a/taipy/core/_repository/_sql_repository.py +++ b/taipy/core/_repository/_sql_repository.py @@ -22,9 +22,13 @@ from ..common.typing import Converter, Entity, ModelType from ..exceptions import ModelNotFound from .db._sql_connection import _SQLConnection +from ...logger._taipy_logger import _TaipyLogger class _SQLRepository(_AbstractRepository[ModelType, Entity]): + + _logger = _TaipyLogger._get_logger() + def __init__(self, model_type: Type[ModelType], converter: Type[Converter]): """ Holds common methods to be used and extended when the need for saving @@ -53,6 +57,8 @@ def _save(self, entity: Entity, retry: int = 3): self._update_entry(obj) return except DatabaseError as e: + self._logger.error(f"Error while updating {entity.id} in {self.table.name}. Retry nb: {4-retry}.") + self._logger.error(f"Error : {e}") if retry > 0: sleep(0.1) self._save(entity, retry - 1) @@ -61,6 +67,8 @@ def _save(self, entity: Entity, retry: int = 3): try: self.__insert_model(obj) except DatabaseError as e: + self._logger.error(f"Error while inserting {entity.id} into {self.table.name}. Retry nb: {4 - retry}.") + self._logger.error(f"Error : {e}") if retry > 0: sleep(0.1) self._save(entity, retry - 1) From 7c9c486d2cc4e785a97f53e8bf918169bd99592a Mon Sep 17 00:00:00 2001 From: jrobinAV Date: Mon, 11 Mar 2024 14:41:07 +0100 Subject: [PATCH 14/62] Adding traces --- taipy/core/submission/submission.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/taipy/core/submission/submission.py b/taipy/core/submission/submission.py index 32fe2976a2..14a660fe08 100644 --- a/taipy/core/submission/submission.py +++ b/taipy/core/submission/submission.py @@ -209,7 +209,7 @@ def _update_submission_status(self, job: Job): if job_status == Status.FAILED: submission._submission_status = SubmissionStatus.FAILED _SubmissionManagerFactory._build_manager()._set(submission) - self.__logger.debug(f"{job.id} status is {job_status}. Submission status set to " + self.__logger.info(f"{job.id} status is {job_status}. Submission status set to " f"{submission._submission_status}") return if job_status == Status.CANCELED: @@ -251,7 +251,7 @@ def _update_submission_status(self, job: Job): submission.submission_status = SubmissionStatus.COMPLETED # type: ignore else: submission.submission_status = SubmissionStatus.UNDEFINED # type: ignore - self.__logger.debug(f"{job.id} status is {job_status}. Submission status set to " + self.__logger.info(f"{job.id} status is {job_status}. Submission status set to " f"{submission._submission_status}") def is_finished(self) -> bool: From 5bca172264e524f690187cc3a42fad8c6a4e1084 Mon Sep 17 00:00:00 2001 From: jrobinAV Date: Mon, 11 Mar 2024 15:00:09 +0100 Subject: [PATCH 15/62] linter --- taipy/core/_repository/_sql_repository.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/taipy/core/_repository/_sql_repository.py b/taipy/core/_repository/_sql_repository.py index 8f9e4814a1..96023c9446 100644 --- a/taipy/core/_repository/_sql_repository.py +++ b/taipy/core/_repository/_sql_repository.py @@ -57,7 +57,8 @@ def _save(self, entity: Entity, retry: int = 3): self._update_entry(obj) return except DatabaseError as e: - self._logger.error(f"Error while updating {entity.id} in {self.table.name}. Retry nb: {4-retry}.") + self._logger.error(f"Error while updating {entity.id} in {self.table.name}. " # type: ignore + f"Retry nb: {4-retry}.") self._logger.error(f"Error : {e}") if retry > 0: sleep(0.1) @@ -67,7 +68,8 @@ def _save(self, entity: Entity, retry: int = 3): try: self.__insert_model(obj) except DatabaseError as e: - self._logger.error(f"Error while inserting {entity.id} into {self.table.name}. Retry nb: {4 - retry}.") + self._logger.error(f"Error while inserting {entity.id} into {self.table.name}. " # type: ignore + f"Retry nb: {4 - retry}.") self._logger.error(f"Error : {e}") if retry > 0: sleep(0.1) From 786e4e014e98ace735d20dc03cbd1df43098e4b5 Mon Sep 17 00:00:00 2001 From: jrobinAV Date: Mon, 11 Mar 2024 15:02:39 +0100 Subject: [PATCH 16/62] linter --- taipy/core/_repository/_sql_repository.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/taipy/core/_repository/_sql_repository.py b/taipy/core/_repository/_sql_repository.py index 96023c9446..e3ce0cf9b3 100644 --- a/taipy/core/_repository/_sql_repository.py +++ b/taipy/core/_repository/_sql_repository.py @@ -18,11 +18,11 @@ from sqlalchemy.dialects import sqlite from sqlalchemy.exc import NoResultFound +from ...logger._taipy_logger import _TaipyLogger from .._repository._abstract_repository import _AbstractRepository from ..common.typing import Converter, Entity, ModelType from ..exceptions import ModelNotFound from .db._sql_connection import _SQLConnection -from ...logger._taipy_logger import _TaipyLogger class _SQLRepository(_AbstractRepository[ModelType, Entity]): From 80161cf5a899728546beb70e192f7d314fd7053d Mon Sep 17 00:00:00 2001 From: jrobinAV Date: Mon, 11 Mar 2024 16:45:32 +0100 Subject: [PATCH 17/62] update traces --- taipy/core/_orchestrator/_orchestrator.py | 8 +++++++- taipy/core/job/job.py | 2 +- taipy/core/submission/submission.py | 4 ++-- .../core/_orchestrator/test_orchestrator__cancel_jobs.py | 2 +- 4 files changed, 11 insertions(+), 5 deletions(-) diff --git a/taipy/core/_orchestrator/_orchestrator.py b/taipy/core/_orchestrator/_orchestrator.py index 8aa2890158..0a07830f0f 100644 --- a/taipy/core/_orchestrator/_orchestrator.py +++ b/taipy/core/_orchestrator/_orchestrator.py @@ -161,7 +161,6 @@ def _lock_dn_output_and_create_job( job = _JobManagerFactory._build_manager()._create( task, itertools.chain([cls._on_status_change], callbacks or []), submit_id, submit_entity_id, force=force ) - return job @classmethod @@ -226,18 +225,25 @@ def _unlock_edit_on_jobs_outputs(jobs: Union[Job, List[Job], Set[Job]]): @classmethod def _on_status_change(cls, job: Job): if job.is_completed() or job.is_skipped(): + cls.__logger.info(f"{job.id} has been completed or skipped. Unblocking jobs.") cls.__unblock_jobs() elif job.is_failed(): cls._fail_subsequent_jobs(job) @classmethod def __unblock_jobs(cls): + cls.__logger.info("Entering __unblock_jobs.") with cls.lock: for job in cls.blocked_jobs: + cls.__logger.info(f"Unblocking {job.id} ?") if not cls._is_blocked(job): + cls.__logger.info(f"Unblocking {job.id} !") job.pending() + cls.__logger.info(f"Removing {job.id} from the blocked list.") cls.__remove_blocked_job(job) + cls.__logger.info(f"Adding {job.id} to the list of jobs to run.") cls.jobs_to_run.put(job) + cls.__logger.info("Exiting __unblock_jobs.") @classmethod def __remove_blocked_job(cls, job): diff --git a/taipy/core/job/job.py b/taipy/core/job/job.py index 95740d9fef..02349acb84 100644 --- a/taipy/core/job/job.py +++ b/taipy/core/job/job.py @@ -33,7 +33,7 @@ def _run_callbacks(fn): def __run_callbacks(job): fn(job) - _TaipyLogger._get_logger().debug(f"{job.id} status has changed to {job.status}.") + _TaipyLogger._get_logger().error(f"{job.id} status has changed to {job.status}.") for fct in job._subscribers: fct(job) diff --git a/taipy/core/submission/submission.py b/taipy/core/submission/submission.py index 14a660fe08..32fe2976a2 100644 --- a/taipy/core/submission/submission.py +++ b/taipy/core/submission/submission.py @@ -209,7 +209,7 @@ def _update_submission_status(self, job: Job): if job_status == Status.FAILED: submission._submission_status = SubmissionStatus.FAILED _SubmissionManagerFactory._build_manager()._set(submission) - self.__logger.info(f"{job.id} status is {job_status}. Submission status set to " + self.__logger.debug(f"{job.id} status is {job_status}. Submission status set to " f"{submission._submission_status}") return if job_status == Status.CANCELED: @@ -251,7 +251,7 @@ def _update_submission_status(self, job: Job): submission.submission_status = SubmissionStatus.COMPLETED # type: ignore else: submission.submission_status = SubmissionStatus.UNDEFINED # type: ignore - self.__logger.info(f"{job.id} status is {job_status}. Submission status set to " + self.__logger.debug(f"{job.id} status is {job_status}. Submission status set to " f"{submission._submission_status}") def is_finished(self) -> bool: diff --git a/tests/core/_orchestrator/test_orchestrator__cancel_jobs.py b/tests/core/_orchestrator/test_orchestrator__cancel_jobs.py index 620f3a6471..4203206f47 100644 --- a/tests/core/_orchestrator/test_orchestrator__cancel_jobs.py +++ b/tests/core/_orchestrator/test_orchestrator__cancel_jobs.py @@ -59,7 +59,7 @@ def test_cancel_job_no_subsequent_jobs(): def test_cancel_job_with_subsequent_blocked_jobs(): scenario = create_scenario() - orchestrator = _OrchestratorFactory._build_orchestrator() + orchestrator = cast(_Orchestrator, _OrchestratorFactory._build_orchestrator()) job1 = orchestrator._lock_dn_output_and_create_job(scenario.t1, "s_id", "e_id") job2 = orchestrator._lock_dn_output_and_create_job(scenario.t2, "s_id", "e_id") job3 = orchestrator._lock_dn_output_and_create_job(scenario.t3, "s_id", "e_id") From 023ec9a94628b58f823345aac46367394d56987e Mon Sep 17 00:00:00 2001 From: jrobinAV Date: Mon, 11 Mar 2024 17:08:46 +0100 Subject: [PATCH 18/62] update traces --- taipy/core/_orchestrator/_orchestrator.py | 21 ++++++++++----------- taipy/core/job/job.py | 2 +- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/taipy/core/_orchestrator/_orchestrator.py b/taipy/core/_orchestrator/_orchestrator.py index 0a07830f0f..984255f010 100644 --- a/taipy/core/_orchestrator/_orchestrator.py +++ b/taipy/core/_orchestrator/_orchestrator.py @@ -58,7 +58,7 @@ def submit( """Submit the given `Scenario^` or `Sequence^` for an execution. Parameters: - submittable (Union[SCenario^, Sequence^]): The scenario or sequence to submit for execution. + submittable (Union[Scenario^, Sequence^]): The scenario or sequence to submit for execution. callbacks: The optional list of functions that should be executed on jobs status change. force (bool) : Enforce execution of the scenario's or sequence's tasks even if their output data nodes are cached. @@ -66,7 +66,7 @@ def submit( finished in asynchronous mode. timeout (Union[float, int]): The optional maximum number of seconds to wait for the jobs to be finished before returning. - **properties (dict[str, any]): A keyworded variable length list of user additional arguments + **properties (dict[str, any]): A key worded variable length list of user additional arguments that will be stored within the `Submission^`. It can be accessed via `Submission.properties^`. Returns: The created `Submission^` containing the information about the submission. @@ -120,7 +120,7 @@ def submit_task( in asynchronous mode. timeout (Union[float, int]): The optional maximum number of seconds to wait for the job to be finished before returning. - **properties (dict[str, any]): A keyworded variable length list of user additional arguments + **properties (dict[str, any]): A key worded variable length list of user additional arguments that will be stored within the `Submission^`. It can be accessed via `Submission.properties^`. Returns: The created `Submission^` containing the information about the submission. @@ -225,25 +225,25 @@ def _unlock_edit_on_jobs_outputs(jobs: Union[Job, List[Job], Set[Job]]): @classmethod def _on_status_change(cls, job: Job): if job.is_completed() or job.is_skipped(): - cls.__logger.info(f"{job.id} has been completed or skipped. Unblocking jobs.") + cls.__logger.error(f"{job.id} has been completed or skipped. Unblocking jobs.") cls.__unblock_jobs() elif job.is_failed(): cls._fail_subsequent_jobs(job) @classmethod def __unblock_jobs(cls): - cls.__logger.info("Entering __unblock_jobs.") + cls.__logger.error(" Entering __unblock_jobs.") with cls.lock: for job in cls.blocked_jobs: - cls.__logger.info(f"Unblocking {job.id} ?") + cls.__logger.error(f" Unblocking {job.id} ?") if not cls._is_blocked(job): - cls.__logger.info(f"Unblocking {job.id} !") + cls.__logger.error(f" Unblocking {job.id} !") job.pending() - cls.__logger.info(f"Removing {job.id} from the blocked list.") + cls.__logger.error(f" Removing {job.id} from the blocked list.") cls.__remove_blocked_job(job) - cls.__logger.info(f"Adding {job.id} to the list of jobs to run.") + cls.__logger.error(f" Adding {job.id} to the list of jobs to run.") cls.jobs_to_run.put(job) - cls.__logger.info("Exiting __unblock_jobs.") + cls.__logger.error(" Exiting __unblock_jobs.") @classmethod def __remove_blocked_job(cls, job): @@ -314,7 +314,6 @@ def _fail_subsequent_jobs(cls, failed_job: Job): @classmethod def _cancel_jobs(cls, job_id_to_cancel: JobId, jobs: Set[Job]): - for job in jobs: if job.is_running(): cls.__logger.info(f"{job.id} is running and cannot be canceled.") diff --git a/taipy/core/job/job.py b/taipy/core/job/job.py index 02349acb84..f50e98a2c4 100644 --- a/taipy/core/job/job.py +++ b/taipy/core/job/job.py @@ -289,7 +289,7 @@ def is_finished(self) -> bool: return self.is_completed() or self.is_failed() or self.is_canceled() or self.is_skipped() or self.is_abandoned() def _is_finished(self) -> bool: - """Indicate if the job is finished. This function will not triggered the persistency feature like is_finished(). + """Indicate if the job is finished. This function will not trigger the persistence feature like is_finished(). Returns: True if the job is finished. From 3c64a9a8cec360acee3c08fab1cae4f44a67c0d6 Mon Sep 17 00:00:00 2001 From: jrobinAV Date: Mon, 11 Mar 2024 17:59:37 +0100 Subject: [PATCH 19/62] Add traces around lock acquisition --- taipy/core/_orchestrator/_orchestrator.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/taipy/core/_orchestrator/_orchestrator.py b/taipy/core/_orchestrator/_orchestrator.py index 984255f010..41ab0bdb07 100644 --- a/taipy/core/_orchestrator/_orchestrator.py +++ b/taipy/core/_orchestrator/_orchestrator.py @@ -80,6 +80,7 @@ def submit( jobs = [] tasks = submittable._get_sorted_tasks() with cls.lock: + cls.__logger.error(f"-------------------------> Acquired lock to submit {submittable.id}.") # type: ignore for ts in tasks: for task in ts: jobs.append( @@ -93,6 +94,8 @@ def submit( ) submission.jobs = jobs # type: ignore cls._orchestrate_job_to_run_or_block(jobs) + cls.__logger.error(f"-------------------------> Released lock after submitting " + f"{submittable.id}.") # type: ignore if Config.job_config.is_development: cls._check_and_execute_jobs_if_development_mode() else: @@ -130,6 +133,7 @@ def submit_task( ) submit_id = submission.id with cls.lock: + cls.__logger.error("-------------------------> Acquired lock to submit task.") job = cls._lock_dn_output_and_create_job( task, submit_id, @@ -140,6 +144,7 @@ def submit_task( jobs = [job] submission.jobs = jobs # type: ignore cls._orchestrate_job_to_run_or_block(jobs) + cls.__logger.error(f"-------------------------> Released lock after submitting task {task.id}.") if Config.job_config.is_development: cls._check_and_execute_jobs_if_development_mode() else: @@ -234,6 +239,7 @@ def _on_status_change(cls, job: Job): def __unblock_jobs(cls): cls.__logger.error(" Entering __unblock_jobs.") with cls.lock: + cls.__logger.error(f"-------------------------> Acquired lock to unblock jobs.") for job in cls.blocked_jobs: cls.__logger.error(f" Unblocking {job.id} ?") if not cls._is_blocked(job): @@ -243,6 +249,7 @@ def __unblock_jobs(cls): cls.__remove_blocked_job(job) cls.__logger.error(f" Adding {job.id} to the list of jobs to run.") cls.jobs_to_run.put(job) + cls.__logger.error(f"-------------------------> Released lock after unblocking jobs.") cls.__logger.error(" Exiting __unblock_jobs.") @classmethod @@ -262,12 +269,14 @@ def cancel_job(cls, job: Job): cls.__logger.info(f"{job.id} has already failed and cannot be canceled.") else: with cls.lock: + cls.__logger.error(f"-------------------------> Acquired lock to cancel job {job.id}.") to_cancel_or_abandon_jobs = {job} to_cancel_or_abandon_jobs.update(cls.__find_subsequent_jobs(job.submit_id, set(job.task.output.keys()))) cls.__remove_blocked_jobs(to_cancel_or_abandon_jobs) cls.__remove_jobs_to_run(to_cancel_or_abandon_jobs) cls._cancel_jobs(job.id, to_cancel_or_abandon_jobs) cls._unlock_edit_on_jobs_outputs(to_cancel_or_abandon_jobs) + cls.__logger.error(f"-------------------------> Released lock after canceling {job.id}.") @classmethod def __find_subsequent_jobs(cls, submit_id, output_dn_config_ids: Set) -> Set[Job]: @@ -301,6 +310,7 @@ def __remove_jobs_to_run(cls, jobs): @classmethod def _fail_subsequent_jobs(cls, failed_job: Job): with cls.lock: + cls.__logger.error("-------------------------> Acquired lock to fail subsequent jobs.") to_fail_or_abandon_jobs = set() to_fail_or_abandon_jobs.update( cls.__find_subsequent_jobs(failed_job.submit_id, set(failed_job.task.output.keys())) From bcaf8e14f398c436b5077cc51087db96f6739ea4 Mon Sep 17 00:00:00 2001 From: jrobinAV Date: Mon, 11 Mar 2024 18:04:14 +0100 Subject: [PATCH 20/62] linter --- taipy/core/_orchestrator/_orchestrator.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/taipy/core/_orchestrator/_orchestrator.py b/taipy/core/_orchestrator/_orchestrator.py index 41ab0bdb07..1e1532c85f 100644 --- a/taipy/core/_orchestrator/_orchestrator.py +++ b/taipy/core/_orchestrator/_orchestrator.py @@ -94,8 +94,7 @@ def submit( ) submission.jobs = jobs # type: ignore cls._orchestrate_job_to_run_or_block(jobs) - cls.__logger.error(f"-------------------------> Released lock after submitting " - f"{submittable.id}.") # type: ignore + cls.__logger.error(f"-------------------------> Released lock after submitting {submission.id}.") if Config.job_config.is_development: cls._check_and_execute_jobs_if_development_mode() else: From d56208877a57c260ead89f9b06e67ada47897ebf Mon Sep 17 00:00:00 2001 From: jrobinAV Date: Mon, 11 Mar 2024 18:08:27 +0100 Subject: [PATCH 21/62] linter --- taipy/core/_orchestrator/_orchestrator.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/taipy/core/_orchestrator/_orchestrator.py b/taipy/core/_orchestrator/_orchestrator.py index 1e1532c85f..5784d22a9f 100644 --- a/taipy/core/_orchestrator/_orchestrator.py +++ b/taipy/core/_orchestrator/_orchestrator.py @@ -238,7 +238,7 @@ def _on_status_change(cls, job: Job): def __unblock_jobs(cls): cls.__logger.error(" Entering __unblock_jobs.") with cls.lock: - cls.__logger.error(f"-------------------------> Acquired lock to unblock jobs.") + cls.__logger.error("-------------------------> Acquired lock to unblock jobs.") for job in cls.blocked_jobs: cls.__logger.error(f" Unblocking {job.id} ?") if not cls._is_blocked(job): @@ -248,7 +248,7 @@ def __unblock_jobs(cls): cls.__remove_blocked_job(job) cls.__logger.error(f" Adding {job.id} to the list of jobs to run.") cls.jobs_to_run.put(job) - cls.__logger.error(f"-------------------------> Released lock after unblocking jobs.") + cls.__logger.error("-------------------------> Released lock after unblocking jobs.") cls.__logger.error(" Exiting __unblock_jobs.") @classmethod From f7486a0c32320377ec3b9bfa803d9ddbed320b24 Mon Sep 17 00:00:00 2001 From: jrobinAV Date: Mon, 11 Mar 2024 18:34:51 +0100 Subject: [PATCH 22/62] linter --- .../_orchestrator/_dispatcher/_job_dispatcher.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py b/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py index 1d139710d5..88c676fd65 100644 --- a/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py +++ b/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py @@ -64,16 +64,23 @@ def run(self): while not self._STOP_FLAG: try: if self._can_execute(): - with self.lock: - if self._STOP_FLAG: - break - job = self.orchestrator.jobs_to_run.get(block=True, timeout=0.1) + self.lock.acquire() + self._logger.error("-------------------------> Acquired lock to execute job.") + if self._STOP_FLAG: + self.lock.release() + break + job = self.orchestrator.jobs_to_run.get(block=True, timeout=0.1) + self._logger.error(f"-------------------------> Got job to execute {job.id}.") self._execute_job(job) else: time.sleep(0.1) # We need to sleep to avoid busy waiting. except Empty: # In case the last job of the queue has been removed. + self._logger.error("-------------------------> Released lock to execute job.") + self.lock.release() pass except Exception as e: + self._logger.error("-------------------------> Released lock to execute job 2.") + self.lock.release() self._logger.exception(e) pass if self.stop_wait: From e6cbbb86cef3e8e76f2ab4840da2903083cc2e4d Mon Sep 17 00:00:00 2001 From: jrobinAV Date: Mon, 11 Mar 2024 19:34:23 +0100 Subject: [PATCH 23/62] attempt to isolate lock on job removal --- .../_dispatcher/_job_dispatcher.py | 36 +++++++++---------- 1 file changed, 16 insertions(+), 20 deletions(-) diff --git a/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py b/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py index 88c676fd65..6457e64bad 100644 --- a/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py +++ b/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py @@ -62,27 +62,23 @@ def stop(self, wait: bool = True, timeout: Optional[float] = None): def run(self): self._logger.debug("Job dispatcher started.") while not self._STOP_FLAG: - try: - if self._can_execute(): - self.lock.acquire() + if self._can_execute(): + with self.lock: self._logger.error("-------------------------> Acquired lock to execute job.") - if self._STOP_FLAG: - self.lock.release() - break - job = self.orchestrator.jobs_to_run.get(block=True, timeout=0.1) - self._logger.error(f"-------------------------> Got job to execute {job.id}.") - self._execute_job(job) - else: - time.sleep(0.1) # We need to sleep to avoid busy waiting. - except Empty: # In case the last job of the queue has been removed. - self._logger.error("-------------------------> Released lock to execute job.") - self.lock.release() - pass - except Exception as e: - self._logger.error("-------------------------> Released lock to execute job 2.") - self.lock.release() - self._logger.exception(e) - pass + try: + job = self.orchestrator.jobs_to_run.get(block=True, timeout=0.1) + self._logger.error(f"-------------------------> Got job to execute {job.id}.") + except Empty: # In case the last job of the queue has been removed. + self._logger.error("-------------------------> Released lock to execute job.") + pass + except Exception as e: + self._logger.error("-------------------------> Released lock to execute job 2.") + self._logger.exception(e) + pass + self._execute_job(job) + else: + time.sleep(0.1) # We need to sleep to avoid busy waiting. + if self.stop_wait: self._logger.debug("Waiting for the dispatcher thread to stop...") self.join(timeout=self.stop_timeout) From 4f2d7fb6df91a3245936222f49d40a2c8810cacb Mon Sep 17 00:00:00 2001 From: jrobinAV Date: Mon, 11 Mar 2024 20:06:50 +0100 Subject: [PATCH 24/62] attempt 2 to isolate lock on job removal --- taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py b/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py index 6457e64bad..2b2018ed76 100644 --- a/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py +++ b/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py @@ -65,9 +65,9 @@ def run(self): if self._can_execute(): with self.lock: self._logger.error("-------------------------> Acquired lock to execute job.") + job = None try: job = self.orchestrator.jobs_to_run.get(block=True, timeout=0.1) - self._logger.error(f"-------------------------> Got job to execute {job.id}.") except Empty: # In case the last job of the queue has been removed. self._logger.error("-------------------------> Released lock to execute job.") pass @@ -75,7 +75,9 @@ def run(self): self._logger.error("-------------------------> Released lock to execute job 2.") self._logger.exception(e) pass - self._execute_job(job) + if job: + self._logger.error(f"-------------------------> Got job to execute {job.id}.") + self._execute_job(job) else: time.sleep(0.1) # We need to sleep to avoid busy waiting. From bbb2f828cf336f14f53c64a6968cd8a4888b6809 Mon Sep 17 00:00:00 2001 From: jrobinAV Date: Mon, 11 Mar 2024 20:21:17 +0100 Subject: [PATCH 25/62] attempt 3 to isolate lock on job removal --- taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py b/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py index 2b2018ed76..b9434e010d 100644 --- a/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py +++ b/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py @@ -69,18 +69,16 @@ def run(self): try: job = self.orchestrator.jobs_to_run.get(block=True, timeout=0.1) except Empty: # In case the last job of the queue has been removed. - self._logger.error("-------------------------> Released lock to execute job.") pass except Exception as e: - self._logger.error("-------------------------> Released lock to execute job 2.") self._logger.exception(e) pass + self._logger.error("-------------------------> Released lock to execute job.") if job: self._logger.error(f"-------------------------> Got job to execute {job.id}.") self._execute_job(job) else: time.sleep(0.1) # We need to sleep to avoid busy waiting. - if self.stop_wait: self._logger.debug("Waiting for the dispatcher thread to stop...") self.join(timeout=self.stop_timeout) From 9b891c5b7040ddd8661e831d054cebe53474deb9 Mon Sep 17 00:00:00 2001 From: jrobinAV Date: Tue, 12 Mar 2024 14:53:27 +0100 Subject: [PATCH 26/62] Add lock around nb_available_workers --- .../_dispatcher/_standalone_job_dispatcher.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py b/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py index 37235f30c0..ff122f4280 100644 --- a/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py +++ b/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py @@ -11,6 +11,7 @@ from concurrent.futures import Executor, ProcessPoolExecutor from functools import partial +from threading import Lock from typing import Callable, Optional from taipy.config._serializer._toml_serializer import _TomlSerializer @@ -32,10 +33,12 @@ def __init__(self, orchestrator: _AbstractOrchestrator, subproc_initializer: Opt max_workers=max_workers, initializer=subproc_initializer, ) # type: ignore + self.nb_available_lock = Lock() self._nb_available_workers = self._executor._max_workers # type: ignore def _can_execute(self) -> bool: """Returns True if the dispatcher have resources to dispatch a job.""" + self._logger.error(f"can execute a job ? {self._nb_available_workers}") return self._nb_available_workers > 0 def run(self): @@ -49,17 +52,15 @@ def _dispatch(self, job: Job): Parameters: job (Job^): The job to submit on an executor with an available worker. """ - - self._nb_available_workers -= 1 + with self.nb_available_lock: + self._nb_available_workers -= 1 + self._logger.error(f"Changing nb_available_workers to {self._nb_available_workers} from dispatch") config_as_string = _TomlSerializer()._serialize(Config._applied_config) # type: ignore[attr-defined] future = self._executor.submit(_TaskFunctionWrapper(job.id, job.task), config_as_string=config_as_string) - - future.add_done_callback(self._release_worker) # We must release the worker before updating the job status - # so that the worker is available for another job as soon as possible. future.add_done_callback(partial(self._update_job_status_from_future, job)) - def _release_worker(self, _): - self._nb_available_workers += 1 - def _update_job_status_from_future(self, job: Job, ft): + with self.nb_available_lock: + self._nb_available_workers += 1 + self._logger.error(f"Changing nb_available_workers to {self._nb_available_workers} from callback") self._update_job_status(job, ft.result()) From fe43ba88078204ebceaa9512b31af14c95f1b13f Mon Sep 17 00:00:00 2001 From: jrobinAV Date: Tue, 12 Mar 2024 15:09:43 +0100 Subject: [PATCH 27/62] fix UTs --- .../_dispatcher/_standalone_job_dispatcher.py | 6 +++--- .../_dispatcher/mock_standalone_dispatcher.py | 7 ++----- .../test_standalone_job_dispatcher.py | 16 ++-------------- 3 files changed, 7 insertions(+), 22 deletions(-) diff --git a/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py b/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py index ff122f4280..30c3aaf8e3 100644 --- a/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py +++ b/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py @@ -33,7 +33,7 @@ def __init__(self, orchestrator: _AbstractOrchestrator, subproc_initializer: Opt max_workers=max_workers, initializer=subproc_initializer, ) # type: ignore - self.nb_available_lock = Lock() + self._nb_available_workers_lock = Lock() self._nb_available_workers = self._executor._max_workers # type: ignore def _can_execute(self) -> bool: @@ -52,7 +52,7 @@ def _dispatch(self, job: Job): Parameters: job (Job^): The job to submit on an executor with an available worker. """ - with self.nb_available_lock: + with self._nb_available_workers_lock: self._nb_available_workers -= 1 self._logger.error(f"Changing nb_available_workers to {self._nb_available_workers} from dispatch") config_as_string = _TomlSerializer()._serialize(Config._applied_config) # type: ignore[attr-defined] @@ -60,7 +60,7 @@ def _dispatch(self, job: Job): future.add_done_callback(partial(self._update_job_status_from_future, job)) def _update_job_status_from_future(self, job: Job, ft): - with self.nb_available_lock: + with self._nb_available_workers_lock: self._nb_available_workers += 1 self._logger.error(f"Changing nb_available_workers to {self._nb_available_workers} from callback") self._update_job_status(job, ft.result()) diff --git a/tests/core/_orchestrator/_dispatcher/mock_standalone_dispatcher.py b/tests/core/_orchestrator/_dispatcher/mock_standalone_dispatcher.py index 7be6b2cfca..acad99bdca 100644 --- a/tests/core/_orchestrator/_dispatcher/mock_standalone_dispatcher.py +++ b/tests/core/_orchestrator/_dispatcher/mock_standalone_dispatcher.py @@ -10,6 +10,7 @@ # specific language governing permissions and limitations under the License. from concurrent.futures import Executor, Future +from threading import Lock from typing import List from taipy.core import Job @@ -39,9 +40,9 @@ def __init__(self, orchestrator: _AbstractOrchestrator): super(_StandaloneJobDispatcher, self).__init__(orchestrator) self._executor: Executor = MockProcessPoolExecutor() self._nb_available_workers = 1 + self._nb_available_workers_lock = Lock() self.dispatch_calls: List = [] - self.release_worker_calls: List = [] self.update_job_status_from_future_calls: List = [] def mock_exception_for_job(self, task_id, e: Exception): @@ -51,10 +52,6 @@ def _dispatch(self, job: Job): self.dispatch_calls.append(job) super()._dispatch(job) - def _release_worker(self, _): - self.release_worker_calls.append(None) - super()._release_worker(_) - def _update_job_status_from_future(self, job: Job, ft): self.update_job_status_from_future_calls.append((job, ft)) super()._update_job_status_from_future(job, ft) diff --git a/tests/core/_orchestrator/_dispatcher/test_standalone_job_dispatcher.py b/tests/core/_orchestrator/_dispatcher/test_standalone_job_dispatcher.py index 43a428e685..b962fe3840 100644 --- a/tests/core/_orchestrator/_dispatcher/test_standalone_job_dispatcher.py +++ b/tests/core/_orchestrator/_dispatcher/test_standalone_job_dispatcher.py @@ -70,9 +70,6 @@ def test_dispatch_job(): assert submit_first_call[1] == () assert submit_first_call[2]["config_as_string"] == _TomlSerializer()._serialize(Config._applied_config) - # test that the worker is released after the job is done - assert len(dispatcher.release_worker_calls) == 1 - # test that the job status is updated after execution on future assert len(dispatcher.update_job_status_from_future_calls) == 1 assert dispatcher.update_job_status_from_future_calls[0][0] == job @@ -90,17 +87,6 @@ def test_can_execute(): dispatcher._nb_available_workers = 1 assert dispatcher._can_execute() - -def test_release_worker(): - dispatcher = _StandaloneJobDispatcher(_OrchestratorFactory._orchestrator) - - assert dispatcher._nb_available_workers == 1 - dispatcher._release_worker(None) - assert dispatcher._nb_available_workers == 2 - dispatcher._release_worker(None) - assert dispatcher._nb_available_workers == 3 - - def test_update_job_status_from_future(): task = create_task() job = Job(JobId("job"), task, "s_id", task.id) @@ -108,7 +94,9 @@ def test_update_job_status_from_future(): dispatcher = _StandaloneJobDispatcher(orchestrator) ft = Future() ft.set_result(None) + assert dispatcher._nb_available_workers == 1 dispatcher._update_job_status_from_future(job, ft) + assert dispatcher._nb_available_workers == 2 assert job.is_completed() From b0ef819dd4e691bbf4b58dcf42b890106c4de6b6 Mon Sep 17 00:00:00 2001 From: jrobinAV Date: Tue, 12 Mar 2024 15:22:37 +0100 Subject: [PATCH 28/62] minor Cleanning --- .../_dispatcher/_job_dispatcher.py | 33 ++++++++++--------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py b/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py index b9434e010d..874ff3a5ef 100644 --- a/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py +++ b/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py @@ -62,23 +62,24 @@ def stop(self, wait: bool = True, timeout: Optional[float] = None): def run(self): self._logger.debug("Job dispatcher started.") while not self._STOP_FLAG: - if self._can_execute(): - with self.lock: - self._logger.error("-------------------------> Acquired lock to execute job.") - job = None - try: - job = self.orchestrator.jobs_to_run.get(block=True, timeout=0.1) - except Empty: # In case the last job of the queue has been removed. - pass - except Exception as e: - self._logger.exception(e) - pass - self._logger.error("-------------------------> Released lock to execute job.") - if job: - self._logger.error(f"-------------------------> Got job to execute {job.id}.") - self._execute_job(job) - else: + if not self._can_execute(): time.sleep(0.1) # We need to sleep to avoid busy waiting. + continue + with self.lock: + self._logger.error("-------------------------> Acquired lock to execute job.") + job = None + try: + job = self.orchestrator.jobs_to_run.get(block=True, timeout=0.1) + except Empty: # In case the last job of the queue has been removed. + pass + self._logger.error("-------------------------> Released lock to execute job.") + if job: + self._logger.error(f"-------------------------> Got job to execute {job.id}.") + try: + self._execute_job(job) + except Exception as e: + self._logger.exception(e) + if self.stop_wait: self._logger.debug("Waiting for the dispatcher thread to stop...") self.join(timeout=self.stop_timeout) From 74378956e3e824c19a687d8cc290a02fa39eff4e Mon Sep 17 00:00:00 2001 From: jrobinAV Date: Tue, 12 Mar 2024 15:38:34 +0100 Subject: [PATCH 29/62] Remove logs --- .../_orchestrator/_dispatcher/_job_dispatcher.py | 3 --- .../_dispatcher/_standalone_job_dispatcher.py | 3 --- taipy/core/_orchestrator/_orchestrator.py | 16 ---------------- 3 files changed, 22 deletions(-) diff --git a/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py b/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py index 874ff3a5ef..17018a7a30 100644 --- a/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py +++ b/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py @@ -66,15 +66,12 @@ def run(self): time.sleep(0.1) # We need to sleep to avoid busy waiting. continue with self.lock: - self._logger.error("-------------------------> Acquired lock to execute job.") job = None try: job = self.orchestrator.jobs_to_run.get(block=True, timeout=0.1) except Empty: # In case the last job of the queue has been removed. pass - self._logger.error("-------------------------> Released lock to execute job.") if job: - self._logger.error(f"-------------------------> Got job to execute {job.id}.") try: self._execute_job(job) except Exception as e: diff --git a/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py b/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py index 30c3aaf8e3..cb592eb1a9 100644 --- a/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py +++ b/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py @@ -38,7 +38,6 @@ def __init__(self, orchestrator: _AbstractOrchestrator, subproc_initializer: Opt def _can_execute(self) -> bool: """Returns True if the dispatcher have resources to dispatch a job.""" - self._logger.error(f"can execute a job ? {self._nb_available_workers}") return self._nb_available_workers > 0 def run(self): @@ -54,7 +53,6 @@ def _dispatch(self, job: Job): """ with self._nb_available_workers_lock: self._nb_available_workers -= 1 - self._logger.error(f"Changing nb_available_workers to {self._nb_available_workers} from dispatch") config_as_string = _TomlSerializer()._serialize(Config._applied_config) # type: ignore[attr-defined] future = self._executor.submit(_TaskFunctionWrapper(job.id, job.task), config_as_string=config_as_string) future.add_done_callback(partial(self._update_job_status_from_future, job)) @@ -62,5 +60,4 @@ def _dispatch(self, job: Job): def _update_job_status_from_future(self, job: Job, ft): with self._nb_available_workers_lock: self._nb_available_workers += 1 - self._logger.error(f"Changing nb_available_workers to {self._nb_available_workers} from callback") self._update_job_status(job, ft.result()) diff --git a/taipy/core/_orchestrator/_orchestrator.py b/taipy/core/_orchestrator/_orchestrator.py index 5784d22a9f..d575f4a961 100644 --- a/taipy/core/_orchestrator/_orchestrator.py +++ b/taipy/core/_orchestrator/_orchestrator.py @@ -80,7 +80,6 @@ def submit( jobs = [] tasks = submittable._get_sorted_tasks() with cls.lock: - cls.__logger.error(f"-------------------------> Acquired lock to submit {submittable.id}.") # type: ignore for ts in tasks: for task in ts: jobs.append( @@ -94,7 +93,6 @@ def submit( ) submission.jobs = jobs # type: ignore cls._orchestrate_job_to_run_or_block(jobs) - cls.__logger.error(f"-------------------------> Released lock after submitting {submission.id}.") if Config.job_config.is_development: cls._check_and_execute_jobs_if_development_mode() else: @@ -132,7 +130,6 @@ def submit_task( ) submit_id = submission.id with cls.lock: - cls.__logger.error("-------------------------> Acquired lock to submit task.") job = cls._lock_dn_output_and_create_job( task, submit_id, @@ -143,7 +140,6 @@ def submit_task( jobs = [job] submission.jobs = jobs # type: ignore cls._orchestrate_job_to_run_or_block(jobs) - cls.__logger.error(f"-------------------------> Released lock after submitting task {task.id}.") if Config.job_config.is_development: cls._check_and_execute_jobs_if_development_mode() else: @@ -229,27 +225,18 @@ def _unlock_edit_on_jobs_outputs(jobs: Union[Job, List[Job], Set[Job]]): @classmethod def _on_status_change(cls, job: Job): if job.is_completed() or job.is_skipped(): - cls.__logger.error(f"{job.id} has been completed or skipped. Unblocking jobs.") cls.__unblock_jobs() elif job.is_failed(): cls._fail_subsequent_jobs(job) @classmethod def __unblock_jobs(cls): - cls.__logger.error(" Entering __unblock_jobs.") with cls.lock: - cls.__logger.error("-------------------------> Acquired lock to unblock jobs.") for job in cls.blocked_jobs: - cls.__logger.error(f" Unblocking {job.id} ?") if not cls._is_blocked(job): - cls.__logger.error(f" Unblocking {job.id} !") job.pending() - cls.__logger.error(f" Removing {job.id} from the blocked list.") cls.__remove_blocked_job(job) - cls.__logger.error(f" Adding {job.id} to the list of jobs to run.") cls.jobs_to_run.put(job) - cls.__logger.error("-------------------------> Released lock after unblocking jobs.") - cls.__logger.error(" Exiting __unblock_jobs.") @classmethod def __remove_blocked_job(cls, job): @@ -268,14 +255,12 @@ def cancel_job(cls, job: Job): cls.__logger.info(f"{job.id} has already failed and cannot be canceled.") else: with cls.lock: - cls.__logger.error(f"-------------------------> Acquired lock to cancel job {job.id}.") to_cancel_or_abandon_jobs = {job} to_cancel_or_abandon_jobs.update(cls.__find_subsequent_jobs(job.submit_id, set(job.task.output.keys()))) cls.__remove_blocked_jobs(to_cancel_or_abandon_jobs) cls.__remove_jobs_to_run(to_cancel_or_abandon_jobs) cls._cancel_jobs(job.id, to_cancel_or_abandon_jobs) cls._unlock_edit_on_jobs_outputs(to_cancel_or_abandon_jobs) - cls.__logger.error(f"-------------------------> Released lock after canceling {job.id}.") @classmethod def __find_subsequent_jobs(cls, submit_id, output_dn_config_ids: Set) -> Set[Job]: @@ -309,7 +294,6 @@ def __remove_jobs_to_run(cls, jobs): @classmethod def _fail_subsequent_jobs(cls, failed_job: Job): with cls.lock: - cls.__logger.error("-------------------------> Acquired lock to fail subsequent jobs.") to_fail_or_abandon_jobs = set() to_fail_or_abandon_jobs.update( cls.__find_subsequent_jobs(failed_job.submit_id, set(failed_job.task.output.keys())) From 9cc19d9df308fa2fa3623b5a68c7d189d2904805 Mon Sep 17 00:00:00 2001 From: jrobinAV Date: Tue, 12 Mar 2024 15:39:27 +0100 Subject: [PATCH 30/62] Remove logs --- taipy/core/job/job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/taipy/core/job/job.py b/taipy/core/job/job.py index f50e98a2c4..0b383dd78b 100644 --- a/taipy/core/job/job.py +++ b/taipy/core/job/job.py @@ -33,7 +33,7 @@ def _run_callbacks(fn): def __run_callbacks(job): fn(job) - _TaipyLogger._get_logger().error(f"{job.id} status has changed to {job.status}.") + _TaipyLogger._get_logger().debug(f"{job.id} status has changed to {job.status}.") for fct in job._subscribers: fct(job) From b94d3c3c0fc55c6bd07c6c615051a14d22a21d19 Mon Sep 17 00:00:00 2001 From: jrobinAV Date: Tue, 12 Mar 2024 15:43:35 +0100 Subject: [PATCH 31/62] Cleaning --- taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py b/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py index 17018a7a30..cab2439750 100644 --- a/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py +++ b/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py @@ -68,12 +68,16 @@ def run(self): with self.lock: job = None try: - job = self.orchestrator.jobs_to_run.get(block=True, timeout=0.1) + if not self._STOP_FLAG: + job = self.orchestrator.jobs_to_run.get(block=True, timeout=0.1) except Empty: # In case the last job of the queue has been removed. pass if job: try: - self._execute_job(job) + if not self._STOP_FLAG: + self._execute_job(job) + else: + self.orchestrator.jobs_to_run.put(job) except Exception as e: self._logger.exception(e) From 3cbcb4d6f62090444a364cb145fcc66506cb1a15 Mon Sep 17 00:00:00 2001 From: jrobinAV Date: Tue, 12 Mar 2024 15:47:10 +0100 Subject: [PATCH 32/62] Revert unecessary change --- taipy/core/_orchestrator/_orchestrator.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/taipy/core/_orchestrator/_orchestrator.py b/taipy/core/_orchestrator/_orchestrator.py index d575f4a961..5776643476 100644 --- a/taipy/core/_orchestrator/_orchestrator.py +++ b/taipy/core/_orchestrator/_orchestrator.py @@ -231,9 +231,9 @@ def _on_status_change(cls, job: Job): @classmethod def __unblock_jobs(cls): - with cls.lock: - for job in cls.blocked_jobs: - if not cls._is_blocked(job): + for job in cls.blocked_jobs: + if not cls._is_blocked(job): + with cls.lock: job.pending() cls.__remove_blocked_job(job) cls.jobs_to_run.put(job) From 8eeb9401ba31366874194f188a9a25cf6c9adb26 Mon Sep 17 00:00:00 2001 From: jrobinAV Date: Tue, 12 Mar 2024 16:00:24 +0100 Subject: [PATCH 33/62] Revert sql retry pattern --- taipy/core/_repository/_sql_repository.py | 20 +++++--------------- 1 file changed, 5 insertions(+), 15 deletions(-) diff --git a/taipy/core/_repository/_sql_repository.py b/taipy/core/_repository/_sql_repository.py index e3ce0cf9b3..cf80311153 100644 --- a/taipy/core/_repository/_sql_repository.py +++ b/taipy/core/_repository/_sql_repository.py @@ -50,32 +50,22 @@ def __init__(self, model_type: Type[ModelType], converter: Type[Converter]): ############################### # ## Inherited methods ## # ############################### - def _save(self, entity: Entity, retry: int = 3): + def _save(self, entity: Entity): obj = self.converter._entity_to_model(entity) if self._exists(entity.id): # type: ignore try: self._update_entry(obj) return except DatabaseError as e: - self._logger.error(f"Error while updating {entity.id} in {self.table.name}. " # type: ignore - f"Retry nb: {4-retry}.") + self._logger.error(f"Error while updating {entity.id} in {self.table.name}. ") # type: ignore self._logger.error(f"Error : {e}") - if retry > 0: - sleep(0.1) - self._save(entity, retry - 1) - else: - raise e + raise e try: self.__insert_model(obj) except DatabaseError as e: - self._logger.error(f"Error while inserting {entity.id} into {self.table.name}. " # type: ignore - f"Retry nb: {4 - retry}.") + self._logger.error(f"Error while inserting {entity.id} into {self.table.name}. ") # type: ignore self._logger.error(f"Error : {e}") - if retry > 0: - sleep(0.1) - self._save(entity, retry - 1) - else: - raise e + raise e def _exists(self, entity_id: str): query = self.table.select().filter_by(id=entity_id) From 8a3b57be3be2130d8957cfcc684714e7d8c661e9 Mon Sep 17 00:00:00 2001 From: trgiangdo Date: Tue, 12 Mar 2024 22:02:47 +0700 Subject: [PATCH 34/62] fix: reset everything after each test as well --- tests/core/conftest.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/core/conftest.py b/tests/core/conftest.py index 0583c0639a..6dfbbdde0a 100644 --- a/tests/core/conftest.py +++ b/tests/core/conftest.py @@ -331,6 +331,12 @@ def clean_repository(init_config, init_managers, init_orchestrator, init_notifie with patch("sys.argv", ["prog"]): yield + close_all_sessions() + init_orchestrator() + init_managers() + init_config() + init_notifier() + @pytest.fixture def init_config(reset_configuration_singleton, inject_core_sections): From a4d07cf203879558243c7375cdf15da889310e2c Mon Sep 17 00:00:00 2001 From: trgiangdo Date: Tue, 12 Mar 2024 22:06:10 +0700 Subject: [PATCH 35/62] fix: linter error --- taipy/core/_repository/_sql_repository.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/taipy/core/_repository/_sql_repository.py b/taipy/core/_repository/_sql_repository.py index cf80311153..9a69c1d111 100644 --- a/taipy/core/_repository/_sql_repository.py +++ b/taipy/core/_repository/_sql_repository.py @@ -12,7 +12,6 @@ import json import pathlib from sqlite3 import DatabaseError -from time import sleep from typing import Any, Dict, Iterable, List, Optional, Type, Union from sqlalchemy.dialects import sqlite @@ -26,7 +25,6 @@ class _SQLRepository(_AbstractRepository[ModelType, Entity]): - _logger = _TaipyLogger._get_logger() def __init__(self, model_type: Type[ModelType], converter: Type[Converter]): From 67bac4e911617fffecbadd2c8ed76b8bdbd9d3df Mon Sep 17 00:00:00 2001 From: trgiangdo Date: Wed, 13 Mar 2024 10:02:46 +0700 Subject: [PATCH 36/62] minor improvements --- taipy/core/_orchestrator/_orchestrator.py | 2 +- taipy/core/submission/submission.py | 13 ++++++++----- .../core/_orchestrator/test_orchestrator_factory.py | 3 +++ tests/core/config/test_override_config.py | 5 +++-- tests/core/conftest.py | 11 +++-------- tests/core/job/test_job_manager.py | 10 +--------- 6 files changed, 19 insertions(+), 25 deletions(-) diff --git a/taipy/core/_orchestrator/_orchestrator.py b/taipy/core/_orchestrator/_orchestrator.py index 5776643476..48aa27ca01 100644 --- a/taipy/core/_orchestrator/_orchestrator.py +++ b/taipy/core/_orchestrator/_orchestrator.py @@ -36,7 +36,7 @@ class _Orchestrator(_AbstractOrchestrator): """ jobs_to_run: Queue = Queue() - blocked_jobs: List = [] + blocked_jobs: List[Job] = [] lock = Lock() __logger = _TaipyLogger._get_logger() diff --git a/taipy/core/submission/submission.py b/taipy/core/submission/submission.py index 32fe2976a2..42f697e427 100644 --- a/taipy/core/submission/submission.py +++ b/taipy/core/submission/submission.py @@ -199,6 +199,7 @@ def __ge__(self, other): def _update_submission_status(self, job: Job): from ._submission_manager_factory import _SubmissionManagerFactory + with self.lock: submission_manager = _SubmissionManagerFactory._build_manager() submission = submission_manager._get(self) @@ -208,9 +209,10 @@ def _update_submission_status(self, job: Job): job_status = job.status if job_status == Status.FAILED: submission._submission_status = SubmissionStatus.FAILED - _SubmissionManagerFactory._build_manager()._set(submission) - self.__logger.debug(f"{job.id} status is {job_status}. Submission status set to " - f"{submission._submission_status}") + submission_manager._set(submission) + self.__logger.debug( + f"{job.id} status is {job_status}. Submission status set to " f"{submission._submission_status}" + ) return if job_status == Status.CANCELED: submission._is_canceled = True @@ -251,8 +253,9 @@ def _update_submission_status(self, job: Job): submission.submission_status = SubmissionStatus.COMPLETED # type: ignore else: submission.submission_status = SubmissionStatus.UNDEFINED # type: ignore - self.__logger.debug(f"{job.id} status is {job_status}. Submission status set to " - f"{submission._submission_status}") + self.__logger.debug( + f"{job.id} status is {job_status}. Submission status set to " f"{submission._submission_status}" + ) def is_finished(self) -> bool: """Indicate if the submission is finished. diff --git a/tests/core/_orchestrator/test_orchestrator_factory.py b/tests/core/_orchestrator/test_orchestrator_factory.py index d4e387d42f..f7536cac31 100644 --- a/tests/core/_orchestrator/test_orchestrator_factory.py +++ b/tests/core/_orchestrator/test_orchestrator_factory.py @@ -111,6 +111,9 @@ def test_build_unknown_dispatcher(): _OrchestratorFactory._build_dispatcher() assert _OrchestratorFactory._dispatcher is None + Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE) + _OrchestratorFactory._build_dispatcher() + def test_remove_dispatcher_not_built(): _OrchestratorFactory._dispatcher = None diff --git a/tests/core/config/test_override_config.py b/tests/core/config/test_override_config.py index 192809535c..e7406422ce 100644 --- a/tests/core/config/test_override_config.py +++ b/tests/core/config/test_override_config.py @@ -60,13 +60,14 @@ def test_override_default_configuration_with_code_configuration(): def test_override_default_config_with_code_config_including_env_variable_values(): Config.configure_core() assert Config.core.repository_type == "filesystem" - Config.configure_core(repository_type="othertype") - assert Config.core.repository_type == "othertype" with mock.patch.dict(os.environ, {"REPOSITORY_TYPE": "foo"}): Config.configure_core(repository_type="ENV[REPOSITORY_TYPE]") assert Config.core.repository_type == "foo" + Config.configure_core(repository_type="othertype") + assert Config.core.repository_type == "othertype" + def test_override_default_configuration_with_file_configuration(): tf = NamedTemporaryFile( diff --git a/tests/core/conftest.py b/tests/core/conftest.py index 6dfbbdde0a..5c45eec4c0 100644 --- a/tests/core/conftest.py +++ b/tests/core/conftest.py @@ -180,14 +180,9 @@ def default_multi_sheet_data_frame(): def cleanup_files(): yield - if os.path.exists(".data"): - shutil.rmtree(".data", ignore_errors=True) - if os.path.exists("user_data"): - shutil.rmtree("user_data", ignore_errors=True) - if os.path.exists(".taipy"): - shutil.rmtree(".taipy", ignore_errors=True) - if os.path.exists(".my_data"): - shutil.rmtree(".my_data", ignore_errors=True) + for path in [".data", ".my_data", "user_data", ".taipy"]: + if os.path.exists(path): + shutil.rmtree(path, ignore_errors=True) @pytest.fixture(scope="function") diff --git a/tests/core/job/test_job_manager.py b/tests/core/job/test_job_manager.py index c479bcb9e6..abf65e2360 100644 --- a/tests/core/job/test_job_manager.py +++ b/tests/core/job/test_job_manager.py @@ -138,15 +138,6 @@ def test_delete_job(): assert _JobManager._get(job_1.id) is None -m = multiprocessing.Manager() -lock = m.Lock() - - -def inner_lock_multiply(nb1: float, nb2: float): - with lock: - return multiply(1 or nb1, 2 or nb2) - - def test_raise_when_trying_to_delete_unfinished_job(): Config.configure_job_executions(mode=JobConfig._STANDALONE_MODE, max_nb_of_workers=2) m = multiprocessing.Manager() @@ -326,6 +317,7 @@ def test_cancel_subsequent_jobs(): orchestrator = _OrchestratorFactory._orchestrator submission_manager = _SubmissionManagerFactory._build_manager() + m = multiprocessing.Manager() lock_0 = m.Lock() dn_1 = InMemoryDataNode("dn_config_1", Scope.SCENARIO, properties={"default_data": 1}) From f1aa3708d1b470e6539c8c594819f17f88cfbaf0 Mon Sep 17 00:00:00 2001 From: trgiangdo Date: Wed, 13 Mar 2024 14:01:14 +0700 Subject: [PATCH 37/62] fix: move dispatcher.join() back to the stop method --- .../_orchestrator/_dispatcher/_job_dispatcher.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py b/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py index cab2439750..ac0fb52654 100644 --- a/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py +++ b/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py @@ -55,9 +55,10 @@ def stop(self, wait: bool = True, timeout: Optional[float] = None): wait (bool): If True, the method will wait for the dispatcher to stop. timeout (Optional[float]): The maximum time to wait. If None, the method will wait indefinitely. """ - self.stop_wait = wait - self.stop_timeout = timeout self._STOP_FLAG = True + if wait and self.is_alive(): + self._logger.debug("Waiting for the dispatcher thread to stop...") + self.join(timeout=timeout) def run(self): self._logger.debug("Job dispatcher started.") @@ -65,13 +66,15 @@ def run(self): if not self._can_execute(): time.sleep(0.1) # We need to sleep to avoid busy waiting. continue + with self.lock: job = None try: if not self._STOP_FLAG: job = self.orchestrator.jobs_to_run.get(block=True, timeout=0.1) except Empty: # In case the last job of the queue has been removed. - pass + continue + if job: try: if not self._STOP_FLAG: @@ -81,9 +84,6 @@ def run(self): except Exception as e: self._logger.exception(e) - if self.stop_wait: - self._logger.debug("Waiting for the dispatcher thread to stop...") - self.join(timeout=self.stop_timeout) self._logger.debug("Job dispatcher stopped.") @abstractmethod From 9ee4c2b0c367265ecdec7f94f7e345f362d167e7 Mon Sep 17 00:00:00 2001 From: trgiangdo Date: Wed, 13 Mar 2024 14:01:44 +0700 Subject: [PATCH 38/62] fix: move _nb_available_workers_lock lock to class level --- .../_orchestrator/_dispatcher/_standalone_job_dispatcher.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py b/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py index cb592eb1a9..f57711ab78 100644 --- a/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py +++ b/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py @@ -26,6 +26,8 @@ class _StandaloneJobDispatcher(_JobDispatcher): """Manages job dispatching (instances of `Job^` class) in an asynchronous way using a ProcessPoolExecutor.""" + _nb_available_workers_lock = Lock() + def __init__(self, orchestrator: _AbstractOrchestrator, subproc_initializer: Optional[Callable] = None): super().__init__(orchestrator) max_workers = Config.job_config.max_nb_of_workers or 1 @@ -33,7 +35,6 @@ def __init__(self, orchestrator: _AbstractOrchestrator, subproc_initializer: Opt max_workers=max_workers, initializer=subproc_initializer, ) # type: ignore - self._nb_available_workers_lock = Lock() self._nb_available_workers = self._executor._max_workers # type: ignore def _can_execute(self) -> bool: From 4af9e0fe512102a5d8d79c3b1ca5a23550dcfa30 Mon Sep 17 00:00:00 2001 From: jrobinAV Date: Mon, 18 Mar 2024 11:29:05 +0100 Subject: [PATCH 39/62] Revert "Remove logs" This reverts commit 74378956 --- .../_dispatcher/_job_dispatcher.py | 4 ++- .../_dispatcher/_standalone_job_dispatcher.py | 3 +++ taipy/core/_orchestrator/_orchestrator.py | 26 +++++++++++++++---- 3 files changed, 27 insertions(+), 6 deletions(-) diff --git a/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py b/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py index ac0fb52654..ff3b4f2dd7 100644 --- a/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py +++ b/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py @@ -68,14 +68,16 @@ def run(self): continue with self.lock: + self._logger.error("-------------------------> Acquired lock to execute job.") job = None try: if not self._STOP_FLAG: job = self.orchestrator.jobs_to_run.get(block=True, timeout=0.1) except Empty: # In case the last job of the queue has been removed. continue - + self._logger.error("-------------------------> Released lock to execute job.") if job: + self._logger.error(f"-------------------------> Got job to execute {job.id}.") try: if not self._STOP_FLAG: self._execute_job(job) diff --git a/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py b/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py index f57711ab78..2b47a0e128 100644 --- a/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py +++ b/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py @@ -39,6 +39,7 @@ def __init__(self, orchestrator: _AbstractOrchestrator, subproc_initializer: Opt def _can_execute(self) -> bool: """Returns True if the dispatcher have resources to dispatch a job.""" + self._logger.error(f"can execute a job ? {self._nb_available_workers}") return self._nb_available_workers > 0 def run(self): @@ -54,6 +55,7 @@ def _dispatch(self, job: Job): """ with self._nb_available_workers_lock: self._nb_available_workers -= 1 + self._logger.error(f"Changing nb_available_workers to {self._nb_available_workers} from dispatch") config_as_string = _TomlSerializer()._serialize(Config._applied_config) # type: ignore[attr-defined] future = self._executor.submit(_TaskFunctionWrapper(job.id, job.task), config_as_string=config_as_string) future.add_done_callback(partial(self._update_job_status_from_future, job)) @@ -61,4 +63,5 @@ def _dispatch(self, job: Job): def _update_job_status_from_future(self, job: Job, ft): with self._nb_available_workers_lock: self._nb_available_workers += 1 + self._logger.error(f"Changing nb_available_workers to {self._nb_available_workers} from callback") self._update_job_status(job, ft.result()) diff --git a/taipy/core/_orchestrator/_orchestrator.py b/taipy/core/_orchestrator/_orchestrator.py index 0c1674e697..af0d2e2e14 100644 --- a/taipy/core/_orchestrator/_orchestrator.py +++ b/taipy/core/_orchestrator/_orchestrator.py @@ -36,7 +36,7 @@ class _Orchestrator(_AbstractOrchestrator): """ jobs_to_run: Queue = Queue() - blocked_jobs: List[Job] = [] + blocked_jobs: List = [] lock = Lock() __logger = _TaipyLogger._get_logger() @@ -77,9 +77,10 @@ def submit( getattr(submittable, "config_id", None), **properties, ) - jobs: List[Job] = [] + jobs = [] tasks = submittable._get_sorted_tasks() with cls.lock: + cls.__logger.error(f"-------------------------> Acquired lock to submit {submittable.id}.") # type: ignore for ts in tasks: jobs.extend( cls._lock_dn_output_and_create_job( @@ -93,6 +94,7 @@ def submit( ) submission.jobs = jobs # type: ignore cls._orchestrate_job_to_run_or_block(jobs) + cls.__logger.error(f"-------------------------> Released lock after submitting {submission.id}.") if Config.job_config.is_development: cls._check_and_execute_jobs_if_development_mode() elif wait: @@ -129,6 +131,7 @@ def submit_task( ) submit_id = submission.id with cls.lock: + cls.__logger.error("-------------------------> Acquired lock to submit task.") job = cls._lock_dn_output_and_create_job( task, submit_id, @@ -139,6 +142,7 @@ def submit_task( jobs = [job] submission.jobs = jobs # type: ignore cls._orchestrate_job_to_run_or_block(jobs) + cls.__logger.error(f"-------------------------> Released lock after submitting task {task.id}.") if Config.job_config.is_development: cls._check_and_execute_jobs_if_development_mode() else: @@ -223,18 +227,27 @@ def _unlock_edit_on_jobs_outputs(jobs: Union[Job, List[Job], Set[Job]]): @classmethod def _on_status_change(cls, job: Job): if job.is_completed() or job.is_skipped(): + cls.__logger.error(f"{job.id} has been completed or skipped. Unblocking jobs.") cls.__unblock_jobs() elif job.is_failed(): cls._fail_subsequent_jobs(job) @classmethod def __unblock_jobs(cls): - for job in cls.blocked_jobs: - if not cls._is_blocked(job): - with cls.lock: + cls.__logger.error(" Entering __unblock_jobs.") + with cls.lock: + cls.__logger.error("-------------------------> Acquired lock to unblock jobs.") + for job in cls.blocked_jobs: + cls.__logger.error(f" Unblocking {job.id} ?") + if not cls._is_blocked(job): + cls.__logger.error(f" Unblocking {job.id} !") job.pending() + cls.__logger.error(f" Removing {job.id} from the blocked list.") cls.__remove_blocked_job(job) + cls.__logger.error(f" Adding {job.id} to the list of jobs to run.") cls.jobs_to_run.put(job) + cls.__logger.error("-------------------------> Released lock after unblocking jobs.") + cls.__logger.error(" Exiting __unblock_jobs.") @classmethod def __remove_blocked_job(cls, job): @@ -253,12 +266,14 @@ def cancel_job(cls, job: Job): cls.__logger.info(f"{job.id} has already failed and cannot be canceled.") else: with cls.lock: + cls.__logger.error(f"-------------------------> Acquired lock to cancel job {job.id}.") to_cancel_or_abandon_jobs = {job} to_cancel_or_abandon_jobs.update(cls.__find_subsequent_jobs(job.submit_id, set(job.task.output.keys()))) cls.__remove_blocked_jobs(to_cancel_or_abandon_jobs) cls.__remove_jobs_to_run(to_cancel_or_abandon_jobs) cls._cancel_jobs(job.id, to_cancel_or_abandon_jobs) cls._unlock_edit_on_jobs_outputs(to_cancel_or_abandon_jobs) + cls.__logger.error(f"-------------------------> Released lock after canceling {job.id}.") @classmethod def __find_subsequent_jobs(cls, submit_id, output_dn_config_ids: Set) -> Set[Job]: @@ -292,6 +307,7 @@ def __remove_jobs_to_run(cls, jobs): @classmethod def _fail_subsequent_jobs(cls, failed_job: Job): with cls.lock: + cls.__logger.error("-------------------------> Acquired lock to fail subsequent jobs.") to_fail_or_abandon_jobs = set() to_fail_or_abandon_jobs.update( cls.__find_subsequent_jobs(failed_job.submit_id, set(failed_job.task.output.keys())) From fb9d621103a709d359d433fe24daff1a9b0174be Mon Sep 17 00:00:00 2001 From: jrobinAV Date: Mon, 18 Mar 2024 12:01:07 +0100 Subject: [PATCH 40/62] fix linter --- taipy/core/_orchestrator/_orchestrator.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/taipy/core/_orchestrator/_orchestrator.py b/taipy/core/_orchestrator/_orchestrator.py index af0d2e2e14..00135a09f9 100644 --- a/taipy/core/_orchestrator/_orchestrator.py +++ b/taipy/core/_orchestrator/_orchestrator.py @@ -36,7 +36,7 @@ class _Orchestrator(_AbstractOrchestrator): """ jobs_to_run: Queue = Queue() - blocked_jobs: List = [] + blocked_jobs: List[Job] = [] lock = Lock() __logger = _TaipyLogger._get_logger() @@ -77,7 +77,7 @@ def submit( getattr(submittable, "config_id", None), **properties, ) - jobs = [] + jobs: List[Job] = [] tasks = submittable._get_sorted_tasks() with cls.lock: cls.__logger.error(f"-------------------------> Acquired lock to submit {submittable.id}.") # type: ignore From fab34c6df1fc4ac83929faf68e754e612a7bc810 Mon Sep 17 00:00:00 2001 From: jrobinAV Date: Mon, 18 Mar 2024 13:50:36 +0100 Subject: [PATCH 41/62] minor change. --- taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py | 2 +- .../_orchestrator/_dispatcher/_standalone_job_dispatcher.py | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py b/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py index ff3b4f2dd7..1f4a3aae1b 100644 --- a/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py +++ b/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py @@ -74,7 +74,7 @@ def run(self): if not self._STOP_FLAG: job = self.orchestrator.jobs_to_run.get(block=True, timeout=0.1) except Empty: # In case the last job of the queue has been removed. - continue + pass self._logger.error("-------------------------> Released lock to execute job.") if job: self._logger.error(f"-------------------------> Got job to execute {job.id}.") diff --git a/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py b/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py index 2b47a0e128..d8380815c0 100644 --- a/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py +++ b/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py @@ -39,8 +39,9 @@ def __init__(self, orchestrator: _AbstractOrchestrator, subproc_initializer: Opt def _can_execute(self) -> bool: """Returns True if the dispatcher have resources to dispatch a job.""" - self._logger.error(f"can execute a job ? {self._nb_available_workers}") - return self._nb_available_workers > 0 + with self._nb_available_workers_lock: + self._logger.error(f"can execute a job ? {self._nb_available_workers}") + return self._nb_available_workers > 0 def run(self): with self._executor: From 8d9dd339a6d95ff42230fc0f3a05dfd2c92fc750 Mon Sep 17 00:00:00 2001 From: jrobinAV Date: Tue, 19 Mar 2024 09:04:26 +0100 Subject: [PATCH 42/62] Turn logs to debug --- .../_dispatcher/_job_dispatcher.py | 6 ++-- .../_dispatcher/_standalone_job_dispatcher.py | 6 ++-- taipy/core/_orchestrator/_orchestrator.py | 32 +++++++++---------- 3 files changed, 22 insertions(+), 22 deletions(-) diff --git a/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py b/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py index 1f4a3aae1b..c736d2d4c6 100644 --- a/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py +++ b/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py @@ -68,16 +68,16 @@ def run(self): continue with self.lock: - self._logger.error("-------------------------> Acquired lock to execute job.") + self._logger.debug("-------------------------> Acquired lock to execute job.") job = None try: if not self._STOP_FLAG: job = self.orchestrator.jobs_to_run.get(block=True, timeout=0.1) except Empty: # In case the last job of the queue has been removed. pass - self._logger.error("-------------------------> Released lock to execute job.") + self._logger.debug("-------------------------> Released lock to execute job.") if job: - self._logger.error(f"-------------------------> Got job to execute {job.id}.") + self._logger.debug(f"-------------------------> Got job to execute {job.id}.") try: if not self._STOP_FLAG: self._execute_job(job) diff --git a/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py b/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py index d8380815c0..8e88deea72 100644 --- a/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py +++ b/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py @@ -40,7 +40,7 @@ def __init__(self, orchestrator: _AbstractOrchestrator, subproc_initializer: Opt def _can_execute(self) -> bool: """Returns True if the dispatcher have resources to dispatch a job.""" with self._nb_available_workers_lock: - self._logger.error(f"can execute a job ? {self._nb_available_workers}") + self._logger.debug(f"can execute a job ? {self._nb_available_workers}") return self._nb_available_workers > 0 def run(self): @@ -56,7 +56,7 @@ def _dispatch(self, job: Job): """ with self._nb_available_workers_lock: self._nb_available_workers -= 1 - self._logger.error(f"Changing nb_available_workers to {self._nb_available_workers} from dispatch") + self._logger.debug(f"Changing nb_available_workers to {self._nb_available_workers} from dispatch") config_as_string = _TomlSerializer()._serialize(Config._applied_config) # type: ignore[attr-defined] future = self._executor.submit(_TaskFunctionWrapper(job.id, job.task), config_as_string=config_as_string) future.add_done_callback(partial(self._update_job_status_from_future, job)) @@ -64,5 +64,5 @@ def _dispatch(self, job: Job): def _update_job_status_from_future(self, job: Job, ft): with self._nb_available_workers_lock: self._nb_available_workers += 1 - self._logger.error(f"Changing nb_available_workers to {self._nb_available_workers} from callback") + self._logger.debug(f"Changing nb_available_workers to {self._nb_available_workers} from callback") self._update_job_status(job, ft.result()) diff --git a/taipy/core/_orchestrator/_orchestrator.py b/taipy/core/_orchestrator/_orchestrator.py index 00135a09f9..5307208825 100644 --- a/taipy/core/_orchestrator/_orchestrator.py +++ b/taipy/core/_orchestrator/_orchestrator.py @@ -80,7 +80,7 @@ def submit( jobs: List[Job] = [] tasks = submittable._get_sorted_tasks() with cls.lock: - cls.__logger.error(f"-------------------------> Acquired lock to submit {submittable.id}.") # type: ignore + cls.__logger.debug(f"-------------------------> Acquired lock to submit {submittable.id}.") # type: ignore for ts in tasks: jobs.extend( cls._lock_dn_output_and_create_job( @@ -94,7 +94,7 @@ def submit( ) submission.jobs = jobs # type: ignore cls._orchestrate_job_to_run_or_block(jobs) - cls.__logger.error(f"-------------------------> Released lock after submitting {submission.id}.") + cls.__logger.debug(f"-------------------------> Released lock after submitting {submission.id}.") if Config.job_config.is_development: cls._check_and_execute_jobs_if_development_mode() elif wait: @@ -131,7 +131,7 @@ def submit_task( ) submit_id = submission.id with cls.lock: - cls.__logger.error("-------------------------> Acquired lock to submit task.") + cls.__logger.debug("-------------------------> Acquired lock to submit task.") job = cls._lock_dn_output_and_create_job( task, submit_id, @@ -142,7 +142,7 @@ def submit_task( jobs = [job] submission.jobs = jobs # type: ignore cls._orchestrate_job_to_run_or_block(jobs) - cls.__logger.error(f"-------------------------> Released lock after submitting task {task.id}.") + cls.__logger.debug(f"-------------------------> Released lock after submitting task {task.id}.") if Config.job_config.is_development: cls._check_and_execute_jobs_if_development_mode() else: @@ -227,27 +227,27 @@ def _unlock_edit_on_jobs_outputs(jobs: Union[Job, List[Job], Set[Job]]): @classmethod def _on_status_change(cls, job: Job): if job.is_completed() or job.is_skipped(): - cls.__logger.error(f"{job.id} has been completed or skipped. Unblocking jobs.") + cls.__logger.debug(f"{job.id} has been completed or skipped. Unblocking jobs.") cls.__unblock_jobs() elif job.is_failed(): cls._fail_subsequent_jobs(job) @classmethod def __unblock_jobs(cls): - cls.__logger.error(" Entering __unblock_jobs.") + cls.__logger.debug(" Entering __unblock_jobs.") with cls.lock: - cls.__logger.error("-------------------------> Acquired lock to unblock jobs.") + cls.__logger.debug("-------------------------> Acquired lock to unblock jobs.") for job in cls.blocked_jobs: - cls.__logger.error(f" Unblocking {job.id} ?") + cls.__logger.debug(f" Unblocking {job.id} ?") if not cls._is_blocked(job): - cls.__logger.error(f" Unblocking {job.id} !") + cls.__logger.debug(f" Unblocking {job.id} !") job.pending() - cls.__logger.error(f" Removing {job.id} from the blocked list.") + cls.__logger.debug(f" Removing {job.id} from the blocked list.") cls.__remove_blocked_job(job) - cls.__logger.error(f" Adding {job.id} to the list of jobs to run.") + cls.__logger.debug(f" Adding {job.id} to the list of jobs to run.") cls.jobs_to_run.put(job) - cls.__logger.error("-------------------------> Released lock after unblocking jobs.") - cls.__logger.error(" Exiting __unblock_jobs.") + cls.__logger.debug("-------------------------> Released lock after unblocking jobs.") + cls.__logger.debug(" Exiting __unblock_jobs.") @classmethod def __remove_blocked_job(cls, job): @@ -266,14 +266,14 @@ def cancel_job(cls, job: Job): cls.__logger.info(f"{job.id} has already failed and cannot be canceled.") else: with cls.lock: - cls.__logger.error(f"-------------------------> Acquired lock to cancel job {job.id}.") + cls.__logger.debug(f"-------------------------> Acquired lock to cancel job {job.id}.") to_cancel_or_abandon_jobs = {job} to_cancel_or_abandon_jobs.update(cls.__find_subsequent_jobs(job.submit_id, set(job.task.output.keys()))) cls.__remove_blocked_jobs(to_cancel_or_abandon_jobs) cls.__remove_jobs_to_run(to_cancel_or_abandon_jobs) cls._cancel_jobs(job.id, to_cancel_or_abandon_jobs) cls._unlock_edit_on_jobs_outputs(to_cancel_or_abandon_jobs) - cls.__logger.error(f"-------------------------> Released lock after canceling {job.id}.") + cls.__logger.debug(f"-------------------------> Released lock after canceling {job.id}.") @classmethod def __find_subsequent_jobs(cls, submit_id, output_dn_config_ids: Set) -> Set[Job]: @@ -307,7 +307,7 @@ def __remove_jobs_to_run(cls, jobs): @classmethod def _fail_subsequent_jobs(cls, failed_job: Job): with cls.lock: - cls.__logger.error("-------------------------> Acquired lock to fail subsequent jobs.") + cls.__logger.debug("-------------------------> Acquired lock to fail subsequent jobs.") to_fail_or_abandon_jobs = set() to_fail_or_abandon_jobs.update( cls.__find_subsequent_jobs(failed_job.submit_id, set(failed_job.task.output.keys())) From 9720572cb5a0d5c307cc3d52122b8d9d91275dbc Mon Sep 17 00:00:00 2001 From: jrobinAV Date: Tue, 19 Mar 2024 10:43:20 +0100 Subject: [PATCH 43/62] add msecs to default log format --- taipy/logger/_taipy_logger.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/taipy/logger/_taipy_logger.py b/taipy/logger/_taipy_logger.py index 8cde4274d0..43deaac345 100644 --- a/taipy/logger/_taipy_logger.py +++ b/taipy/logger/_taipy_logger.py @@ -34,7 +34,7 @@ def _get_logger(cls): cls.__logger.setLevel(logging.INFO) ch = logging.StreamHandler(sys.stdout) ch.setLevel(logging.INFO) - formatter = logging.Formatter("[%(asctime)s][%(name)s][%(levelname)s] %(message)s", "%Y-%m-%d %H:%M:%S") - ch.setFormatter(formatter) + frmter = logging.Formatter("[%(asctime)s.%03d][%(name)s][%(levelname)s] %(message)s", "%Y-%m-%d %H:%M:%S") + ch.setFormatter(frmter) cls.__logger.addHandler(ch) return cls.__logger From e35ded629edd551c1a1e86fa43193f8680dacad8 Mon Sep 17 00:00:00 2001 From: Toan Quach Date: Tue, 19 Mar 2024 16:51:13 +0700 Subject: [PATCH 44/62] replace threading lock with rlock --- .../_orchestrator/_dispatcher/_standalone_job_dispatcher.py | 4 ++-- taipy/core/_orchestrator/_orchestrator.py | 4 ++-- taipy/core/submission/submission.py | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py b/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py index 8e88deea72..7539c6f1b7 100644 --- a/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py +++ b/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py @@ -11,7 +11,7 @@ from concurrent.futures import Executor, ProcessPoolExecutor from functools import partial -from threading import Lock +from threading import RLock from typing import Callable, Optional from taipy.config._serializer._toml_serializer import _TomlSerializer @@ -26,7 +26,7 @@ class _StandaloneJobDispatcher(_JobDispatcher): """Manages job dispatching (instances of `Job^` class) in an asynchronous way using a ProcessPoolExecutor.""" - _nb_available_workers_lock = Lock() + _nb_available_workers_lock = RLock() def __init__(self, orchestrator: _AbstractOrchestrator, subproc_initializer: Optional[Callable] = None): super().__init__(orchestrator) diff --git a/taipy/core/_orchestrator/_orchestrator.py b/taipy/core/_orchestrator/_orchestrator.py index 5307208825..028f5ad4e4 100644 --- a/taipy/core/_orchestrator/_orchestrator.py +++ b/taipy/core/_orchestrator/_orchestrator.py @@ -12,7 +12,7 @@ import itertools from datetime import datetime from queue import Queue -from threading import Lock +from threading import RLock from time import sleep from typing import Callable, Iterable, List, Optional, Set, Union @@ -38,7 +38,7 @@ class _Orchestrator(_AbstractOrchestrator): jobs_to_run: Queue = Queue() blocked_jobs: List[Job] = [] - lock = Lock() + lock = RLock() __logger = _TaipyLogger._get_logger() @classmethod diff --git a/taipy/core/submission/submission.py b/taipy/core/submission/submission.py index 42f697e427..016b2d9c3a 100644 --- a/taipy/core/submission/submission.py +++ b/taipy/core/submission/submission.py @@ -44,7 +44,7 @@ class Submission(_Entity, _Labeled): _ID_PREFIX = "SUBMISSION" _MANAGER_NAME = "submission" __SEPARATOR = "_" - lock = threading.Lock() + lock = threading.RLock() __logger = _TaipyLogger._get_logger() def __init__( From e07c719a40ac41360549f31d7a2a109117ca00be Mon Sep 17 00:00:00 2001 From: Toan Quach Date: Tue, 19 Mar 2024 16:54:28 +0700 Subject: [PATCH 45/62] replace threading lock with rlock --- .../_orchestrator/_dispatcher/mock_standalone_dispatcher.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/core/_orchestrator/_dispatcher/mock_standalone_dispatcher.py b/tests/core/_orchestrator/_dispatcher/mock_standalone_dispatcher.py index acad99bdca..54f2864e5a 100644 --- a/tests/core/_orchestrator/_dispatcher/mock_standalone_dispatcher.py +++ b/tests/core/_orchestrator/_dispatcher/mock_standalone_dispatcher.py @@ -10,7 +10,7 @@ # specific language governing permissions and limitations under the License. from concurrent.futures import Executor, Future -from threading import Lock +from threading import RLock from typing import List from taipy.core import Job @@ -40,7 +40,7 @@ def __init__(self, orchestrator: _AbstractOrchestrator): super(_StandaloneJobDispatcher, self).__init__(orchestrator) self._executor: Executor = MockProcessPoolExecutor() self._nb_available_workers = 1 - self._nb_available_workers_lock = Lock() + self._nb_available_workers_lock = RLock() self.dispatch_calls: List = [] self.update_job_status_from_future_calls: List = [] From 9993c25c522c27a2fec04762884ebd40a2502131 Mon Sep 17 00:00:00 2001 From: jrobinAV Date: Tue, 19 Mar 2024 13:14:04 +0100 Subject: [PATCH 46/62] remove msecs to default log format --- taipy/logger/_taipy_logger.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/taipy/logger/_taipy_logger.py b/taipy/logger/_taipy_logger.py index 43deaac345..08fe18f58b 100644 --- a/taipy/logger/_taipy_logger.py +++ b/taipy/logger/_taipy_logger.py @@ -34,7 +34,7 @@ def _get_logger(cls): cls.__logger.setLevel(logging.INFO) ch = logging.StreamHandler(sys.stdout) ch.setLevel(logging.INFO) - frmter = logging.Formatter("[%(asctime)s.%03d][%(name)s][%(levelname)s] %(message)s", "%Y-%m-%d %H:%M:%S") + frmter = logging.Formatter("[%(asctime)s][%(name)s][%(levelname)s] %(message)s", "%Y-%m-%d %H:%M:%S") ch.setFormatter(frmter) cls.__logger.addHandler(ch) return cls.__logger From b684dd4c7ff6d0aba519e612084091ac26f8c761 Mon Sep 17 00:00:00 2001 From: jrobinAV Date: Tue, 19 Mar 2024 13:41:18 +0100 Subject: [PATCH 47/62] add msecs again to default log format --- taipy/logger/_taipy_logger.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/taipy/logger/_taipy_logger.py b/taipy/logger/_taipy_logger.py index 08fe18f58b..43deaac345 100644 --- a/taipy/logger/_taipy_logger.py +++ b/taipy/logger/_taipy_logger.py @@ -34,7 +34,7 @@ def _get_logger(cls): cls.__logger.setLevel(logging.INFO) ch = logging.StreamHandler(sys.stdout) ch.setLevel(logging.INFO) - frmter = logging.Formatter("[%(asctime)s][%(name)s][%(levelname)s] %(message)s", "%Y-%m-%d %H:%M:%S") + frmter = logging.Formatter("[%(asctime)s.%03d][%(name)s][%(levelname)s] %(message)s", "%Y-%m-%d %H:%M:%S") ch.setFormatter(frmter) cls.__logger.addHandler(ch) return cls.__logger From 474d439b950e70d83b75aae1f481adb54e515c30 Mon Sep 17 00:00:00 2001 From: jrobinAV Date: Tue, 19 Mar 2024 13:45:41 +0100 Subject: [PATCH 48/62] add msecs properly to default log format --- taipy/logger/_taipy_logger.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/taipy/logger/_taipy_logger.py b/taipy/logger/_taipy_logger.py index 43deaac345..2f1f26f823 100644 --- a/taipy/logger/_taipy_logger.py +++ b/taipy/logger/_taipy_logger.py @@ -34,7 +34,7 @@ def _get_logger(cls): cls.__logger.setLevel(logging.INFO) ch = logging.StreamHandler(sys.stdout) ch.setLevel(logging.INFO) - frmter = logging.Formatter("[%(asctime)s.%03d][%(name)s][%(levelname)s] %(message)s", "%Y-%m-%d %H:%M:%S") - ch.setFormatter(frmter) + f = logging.Formatter("[%(asctime)s.%(msecs)03d][%(name)s][%(levelname)s] %(message)s", "%Y-%m-%d %H:%M:%S") + ch.setFormatter(f) cls.__logger.addHandler(ch) return cls.__logger From 598d6a0be107f4cc60e8eb4b7f81e978eb0c477d Mon Sep 17 00:00:00 2001 From: jrobinAV Date: Tue, 19 Mar 2024 13:47:35 +0100 Subject: [PATCH 49/62] improve debug log --- .../_orchestrator/_dispatcher/_standalone_job_dispatcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py b/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py index 7539c6f1b7..c720604f78 100644 --- a/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py +++ b/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py @@ -40,7 +40,7 @@ def __init__(self, orchestrator: _AbstractOrchestrator, subproc_initializer: Opt def _can_execute(self) -> bool: """Returns True if the dispatcher have resources to dispatch a job.""" with self._nb_available_workers_lock: - self._logger.debug(f"can execute a job ? {self._nb_available_workers}") + self._logger.debug(f"can execute a job ? {self._nb_available_workers} available workers.") return self._nb_available_workers > 0 def run(self): From 4afc4c3feeb84c48955c1e0cef3a24b70aa3a212 Mon Sep 17 00:00:00 2001 From: jrobinAV Date: Tue, 19 Mar 2024 15:09:06 +0100 Subject: [PATCH 50/62] remove logs --- .../_dispatcher/_job_dispatcher.py | 6 ++-- .../_dispatcher/_standalone_job_dispatcher.py | 6 ++-- taipy/core/_orchestrator/_orchestrator.py | 30 +++++++++---------- 3 files changed, 21 insertions(+), 21 deletions(-) diff --git a/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py b/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py index c736d2d4c6..0f090c83bf 100644 --- a/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py +++ b/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py @@ -68,16 +68,16 @@ def run(self): continue with self.lock: - self._logger.debug("-------------------------> Acquired lock to execute job.") + # self._logger.debug("-------------------------> Acquired lock to execute job.") job = None try: if not self._STOP_FLAG: job = self.orchestrator.jobs_to_run.get(block=True, timeout=0.1) except Empty: # In case the last job of the queue has been removed. pass - self._logger.debug("-------------------------> Released lock to execute job.") + # self._logger.debug("-------------------------> Released lock to execute job.") if job: - self._logger.debug(f"-------------------------> Got job to execute {job.id}.") + # self._logger.debug(f"-------------------------> Got job to execute {job.id}.") try: if not self._STOP_FLAG: self._execute_job(job) diff --git a/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py b/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py index c720604f78..aab0a1210c 100644 --- a/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py +++ b/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py @@ -40,7 +40,7 @@ def __init__(self, orchestrator: _AbstractOrchestrator, subproc_initializer: Opt def _can_execute(self) -> bool: """Returns True if the dispatcher have resources to dispatch a job.""" with self._nb_available_workers_lock: - self._logger.debug(f"can execute a job ? {self._nb_available_workers} available workers.") + # self._logger.debug(f"can execute a job ? {self._nb_available_workers} available workers.") return self._nb_available_workers > 0 def run(self): @@ -56,7 +56,7 @@ def _dispatch(self, job: Job): """ with self._nb_available_workers_lock: self._nb_available_workers -= 1 - self._logger.debug(f"Changing nb_available_workers to {self._nb_available_workers} from dispatch") + # self._logger.debug(f"Changing nb_available_workers to {self._nb_available_workers} from dispatch") config_as_string = _TomlSerializer()._serialize(Config._applied_config) # type: ignore[attr-defined] future = self._executor.submit(_TaskFunctionWrapper(job.id, job.task), config_as_string=config_as_string) future.add_done_callback(partial(self._update_job_status_from_future, job)) @@ -64,5 +64,5 @@ def _dispatch(self, job: Job): def _update_job_status_from_future(self, job: Job, ft): with self._nb_available_workers_lock: self._nb_available_workers += 1 - self._logger.debug(f"Changing nb_available_workers to {self._nb_available_workers} from callback") + # self._logger.debug(f"Changing nb_available_workers to {self._nb_available_workers} from callback") self._update_job_status(job, ft.result()) diff --git a/taipy/core/_orchestrator/_orchestrator.py b/taipy/core/_orchestrator/_orchestrator.py index 028f5ad4e4..f4fc42cf7d 100644 --- a/taipy/core/_orchestrator/_orchestrator.py +++ b/taipy/core/_orchestrator/_orchestrator.py @@ -80,7 +80,7 @@ def submit( jobs: List[Job] = [] tasks = submittable._get_sorted_tasks() with cls.lock: - cls.__logger.debug(f"-------------------------> Acquired lock to submit {submittable.id}.") # type: ignore + # cls.__logger.debug(f"-------------------------> Acquired lock to submit {submittable.id}.") # type: ignore for ts in tasks: jobs.extend( cls._lock_dn_output_and_create_job( @@ -94,7 +94,7 @@ def submit( ) submission.jobs = jobs # type: ignore cls._orchestrate_job_to_run_or_block(jobs) - cls.__logger.debug(f"-------------------------> Released lock after submitting {submission.id}.") + # cls.__logger.debug(f"-------------------------> Released lock after submitting {submission.id}.") if Config.job_config.is_development: cls._check_and_execute_jobs_if_development_mode() elif wait: @@ -131,7 +131,7 @@ def submit_task( ) submit_id = submission.id with cls.lock: - cls.__logger.debug("-------------------------> Acquired lock to submit task.") + # cls.__logger.debug("-------------------------> Acquired lock to submit task.") job = cls._lock_dn_output_and_create_job( task, submit_id, @@ -142,7 +142,7 @@ def submit_task( jobs = [job] submission.jobs = jobs # type: ignore cls._orchestrate_job_to_run_or_block(jobs) - cls.__logger.debug(f"-------------------------> Released lock after submitting task {task.id}.") + # cls.__logger.debug(f"-------------------------> Released lock after submitting task {task.id}.") if Config.job_config.is_development: cls._check_and_execute_jobs_if_development_mode() else: @@ -234,20 +234,20 @@ def _on_status_change(cls, job: Job): @classmethod def __unblock_jobs(cls): - cls.__logger.debug(" Entering __unblock_jobs.") + # cls.__logger.debug(" Entering __unblock_jobs.") with cls.lock: - cls.__logger.debug("-------------------------> Acquired lock to unblock jobs.") + # cls.__logger.debug("-------------------------> Acquired lock to unblock jobs.") for job in cls.blocked_jobs: - cls.__logger.debug(f" Unblocking {job.id} ?") + # cls.__logger.debug(f" Unblocking {job.id} ?") if not cls._is_blocked(job): - cls.__logger.debug(f" Unblocking {job.id} !") + # cls.__logger.debug(f" Unblocking {job.id} !") job.pending() - cls.__logger.debug(f" Removing {job.id} from the blocked list.") + # cls.__logger.debug(f" Removing {job.id} from the blocked list.") cls.__remove_blocked_job(job) - cls.__logger.debug(f" Adding {job.id} to the list of jobs to run.") + # cls.__logger.debug(f" Adding {job.id} to the list of jobs to run.") cls.jobs_to_run.put(job) - cls.__logger.debug("-------------------------> Released lock after unblocking jobs.") - cls.__logger.debug(" Exiting __unblock_jobs.") + # cls.__logger.debug("-------------------------> Released lock after unblocking jobs.") + # cls.__logger.debug(" Exiting __unblock_jobs.") @classmethod def __remove_blocked_job(cls, job): @@ -266,14 +266,14 @@ def cancel_job(cls, job: Job): cls.__logger.info(f"{job.id} has already failed and cannot be canceled.") else: with cls.lock: - cls.__logger.debug(f"-------------------------> Acquired lock to cancel job {job.id}.") + # cls.__logger.debug(f"-------------------------> Acquired lock to cancel job {job.id}.") to_cancel_or_abandon_jobs = {job} to_cancel_or_abandon_jobs.update(cls.__find_subsequent_jobs(job.submit_id, set(job.task.output.keys()))) cls.__remove_blocked_jobs(to_cancel_or_abandon_jobs) cls.__remove_jobs_to_run(to_cancel_or_abandon_jobs) cls._cancel_jobs(job.id, to_cancel_or_abandon_jobs) cls._unlock_edit_on_jobs_outputs(to_cancel_or_abandon_jobs) - cls.__logger.debug(f"-------------------------> Released lock after canceling {job.id}.") + # cls.__logger.debug(f"-------------------------> Released lock after canceling {job.id}.") @classmethod def __find_subsequent_jobs(cls, submit_id, output_dn_config_ids: Set) -> Set[Job]: @@ -307,7 +307,7 @@ def __remove_jobs_to_run(cls, jobs): @classmethod def _fail_subsequent_jobs(cls, failed_job: Job): with cls.lock: - cls.__logger.debug("-------------------------> Acquired lock to fail subsequent jobs.") + # cls.__logger.debug("-------------------------> Acquired lock to fail subsequent jobs.") to_fail_or_abandon_jobs = set() to_fail_or_abandon_jobs.update( cls.__find_subsequent_jobs(failed_job.submit_id, set(failed_job.task.output.keys())) From 612b274981b0df4ecd7b82a4862ddf45f15f20a0 Mon Sep 17 00:00:00 2001 From: jrobinAV Date: Tue, 19 Mar 2024 15:12:31 +0100 Subject: [PATCH 51/62] remove logs --- taipy/core/_orchestrator/_orchestrator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/taipy/core/_orchestrator/_orchestrator.py b/taipy/core/_orchestrator/_orchestrator.py index f4fc42cf7d..02ed8bae3d 100644 --- a/taipy/core/_orchestrator/_orchestrator.py +++ b/taipy/core/_orchestrator/_orchestrator.py @@ -80,7 +80,7 @@ def submit( jobs: List[Job] = [] tasks = submittable._get_sorted_tasks() with cls.lock: - # cls.__logger.debug(f"-------------------------> Acquired lock to submit {submittable.id}.") # type: ignore + # cls.__logger.debug(f"------------------------->Acquired lock to submit {submittable.id}.") # type: ignore for ts in tasks: jobs.extend( cls._lock_dn_output_and_create_job( From 3961ce306aacba5e45d8a2dd76eec65e0c9ce7ba Mon Sep 17 00:00:00 2001 From: jrobinAV Date: Tue, 19 Mar 2024 15:50:38 +0100 Subject: [PATCH 52/62] Revert "remove logs" This reverts commit 612b274981b0df4ecd7b82a4862ddf45f15f20a0. --- taipy/core/_orchestrator/_orchestrator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/taipy/core/_orchestrator/_orchestrator.py b/taipy/core/_orchestrator/_orchestrator.py index 02ed8bae3d..f4fc42cf7d 100644 --- a/taipy/core/_orchestrator/_orchestrator.py +++ b/taipy/core/_orchestrator/_orchestrator.py @@ -80,7 +80,7 @@ def submit( jobs: List[Job] = [] tasks = submittable._get_sorted_tasks() with cls.lock: - # cls.__logger.debug(f"------------------------->Acquired lock to submit {submittable.id}.") # type: ignore + # cls.__logger.debug(f"-------------------------> Acquired lock to submit {submittable.id}.") # type: ignore for ts in tasks: jobs.extend( cls._lock_dn_output_and_create_job( From 4a4f2f3cb0dc1b191e0639b97505652bace0009c Mon Sep 17 00:00:00 2001 From: jrobinAV Date: Tue, 19 Mar 2024 15:50:44 +0100 Subject: [PATCH 53/62] Revert "remove logs" This reverts commit 4afc4c3feeb84c48955c1e0cef3a24b70aa3a212. --- .../_dispatcher/_job_dispatcher.py | 6 ++-- .../_dispatcher/_standalone_job_dispatcher.py | 6 ++-- taipy/core/_orchestrator/_orchestrator.py | 30 +++++++++---------- 3 files changed, 21 insertions(+), 21 deletions(-) diff --git a/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py b/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py index 0f090c83bf..c736d2d4c6 100644 --- a/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py +++ b/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py @@ -68,16 +68,16 @@ def run(self): continue with self.lock: - # self._logger.debug("-------------------------> Acquired lock to execute job.") + self._logger.debug("-------------------------> Acquired lock to execute job.") job = None try: if not self._STOP_FLAG: job = self.orchestrator.jobs_to_run.get(block=True, timeout=0.1) except Empty: # In case the last job of the queue has been removed. pass - # self._logger.debug("-------------------------> Released lock to execute job.") + self._logger.debug("-------------------------> Released lock to execute job.") if job: - # self._logger.debug(f"-------------------------> Got job to execute {job.id}.") + self._logger.debug(f"-------------------------> Got job to execute {job.id}.") try: if not self._STOP_FLAG: self._execute_job(job) diff --git a/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py b/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py index aab0a1210c..c720604f78 100644 --- a/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py +++ b/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py @@ -40,7 +40,7 @@ def __init__(self, orchestrator: _AbstractOrchestrator, subproc_initializer: Opt def _can_execute(self) -> bool: """Returns True if the dispatcher have resources to dispatch a job.""" with self._nb_available_workers_lock: - # self._logger.debug(f"can execute a job ? {self._nb_available_workers} available workers.") + self._logger.debug(f"can execute a job ? {self._nb_available_workers} available workers.") return self._nb_available_workers > 0 def run(self): @@ -56,7 +56,7 @@ def _dispatch(self, job: Job): """ with self._nb_available_workers_lock: self._nb_available_workers -= 1 - # self._logger.debug(f"Changing nb_available_workers to {self._nb_available_workers} from dispatch") + self._logger.debug(f"Changing nb_available_workers to {self._nb_available_workers} from dispatch") config_as_string = _TomlSerializer()._serialize(Config._applied_config) # type: ignore[attr-defined] future = self._executor.submit(_TaskFunctionWrapper(job.id, job.task), config_as_string=config_as_string) future.add_done_callback(partial(self._update_job_status_from_future, job)) @@ -64,5 +64,5 @@ def _dispatch(self, job: Job): def _update_job_status_from_future(self, job: Job, ft): with self._nb_available_workers_lock: self._nb_available_workers += 1 - # self._logger.debug(f"Changing nb_available_workers to {self._nb_available_workers} from callback") + self._logger.debug(f"Changing nb_available_workers to {self._nb_available_workers} from callback") self._update_job_status(job, ft.result()) diff --git a/taipy/core/_orchestrator/_orchestrator.py b/taipy/core/_orchestrator/_orchestrator.py index f4fc42cf7d..028f5ad4e4 100644 --- a/taipy/core/_orchestrator/_orchestrator.py +++ b/taipy/core/_orchestrator/_orchestrator.py @@ -80,7 +80,7 @@ def submit( jobs: List[Job] = [] tasks = submittable._get_sorted_tasks() with cls.lock: - # cls.__logger.debug(f"-------------------------> Acquired lock to submit {submittable.id}.") # type: ignore + cls.__logger.debug(f"-------------------------> Acquired lock to submit {submittable.id}.") # type: ignore for ts in tasks: jobs.extend( cls._lock_dn_output_and_create_job( @@ -94,7 +94,7 @@ def submit( ) submission.jobs = jobs # type: ignore cls._orchestrate_job_to_run_or_block(jobs) - # cls.__logger.debug(f"-------------------------> Released lock after submitting {submission.id}.") + cls.__logger.debug(f"-------------------------> Released lock after submitting {submission.id}.") if Config.job_config.is_development: cls._check_and_execute_jobs_if_development_mode() elif wait: @@ -131,7 +131,7 @@ def submit_task( ) submit_id = submission.id with cls.lock: - # cls.__logger.debug("-------------------------> Acquired lock to submit task.") + cls.__logger.debug("-------------------------> Acquired lock to submit task.") job = cls._lock_dn_output_and_create_job( task, submit_id, @@ -142,7 +142,7 @@ def submit_task( jobs = [job] submission.jobs = jobs # type: ignore cls._orchestrate_job_to_run_or_block(jobs) - # cls.__logger.debug(f"-------------------------> Released lock after submitting task {task.id}.") + cls.__logger.debug(f"-------------------------> Released lock after submitting task {task.id}.") if Config.job_config.is_development: cls._check_and_execute_jobs_if_development_mode() else: @@ -234,20 +234,20 @@ def _on_status_change(cls, job: Job): @classmethod def __unblock_jobs(cls): - # cls.__logger.debug(" Entering __unblock_jobs.") + cls.__logger.debug(" Entering __unblock_jobs.") with cls.lock: - # cls.__logger.debug("-------------------------> Acquired lock to unblock jobs.") + cls.__logger.debug("-------------------------> Acquired lock to unblock jobs.") for job in cls.blocked_jobs: - # cls.__logger.debug(f" Unblocking {job.id} ?") + cls.__logger.debug(f" Unblocking {job.id} ?") if not cls._is_blocked(job): - # cls.__logger.debug(f" Unblocking {job.id} !") + cls.__logger.debug(f" Unblocking {job.id} !") job.pending() - # cls.__logger.debug(f" Removing {job.id} from the blocked list.") + cls.__logger.debug(f" Removing {job.id} from the blocked list.") cls.__remove_blocked_job(job) - # cls.__logger.debug(f" Adding {job.id} to the list of jobs to run.") + cls.__logger.debug(f" Adding {job.id} to the list of jobs to run.") cls.jobs_to_run.put(job) - # cls.__logger.debug("-------------------------> Released lock after unblocking jobs.") - # cls.__logger.debug(" Exiting __unblock_jobs.") + cls.__logger.debug("-------------------------> Released lock after unblocking jobs.") + cls.__logger.debug(" Exiting __unblock_jobs.") @classmethod def __remove_blocked_job(cls, job): @@ -266,14 +266,14 @@ def cancel_job(cls, job: Job): cls.__logger.info(f"{job.id} has already failed and cannot be canceled.") else: with cls.lock: - # cls.__logger.debug(f"-------------------------> Acquired lock to cancel job {job.id}.") + cls.__logger.debug(f"-------------------------> Acquired lock to cancel job {job.id}.") to_cancel_or_abandon_jobs = {job} to_cancel_or_abandon_jobs.update(cls.__find_subsequent_jobs(job.submit_id, set(job.task.output.keys()))) cls.__remove_blocked_jobs(to_cancel_or_abandon_jobs) cls.__remove_jobs_to_run(to_cancel_or_abandon_jobs) cls._cancel_jobs(job.id, to_cancel_or_abandon_jobs) cls._unlock_edit_on_jobs_outputs(to_cancel_or_abandon_jobs) - # cls.__logger.debug(f"-------------------------> Released lock after canceling {job.id}.") + cls.__logger.debug(f"-------------------------> Released lock after canceling {job.id}.") @classmethod def __find_subsequent_jobs(cls, submit_id, output_dn_config_ids: Set) -> Set[Job]: @@ -307,7 +307,7 @@ def __remove_jobs_to_run(cls, jobs): @classmethod def _fail_subsequent_jobs(cls, failed_job: Job): with cls.lock: - # cls.__logger.debug("-------------------------> Acquired lock to fail subsequent jobs.") + cls.__logger.debug("-------------------------> Acquired lock to fail subsequent jobs.") to_fail_or_abandon_jobs = set() to_fail_or_abandon_jobs.update( cls.__find_subsequent_jobs(failed_job.submit_id, set(failed_job.task.output.keys())) From c8811a1cddc21de7ddb8a118ca46b1922d1a0662 Mon Sep 17 00:00:00 2001 From: jrobinAV Date: Wed, 20 Mar 2024 10:20:19 +0100 Subject: [PATCH 54/62] attempt to wait 1 second before stopping dispatcher --- .../_dispatcher/_job_dispatcher.py | 8 +++---- .../_dispatcher/_standalone_job_dispatcher.py | 8 +++---- taipy/core/_orchestrator/_orchestrator.py | 21 +++++++++---------- 3 files changed, 18 insertions(+), 19 deletions(-) diff --git a/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py b/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py index c736d2d4c6..423cb4ca9b 100644 --- a/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py +++ b/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py @@ -68,16 +68,16 @@ def run(self): continue with self.lock: - self._logger.debug("-------------------------> Acquired lock to execute job.") + self._logger.debug("Acquiring lock to check jobs to run.") job = None try: if not self._STOP_FLAG: job = self.orchestrator.jobs_to_run.get(block=True, timeout=0.1) except Empty: # In case the last job of the queue has been removed. pass - self._logger.debug("-------------------------> Released lock to execute job.") + self._logger.debug(f"Releasing lock after checking jobs to run.") if job: - self._logger.debug(f"-------------------------> Got job to execute {job.id}.") + self._logger.debug(f"Got a job to execute {job.id}.") try: if not self._STOP_FLAG: self._execute_job(job) @@ -85,7 +85,7 @@ def run(self): self.orchestrator.jobs_to_run.put(job) except Exception as e: self._logger.exception(e) - + time.sleep(1) self._logger.debug("Job dispatcher stopped.") @abstractmethod diff --git a/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py b/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py index c720604f78..f8df25a738 100644 --- a/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py +++ b/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py @@ -40,13 +40,13 @@ def __init__(self, orchestrator: _AbstractOrchestrator, subproc_initializer: Opt def _can_execute(self) -> bool: """Returns True if the dispatcher have resources to dispatch a job.""" with self._nb_available_workers_lock: - self._logger.debug(f"can execute a job ? {self._nb_available_workers} available workers.") + self._logger.debug(f"Can execute a job ? {self._nb_available_workers} available workers.") return self._nb_available_workers > 0 def run(self): with self._executor: super().run() - self._logger.debug("Standalone job dispatcher: Pool executor shut down") + self._logger.debug("Standalone job dispatcher: Pool executor shut down.") def _dispatch(self, job: Job): """Dispatches the given `Job^` on an available worker for execution. @@ -56,7 +56,7 @@ def _dispatch(self, job: Job): """ with self._nb_available_workers_lock: self._nb_available_workers -= 1 - self._logger.debug(f"Changing nb_available_workers to {self._nb_available_workers} from dispatch") + self._logger.debug(f"Changing nb_available_workers to {self._nb_available_workers} from dispatch method.") config_as_string = _TomlSerializer()._serialize(Config._applied_config) # type: ignore[attr-defined] future = self._executor.submit(_TaskFunctionWrapper(job.id, job.task), config_as_string=config_as_string) future.add_done_callback(partial(self._update_job_status_from_future, job)) @@ -64,5 +64,5 @@ def _dispatch(self, job: Job): def _update_job_status_from_future(self, job: Job, ft): with self._nb_available_workers_lock: self._nb_available_workers += 1 - self._logger.debug(f"Changing nb_available_workers to {self._nb_available_workers} from callback") + self._logger.debug(f"Changing nb_available_workers to {self._nb_available_workers} from callback method.") self._update_job_status(job, ft.result()) diff --git a/taipy/core/_orchestrator/_orchestrator.py b/taipy/core/_orchestrator/_orchestrator.py index 028f5ad4e4..82cc3b855a 100644 --- a/taipy/core/_orchestrator/_orchestrator.py +++ b/taipy/core/_orchestrator/_orchestrator.py @@ -80,7 +80,7 @@ def submit( jobs: List[Job] = [] tasks = submittable._get_sorted_tasks() with cls.lock: - cls.__logger.debug(f"-------------------------> Acquired lock to submit {submittable.id}.") # type: ignore + cls.__logger.debug(f"Acquiring lock to submit {submission.entity_id}.") for ts in tasks: jobs.extend( cls._lock_dn_output_and_create_job( @@ -94,7 +94,7 @@ def submit( ) submission.jobs = jobs # type: ignore cls._orchestrate_job_to_run_or_block(jobs) - cls.__logger.debug(f"-------------------------> Released lock after submitting {submission.id}.") + cls.__logger.debug(f"Releasing lock after submitting {submission.entity_id}.") if Config.job_config.is_development: cls._check_and_execute_jobs_if_development_mode() elif wait: @@ -131,7 +131,7 @@ def submit_task( ) submit_id = submission.id with cls.lock: - cls.__logger.debug("-------------------------> Acquired lock to submit task.") + cls.__logger.debug(f"Acquiring lock to submit task {task.id}.") job = cls._lock_dn_output_and_create_job( task, submit_id, @@ -142,7 +142,7 @@ def submit_task( jobs = [job] submission.jobs = jobs # type: ignore cls._orchestrate_job_to_run_or_block(jobs) - cls.__logger.debug(f"-------------------------> Released lock after submitting task {task.id}.") + cls.__logger.debug(f"Releasing lock after submitting task {task.id}.") if Config.job_config.is_development: cls._check_and_execute_jobs_if_development_mode() else: @@ -234,9 +234,8 @@ def _on_status_change(cls, job: Job): @classmethod def __unblock_jobs(cls): - cls.__logger.debug(" Entering __unblock_jobs.") with cls.lock: - cls.__logger.debug("-------------------------> Acquired lock to unblock jobs.") + cls.__logger.debug("Acquiring lock to unblock jobs.") for job in cls.blocked_jobs: cls.__logger.debug(f" Unblocking {job.id} ?") if not cls._is_blocked(job): @@ -246,8 +245,7 @@ def __unblock_jobs(cls): cls.__remove_blocked_job(job) cls.__logger.debug(f" Adding {job.id} to the list of jobs to run.") cls.jobs_to_run.put(job) - cls.__logger.debug("-------------------------> Released lock after unblocking jobs.") - cls.__logger.debug(" Exiting __unblock_jobs.") + cls.__logger.debug("Releasing lock after unblocking jobs.") @classmethod def __remove_blocked_job(cls, job): @@ -266,14 +264,14 @@ def cancel_job(cls, job: Job): cls.__logger.info(f"{job.id} has already failed and cannot be canceled.") else: with cls.lock: - cls.__logger.debug(f"-------------------------> Acquired lock to cancel job {job.id}.") + cls.__logger.debug(f"Acquiring lock to cancel job {job.id}.") to_cancel_or_abandon_jobs = {job} to_cancel_or_abandon_jobs.update(cls.__find_subsequent_jobs(job.submit_id, set(job.task.output.keys()))) cls.__remove_blocked_jobs(to_cancel_or_abandon_jobs) cls.__remove_jobs_to_run(to_cancel_or_abandon_jobs) cls._cancel_jobs(job.id, to_cancel_or_abandon_jobs) cls._unlock_edit_on_jobs_outputs(to_cancel_or_abandon_jobs) - cls.__logger.debug(f"-------------------------> Released lock after canceling {job.id}.") + cls.__logger.debug(f"Releasing lock after canceling {job.id}.") @classmethod def __find_subsequent_jobs(cls, submit_id, output_dn_config_ids: Set) -> Set[Job]: @@ -307,7 +305,7 @@ def __remove_jobs_to_run(cls, jobs): @classmethod def _fail_subsequent_jobs(cls, failed_job: Job): with cls.lock: - cls.__logger.debug("-------------------------> Acquired lock to fail subsequent jobs.") + cls.__logger.debug("Acquiring lock to fail subsequent jobs.") to_fail_or_abandon_jobs = set() to_fail_or_abandon_jobs.update( cls.__find_subsequent_jobs(failed_job.submit_id, set(failed_job.task.output.keys())) @@ -318,6 +316,7 @@ def _fail_subsequent_jobs(cls, failed_job: Job): cls.__remove_blocked_jobs(to_fail_or_abandon_jobs) cls.__remove_jobs_to_run(to_fail_or_abandon_jobs) cls._unlock_edit_on_jobs_outputs(to_fail_or_abandon_jobs) + cls.__logger.debug("Releasing lock after fail subsequent jobs.") @classmethod def _cancel_jobs(cls, job_id_to_cancel: JobId, jobs: Set[Job]): From a5a9946b748c8791717097b62398f5508035ce59 Mon Sep 17 00:00:00 2001 From: jrobinAV Date: Wed, 20 Mar 2024 11:01:37 +0100 Subject: [PATCH 55/62] linter --- taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py b/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py index 423cb4ca9b..7c74009018 100644 --- a/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py +++ b/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py @@ -75,7 +75,7 @@ def run(self): job = self.orchestrator.jobs_to_run.get(block=True, timeout=0.1) except Empty: # In case the last job of the queue has been removed. pass - self._logger.debug(f"Releasing lock after checking jobs to run.") + self._logger.debug("Releasing lock after checking jobs to run.") if job: self._logger.debug(f"Got a job to execute {job.id}.") try: From 5d94b451999ea977d4c6e179bfd98b8d73fd9d70 Mon Sep 17 00:00:00 2001 From: jrobinAV Date: Thu, 21 Mar 2024 09:48:21 +0100 Subject: [PATCH 56/62] Remove Rlock and replace by Lock --- .../_orchestrator/_dispatcher/_standalone_job_dispatcher.py | 4 ++-- taipy/core/_orchestrator/_orchestrator.py | 4 ++-- taipy/core/submission/submission.py | 2 +- .../_orchestrator/_dispatcher/mock_standalone_dispatcher.py | 4 ++-- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py b/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py index f8df25a738..17630ca672 100644 --- a/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py +++ b/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py @@ -11,7 +11,7 @@ from concurrent.futures import Executor, ProcessPoolExecutor from functools import partial -from threading import RLock +from threading import Lock from typing import Callable, Optional from taipy.config._serializer._toml_serializer import _TomlSerializer @@ -26,7 +26,7 @@ class _StandaloneJobDispatcher(_JobDispatcher): """Manages job dispatching (instances of `Job^` class) in an asynchronous way using a ProcessPoolExecutor.""" - _nb_available_workers_lock = RLock() + _nb_available_workers_lock = Lock() def __init__(self, orchestrator: _AbstractOrchestrator, subproc_initializer: Optional[Callable] = None): super().__init__(orchestrator) diff --git a/taipy/core/_orchestrator/_orchestrator.py b/taipy/core/_orchestrator/_orchestrator.py index abfa13b2e9..ada30a3a59 100644 --- a/taipy/core/_orchestrator/_orchestrator.py +++ b/taipy/core/_orchestrator/_orchestrator.py @@ -12,7 +12,7 @@ import itertools from datetime import datetime from queue import Queue -from threading import RLock +from threading import Lock from time import sleep from typing import Callable, Iterable, List, Optional, Set, Union @@ -38,7 +38,7 @@ class _Orchestrator(_AbstractOrchestrator): jobs_to_run: Queue = Queue() blocked_jobs: List[Job] = [] - lock = RLock() + lock = Lock() __logger = _TaipyLogger._get_logger() @classmethod diff --git a/taipy/core/submission/submission.py b/taipy/core/submission/submission.py index 1061f0cbf3..b7606d22c3 100644 --- a/taipy/core/submission/submission.py +++ b/taipy/core/submission/submission.py @@ -44,7 +44,7 @@ class Submission(_Entity, _Labeled): _ID_PREFIX = "SUBMISSION" _MANAGER_NAME = "submission" __SEPARATOR = "_" - lock = threading.RLock() + lock = threading.Lock() __logger = _TaipyLogger._get_logger() def __init__( diff --git a/tests/core/_orchestrator/_dispatcher/mock_standalone_dispatcher.py b/tests/core/_orchestrator/_dispatcher/mock_standalone_dispatcher.py index 54f2864e5a..acad99bdca 100644 --- a/tests/core/_orchestrator/_dispatcher/mock_standalone_dispatcher.py +++ b/tests/core/_orchestrator/_dispatcher/mock_standalone_dispatcher.py @@ -10,7 +10,7 @@ # specific language governing permissions and limitations under the License. from concurrent.futures import Executor, Future -from threading import RLock +from threading import Lock from typing import List from taipy.core import Job @@ -40,7 +40,7 @@ def __init__(self, orchestrator: _AbstractOrchestrator): super(_StandaloneJobDispatcher, self).__init__(orchestrator) self._executor: Executor = MockProcessPoolExecutor() self._nb_available_workers = 1 - self._nb_available_workers_lock = RLock() + self._nb_available_workers_lock = Lock() self.dispatch_calls: List = [] self.update_job_status_from_future_calls: List = [] From 95679d29061e08c78d2cd2ce1db0ae3e7db265ea Mon Sep 17 00:00:00 2001 From: jrobinAV Date: Thu, 21 Mar 2024 09:50:20 +0100 Subject: [PATCH 57/62] use is_running instead of is_alive --- taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py b/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py index 7c74009018..d80f22d8dc 100644 --- a/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py +++ b/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py @@ -56,7 +56,7 @@ def stop(self, wait: bool = True, timeout: Optional[float] = None): timeout (Optional[float]): The maximum time to wait. If None, the method will wait indefinitely. """ self._STOP_FLAG = True - if wait and self.is_alive(): + if wait and self.is_running(): self._logger.debug("Waiting for the dispatcher thread to stop...") self.join(timeout=timeout) From 074bc947a51efe2b4370f2da151917de5c6fca0e Mon Sep 17 00:00:00 2001 From: jrobinAV Date: Thu, 21 Mar 2024 10:05:11 +0100 Subject: [PATCH 58/62] Linter --- .../_orchestrator/_dispatcher/test_standalone_job_dispatcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/core/_orchestrator/_dispatcher/test_standalone_job_dispatcher.py b/tests/core/_orchestrator/_dispatcher/test_standalone_job_dispatcher.py index 4b5b13bf0e..784ed587fd 100644 --- a/tests/core/_orchestrator/_dispatcher/test_standalone_job_dispatcher.py +++ b/tests/core/_orchestrator/_dispatcher/test_standalone_job_dispatcher.py @@ -88,7 +88,7 @@ def test_can_execute(): dispatcher._nb_available_workers = 1 assert dispatcher._can_execute() - + def test_update_job_status_from_future(): task = create_task() job = Job(JobId("job"), task, "s_id", task.id) From a88bd9a75cc046b1c03bf6820ae71f0ba0448ae5 Mon Sep 17 00:00:00 2001 From: jrobinAV Date: Thu, 21 Mar 2024 10:28:28 +0100 Subject: [PATCH 59/62] Fix wrong merge conflict resolution --- .../_dispatcher/test_standalone_job_dispatcher.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/core/_orchestrator/_dispatcher/test_standalone_job_dispatcher.py b/tests/core/_orchestrator/_dispatcher/test_standalone_job_dispatcher.py index 784ed587fd..40591e0534 100644 --- a/tests/core/_orchestrator/_dispatcher/test_standalone_job_dispatcher.py +++ b/tests/core/_orchestrator/_dispatcher/test_standalone_job_dispatcher.py @@ -96,9 +96,9 @@ def test_update_job_status_from_future(): dispatcher = _StandaloneJobDispatcher(orchestrator) ft = Future() ft.set_result(None) - assert dispatcher._nb_available_workers == 1 - dispatcher._update_job_status_from_future(job, ft) assert dispatcher._nb_available_workers == 2 + dispatcher._update_job_status_from_future(job, ft) + assert dispatcher._nb_available_workers == 3 assert job.is_completed() From 1aa87c703fbb0c790143fcb89c322fc67bcc0e0d Mon Sep 17 00:00:00 2001 From: jrobinAV Date: Thu, 21 Mar 2024 14:22:10 +0100 Subject: [PATCH 60/62] reduce and clean logs --- .../_dispatcher/_standalone_job_dispatcher.py | 6 +++--- taipy/core/_orchestrator/_orchestrator.py | 7 +++---- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py b/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py index 0a0d865fb6..849f9b5901 100644 --- a/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py +++ b/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py @@ -41,7 +41,7 @@ def __init__(self, orchestrator: _AbstractOrchestrator, subproc_initializer: Opt def _can_execute(self) -> bool: """Returns True if the dispatcher have resources to dispatch a job.""" with self._nb_available_workers_lock: - self._logger.debug(f"Can execute a job ? {self._nb_available_workers} available workers.") + self._logger.debug(f"{self._nb_available_workers=}") return self._nb_available_workers > 0 def run(self): @@ -57,7 +57,7 @@ def _dispatch(self, job: Job): """ with self._nb_available_workers_lock: self._nb_available_workers -= 1 - self._logger.debug(f"Changing nb_available_workers to {self._nb_available_workers} from dispatch method.") + self._logger.debug(f"Setting nb_available_workers to {self._nb_available_workers} in the dispatch method.") config_as_string = _TomlSerializer()._serialize(Config._applied_config) # type: ignore[attr-defined] future = self._executor.submit(_TaskFunctionWrapper(job.id, job.task), config_as_string=config_as_string) future.add_done_callback(partial(self._update_job_status_from_future, job)) @@ -65,5 +65,5 @@ def _dispatch(self, job: Job): def _update_job_status_from_future(self, job: Job, ft): with self._nb_available_workers_lock: self._nb_available_workers += 1 - self._logger.debug(f"Changing nb_available_workers to {self._nb_available_workers} from callback method.") + self._logger.debug(f"Setting nb_available_workers to {self._nb_available_workers} in the callback method.") self._update_job_status(job, ft.result()) diff --git a/taipy/core/_orchestrator/_orchestrator.py b/taipy/core/_orchestrator/_orchestrator.py index ada30a3a59..8bd149e475 100644 --- a/taipy/core/_orchestrator/_orchestrator.py +++ b/taipy/core/_orchestrator/_orchestrator.py @@ -237,13 +237,12 @@ def __unblock_jobs(cls): with cls.lock: cls.__logger.debug("Acquiring lock to unblock jobs.") for job in cls.blocked_jobs: - cls.__logger.debug(f" Unblocking {job.id} ?") if not cls._is_blocked(job): - cls.__logger.debug(f" Unblocking {job.id} !") + cls.__logger.debug(f"Unblocking job: {job.id}.") job.pending() - cls.__logger.debug(f" Removing {job.id} from the blocked list.") + cls.__logger.debug(f"Removing job {job.id} from the blocked_job list.") cls.__remove_blocked_job(job) - cls.__logger.debug(f" Adding {job.id} to the list of jobs to run.") + cls.__logger.debug(f"Adding job {job.id} to the list of jobs to run.") cls.jobs_to_run.put(job) cls.__logger.debug("Releasing lock after unblocking jobs.") From 4a329567659e22e45f827bcc42b496a265651a3a Mon Sep 17 00:00:00 2001 From: jrobinAV Date: Thu, 21 Mar 2024 14:46:45 +0100 Subject: [PATCH 61/62] Removing logs about releasing lock --- taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py | 3 +-- taipy/core/_orchestrator/_orchestrator.py | 5 ----- 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py b/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py index d80f22d8dc..a4fe173f32 100644 --- a/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py +++ b/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py @@ -64,7 +64,7 @@ def run(self): self._logger.debug("Job dispatcher started.") while not self._STOP_FLAG: if not self._can_execute(): - time.sleep(0.1) # We need to sleep to avoid busy waiting. + time.sleep(0.2) # We need to sleep to avoid busy waiting. continue with self.lock: @@ -75,7 +75,6 @@ def run(self): job = self.orchestrator.jobs_to_run.get(block=True, timeout=0.1) except Empty: # In case the last job of the queue has been removed. pass - self._logger.debug("Releasing lock after checking jobs to run.") if job: self._logger.debug(f"Got a job to execute {job.id}.") try: diff --git a/taipy/core/_orchestrator/_orchestrator.py b/taipy/core/_orchestrator/_orchestrator.py index 8bd149e475..5f1df0e286 100644 --- a/taipy/core/_orchestrator/_orchestrator.py +++ b/taipy/core/_orchestrator/_orchestrator.py @@ -94,7 +94,6 @@ def submit( ) submission.jobs = jobs # type: ignore cls._orchestrate_job_to_run_or_block(jobs) - cls.__logger.debug(f"Releasing lock after submitting {submission.entity_id}.") if Config.job_config.is_development: cls._check_and_execute_jobs_if_development_mode() elif wait: @@ -142,7 +141,6 @@ def submit_task( jobs = [job] submission.jobs = jobs # type: ignore cls._orchestrate_job_to_run_or_block(jobs) - cls.__logger.debug(f"Releasing lock after submitting task {task.id}.") if Config.job_config.is_development: cls._check_and_execute_jobs_if_development_mode() else: @@ -244,7 +242,6 @@ def __unblock_jobs(cls): cls.__remove_blocked_job(job) cls.__logger.debug(f"Adding job {job.id} to the list of jobs to run.") cls.jobs_to_run.put(job) - cls.__logger.debug("Releasing lock after unblocking jobs.") @classmethod def __remove_blocked_job(cls, job): @@ -270,7 +267,6 @@ def cancel_job(cls, job: Job): cls.__remove_jobs_to_run(to_cancel_or_abandon_jobs) cls._cancel_jobs(job.id, to_cancel_or_abandon_jobs) cls._unlock_edit_on_jobs_outputs(to_cancel_or_abandon_jobs) - cls.__logger.debug(f"Releasing lock after canceling {job.id}.") @classmethod def __find_subsequent_jobs(cls, submit_id, output_dn_config_ids: Set) -> Set[Job]: @@ -315,7 +311,6 @@ def _fail_subsequent_jobs(cls, failed_job: Job): cls.__remove_blocked_jobs(to_fail_or_abandon_jobs) cls.__remove_jobs_to_run(to_fail_or_abandon_jobs) cls._unlock_edit_on_jobs_outputs(to_fail_or_abandon_jobs) - cls.__logger.debug("Releasing lock after fail subsequent jobs.") @classmethod def _cancel_jobs(cls, job_id_to_cancel: JobId, jobs: Set[Job]): From 935adaebab9fdfddaecddf12e47c56280c26dfdd Mon Sep 17 00:00:00 2001 From: jrobinAV Date: Fri, 22 Mar 2024 10:35:32 +0100 Subject: [PATCH 62/62] Removing useless sleep --- taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py | 1 - 1 file changed, 1 deletion(-) diff --git a/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py b/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py index a4fe173f32..c59ea6fd12 100644 --- a/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py +++ b/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py @@ -84,7 +84,6 @@ def run(self): self.orchestrator.jobs_to_run.put(job) except Exception as e: self._logger.exception(e) - time.sleep(1) self._logger.debug("Job dispatcher stopped.") @abstractmethod