diff --git a/taipy/core/_core.py b/taipy/core/_core.py index 2ca5aaa6f3..69628b8408 100644 --- a/taipy/core/_core.py +++ b/taipy/core/_core.py @@ -66,16 +66,19 @@ def run(self, force_restart=False): self.__start_dispatcher(force_restart) - def stop(self): + def stop(self, wait: bool = True, timeout: Optional[float] = None): """ Stop the Core service. - This function stops the dispatcher and unblock the Config for update. + + Parameters: + wait (bool): If True, the method will wait for the dispatcher to stop. + timeout (Optional[float]): The maximum time to wait. If None, the method will wait indefinitely. """ Config.unblock_update() if self._dispatcher: - self._dispatcher = _OrchestratorFactory._remove_dispatcher() + self._dispatcher = _OrchestratorFactory._remove_dispatcher(wait, timeout) self.__logger.info("Core service has been stopped.") with self.__class__.__lock_is_running: diff --git a/taipy/core/_orchestrator/_dispatcher/_development_job_dispatcher.py b/taipy/core/_orchestrator/_dispatcher/_development_job_dispatcher.py index a9cb05693e..638c80002e 100644 --- a/taipy/core/_orchestrator/_dispatcher/_development_job_dispatcher.py +++ b/taipy/core/_orchestrator/_dispatcher/_development_job_dispatcher.py @@ -8,6 +8,9 @@ # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the # specific language governing permissions and limitations under the License. + +from typing import Optional + from ...job.job import Job from .._abstract_orchestrator import _AbstractOrchestrator from ._job_dispatcher import _JobDispatcher @@ -26,7 +29,7 @@ def start(self): def is_running(self) -> bool: return True - def stop(self): + def stop(self, wait: bool = True, timeout: Optional[float] = None): raise NotImplementedError def run(self): diff --git a/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py b/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py index 4c3669a3e4..95ff3f9562 100644 --- a/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py +++ b/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py @@ -12,7 +12,7 @@ import threading from abc import abstractmethod from queue import Empty -from typing import Dict +from typing import Dict, Optional from taipy.config.config import Config from taipy.logger._taipy_logger import _TaipyLogger @@ -47,12 +47,20 @@ def is_running(self) -> bool: """Return True if the dispatcher is running""" return self.is_alive() - def stop(self): - """Stop the dispatcher""" + def stop(self, wait: bool = True, timeout: Optional[float] = None): + """Stop the dispatcher. + + Parameters: + wait (bool): If True, the method will wait for the dispatcher to stop. + timeout (Optional[float]): The maximum time to wait. If None, the method will wait indefinitely. + """ self._STOP_FLAG = True + if wait and self.is_alive(): + self._logger.debug("Waiting for the dispatcher thread to stop...") + self.join(timeout=timeout) def run(self): - _TaipyLogger._get_logger().info("Start job dispatcher...") + self._logger.info("Start job dispatcher...") while not self._STOP_FLAG: try: if self._can_execute(): @@ -64,7 +72,7 @@ def run(self): except Empty: # In case the last job of the queue has been removed. pass except Exception as e: - _TaipyLogger._get_logger().exception(e) + self._logger.exception(e) pass self._logger.info("Job dispatcher stopped.") diff --git a/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py b/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py index afb5e2b872..ff57fec48d 100644 --- a/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py +++ b/taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py @@ -8,6 +8,7 @@ # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the # specific language governing permissions and limitations under the License. + from concurrent.futures import Executor, ProcessPoolExecutor from functools import partial from typing import Callable, Optional @@ -29,7 +30,7 @@ def __init__(self, orchestrator: _AbstractOrchestrator, subproc_initializer: Opt max_workers = Config.job_config.max_nb_of_workers or 1 self._executor: Executor = ProcessPoolExecutor( max_workers=max_workers, - initializer=subproc_initializer + initializer=subproc_initializer, ) # type: ignore self._nb_available_workers = self._executor._max_workers # type: ignore diff --git a/taipy/core/_orchestrator/_orchestrator_factory.py b/taipy/core/_orchestrator/_orchestrator_factory.py index 24021cf61c..0976f70bc6 100644 --- a/taipy/core/_orchestrator/_orchestrator_factory.py +++ b/taipy/core/_orchestrator/_orchestrator_factory.py @@ -61,9 +61,9 @@ def _build_dispatcher(cls, force_restart=False) -> Optional[_JobDispatcher]: return cls._dispatcher @classmethod - def _remove_dispatcher(cls) -> Optional[_JobDispatcher]: + def _remove_dispatcher(cls, wait: bool = True, timeout: Optional[float] = None) -> None: if cls._dispatcher is not None and not isinstance(cls._dispatcher, _DevelopmentJobDispatcher): - cls._dispatcher.stop() + cls._dispatcher.stop(wait, timeout) cls._dispatcher = None return cls._dispatcher diff --git a/tests/core/_orchestrator/test_orchestrator.py b/tests/core/_orchestrator/test_orchestrator.py index 42c84d442c..b80bbcf170 100644 --- a/tests/core/_orchestrator/test_orchestrator.py +++ b/tests/core/_orchestrator/test_orchestrator.py @@ -57,7 +57,7 @@ def test_submit_task_multithreading_multiple_task(): task_1 = _create_task(partial(lock_multiply, lock_1)) task_2 = _create_task(partial(lock_multiply, lock_2)) - _OrchestratorFactory._build_dispatcher() + _OrchestratorFactory._build_dispatcher(force_restart=True) with lock_1: with lock_2: @@ -104,7 +104,7 @@ def test_submit_submittable_multithreading_multiple_task(): scenario = Scenario("scenario_config", [task_1, task_2], {}) - _OrchestratorFactory._build_dispatcher() + _OrchestratorFactory._build_dispatcher(force_restart=True) with lock_1: with lock_2: @@ -146,7 +146,7 @@ def test_submit_task_multithreading_multiple_task_in_sync_way_to_check_job_statu task_1 = _create_task(partial(lock_multiply, lock_1)) task_2 = _create_task(partial(lock_multiply, lock_2)) - _OrchestratorFactory._build_dispatcher() + _OrchestratorFactory._build_dispatcher(force_restart=True) with lock_0: submission_0 = _Orchestrator.submit_task(task_0) @@ -215,7 +215,7 @@ def test_blocked_task(): bar_cfg = Config.configure_data_node("bar") baz_cfg = Config.configure_data_node("baz") - _OrchestratorFactory._build_dispatcher() + _OrchestratorFactory._build_dispatcher(force_restart=True) dns = _DataManager._bulk_get_or_create([foo_cfg, bar_cfg, baz_cfg]) foo = dns[foo_cfg] @@ -273,7 +273,7 @@ def test_blocked_submittable(): bar_cfg = Config.configure_data_node("bar") baz_cfg = Config.configure_data_node("baz") - _OrchestratorFactory._build_dispatcher() + _OrchestratorFactory._build_dispatcher(force_restart=True) dns = _DataManager._bulk_get_or_create([foo_cfg, bar_cfg, baz_cfg]) foo = dns[foo_cfg] diff --git a/tests/core/conftest.py b/tests/core/conftest.py index c76bb36045..1fc4da790a 100644 --- a/tests/core/conftest.py +++ b/tests/core/conftest.py @@ -61,7 +61,6 @@ from taipy.core.task.task import Task current_time = datetime.now() -_OrchestratorFactory._build_orchestrator() @pytest.fixture(scope="function") @@ -369,9 +368,11 @@ def _init_managers(): @pytest.fixture def init_orchestrator(): def _init_orchestrator(): + _OrchestratorFactory._remove_dispatcher() + if _OrchestratorFactory._orchestrator is None: _OrchestratorFactory._build_orchestrator() - _OrchestratorFactory._build_dispatcher() + _OrchestratorFactory._build_dispatcher(force_restart=True) _OrchestratorFactory._orchestrator.jobs_to_run = Queue() _OrchestratorFactory._orchestrator.blocked_jobs = [] @@ -392,7 +393,7 @@ def sql_engine(): @pytest.fixture -def init_sql_repo(tmp_sqlite): +def init_sql_repo(tmp_sqlite, init_managers): Config.configure_core(repository_type="sql", repository_properties={"db_location": tmp_sqlite}) # Clean SQLite database @@ -401,4 +402,6 @@ def init_sql_repo(tmp_sqlite): _SQLConnection._connection = None _SQLConnection.init_db() + init_managers() + return tmp_sqlite diff --git a/tests/core/cycle/test_cycle_manager.py b/tests/core/cycle/test_cycle_manager.py index 1f639962f4..ee2be580ee 100644 --- a/tests/core/cycle/test_cycle_manager.py +++ b/tests/core/cycle/test_cycle_manager.py @@ -14,8 +14,6 @@ from taipy.config.common.frequency import Frequency from taipy.config.common.scope import Scope from taipy.config.config import Config -from taipy.core._orchestrator._orchestrator_factory import _OrchestratorFactory -from taipy.core.config.job_config import JobConfig from taipy.core.cycle._cycle_manager import _CycleManager from taipy.core.cycle.cycle import Cycle from taipy.core.cycle.cycle_id import CycleId @@ -190,8 +188,6 @@ def test_get_cycle_start_date_and_end_date(): def test_hard_delete_shared_entities(): - Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE) - dn_config_1 = Config.configure_data_node("my_input_1", "pickle", scope=Scope.SCENARIO, default_data="testing") dn_config_2 = Config.configure_data_node("my_input_2", "pickle", scope=Scope.SCENARIO, default_data="testing") dn_config_3 = Config.configure_data_node("my_input_3", "pickle", scope=Scope.CYCLE, default_data="testing") @@ -219,8 +215,6 @@ def test_hard_delete_shared_entities(): scenario_config_2 = Config.configure_scenario("scenario_config_2", [task_config_2, task_config_3]) scenario_config_2.add_sequences({"sequence_3": [task_config_3]}) - _OrchestratorFactory._build_dispatcher() - scenario_1 = _ScenarioManager._create(scenario_config_1) scenario_2 = _ScenarioManager._create(scenario_config_1) scenario_3 = _ScenarioManager._create(scenario_config_2) diff --git a/tests/core/cycle/test_cycle_manager_with_sql_repo.py b/tests/core/cycle/test_cycle_manager_with_sql_repo.py index 08fcb1139c..d537a57cc3 100644 --- a/tests/core/cycle/test_cycle_manager_with_sql_repo.py +++ b/tests/core/cycle/test_cycle_manager_with_sql_repo.py @@ -14,8 +14,6 @@ from taipy.config.common.frequency import Frequency from taipy.config.common.scope import Scope from taipy.config.config import Config -from taipy.core._orchestrator._orchestrator_factory import _OrchestratorFactory -from taipy.core.config.job_config import JobConfig from taipy.core.cycle._cycle_manager import _CycleManager from taipy.core.cycle._cycle_manager_factory import _CycleManagerFactory from taipy.core.cycle.cycle import Cycle @@ -199,7 +197,6 @@ def test_get_cycle_start_date_and_end_date(init_sql_repo): def test_hard_delete_shared_entities(init_sql_repo): - Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE) _ScenarioManager._repository = _ScenarioManagerFactory._build_repository() dn_config_1 = Config.configure_data_node("my_input_1", "in_memory", scope=Scope.SCENARIO, default_data="testing") @@ -228,8 +225,6 @@ def test_hard_delete_shared_entities(init_sql_repo): ) # No Frequency so cycle attached to scenarios scenario_config_2.add_sequences({"sequence_3": [task_config_3]}) - _OrchestratorFactory._build_dispatcher() - scenario_1 = _ScenarioManager._create(scenario_config_1) scenario_2 = _ScenarioManager._create(scenario_config_1) scenario_3 = _ScenarioManager._create(scenario_config_2) diff --git a/tests/core/data/test_data_manager_with_sql_repo.py b/tests/core/data/test_data_manager_with_sql_repo.py index b3aedfd0f1..16b52be94c 100644 --- a/tests/core/data/test_data_manager_with_sql_repo.py +++ b/tests/core/data/test_data_manager_with_sql_repo.py @@ -19,7 +19,6 @@ from taipy.core._version._version_manager import _VersionManager from taipy.core.config.data_node_config import DataNodeConfig from taipy.core.data._data_manager import _DataManager -from taipy.core.data._data_manager_factory import _DataManagerFactory from taipy.core.data.csv import CSVDataNode from taipy.core.data.data_node_id import DataNodeId from taipy.core.data.in_memory import InMemoryDataNode @@ -30,14 +29,8 @@ def file_exists(file_path: str) -> bool: return os.path.exists(file_path) -def init_managers(): - _DataManagerFactory._build_manager()._delete_all() - - class TestDataManager: def test_create_data_node_and_modify_properties_does_not_modify_config(self, init_sql_repo): - init_managers() - dn_config = Config.configure_data_node(id="name", foo="bar") dn = _DataManager._create_and_set(dn_config, None, None) assert dn_config.properties.get("foo") == "bar" @@ -51,23 +44,17 @@ def test_create_data_node_and_modify_properties_does_not_modify_config(self, ini assert dn.properties.get("baz") == "qux" def test_create_raises_exception_with_wrong_type(self, init_sql_repo): - init_managers() - wrong_type_dn_config = DataNodeConfig(id="foo", storage_type="bar", scope=DataNodeConfig._DEFAULT_SCOPE) with pytest.raises(InvalidDataNodeType): _DataManager._create_and_set(wrong_type_dn_config, None, None) def test_create_from_same_config_generates_new_data_node_and_new_id(self, init_sql_repo): - init_managers() - dn_config = Config.configure_data_node(id="foo", storage_type="in_memory") dn = _DataManager._create_and_set(dn_config, None, None) dn_2 = _DataManager._create_and_set(dn_config, None, None) assert dn_2.id != dn.id def test_create_uses_overridden_attributes_in_config_file(self, init_sql_repo): - init_managers() - Config.override(os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/config.toml")) csv_dn_cfg = Config.configure_data_node(id="foo", storage_type="csv", path="bar", has_header=True) @@ -85,14 +72,10 @@ def test_create_uses_overridden_attributes_in_config_file(self, init_sql_repo): assert csv_dn.has_header def test_get_if_not_exists(self, init_sql_repo): - init_managers() - with pytest.raises(ModelNotFound): _DataManager._repository._load("test_data_node_2") def test_get_all(self, init_sql_repo): - init_managers() - _DataManager._delete_all() assert len(_DataManager._get_all()) == 0 dn_config_1 = Config.configure_data_node(id="foo", storage_type="in_memory") @@ -106,8 +89,6 @@ def test_get_all(self, init_sql_repo): assert len([dn for dn in _DataManager._get_all() if dn.config_id == "baz"]) == 2 def test_get_all_on_multiple_versions_environment(self, init_sql_repo): - init_managers() - # Create 5 data nodes with 2 versions each # Only version 1.0 has the data node with config_id = "config_id_1" # Only version 2.0 has the data node with config_id = "config_id_6" @@ -143,8 +124,6 @@ def test_get_all_on_multiple_versions_environment(self, init_sql_repo): assert len(_DataManager._get_all_by(filters=[{"version": "2.0", "config_id": "config_id_6"}])) == 1 def test_set(self, init_sql_repo): - init_managers() - dn = InMemoryDataNode( "config_id", Scope.SCENARIO, @@ -171,7 +150,6 @@ def test_set(self, init_sql_repo): assert _DataManager._get(dn.id).config_id == "foo" def test_delete(self, init_sql_repo): - init_managers() _DataManager._delete_all() dn_1 = InMemoryDataNode("config_id", Scope.SCENARIO, id="id_1") @@ -198,8 +176,6 @@ def test_get_or_create(self, init_sql_repo): def _get_or_create_dn(config, *args): return _DataManager._bulk_get_or_create([config], *args)[config] - init_managers() - global_dn_config = Config.configure_data_node( id="test_data_node", storage_type="in_memory", scope=Scope.GLOBAL, data="In memory Data Node" ) @@ -259,8 +235,6 @@ def _get_or_create_dn(config, *args): assert cycle_dn_4.id == cycle_dn_5.id def test_get_data_nodes_by_config_id(self, init_sql_repo): - init_managers() - dn_config_1 = Config.configure_data_node("dn_1", scope=Scope.SCENARIO) dn_config_2 = Config.configure_data_node("dn_2", scope=Scope.SCENARIO) dn_config_3 = Config.configure_data_node("dn_3", scope=Scope.SCENARIO) @@ -290,8 +264,6 @@ def test_get_data_nodes_by_config_id(self, init_sql_repo): assert sorted([dn_3_1.id]) == sorted([sequence.id for sequence in dn_3_datanodes]) def test_get_data_nodes_by_config_id_in_multiple_versions_environment(self, init_sql_repo): - init_managers() - dn_config_1 = Config.configure_data_node("dn_1", scope=Scope.SCENARIO) dn_config_2 = Config.configure_data_node("dn_2", scope=Scope.SCENARIO) diff --git a/tests/core/data/test_data_node.py b/tests/core/data/test_data_node.py index 5902d8865a..ffea70e5a4 100644 --- a/tests/core/data/test_data_node.py +++ b/tests/core/data/test_data_node.py @@ -20,8 +20,6 @@ from taipy.config import Config from taipy.config.common.scope import Scope from taipy.config.exceptions.exceptions import InvalidConfigurationId -from taipy.core._orchestrator._orchestrator_factory import _OrchestratorFactory -from taipy.core.config.job_config import JobConfig from taipy.core.data._data_manager import _DataManager from taipy.core.data.data_node import DataNode from taipy.core.data.data_node_id import DataNodeId @@ -355,8 +353,6 @@ def test_is_up_to_date_across_scenarios(self, current_datetime): assert not dn_3.is_up_to_date def test_do_not_recompute_data_node_valid_but_continue_sequence_execution(self): - Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE) - a = Config.configure_data_node("A", "pickle", default_data="A") b = Config.configure_data_node("B", "pickle") c = Config.configure_data_node("C", "pickle") @@ -367,8 +363,6 @@ def test_do_not_recompute_data_node_valid_but_continue_sequence_execution(self): task_b_d = Config.configure_task("task_b_d", funct_b_d, input=b, output=d) scenario_cfg = Config.configure_scenario("scenario", [task_a_b, task_b_c, task_b_d]) - _OrchestratorFactory._build_dispatcher() - scenario = tp.create_scenario(scenario_cfg) scenario.submit() assert scenario.A.read() == "A" diff --git a/tests/core/job/test_job.py b/tests/core/job/test_job.py index 11c419b28a..d40f8245d8 100644 --- a/tests/core/job/test_job.py +++ b/tests/core/job/test_job.py @@ -305,9 +305,16 @@ def test_auto_set_and_reload(current_datetime, job_id): assert not job_1._is_in_context +def test_is_deletable(): + with mock.patch("taipy.core.job._job_manager._JobManager._is_deletable") as mock_submit: + task = Task(config_id="name_1", properties={}, function=_foo, id=TaskId("task_1")) + job = Job(job_id, task, "submit_id_1", "scenario_entity_id") + job.is_deletable() + mock_submit.assert_called_once_with(job) + + def _dispatch(task: Task, job: Job, mode=JobConfig._DEVELOPMENT_MODE): Config.configure_job_executions(mode=mode) - _OrchestratorFactory._build_dispatcher() _TaskManager._set(task) _JobManager._set(job) dispatcher: Union[_StandaloneJobDispatcher, _DevelopmentJobDispatcher] = _StandaloneJobDispatcher( @@ -316,11 +323,3 @@ def _dispatch(task: Task, job: Job, mode=JobConfig._DEVELOPMENT_MODE): if mode == JobConfig._DEVELOPMENT_MODE: dispatcher = _DevelopmentJobDispatcher(cast(_AbstractOrchestrator, _OrchestratorFactory._orchestrator)) dispatcher._dispatch(job) - - -def test_is_deletable(): - with mock.patch("taipy.core.job._job_manager._JobManager._is_deletable") as mock_submit: - task = Task(config_id="name_1", properties={}, function=_foo, id=TaskId("task_1")) - job = Job(job_id, task, "submit_id_1", "scenario_entity_id") - job.is_deletable() - mock_submit.assert_called_once_with(job) diff --git a/tests/core/job/test_job_manager.py b/tests/core/job/test_job_manager.py index 305568cb41..09924b1ea9 100644 --- a/tests/core/job/test_job_manager.py +++ b/tests/core/job/test_job_manager.py @@ -50,8 +50,6 @@ def test_create_jobs(): Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE) task = _create_task(multiply, name="get_job") - _OrchestratorFactory._build_dispatcher() - job_1 = _JobManager._create(task, [print], "submit_id", "secnario_id", True) assert _JobManager._get(job_1.id) == job_1 assert job_1.is_submitted() @@ -78,8 +76,6 @@ def test_get_job(): task = _create_task(multiply, name="get_job") - _OrchestratorFactory._build_dispatcher() - job_1 = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0] assert _JobManager._get(job_1.id) == job_1 assert _JobManager._get(job_1.id).submit_entity_id == task.id @@ -97,8 +93,6 @@ def test_get_latest_job(): task = _create_task(multiply, name="get_latest_job") task_2 = _create_task(multiply, name="get_latest_job_2") - _OrchestratorFactory._build_dispatcher() - job_1 = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0] assert _JobManager._get_latest(task) == job_1 assert _JobManager._get_latest(task_2) is None @@ -123,8 +117,6 @@ def test_get_jobs(): task = _create_task(multiply, name="get_all_jobs") - _OrchestratorFactory._build_dispatcher() - job_1 = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0] job_2 = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0] @@ -136,8 +128,6 @@ def test_delete_job(): task = _create_task(multiply, name="delete_job") - _OrchestratorFactory._build_dispatcher() - job_1 = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0] job_2 = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0] @@ -479,6 +469,7 @@ def test_is_deletable(): assert not _JobManager._is_deletable(job) assert not _JobManager._is_deletable(job.id) + def _create_task(function, nb_outputs=1, name=None): input1_dn_config = Config.configure_data_node("input1", "pickle", Scope.SCENARIO, default_data=21) input2_dn_config = Config.configure_data_node("input2", "pickle", Scope.SCENARIO, default_data=2) diff --git a/tests/core/job/test_job_manager_with_sql_repo.py b/tests/core/job/test_job_manager_with_sql_repo.py index bb8d0776fd..b35ff001b5 100644 --- a/tests/core/job/test_job_manager_with_sql_repo.py +++ b/tests/core/job/test_job_manager_with_sql_repo.py @@ -28,11 +28,9 @@ from taipy.core.data._data_manager_factory import _DataManagerFactory from taipy.core.exceptions.exceptions import JobNotDeletedException from taipy.core.job._job_manager import _JobManager -from taipy.core.job._job_manager_factory import _JobManagerFactory from taipy.core.job.job_id import JobId from taipy.core.job.status import Status from taipy.core.task._task_manager import _TaskManager -from taipy.core.task._task_manager_factory import _TaskManagerFactory from tests.core.utils import assert_true_after_time @@ -45,20 +43,11 @@ def lock_multiply(lock, nb1: float, nb2: float): return multiply(nb1 or 1, nb2 or 2) -def init_managers(): - _TaskManagerFactory._build_manager()._delete_all() - _DataManagerFactory._build_manager()._delete_all() - _JobManagerFactory._build_manager()._delete_all() - - def test_create_jobs(init_sql_repo): Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE) - init_managers() task = _create_task(multiply, name="get_job") - _OrchestratorFactory._build_dispatcher() - job_1 = _JobManager._create(task, [print], "submit_id", "secnario_id", True) assert _JobManager._get(job_1.id) == job_1 assert job_1.is_submitted() @@ -80,12 +69,9 @@ def test_create_jobs(init_sql_repo): def test_get_job(init_sql_repo): Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE) - init_managers() task = _create_task(multiply, name="get_job") - _OrchestratorFactory._build_dispatcher() - job_1 = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0] assert _JobManager._get(job_1.id) == job_1 assert _JobManager._get(job_1.id).submit_entity_id == task.id @@ -99,13 +85,10 @@ def test_get_job(init_sql_repo): def test_get_latest_job(init_sql_repo): Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE) - init_managers() task = _create_task(multiply, name="get_latest_job") task_2 = _create_task(multiply, name="get_latest_job_2") - _OrchestratorFactory._build_dispatcher() - job_1 = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0] assert _JobManager._get_latest(task) == job_1 assert _JobManager._get_latest(task_2) is None @@ -122,18 +105,14 @@ def test_get_latest_job(init_sql_repo): def test_get_job_unknown(init_sql_repo): - init_managers() assert _JobManager._get(JobId("Unknown")) is None def test_get_jobs(init_sql_repo): Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE) - init_managers() task = _create_task(multiply, name="get_all_jobs") - _OrchestratorFactory._build_dispatcher() - job_1 = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0] job_2 = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0] @@ -143,12 +122,8 @@ def test_get_jobs(init_sql_repo): def test_delete_job(init_sql_repo): Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE) - init_managers() - task = _create_task(multiply, name="delete_job") - _OrchestratorFactory._build_dispatcher() - job_1 = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0] job_2 = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0] @@ -160,7 +135,6 @@ def test_delete_job(init_sql_repo): def test_raise_when_trying_to_delete_unfinished_job(init_sql_repo): Config.configure_job_executions(mode=JobConfig._STANDALONE_MODE, max_nb_of_workers=2) - init_managers() m = multiprocessing.Manager() lock = m.Lock() @@ -189,7 +163,6 @@ def test_raise_when_trying_to_delete_unfinished_job(init_sql_repo): def test_force_deleting_unfinished_job(init_sql_repo): Config.configure_job_executions(mode=JobConfig._STANDALONE_MODE, max_nb_of_workers=2) - init_managers() m = multiprocessing.Manager() lock = m.Lock() @@ -217,8 +190,6 @@ def test_force_deleting_unfinished_job(init_sql_repo): def test_is_deletable(init_sql_repo): - init_managers() - assert len(_JobManager._get_all()) == 0 task = _create_task(print, 0, "task") job = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0] diff --git a/tests/core/scenario/test_scenario_manager.py b/tests/core/scenario/test_scenario_manager.py index 8080b7b763..d0ba49671e 100644 --- a/tests/core/scenario/test_scenario_manager.py +++ b/tests/core/scenario/test_scenario_manager.py @@ -20,11 +20,9 @@ from taipy.config.config import Config from taipy.core import Job from taipy.core._orchestrator._orchestrator import _Orchestrator -from taipy.core._orchestrator._orchestrator_factory import _OrchestratorFactory from taipy.core._version._version_manager import _VersionManager from taipy.core.common import _utils from taipy.core.common._utils import _Subscriber -from taipy.core.config.job_config import JobConfig from taipy.core.cycle._cycle_manager import _CycleManager from taipy.core.data._data_manager import _DataManager from taipy.core.data.in_memory import InMemoryDataNode @@ -52,9 +50,6 @@ def test_set_and_get_scenario(cycle): - Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE) - _OrchestratorFactory._build_dispatcher() - scenario_id_1 = ScenarioId("scenario_id_1") scenario_1 = Scenario("scenario_name_1", [], {}, [], scenario_id_1) @@ -278,16 +273,12 @@ def test_get_all_on_multiple_versions_environment(): def test_create_scenario_does_not_modify_config(): - Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE) - creation_date_1 = datetime.now() name_1 = "name_1" scenario_config = Config.configure_scenario("sc", None, None, Frequency.DAILY) assert scenario_config.properties.get("name") is None assert len(scenario_config.properties) == 0 - _OrchestratorFactory._build_dispatcher() - scenario = _ScenarioManager._create(scenario_config, creation_date=creation_date_1, name=name_1) assert len(scenario_config.properties) == 0 assert len(scenario.properties) == 1 @@ -307,8 +298,6 @@ def test_create_scenario_does_not_modify_config(): def test_create_and_delete_scenario(): - Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE) - creation_date_1 = datetime.now() creation_date_2 = creation_date_1 + timedelta(minutes=10) @@ -319,8 +308,6 @@ def test_create_and_delete_scenario(): scenario_config = Config.configure_scenario("sc", None, None, Frequency.DAILY) - _OrchestratorFactory._build_dispatcher() - scenario_1 = _ScenarioManager._create(scenario_config, creation_date=creation_date_1, name=name_1) assert scenario_1.config_id == "sc" assert scenario_1.sequences == {} @@ -531,8 +518,6 @@ def mult_by_4(nb: int): def test_scenario_manager_only_creates_data_node_once(): - Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE) - # dn_1 ---> mult_by_2 ---> dn_2 ---> mult_by_3 ---> dn_6 # dn_1 ---> mult_by_4 ---> dn_4 @@ -550,8 +535,6 @@ def test_scenario_manager_only_creates_data_node_once(): {"by_6": [task_mult_by_2_config, task_mult_by_3_config], "by_4": [task_mult_by_4_config]} ) - _OrchestratorFactory._build_dispatcher() - assert len(_DataManager._get_all()) == 0 assert len(_TaskManager._get_all()) == 0 assert len(_SequenceManager._get_all()) == 0 @@ -588,8 +571,6 @@ def test_scenario_manager_only_creates_data_node_once(): def test_notification_subscribe(mocker): - Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE) - mocker.patch("taipy.core._entity._reload._Reloader._reload", side_effect=lambda m, o: o) scenario_config = Config.configure_scenario( @@ -604,8 +585,6 @@ def test_notification_subscribe(mocker): ], ) - _OrchestratorFactory._build_dispatcher() - scenario = _ScenarioManager._create(scenario_config) notify_1 = NotifyMock(scenario) @@ -639,8 +618,6 @@ def assert_called_with(self, args): def test_notification_subscribe_multiple_params(mocker): - Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE) - mocker.patch("taipy.core._entity._reload._Reloader._reload", side_effect=lambda m, o: o) scenario_config = Config.configure_scenario( @@ -656,8 +633,6 @@ def test_notification_subscribe_multiple_params(mocker): ) notify = mocker.Mock() - _OrchestratorFactory._build_dispatcher() - scenario = _ScenarioManager._create(scenario_config) _ScenarioManager._subscribe(callback=notify, params=["foobar", 123, 1.2], scenario=scenario) mocker.patch.object(_ScenarioManager, "_get", return_value=scenario) @@ -680,8 +655,6 @@ def notify2(*args, **kwargs): def test_notification_unsubscribe(mocker): - Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE) - mocker.patch("taipy.core._entity._reload._Reloader._reload", side_effect=lambda m, o: o) scenario_config = Config.configure_scenario( @@ -696,8 +669,6 @@ def test_notification_unsubscribe(mocker): ], ) - _OrchestratorFactory._build_dispatcher() - scenario = _ScenarioManager._create(scenario_config) notify_1 = notify1 @@ -715,8 +686,6 @@ def test_notification_unsubscribe(mocker): def test_notification_unsubscribe_multi_param(): - Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE) - scenario_config = Config.configure_scenario( "awesome_scenario", [ @@ -729,8 +698,6 @@ def test_notification_unsubscribe_multi_param(): ], ) - _OrchestratorFactory._build_dispatcher() - scenario = _ScenarioManager._create(scenario_config) # test subscribing notification @@ -756,8 +723,6 @@ def test_notification_unsubscribe_multi_param(): def test_scenario_notification_subscribe_all(): - Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE) - scenario_config = Config.configure_scenario( "awesome_scenario", [ @@ -780,7 +745,7 @@ def test_scenario_notification_subscribe_all(): ) ], ) - _OrchestratorFactory._build_dispatcher() + scenario = _ScenarioManager._create(scenario_config) other_scenario = _ScenarioManager._create(other_scenario_config) notify_1 = NotifyMock(scenario) @@ -816,9 +781,6 @@ def test_is_promotable_to_primary_scenario(): def test_get_set_primary_scenario(): - Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE) - _OrchestratorFactory._build_dispatcher() - cycle_1 = _CycleManager._create(Frequency.DAILY, name="foo") scenario_1 = Scenario("sc_1", [], {}, ScenarioId("sc_1"), is_primary=False, cycle=cycle_1) @@ -852,16 +814,12 @@ def test_get_set_primary_scenario(): def test_hard_delete_one_single_scenario_with_scenario_data_nodes(): - Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE) - dn_input_config = Config.configure_data_node("my_input", "in_memory", scope=Scope.SCENARIO, default_data="testing") dn_output_config = Config.configure_data_node("my_output", "in_memory", scope=Scope.SCENARIO) task_config = Config.configure_task("task_config", print, dn_input_config, dn_output_config) scenario_config = Config.configure_scenario("scenario_config", [task_config]) scenario_config.add_sequences({"sequence_config": [task_config]}) - _OrchestratorFactory._build_dispatcher() - scenario = _ScenarioManager._create(scenario_config) _ScenarioManager._submit(scenario.id) @@ -879,16 +837,12 @@ def test_hard_delete_one_single_scenario_with_scenario_data_nodes(): def test_hard_delete_one_scenario_among_two_with_scenario_data_nodes(): - Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE) - dn_input_config = Config.configure_data_node("my_input", "in_memory", scope=Scope.SCENARIO, default_data="testing") dn_output_config = Config.configure_data_node("my_output", "in_memory", scope=Scope.SCENARIO) task_config = Config.configure_task("task_config", print, dn_input_config, dn_output_config) scenario_config = Config.configure_scenario("scenario_config", [task_config]) scenario_config.add_sequences({"sequence_config": [task_config]}) - _OrchestratorFactory._build_dispatcher() - scenario_1 = _ScenarioManager._create(scenario_config) scenario_2 = _ScenarioManager._create(scenario_config) _ScenarioManager._submit(scenario_1.id) @@ -909,16 +863,12 @@ def test_hard_delete_one_scenario_among_two_with_scenario_data_nodes(): def test_hard_delete_one_scenario_among_two_with_cycle_data_nodes(): - Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE) - dn_input_config = Config.configure_data_node("my_input", "in_memory", scope=Scope.CYCLE, default_data="testing") dn_output_config = Config.configure_data_node("my_output", "in_memory", scope=Scope.CYCLE) task_config = Config.configure_task("task_config", print, dn_input_config, dn_output_config) scenario_config = Config.configure_scenario("scenario_config", [task_config]) scenario_config.add_sequences({"sequence_config": [task_config]}) - _OrchestratorFactory._build_dispatcher() - scenario_1 = _ScenarioManager._create(scenario_config) scenario_2 = _ScenarioManager._create(scenario_config) _ScenarioManager._submit(scenario_1.id) @@ -939,8 +889,6 @@ def test_hard_delete_one_scenario_among_two_with_cycle_data_nodes(): def test_hard_delete_shared_entities(): - Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE) - dn_config_1 = Config.configure_data_node("my_input_1", "in_memory", scope=Scope.CYCLE, default_data="testing") dn_config_2 = Config.configure_data_node("my_input_2", "in_memory", scope=Scope.SCENARIO, default_data="testing") dn_config_3 = Config.configure_data_node("my_input_3", "in_memory", scope=Scope.GLOBAL, default_data="testing") @@ -963,8 +911,6 @@ def test_hard_delete_shared_entities(): } ) - _OrchestratorFactory._build_dispatcher() - scenario_1 = _ScenarioManager._create(scenario_config_1) scenario_2 = _ScenarioManager._create(scenario_config_1) scenario_1.submit() @@ -1008,9 +954,6 @@ def test_is_submittable(): def test_submit(): - Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE) - _OrchestratorFactory._build_dispatcher() - data_node_1 = InMemoryDataNode("foo", Scope.SCENARIO, "s1") data_node_2 = InMemoryDataNode("bar", Scope.SCENARIO, "s2") data_node_3 = InMemoryDataNode("baz", Scope.SCENARIO, "s3") @@ -1179,8 +1122,6 @@ def test_scenarios_comparison(): comparators={"bar": [subtraction], "foo": [subtraction, addition]}, ) - _OrchestratorFactory._build_dispatcher() - assert scenario_config.comparators is not None scenario_1 = _ScenarioManager._create(scenario_config) scenario_2 = _ScenarioManager._create(scenario_config) @@ -1213,9 +1154,6 @@ def test_scenarios_comparison(): def test_tags(): - Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE) - _OrchestratorFactory._build_dispatcher() - cycle_1 = _CycleManager._create(Frequency.DAILY, name="today", creation_date=datetime.now()) cycle_2 = _CycleManager._create( Frequency.DAILY, @@ -1353,13 +1291,9 @@ def test_tags(): def test_authorized_tags(): - Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE) - scenario = Scenario("scenario_1", [], {"authorized_tags": ["foo", "bar"]}, [], ScenarioId("scenario_1")) scenario_2_cfg = Config.configure_scenario("scenario_2", [], [], Frequency.DAILY, authorized_tags=["foo", "bar"]) - _OrchestratorFactory._build_dispatcher() - scenario_2 = _ScenarioManager._create(scenario_2_cfg) _ScenarioManager._set(scenario) diff --git a/tests/core/scenario/test_scenario_manager_with_sql_repo.py b/tests/core/scenario/test_scenario_manager_with_sql_repo.py index d96be2ac0e..3129241a9b 100644 --- a/tests/core/scenario/test_scenario_manager_with_sql_repo.py +++ b/tests/core/scenario/test_scenario_manager_with_sql_repo.py @@ -16,9 +16,7 @@ from taipy.config.common.frequency import Frequency from taipy.config.common.scope import Scope from taipy.config.config import Config -from taipy.core._orchestrator._orchestrator_factory import _OrchestratorFactory from taipy.core._version._version_manager import _VersionManager -from taipy.core.config.job_config import JobConfig from taipy.core.cycle._cycle_manager import _CycleManager from taipy.core.data._data_manager import _DataManager from taipy.core.data.in_memory import InMemoryDataNode @@ -32,12 +30,7 @@ from taipy.core.task.task_id import TaskId -def test_set_and_get_scenario(cycle, init_sql_repo, init_managers): - Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE) - - init_managers() - _OrchestratorFactory._build_dispatcher() - +def test_set_and_get_scenario(cycle, init_sql_repo): scenario_id_1 = ScenarioId("scenario_id_1") scenario_1 = Scenario("scenario_name_1", [], {}, [], scenario_id_1) @@ -200,9 +193,7 @@ def test_set_and_get_scenario(cycle, init_sql_repo, init_managers): assert _TaskManager._get(task_2.id).id == task_2.id -def test_get_all_on_multiple_versions_environment(init_sql_repo, init_managers): - init_managers() - +def test_get_all_on_multiple_versions_environment(init_sql_repo): # Create 5 scenarios with 2 versions each # Only version 1.0 has the scenario with config_id = "config_id_1" # Only version 2.0 has the scenario with config_id = "config_id_6" @@ -233,17 +224,11 @@ def test_get_all_on_multiple_versions_environment(init_sql_repo, init_managers): assert len(_ScenarioManager._get_all_by(filters=[{"version": "2.0", "config_id": "config_id_6"}])) == 1 -def test_create_scenario_does_not_modify_config(init_sql_repo, init_managers): - Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE) - - init_managers() - +def test_create_scenario_does_not_modify_config(init_sql_repo): creation_date_1 = datetime.now() name_1 = "name_1" scenario_config = Config.configure_scenario("sc", None, None, Frequency.DAILY) - _OrchestratorFactory._build_dispatcher() - assert scenario_config.properties.get("name") is None assert len(scenario_config.properties) == 0 @@ -265,11 +250,7 @@ def test_create_scenario_does_not_modify_config(init_sql_repo, init_managers): assert scenario_2.name is None -def test_create_and_delete_scenario(init_sql_repo, init_managers): - Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE) - - init_managers() - +def test_create_and_delete_scenario(init_sql_repo): creation_date_1 = datetime.now() creation_date_2 = creation_date_1 + timedelta(minutes=10) @@ -280,8 +261,6 @@ def test_create_and_delete_scenario(init_sql_repo, init_managers): scenario_config = Config.configure_scenario("sc", None, None, Frequency.DAILY) - _OrchestratorFactory._build_dispatcher() - scenario_1 = _ScenarioManager._create(scenario_config, creation_date=creation_date_1, name=name_1) assert scenario_1.config_id == "sc" assert scenario_1.sequences == {} @@ -350,11 +329,7 @@ def mult_by_4(nb: int): return nb * 4 -def test_scenario_manager_only_creates_data_node_once(init_sql_repo, init_managers): - Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE) - - init_managers() - +def test_scenario_manager_only_creates_data_node_once(init_sql_repo): # dn_1 ---> mult_by_2 ---> dn_2 ---> mult_by_3 ---> dn_6 # dn_1 ---> mult_by_4 ---> dn_4 @@ -372,8 +347,6 @@ def test_scenario_manager_only_creates_data_node_once(init_sql_repo, init_manage {"by_6": [task_mult_by_2_config, task_mult_by_3_config], "by_4": [task_mult_by_4_config]} ) - _OrchestratorFactory._build_dispatcher() - assert len(_DataManager._get_all()) == 0 assert len(_TaskManager._get_all()) == 0 assert len(_SequenceManager._get_all()) == 0 @@ -409,9 +382,7 @@ def test_scenario_manager_only_creates_data_node_once(init_sql_repo, init_manage assert len(_ScenarioManager._get_all()) == 2 -def test_get_scenarios_by_config_id(init_sql_repo, init_managers): - init_managers() - +def test_get_scenarios_by_config_id(init_sql_repo): scenario_config_1 = Config.configure_scenario("s1", sequence_configs=[]) scenario_config_2 = Config.configure_scenario("s2", sequence_configs=[]) scenario_config_3 = Config.configure_scenario("s3", sequence_configs=[]) @@ -441,9 +412,7 @@ def test_get_scenarios_by_config_id(init_sql_repo, init_managers): assert sorted([s_3_1.id]) == sorted([scenario.id for scenario in s3_scenarios]) -def test_get_scenarios_by_config_id_in_multiple_versions_environment(init_sql_repo, init_managers): - init_managers() - +def test_get_scenarios_by_config_id_in_multiple_versions_environment(init_sql_repo): scenario_config_1 = Config.configure_scenario("s1", sequence_configs=[]) scenario_config_2 = Config.configure_scenario("s2", sequence_configs=[]) diff --git a/tests/core/sequence/test_sequence_manager.py b/tests/core/sequence/test_sequence_manager.py index c2e1bddcf2..807628f97f 100644 --- a/tests/core/sequence/test_sequence_manager.py +++ b/tests/core/sequence/test_sequence_manager.py @@ -20,11 +20,9 @@ from taipy.config.common.scope import Scope from taipy.config.config import Config from taipy.core._orchestrator._orchestrator import _Orchestrator -from taipy.core._orchestrator._orchestrator_factory import _OrchestratorFactory from taipy.core._version._version_manager import _VersionManager from taipy.core.common import _utils from taipy.core.common._utils import _Subscriber -from taipy.core.config.job_config import JobConfig from taipy.core.data._data_manager import _DataManager from taipy.core.data.in_memory import InMemoryDataNode from taipy.core.exceptions.exceptions import ( @@ -73,8 +71,6 @@ def test_raise_sequence_does_not_belong_to_scenario(): def __init(): - Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE) - _OrchestratorFactory._build_dispatcher() input_dn = InMemoryDataNode("foo", Scope.SCENARIO) output_dn = InMemoryDataNode("foo", Scope.SCENARIO) task = Task("task", {}, print, [input_dn], [output_dn], TaskId("task_id")) @@ -217,9 +213,6 @@ def test_is_submittable(): def test_submit(): - Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE) - _OrchestratorFactory._build_dispatcher() - data_node_1 = InMemoryDataNode("foo", Scope.SCENARIO, "s1") data_node_2 = InMemoryDataNode("bar", Scope.SCENARIO, "s2") data_node_3 = InMemoryDataNode("baz", Scope.SCENARIO, "s3") @@ -328,9 +321,6 @@ def mock_function_no_input_one_output(): def test_submit_sequence_from_tasks_with_one_or_no_input_output(): - Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE) - _OrchestratorFactory._build_dispatcher() - # test no input and no output Task task_no_input_no_output = Task("task_no_input_no_output", {}, mock_function_no_input_no_output) scenario_1 = Scenario("scenario_1", {task_no_input_no_output}, {}) @@ -397,8 +387,6 @@ def mult_by_3(nb: int): def test_get_or_create_data(): # only create intermediate data node once - Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE) - dn_config_1 = Config.configure_data_node("foo", "in_memory", Scope.SCENARIO, default_data=1) dn_config_2 = Config.configure_data_node("bar", "in_memory", Scope.SCENARIO, default_data=0) dn_config_6 = Config.configure_data_node("baz", "in_memory", Scope.SCENARIO, default_data=0) @@ -408,8 +396,6 @@ def test_get_or_create_data(): # dn_1 ---> mult_by_two ---> dn_2 ---> mult_by_3 ---> dn_6 scenario_config = Config.configure_scenario("scenario", [task_config_mult_by_two, task_config_mult_by_3]) - _OrchestratorFactory._build_dispatcher() - assert len(_DataManager._get_all()) == 0 assert len(_TaskManager._get_all()) == 0 @@ -460,8 +446,6 @@ def notify_multi_param(*args, **kwargs): def test_sequence_notification_subscribe(mocker): - Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE) - mocker.patch("taipy.core._entity._reload._Reloader._reload", side_effect=lambda m, o: o) task_configs = [ @@ -473,8 +457,6 @@ def test_sequence_notification_subscribe(mocker): ) ] - _OrchestratorFactory._build_dispatcher() - tasks = _TaskManager._bulk_get_or_create(task_configs=task_configs) scenario = Scenario("scenario", set(tasks), {}, sequences={"by_1": {"tasks": tasks}}) _ScenarioManager._set(scenario) @@ -514,8 +496,6 @@ def test_sequence_notification_subscribe(mocker): def test_sequence_notification_subscribe_multi_param(mocker): - Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE) - mocker.patch("taipy.core._entity._reload._Reloader._reload", side_effect=lambda m, o: o) task_configs = [ @@ -527,8 +507,6 @@ def test_sequence_notification_subscribe_multi_param(mocker): ) ] - _OrchestratorFactory._build_dispatcher() - tasks = _TaskManager._bulk_get_or_create(task_configs) scenario = Scenario("scenario", set(tasks), {}, sequences={"by_6": {"tasks": tasks}}) _ScenarioManager._set(scenario) @@ -549,8 +527,6 @@ def test_sequence_notification_subscribe_multi_param(mocker): def test_sequence_notification_unsubscribe(mocker): - Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE) - mocker.patch("taipy.core._entity._reload._Reloader._reload", side_effect=lambda m, o: o) task_configs = [ @@ -562,8 +538,6 @@ def test_sequence_notification_unsubscribe(mocker): ) ] - _OrchestratorFactory._build_dispatcher() - tasks = _TaskManager._bulk_get_or_create(task_configs) scenario = Scenario("scenario", set(tasks), {}, sequences={"by_6": {"tasks": tasks}}) _ScenarioManager._set(scenario) @@ -584,8 +558,6 @@ def test_sequence_notification_unsubscribe(mocker): def test_sequence_notification_unsubscribe_multi_param(): - Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE) - task_configs = [ Config.configure_task( "mult_by_two", @@ -595,8 +567,6 @@ def test_sequence_notification_unsubscribe_multi_param(): ) ] - _OrchestratorFactory._build_dispatcher() - tasks = _TaskManager._bulk_get_or_create(task_configs) scenario = Scenario("scenario", tasks, {}, sequences={"by_6": {"tasks": tasks}}) _ScenarioManager._set(scenario) @@ -622,8 +592,6 @@ def test_sequence_notification_unsubscribe_multi_param(): def test_sequence_notification_subscribe_all(): - Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE) - task_configs = [ Config.configure_task( "mult_by_two", @@ -633,8 +601,6 @@ def test_sequence_notification_subscribe_all(): ) ] - _OrchestratorFactory._build_dispatcher() - tasks = _TaskManager._bulk_get_or_create(task_configs) scenario = Scenario("scenario", tasks, {}, sequences={"by_6": {"tasks": tasks}, "other_sequence": {"tasks": tasks}}) _ScenarioManager._set(scenario) @@ -802,14 +768,10 @@ def test_export(tmpdir_factory): def test_hard_delete_one_single_sequence_with_scenario_data_nodes(): - Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE) - dn_input_config = Config.configure_data_node("my_input", "in_memory", scope=Scope.SCENARIO, default_data="testing") dn_output_config = Config.configure_data_node("my_output", "in_memory", scope=Scope.SCENARIO) task_config = Config.configure_task("task_config", print, dn_input_config, dn_output_config) - _OrchestratorFactory._build_dispatcher() - tasks = _TaskManager._bulk_get_or_create([task_config]) scenario = Scenario("scenario", tasks, {}, sequences={"sequence": {"tasks": tasks}}) _ScenarioManager._set(scenario) @@ -831,14 +793,10 @@ def test_hard_delete_one_single_sequence_with_scenario_data_nodes(): def test_hard_delete_one_single_sequence_with_cycle_data_nodes(): - Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE) - dn_input_config = Config.configure_data_node("my_input", "in_memory", scope=Scope.CYCLE, default_data="testing") dn_output_config = Config.configure_data_node("my_output", "in_memory", scope=Scope.CYCLE) task_config = Config.configure_task("task_config", print, dn_input_config, dn_output_config) - _OrchestratorFactory._build_dispatcher() - tasks = _TaskManager._bulk_get_or_create([task_config]) scenario = Scenario("scenario", tasks, {}, sequences={"sequence": {"tasks": tasks}}) _ScenarioManager._set(scenario) @@ -860,16 +818,12 @@ def test_hard_delete_one_single_sequence_with_cycle_data_nodes(): def test_hard_delete_shared_entities(): - Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE) - input_dn = Config.configure_data_node("my_input", "in_memory", scope=Scope.SCENARIO, default_data="testing") intermediate_dn = Config.configure_data_node("my_inter", "in_memory", scope=Scope.GLOBAL, default_data="testing") output_dn = Config.configure_data_node("my_output", "in_memory", scope=Scope.GLOBAL, default_data="testing") task_1 = Config.configure_task("task_1", print, input_dn, intermediate_dn) task_2 = Config.configure_task("task_2", print, intermediate_dn, output_dn) - _OrchestratorFactory._build_dispatcher() - tasks_scenario_1 = _TaskManager._bulk_get_or_create([task_1, task_2], scenario_id="scenario_id_1") tasks_scenario_2 = _TaskManager._bulk_get_or_create([task_1, task_2], scenario_id="scenario_id_2") diff --git a/tests/core/sequence/test_sequence_manager_with_sql_repo.py b/tests/core/sequence/test_sequence_manager_with_sql_repo.py index 138d25552e..c9e7aab24e 100644 --- a/tests/core/sequence/test_sequence_manager_with_sql_repo.py +++ b/tests/core/sequence/test_sequence_manager_with_sql_repo.py @@ -13,9 +13,7 @@ from taipy.config.common.scope import Scope from taipy.config.config import Config -from taipy.core._orchestrator._orchestrator_factory import _OrchestratorFactory from taipy.core._version._version_manager import _VersionManager -from taipy.core.config.job_config import JobConfig from taipy.core.data._data_manager import _DataManager from taipy.core.data.in_memory import InMemoryDataNode from taipy.core.exceptions import SequenceAlreadyExists @@ -29,12 +27,7 @@ from taipy.core.task.task_id import TaskId -def test_set_and_get_sequence(init_sql_repo, init_managers): - Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE) - - init_managers() - _OrchestratorFactory._build_dispatcher() - +def test_set_and_get_sequence(init_sql_repo): input_dn = InMemoryDataNode("foo", Scope.SCENARIO) output_dn = InMemoryDataNode("foo", Scope.SCENARIO) task = Task("task", {}, print, [input_dn], [output_dn], TaskId("task_id")) @@ -89,9 +82,7 @@ def test_set_and_get_sequence(init_sql_repo, init_managers): assert len(_SequenceManager._get(sequence_2).tasks) == 1 -def test_get_all_on_multiple_versions_environment(init_sql_repo, init_managers): - init_managers() - +def test_get_all_on_multiple_versions_environment(init_sql_repo): # Create 5 sequences from Scenario with 2 versions each for version in range(1, 3): for i in range(5): @@ -152,12 +143,8 @@ def mult_by_3(nb: int): return nb * 3 -def test_get_or_create_data(init_sql_repo, init_managers): +def test_get_or_create_data(init_sql_repo): # only create intermediate data node once - Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE) - - init_managers() - dn_config_1 = Config.configure_data_node("foo", "in_memory", Scope.SCENARIO, default_data=1) dn_config_2 = Config.configure_data_node("bar", "in_memory", Scope.SCENARIO, default_data=0) dn_config_6 = Config.configure_data_node("baz", "in_memory", Scope.SCENARIO, default_data=0) @@ -167,8 +154,6 @@ def test_get_or_create_data(init_sql_repo, init_managers): # dn_1 ---> mult_by_two ---> dn_2 ---> mult_by_3 ---> dn_6 scenario_config = Config.configure_scenario("scenario", [task_config_mult_by_two, task_config_mult_by_3]) - _OrchestratorFactory._build_dispatcher() - assert len(_DataManager._get_all()) == 0 assert len(_TaskManager._get_all()) == 0 @@ -206,17 +191,11 @@ def test_get_or_create_data(init_sql_repo, init_managers): sequence.WRONG.write(7) -def test_hard_delete_one_single_sequence_with_scenario_data_nodes(init_sql_repo, init_managers): - Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE) - - init_managers() - +def test_hard_delete_one_single_sequence_with_scenario_data_nodes(init_sql_repo): dn_input_config = Config.configure_data_node("my_input", "in_memory", scope=Scope.SCENARIO, default_data="testing") dn_output_config = Config.configure_data_node("my_output", "in_memory", scope=Scope.SCENARIO) task_config = Config.configure_task("task_config", print, dn_input_config, dn_output_config) - _OrchestratorFactory._build_dispatcher() - tasks = _TaskManager._bulk_get_or_create([task_config]) scenario = Scenario("scenario", set(tasks), {}, sequences={"sequence": {"tasks": tasks}}) _ScenarioManager._set(scenario) @@ -237,17 +216,11 @@ def test_hard_delete_one_single_sequence_with_scenario_data_nodes(init_sql_repo, assert len(_JobManager._get_all()) == 1 -def test_hard_delete_one_single_sequence_with_cycle_data_nodes(init_sql_repo, init_managers): - Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE) - - init_managers() - +def test_hard_delete_one_single_sequence_with_cycle_data_nodes(init_sql_repo): dn_input_config = Config.configure_data_node("my_input", "in_memory", scope=Scope.CYCLE, default_data="testing") dn_output_config = Config.configure_data_node("my_output", "in_memory", scope=Scope.CYCLE) task_config = Config.configure_task("task_config", print, dn_input_config, dn_output_config) - _OrchestratorFactory._build_dispatcher() - tasks = _TaskManager._bulk_get_or_create([task_config]) scenario = Scenario("scenario", tasks, {}, sequences={"sequence": {"tasks": tasks}}) _ScenarioManager._set(scenario) @@ -268,19 +241,13 @@ def test_hard_delete_one_single_sequence_with_cycle_data_nodes(init_sql_repo, in assert len(_JobManager._get_all()) == 1 -def test_hard_delete_shared_entities(init_sql_repo, init_managers): - Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE) - - init_managers() - +def test_hard_delete_shared_entities(init_sql_repo): input_dn = Config.configure_data_node("my_input", "in_memory", scope=Scope.SCENARIO, default_data="testing") intermediate_dn = Config.configure_data_node("my_inter", "in_memory", scope=Scope.GLOBAL, default_data="testing") output_dn = Config.configure_data_node("my_output", "in_memory", scope=Scope.GLOBAL, default_data="testing") task_1 = Config.configure_task("task_1", print, input_dn, intermediate_dn) task_2 = Config.configure_task("task_2", print, intermediate_dn, output_dn) - _OrchestratorFactory._build_dispatcher() - tasks_scenario_1 = _TaskManager._bulk_get_or_create([task_1, task_2], scenario_id="scenario_id_1") tasks_scenario_2 = _TaskManager._bulk_get_or_create([task_1, task_2], scenario_id="scenario_id_2") diff --git a/tests/core/submission/test_submission_manager_with_sql_repo.py b/tests/core/submission/test_submission_manager_with_sql_repo.py index 1fd27501b0..17a52a8acc 100644 --- a/tests/core/submission/test_submission_manager_with_sql_repo.py +++ b/tests/core/submission/test_submission_manager_with_sql_repo.py @@ -22,14 +22,7 @@ from taipy.core.submission.submission_status import SubmissionStatus -def init_managers(): - _VersionManagerFactory._build_manager()._delete_all() - _SubmissionManagerFactory._build_manager()._delete_all() - - def test_create_submission(scenario, init_sql_repo): - init_managers() - submission_1 = _SubmissionManagerFactory._build_manager()._create( scenario.id, scenario._ID_PREFIX, scenario.config_id, debug=True, log="log_file", retry_note=5 ) @@ -44,8 +37,6 @@ def test_create_submission(scenario, init_sql_repo): def test_get_submission(init_sql_repo): - init_managers() - submission_manager = _SubmissionManagerFactory._build_manager() submission_1 = submission_manager._create( @@ -63,8 +54,6 @@ def test_get_submission(init_sql_repo): def test_get_all_submission(init_sql_repo): - init_managers() - submission_manager = _SubmissionManagerFactory._build_manager() version_manager = _VersionManagerFactory._build_manager() @@ -86,8 +75,6 @@ def test_get_all_submission(init_sql_repo): def test_get_latest_submission(init_sql_repo): - init_managers() - task_1 = Task("task_config_1", {}, print, id="task_id_1") task_2 = Task("task_config_2", {}, print, id="task_id_2") @@ -113,8 +100,6 @@ def test_get_latest_submission(init_sql_repo): def test_delete_submission(init_sql_repo): - init_managers() - submission_manager = _SubmissionManagerFactory._build_manager() submission = Submission("entity_id", "submission_id", "entity_config_id") @@ -140,8 +125,6 @@ def test_delete_submission(init_sql_repo): def test_is_deletable(init_sql_repo): - init_managers() - submission_manager = _SubmissionManagerFactory._build_manager() submission = Submission("entity_id", "submission_id", "entity_config_id") diff --git a/tests/core/task/test_task_manager_with_sql_repo.py b/tests/core/task/test_task_manager_with_sql_repo.py index 969b674ca2..1d295eb2f7 100644 --- a/tests/core/task/test_task_manager_with_sql_repo.py +++ b/tests/core/task/test_task_manager_with_sql_repo.py @@ -19,25 +19,14 @@ from taipy.core._orchestrator._orchestrator import _Orchestrator from taipy.core._version._version_manager import _VersionManager from taipy.core.data._data_manager import _DataManager -from taipy.core.data._data_manager_factory import _DataManagerFactory from taipy.core.data.in_memory import InMemoryDataNode from taipy.core.exceptions.exceptions import ModelNotFound, NonExistingTask -from taipy.core.job._job_manager_factory import _JobManagerFactory from taipy.core.task._task_manager import _TaskManager -from taipy.core.task._task_manager_factory import _TaskManagerFactory from taipy.core.task.task import Task from taipy.core.task.task_id import TaskId -def init_managers(): - _JobManagerFactory._build_manager()._delete_all() - _TaskManagerFactory._build_manager()._delete_all() - _DataManagerFactory._build_manager()._delete_all() - - def test_create_and_save(init_sql_repo): - init_managers() - input_configs = [Config.configure_data_node("my_input", "in_memory")] output_configs = Config.configure_data_node("my_output", "in_memory") task_config = Config.configure_task("foo", print, input_configs, output_configs) @@ -66,8 +55,6 @@ def test_create_and_save(init_sql_repo): def test_do_not_recreate_existing_data_node(init_sql_repo): - init_managers() - input_config = Config.configure_data_node("my_input", "in_memory", scope=Scope.SCENARIO) output_config = Config.configure_data_node("my_output", "in_memory", scope=Scope.SCENARIO) @@ -80,7 +67,6 @@ def test_do_not_recreate_existing_data_node(init_sql_repo): def test_do_not_recreate_existing_task(init_sql_repo): - init_managers() assert len(_TaskManager._get_all()) == 0 input_config_scope_scenario = Config.configure_data_node("my_input_1", "in_memory", Scope.SCENARIO) @@ -166,8 +152,6 @@ def test_do_not_recreate_existing_task(init_sql_repo): def test_set_and_get_task(init_sql_repo): - init_managers() - task_id_1 = TaskId("id1") first_task = Task("name_1", {}, print, [], [], task_id_1) task_id_2 = TaskId("id2") @@ -218,9 +202,6 @@ def test_set_and_get_task(init_sql_repo): def test_get_all_on_multiple_versions_environment(init_sql_repo): - Config.configure_global_app(repository_type="sql") - init_managers() - # Create 5 tasks with 2 versions each # Only version 1.0 has the task with config_id = "config_id_1" # Only version 2.0 has the task with config_id = "config_id_6" @@ -254,8 +235,6 @@ def test_get_all_on_multiple_versions_environment(init_sql_repo): def test_ensure_conservation_of_order_of_data_nodes_on_task_creation(init_sql_repo): - init_managers() - embedded_1 = Config.configure_data_node("dn_1", "in_memory", scope=Scope.SCENARIO) embedded_2 = Config.configure_data_node("dn_2", "in_memory", scope=Scope.SCENARIO) embedded_3 = Config.configure_data_node("a_dn_3", "in_memory", scope=Scope.SCENARIO) @@ -278,8 +257,6 @@ def test_ensure_conservation_of_order_of_data_nodes_on_task_creation(init_sql_re def test_delete_raise_exception(init_sql_repo): - init_managers() - dn_input_config_1 = Config.configure_data_node( "my_input_1", "in_memory", scope=Scope.SCENARIO, default_data="testing" ) @@ -293,8 +270,6 @@ def test_delete_raise_exception(init_sql_repo): def test_hard_delete(init_sql_repo): - init_managers() - dn_input_config_1 = Config.configure_data_node( "my_input_1", "in_memory", scope=Scope.SCENARIO, default_data="testing" ) @@ -354,8 +329,6 @@ def submit_task(self, task, callbacks=None, force=False, wait=False, timeout=Non def test_get_tasks_by_config_id(init_sql_repo): - init_managers() - dn_config = Config.configure_data_node("dn", scope=Scope.SCENARIO) task_config_1 = Config.configure_task("t1", print, dn_config) task_config_2 = Config.configure_task("t2", print, dn_config) @@ -387,8 +360,6 @@ def test_get_tasks_by_config_id(init_sql_repo): def test_get_scenarios_by_config_id_in_multiple_versions_environment(init_sql_repo): - init_managers() - dn_config = Config.configure_data_node("dn", scope=Scope.SCENARIO) task_config_1 = Config.configure_task("t1", print, dn_config) task_config_2 = Config.configure_task("t2", print, dn_config) diff --git a/tests/core/test_complex_application.py b/tests/core/test_complex_application.py index 749e38aa08..12b5b70081 100644 --- a/tests/core/test_complex_application.py +++ b/tests/core/test_complex_application.py @@ -19,8 +19,6 @@ import taipy.core.taipy as tp from taipy.config import Config from taipy.core import Core, Status -from taipy.core._orchestrator._orchestrator_factory import _OrchestratorFactory -from taipy.core.config.job_config import JobConfig # ################################ USER FUNCTIONS ################################## @@ -71,8 +69,6 @@ def return_a_number_with_sleep(): def test_skipped_jobs(): - Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE) - _OrchestratorFactory._build_orchestrator() input_config = Config.configure_data_node("input_dn") intermediate_config = Config.configure_data_node("intermediate") output_config = Config.configure_data_node("output_dn") @@ -116,9 +112,6 @@ def test_complex(): # | | # t4 d4 - Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE) - _OrchestratorFactory._build_orchestrator() - csv_path_inp = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.csv") excel_path_inp = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.xlsx") diff --git a/tests/core/test_core_cli_with_sql_repo.py b/tests/core/test_core_cli_with_sql_repo.py index b5906dc5d3..485601b92f 100644 --- a/tests/core/test_core_cli_with_sql_repo.py +++ b/tests/core/test_core_cli_with_sql_repo.py @@ -98,9 +98,8 @@ def test_core_cli_production_mode(init_sql_repo): core.stop() -def test_dev_mode_clean_all_entities_of_the_latest_version(init_sql_repo, init_managers): +def test_dev_mode_clean_all_entities_of_the_latest_version(init_sql_repo): scenario_config = config_scenario() - init_managers() # Create a scenario in development mode with patch("sys.argv", ["prog"]): @@ -302,9 +301,8 @@ def test_version_number_when_switching_mode(init_sql_repo): core.stop() -def test_production_mode_load_all_entities_from_previous_production_version(init_sql_repo, init_managers): +def test_production_mode_load_all_entities_from_previous_production_version(init_sql_repo): scenario_config = config_scenario() - init_managers() with patch("sys.argv", ["prog", "--development"]): core = Core() @@ -353,9 +351,8 @@ def test_production_mode_load_all_entities_from_previous_production_version(init core.stop() -def test_force_override_experiment_version(init_sql_repo, init_managers): +def test_force_override_experiment_version(init_sql_repo): scenario_config = config_scenario() - init_managers() with patch("sys.argv", ["prog", "--experiment", "1.0"]): core = Core() @@ -406,9 +403,8 @@ def test_force_override_experiment_version(init_sql_repo, init_managers): assert len(_JobManager._get_all()) == 2 -def test_force_override_production_version(init_sql_repo, init_managers): +def test_force_override_production_version(init_sql_repo): scenario_config = config_scenario() - init_managers() with patch("sys.argv", ["prog", "--production", "1.0"]): core = Core() @@ -461,9 +457,8 @@ def test_force_override_production_version(init_sql_repo, init_managers): core.stop() -def test_modify_config_properties_without_force(caplog, init_sql_repo, init_config, init_managers): +def test_modify_config_properties_without_force(caplog, init_sql_repo, init_config): scenario_config = config_scenario() - init_managers() with patch("sys.argv", ["prog", "--experiment", "1.0"]): core = Core() @@ -504,9 +499,8 @@ def test_modify_config_properties_without_force(caplog, init_sql_repo, init_conf assert 'DATA_NODE "d2" has attribute "exposed_type" modified' in error_message -def test_modify_job_configuration_dont_stop_application(caplog, init_sql_repo, init_config, init_managers): +def test_modify_job_configuration_dont_stop_application(caplog, init_sql_repo, init_config): scenario_config = config_scenario() - init_managers() with patch("sys.argv", ["prog", "--experiment", "1.0"]): Config.configure_job_executions(mode="development") diff --git a/tests/core/utils/__init__.py b/tests/core/utils/__init__.py index 89e4b49fb4..44b5095646 100644 --- a/tests/core/utils/__init__.py +++ b/tests/core/utils/__init__.py @@ -9,11 +9,11 @@ # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the # specific language governing permissions and limitations under the License. +from datetime import datetime +from time import sleep -def assert_true_after_time(assertion, msg=None, time=120): - from datetime import datetime - from time import sleep +def assert_true_after_time(assertion, msg=None, time=120): loops = 0 start = datetime.now() while (datetime.now() - start).seconds < time: