From 63179fcea7e5cfdf19e2534cac22895b406d4931 Mon Sep 17 00:00:00 2001 From: trgiangdo Date: Wed, 28 Feb 2024 23:35:48 +0700 Subject: [PATCH 01/10] fix: reset threading lock at conftest --- tests/core/conftest.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/core/conftest.py b/tests/core/conftest.py index 0583c0639a..915b9560ab 100644 --- a/tests/core/conftest.py +++ b/tests/core/conftest.py @@ -14,6 +14,7 @@ import shutil from datetime import datetime from queue import Queue +from threading import Lock from unittest.mock import patch import pandas as pd @@ -378,6 +379,7 @@ def _init_orchestrator(): _OrchestratorFactory._build_dispatcher(force_restart=True) _OrchestratorFactory._orchestrator.jobs_to_run = Queue() _OrchestratorFactory._orchestrator.blocked_jobs = [] + _OrchestratorFactory._orchestrator.lock = Lock() return _init_orchestrator From 8aa5e7f868de47699f3d6066a790eaec9bbf3d00 Mon Sep 17 00:00:00 2001 From: trgiangdo Date: Thu, 29 Feb 2024 16:47:49 +0700 Subject: [PATCH 02/10] fix: remove redundant function and lock in test_job_manager.py --- tests/core/job/test_job_manager.py | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/tests/core/job/test_job_manager.py b/tests/core/job/test_job_manager.py index c479bcb9e6..abf65e2360 100644 --- a/tests/core/job/test_job_manager.py +++ b/tests/core/job/test_job_manager.py @@ -138,15 +138,6 @@ def test_delete_job(): assert _JobManager._get(job_1.id) is None -m = multiprocessing.Manager() -lock = m.Lock() - - -def inner_lock_multiply(nb1: float, nb2: float): - with lock: - return multiply(1 or nb1, 2 or nb2) - - def test_raise_when_trying_to_delete_unfinished_job(): Config.configure_job_executions(mode=JobConfig._STANDALONE_MODE, max_nb_of_workers=2) m = multiprocessing.Manager() @@ -326,6 +317,7 @@ def test_cancel_subsequent_jobs(): orchestrator = _OrchestratorFactory._orchestrator submission_manager = _SubmissionManagerFactory._build_manager() + m = multiprocessing.Manager() lock_0 = m.Lock() dn_1 = InMemoryDataNode("dn_config_1", Scope.SCENARIO, properties={"default_data": 1}) From a345c6db9d2bcac81b462cb8cfeb4ac01ef1887a Mon Sep 17 00:00:00 2001 From: trgiangdo Date: Tue, 5 Mar 2024 17:58:21 +0700 Subject: [PATCH 03/10] fix: minor code clean up --- taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py | 5 +---- taipy/core/_orchestrator/_orchestrator.py | 2 +- taipy/core/submission/submission.py | 3 ++- taipy/gui_core/_adapters.py | 2 ++ 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py b/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py index 1d139710d5..6bdc935d98 100644 --- a/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py +++ b/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py @@ -75,10 +75,7 @@ def run(self): pass except Exception as e: self._logger.exception(e) - pass - if self.stop_wait: - self._logger.debug("Waiting for the dispatcher thread to stop...") - self.join(timeout=self.stop_timeout) + self._logger.debug("Job dispatcher stopped.") @abstractmethod diff --git a/taipy/core/_orchestrator/_orchestrator.py b/taipy/core/_orchestrator/_orchestrator.py index 97e0bea369..56f860f720 100644 --- a/taipy/core/_orchestrator/_orchestrator.py +++ b/taipy/core/_orchestrator/_orchestrator.py @@ -36,7 +36,7 @@ class _Orchestrator(_AbstractOrchestrator): """ jobs_to_run: Queue = Queue() - blocked_jobs: List = [] + blocked_jobs: List[Job] = [] lock = Lock() __logger = _TaipyLogger._get_logger() diff --git a/taipy/core/submission/submission.py b/taipy/core/submission/submission.py index 5d69d8a86e..743621d52b 100644 --- a/taipy/core/submission/submission.py +++ b/taipy/core/submission/submission.py @@ -196,6 +196,7 @@ def __ge__(self, other): def _update_submission_status(self, job: Job): from ._submission_manager_factory import _SubmissionManagerFactory + with self.lock: submission_manager = _SubmissionManagerFactory._build_manager() submission = submission_manager._get(self) @@ -205,7 +206,7 @@ def _update_submission_status(self, job: Job): job_status = job.status if job_status == Status.FAILED: submission._submission_status = SubmissionStatus.FAILED - _SubmissionManagerFactory._build_manager()._set(submission) + submission_manager._set(submission) return if job_status == Status.CANCELED: submission._is_canceled = True diff --git a/taipy/gui_core/_adapters.py b/taipy/gui_core/_adapters.py index 7ace1a56d8..dd61a53c8e 100644 --- a/taipy/gui_core/_adapters.py +++ b/taipy/gui_core/_adapters.py @@ -18,6 +18,7 @@ Job, Scenario, Sequence, + Submission, Task, is_deletable, is_editable, @@ -43,6 +44,7 @@ def __repr__(self): Cycle.__bases__ += (_GCDoNotUpdate,) Job.__bases__ += (_GCDoNotUpdate,) Task.__bases__ += (_GCDoNotUpdate,) +Submission.__bases__ += (_GCDoNotUpdate,) class _EntityType(Enum): From ab5ec5a9f9c357cc716d0717774417c9f128c550 Mon Sep 17 00:00:00 2001 From: trgiangdo Date: Tue, 5 Mar 2024 17:58:59 +0700 Subject: [PATCH 04/10] fix: reset everything after each test as well --- .../_orchestrator/test_orchestrator_factory.py | 3 +++ tests/core/config/test_override_config.py | 5 +++-- tests/core/conftest.py | 17 +++++++++-------- 3 files changed, 15 insertions(+), 10 deletions(-) diff --git a/tests/core/_orchestrator/test_orchestrator_factory.py b/tests/core/_orchestrator/test_orchestrator_factory.py index d4e387d42f..f7536cac31 100644 --- a/tests/core/_orchestrator/test_orchestrator_factory.py +++ b/tests/core/_orchestrator/test_orchestrator_factory.py @@ -111,6 +111,9 @@ def test_build_unknown_dispatcher(): _OrchestratorFactory._build_dispatcher() assert _OrchestratorFactory._dispatcher is None + Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE) + _OrchestratorFactory._build_dispatcher() + def test_remove_dispatcher_not_built(): _OrchestratorFactory._dispatcher = None diff --git a/tests/core/config/test_override_config.py b/tests/core/config/test_override_config.py index 192809535c..e7406422ce 100644 --- a/tests/core/config/test_override_config.py +++ b/tests/core/config/test_override_config.py @@ -60,13 +60,14 @@ def test_override_default_configuration_with_code_configuration(): def test_override_default_config_with_code_config_including_env_variable_values(): Config.configure_core() assert Config.core.repository_type == "filesystem" - Config.configure_core(repository_type="othertype") - assert Config.core.repository_type == "othertype" with mock.patch.dict(os.environ, {"REPOSITORY_TYPE": "foo"}): Config.configure_core(repository_type="ENV[REPOSITORY_TYPE]") assert Config.core.repository_type == "foo" + Config.configure_core(repository_type="othertype") + assert Config.core.repository_type == "othertype" + def test_override_default_configuration_with_file_configuration(): tf = NamedTemporaryFile( diff --git a/tests/core/conftest.py b/tests/core/conftest.py index 915b9560ab..cfc8bb0841 100644 --- a/tests/core/conftest.py +++ b/tests/core/conftest.py @@ -181,14 +181,9 @@ def default_multi_sheet_data_frame(): def cleanup_files(): yield - if os.path.exists(".data"): - shutil.rmtree(".data", ignore_errors=True) - if os.path.exists("user_data"): - shutil.rmtree("user_data", ignore_errors=True) - if os.path.exists(".taipy"): - shutil.rmtree(".taipy", ignore_errors=True) - if os.path.exists(".my_data"): - shutil.rmtree(".my_data", ignore_errors=True) + for path in [".data", ".my_data", "user_data", ".taipy"]: + if os.path.exists(path): + shutil.rmtree(path, ignore_errors=True) @pytest.fixture(scope="function") @@ -332,6 +327,12 @@ def clean_repository(init_config, init_managers, init_orchestrator, init_notifie with patch("sys.argv", ["prog"]): yield + close_all_sessions() + init_orchestrator() + init_managers() + init_config() + init_notifier() + @pytest.fixture def init_config(reset_configuration_singleton, inject_core_sections): From c48b1f0e0206b6f714929e3c7282b6dffe763ee8 Mon Sep 17 00:00:00 2001 From: trgiangdo Date: Wed, 6 Mar 2024 15:14:13 +0700 Subject: [PATCH 05/10] fix: temporarily deactiavte _GCDoNotUpdate base class --- taipy/gui_core/_adapters.py | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/taipy/gui_core/_adapters.py b/taipy/gui_core/_adapters.py index dd61a53c8e..f1968dd054 100644 --- a/taipy/gui_core/_adapters.py +++ b/taipy/gui_core/_adapters.py @@ -15,11 +15,7 @@ from taipy.core import ( Cycle, DataNode, - Job, Scenario, - Sequence, - Submission, - Task, is_deletable, is_editable, is_promotable, @@ -38,13 +34,13 @@ def __repr__(self): return self.get_label() if hasattr(self, "get_label") else super().__repr__() -Scenario.__bases__ += (_GCDoNotUpdate,) -Sequence.__bases__ += (_GCDoNotUpdate,) -DataNode.__bases__ += (_GCDoNotUpdate,) -Cycle.__bases__ += (_GCDoNotUpdate,) -Job.__bases__ += (_GCDoNotUpdate,) -Task.__bases__ += (_GCDoNotUpdate,) -Submission.__bases__ += (_GCDoNotUpdate,) +# Scenario.__bases__ += (_GCDoNotUpdate,) +# Sequence.__bases__ += (_GCDoNotUpdate,) +# DataNode.__bases__ += (_GCDoNotUpdate,) +# Cycle.__bases__ += (_GCDoNotUpdate,) +# Job.__bases__ += (_GCDoNotUpdate,) +# Task.__bases__ += (_GCDoNotUpdate,) +# Submission.__bases__ += (_GCDoNotUpdate,) class _EntityType(Enum): From fcbb36b27e7338e589a77cfe007cb10061b8e150 Mon Sep 17 00:00:00 2001 From: trgiangdo Date: Sun, 10 Mar 2024 22:17:42 +0700 Subject: [PATCH 06/10] fix: revert the stop wait dispatcher that has been removed by accident --- taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py b/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py index 6bdc935d98..efa11d5207 100644 --- a/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py +++ b/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py @@ -76,6 +76,10 @@ def run(self): except Exception as e: self._logger.exception(e) + if self.stop_wait: + self._logger.debug("Waiting for the dispatcher thread to stop...") + self.join(timeout=self.stop_timeout) + self._logger.debug("Job dispatcher stopped.") @abstractmethod From bb3f3839ed74d7f0f8793bb23bdfc4de7fc271b5 Mon Sep 17 00:00:00 2001 From: trgiangdo Date: Sun, 10 Mar 2024 23:08:02 +0700 Subject: [PATCH 07/10] add logs on lock acquire and release --- .../core/_orchestrator/_dispatcher/_job_dispatcher.py | 7 +++++++ .../_dispatcher/_standalone_job_dispatcher.py | 1 + taipy/core/_orchestrator/_orchestrator.py | 11 ++++++++++- 3 files changed, 18 insertions(+), 1 deletion(-) diff --git a/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py b/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py index efa11d5207..c6f1db6943 100644 --- a/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py +++ b/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py @@ -63,17 +63,24 @@ def run(self): self._logger.debug("Job dispatcher started.") while not self._STOP_FLAG: try: + self._logger.info("Check if can execute before getting from queue") if self._can_execute(): + self._logger.info(f"run() TRY TO acquired the {self.lock}") with self.lock: + self._logger.info(f"run() acquired the {self.lock}") if self._STOP_FLAG: break + self._logger.info(f"Getting job from {self.orchestrator.jobs_to_run}") job = self.orchestrator.jobs_to_run.get(block=True, timeout=0.1) self._execute_job(job) + self._logger.info(f"run() release the {self.lock}") else: time.sleep(0.1) # We need to sleep to avoid busy waiting. except Empty: # In case the last job of the queue has been removed. + self._logger.info(f"run() release the {self.lock} because of Empty") pass except Exception as e: + self._logger.info(f"run() release the {self.lock} because of {e}") self._logger.exception(e) if self.stop_wait: diff --git a/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py b/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py index 37235f30c0..389f58b372 100644 --- a/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py +++ b/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py @@ -36,6 +36,7 @@ def __init__(self, orchestrator: _AbstractOrchestrator, subproc_initializer: Opt def _can_execute(self) -> bool: """Returns True if the dispatcher have resources to dispatch a job.""" + self._logger.info(f"self._nb_available_workers: {self._nb_available_workers}") return self._nb_available_workers > 0 def run(self): diff --git a/taipy/core/_orchestrator/_orchestrator.py b/taipy/core/_orchestrator/_orchestrator.py index 56f860f720..e1a199f8be 100644 --- a/taipy/core/_orchestrator/_orchestrator.py +++ b/taipy/core/_orchestrator/_orchestrator.py @@ -80,6 +80,7 @@ def submit( jobs = [] tasks = submittable._get_sorted_tasks() with cls.lock: + cls.__logger.info(f"submit() acquired the {cls.lock}") for ts in tasks: for task in ts: jobs.append( @@ -91,6 +92,7 @@ def submit( force=force, # type: ignore ) ) + cls.__logger.info(f"submit() released the {cls.lock}") submission.jobs = jobs # type: ignore cls._orchestrate_job_to_run_or_block(jobs) if Config.job_config.is_development: @@ -130,6 +132,7 @@ def submit_task( ) submit_id = submission.id with cls.lock: + cls.__logger.info(f"submit_task() acquired the {cls.lock}") job = cls._lock_dn_output_and_create_job( task, submit_id, @@ -137,6 +140,7 @@ def submit_task( itertools.chain([cls._update_submission_status], callbacks or []), force, ) + cls.__logger.info(f"submit_task() released the {cls.lock}") jobs = [job] submission.jobs = jobs # type: ignore cls._orchestrate_job_to_run_or_block(jobs) @@ -235,9 +239,11 @@ def __unblock_jobs(cls): for job in cls.blocked_jobs: if not cls._is_blocked(job): with cls.lock: + cls.__logger.info(f"unblock_jobs() acquired the {cls.lock}") job.pending() cls.__remove_blocked_job(job) cls.jobs_to_run.put(job) + cls.__logger.info(f"unblock_jobs() released the {cls.lock}") @classmethod def __remove_blocked_job(cls, job): @@ -256,12 +262,14 @@ def cancel_job(cls, job: Job): cls.__logger.info(f"{job.id} has already failed and cannot be canceled.") else: with cls.lock: + cls.__logger.info(f"cancel_job() acquired the {cls.lock}") to_cancel_or_abandon_jobs = {job} to_cancel_or_abandon_jobs.update(cls.__find_subsequent_jobs(job.submit_id, set(job.task.output.keys()))) cls.__remove_blocked_jobs(to_cancel_or_abandon_jobs) cls.__remove_jobs_to_run(to_cancel_or_abandon_jobs) cls._cancel_jobs(job.id, to_cancel_or_abandon_jobs) cls._unlock_edit_on_jobs_outputs(to_cancel_or_abandon_jobs) + cls.__logger.info(f"cancel_job() released the {cls.lock}") @classmethod def __find_subsequent_jobs(cls, submit_id, output_dn_config_ids: Set) -> Set[Job]: @@ -295,6 +303,7 @@ def __remove_jobs_to_run(cls, jobs): @classmethod def _fail_subsequent_jobs(cls, failed_job: Job): with cls.lock: + cls.__logger.info(f"_fail_subsequent_jobs() acquired the {cls.lock}") to_fail_or_abandon_jobs = set() to_fail_or_abandon_jobs.update( cls.__find_subsequent_jobs(failed_job.submit_id, set(failed_job.task.output.keys())) @@ -305,10 +314,10 @@ def _fail_subsequent_jobs(cls, failed_job: Job): cls.__remove_blocked_jobs(to_fail_or_abandon_jobs) cls.__remove_jobs_to_run(to_fail_or_abandon_jobs) cls._unlock_edit_on_jobs_outputs(to_fail_or_abandon_jobs) + cls.__logger.info(f"_fail_subsequent_jobs() released the {cls.lock}") @classmethod def _cancel_jobs(cls, job_id_to_cancel: JobId, jobs: Set[Job]): - for job in jobs: if job.is_running(): cls.__logger.info(f"{job.id} is running and cannot be canceled.") From 9a20741ae56f6c797493447ca78d41af64c87ed3 Mon Sep 17 00:00:00 2001 From: trgiangdo Date: Mon, 11 Mar 2024 15:53:19 +0700 Subject: [PATCH 08/10] temporarily remove join dispatcher thread --- taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py b/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py index c6f1db6943..1b30ebb389 100644 --- a/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py +++ b/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py @@ -83,9 +83,9 @@ def run(self): self._logger.info(f"run() release the {self.lock} because of {e}") self._logger.exception(e) - if self.stop_wait: - self._logger.debug("Waiting for the dispatcher thread to stop...") - self.join(timeout=self.stop_timeout) + # if self.stop_wait: + # self._logger.debug("Waiting for the dispatcher thread to stop...") + # self.join(timeout=self.stop_timeout) self._logger.debug("Job dispatcher stopped.") From 11500a37099f2b1d801495023c30e2cd2f177c6c Mon Sep 17 00:00:00 2001 From: trgiangdo Date: Wed, 13 Mar 2024 16:38:45 +0700 Subject: [PATCH 09/10] fix: remove lock log --- .../_dispatcher/_job_dispatcher.py | 18 +++++------------- taipy/core/_orchestrator/_orchestrator.py | 10 ---------- 2 files changed, 5 insertions(+), 23 deletions(-) diff --git a/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py b/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py index 1b30ebb389..d3fa819bfd 100644 --- a/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py +++ b/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py @@ -55,38 +55,30 @@ def stop(self, wait: bool = True, timeout: Optional[float] = None): wait (bool): If True, the method will wait for the dispatcher to stop. timeout (Optional[float]): The maximum time to wait. If None, the method will wait indefinitely. """ - self.stop_wait = wait - self.stop_timeout = timeout self._STOP_FLAG = True + if wait and self.is_alive(): + self._logger.debug("Waiting for the dispatcher thread to stop...") + self.join(timeout=timeout) + def run(self): self._logger.debug("Job dispatcher started.") while not self._STOP_FLAG: try: - self._logger.info("Check if can execute before getting from queue") if self._can_execute(): - self._logger.info(f"run() TRY TO acquired the {self.lock}") with self.lock: - self._logger.info(f"run() acquired the {self.lock}") if self._STOP_FLAG: break - self._logger.info(f"Getting job from {self.orchestrator.jobs_to_run}") + self._logger.info(f"{self.orchestrator.jobs_to_run.qsize()=}") job = self.orchestrator.jobs_to_run.get(block=True, timeout=0.1) self._execute_job(job) - self._logger.info(f"run() release the {self.lock}") else: time.sleep(0.1) # We need to sleep to avoid busy waiting. except Empty: # In case the last job of the queue has been removed. - self._logger.info(f"run() release the {self.lock} because of Empty") pass except Exception as e: - self._logger.info(f"run() release the {self.lock} because of {e}") self._logger.exception(e) - # if self.stop_wait: - # self._logger.debug("Waiting for the dispatcher thread to stop...") - # self.join(timeout=self.stop_timeout) - self._logger.debug("Job dispatcher stopped.") @abstractmethod diff --git a/taipy/core/_orchestrator/_orchestrator.py b/taipy/core/_orchestrator/_orchestrator.py index e1a199f8be..d27b055c91 100644 --- a/taipy/core/_orchestrator/_orchestrator.py +++ b/taipy/core/_orchestrator/_orchestrator.py @@ -80,7 +80,6 @@ def submit( jobs = [] tasks = submittable._get_sorted_tasks() with cls.lock: - cls.__logger.info(f"submit() acquired the {cls.lock}") for ts in tasks: for task in ts: jobs.append( @@ -92,7 +91,6 @@ def submit( force=force, # type: ignore ) ) - cls.__logger.info(f"submit() released the {cls.lock}") submission.jobs = jobs # type: ignore cls._orchestrate_job_to_run_or_block(jobs) if Config.job_config.is_development: @@ -132,7 +130,6 @@ def submit_task( ) submit_id = submission.id with cls.lock: - cls.__logger.info(f"submit_task() acquired the {cls.lock}") job = cls._lock_dn_output_and_create_job( task, submit_id, @@ -140,7 +137,6 @@ def submit_task( itertools.chain([cls._update_submission_status], callbacks or []), force, ) - cls.__logger.info(f"submit_task() released the {cls.lock}") jobs = [job] submission.jobs = jobs # type: ignore cls._orchestrate_job_to_run_or_block(jobs) @@ -239,11 +235,9 @@ def __unblock_jobs(cls): for job in cls.blocked_jobs: if not cls._is_blocked(job): with cls.lock: - cls.__logger.info(f"unblock_jobs() acquired the {cls.lock}") job.pending() cls.__remove_blocked_job(job) cls.jobs_to_run.put(job) - cls.__logger.info(f"unblock_jobs() released the {cls.lock}") @classmethod def __remove_blocked_job(cls, job): @@ -262,14 +256,12 @@ def cancel_job(cls, job: Job): cls.__logger.info(f"{job.id} has already failed and cannot be canceled.") else: with cls.lock: - cls.__logger.info(f"cancel_job() acquired the {cls.lock}") to_cancel_or_abandon_jobs = {job} to_cancel_or_abandon_jobs.update(cls.__find_subsequent_jobs(job.submit_id, set(job.task.output.keys()))) cls.__remove_blocked_jobs(to_cancel_or_abandon_jobs) cls.__remove_jobs_to_run(to_cancel_or_abandon_jobs) cls._cancel_jobs(job.id, to_cancel_or_abandon_jobs) cls._unlock_edit_on_jobs_outputs(to_cancel_or_abandon_jobs) - cls.__logger.info(f"cancel_job() released the {cls.lock}") @classmethod def __find_subsequent_jobs(cls, submit_id, output_dn_config_ids: Set) -> Set[Job]: @@ -303,7 +295,6 @@ def __remove_jobs_to_run(cls, jobs): @classmethod def _fail_subsequent_jobs(cls, failed_job: Job): with cls.lock: - cls.__logger.info(f"_fail_subsequent_jobs() acquired the {cls.lock}") to_fail_or_abandon_jobs = set() to_fail_or_abandon_jobs.update( cls.__find_subsequent_jobs(failed_job.submit_id, set(failed_job.task.output.keys())) @@ -314,7 +305,6 @@ def _fail_subsequent_jobs(cls, failed_job: Job): cls.__remove_blocked_jobs(to_fail_or_abandon_jobs) cls.__remove_jobs_to_run(to_fail_or_abandon_jobs) cls._unlock_edit_on_jobs_outputs(to_fail_or_abandon_jobs) - cls.__logger.info(f"_fail_subsequent_jobs() released the {cls.lock}") @classmethod def _cancel_jobs(cls, job_id_to_cancel: JobId, jobs: Set[Job]): From 29c42c7d22c2e80af7c9bf6869bf5582e57c1fca Mon Sep 17 00:00:00 2001 From: trgiangdo Date: Thu, 14 Mar 2024 09:40:47 +0700 Subject: [PATCH 10/10] fix: remove jobs_to_run log --- taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py | 1 - 1 file changed, 1 deletion(-) diff --git a/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py b/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py index d3fa819bfd..5c675c55d3 100644 --- a/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py +++ b/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py @@ -69,7 +69,6 @@ def run(self): with self.lock: if self._STOP_FLAG: break - self._logger.info(f"{self.orchestrator.jobs_to_run.qsize()=}") job = self.orchestrator.jobs_to_run.get(block=True, timeout=0.1) self._execute_job(job) else: