diff --git a/setup.py b/setup.py index 0ae0aff48..917100324 100644 --- a/setup.py +++ b/setup.py @@ -58,7 +58,7 @@ test_suite="tests", tests_require=test_requirements, url="https://github.com/avaiga/taipy-core", - version="2.0.1", + version="2.0.2", zip_safe=False, extras_require=extras_require, ) diff --git a/src/taipy/core/common/_utils.py b/src/taipy/core/common/_utils.py index 622577d04..d772ba43d 100644 --- a/src/taipy/core/common/_utils.py +++ b/src/taipy/core/common/_utils.py @@ -34,7 +34,7 @@ def _fct_to_dict(obj): params = [] callback = obj - if isinstance(obj, Subscriber): + if isinstance(obj, _Subscriber): callback = obj.callback params = obj.params @@ -52,4 +52,4 @@ def _fcts_to_dict(objs): return [d for obj in objs if (d := _fct_to_dict(obj)) is not None] -Subscriber = namedtuple("Subscriber", "callback params") +_Subscriber = namedtuple("_Subscriber", "callback params") diff --git a/src/taipy/core/config/__init__.py b/src/taipy/core/config/__init__.py index aa3f3e01e..5a06bec1c 100644 --- a/src/taipy/core/config/__init__.py +++ b/src/taipy/core/config/__init__.py @@ -11,6 +11,9 @@ from taipy.config.checker._checker import _Checker from taipy.config.config import Config +from taipy.config.common.frequency import Frequency +from taipy.config.common.scope import Scope +from taipy.config.global_app.global_app_config import GlobalAppConfig from .checkers._data_node_config_checker import _DataNodeConfigChecker from .checkers._job_config_checker import _JobConfigChecker diff --git a/src/taipy/core/config/pipeline_config.py b/src/taipy/core/config/pipeline_config.py index 3c8b7267a..8ef1e5ab5 100644 --- a/src/taipy/core/config/pipeline_config.py +++ b/src/taipy/core/config/pipeline_config.py @@ -65,7 +65,7 @@ def _to_dict(self): @classmethod def _from_dict(cls, as_dict: Dict[str, Any], id: str, config: Optional[_Config]): as_dict.pop(cls._ID_KEY, id) - t_configs = config._sections[TaskConfig.name] + t_configs = config._sections[TaskConfig.name] # type: ignore tasks = [] if tasks_ids := as_dict.pop(cls._TASK_KEY, None): tasks = [t_configs[task_id] for task_id in tasks_ids if task_id in t_configs] diff --git a/src/taipy/core/config/scenario_config.py b/src/taipy/core/config/scenario_config.py index af7eb8184..b3f87e5b4 100644 --- a/src/taipy/core/config/scenario_config.py +++ b/src/taipy/core/config/scenario_config.py @@ -91,9 +91,9 @@ def _to_dict(self): } @classmethod - def _from_dict(cls, as_dict: Dict[str, Any], id: str, config: Optional[_Config]): + def _from_dict(cls, as_dict: Dict[str, Any], id: str, config: Optional[_Config]): # type: ignore as_dict.pop(cls._ID_KEY, id) - p_configs = config._sections[PipelineConfig.name] + p_configs = config._sections[PipelineConfig.name] # type: ignore pipelines = [] if pipeline_ids := as_dict.pop(cls._PIPELINE_KEY, None): pipelines = [p_configs[p_id] for p_id in pipeline_ids if p_id in p_configs] diff --git a/src/taipy/core/config/task_config.py b/src/taipy/core/config/task_config.py index 06af57503..d22782539 100644 --- a/src/taipy/core/config/task_config.py +++ b/src/taipy/core/config/task_config.py @@ -27,11 +27,9 @@ class TaskConfig(Section): Attributes: id (str): Identifier of the task config. Must be a valid Python variable name. inputs (Union[DataNodeConfig, List[DataNodeConfig]]): The optional list of `DataNodeConfig^` inputs. The - default - value is []. + default value is []. outputs (Union[DataNodeConfig, List[DataNodeConfig]]): The optional list of `DataNodeConfig^` outputs. The - default - value is []. + default value is []. function (Callable): User function taking as inputs some parameters compatible with the exposed types (exposed_type field) of the input data nodes and returning results compatible with the exposed types (exposed_type field) of the outputs list. The default value is None. @@ -101,7 +99,7 @@ def _to_dict(self): def _from_dict(cls, as_dict: Dict[str, Any], id: str, config: Optional[_Config]): as_dict.pop(cls._ID_KEY, id) funct = as_dict.pop(cls._FUNCTION, None) - dn_configs = config._sections[DataNodeConfig.name] + dn_configs = config._sections[DataNodeConfig.name] # type: ignore inputs = [] if inputs_as_str := as_dict.pop(cls._INPUT_KEY, None): inputs = [dn_configs[dn_id] for dn_id in inputs_as_str if dn_id in dn_configs] diff --git a/src/taipy/core/data/__init__.py b/src/taipy/core/data/__init__.py index 16819c78b..00f4a4757 100644 --- a/src/taipy/core/data/__init__.py +++ b/src/taipy/core/data/__init__.py @@ -16,6 +16,7 @@ from .generic import GenericDataNode from .in_memory import InMemoryDataNode from .json import JSONDataNode +from .operator import JoinOperator, Operator from .pickle import PickleDataNode from .sql import SQLDataNode from .sql_table import SQLTableDataNode diff --git a/src/taipy/core/data/abstract_sql.py b/src/taipy/core/data/abstract_sql.py index e4c098d88..59000e486 100644 --- a/src/taipy/core/data/abstract_sql.py +++ b/src/taipy/core/data/abstract_sql.py @@ -18,18 +18,17 @@ import pandas as pd from sqlalchemy import create_engine, text - from taipy.config.common.scope import Scope +from .data_node import DataNode from ..common.alias import DataNodeId, JobId from ..exceptions.exceptions import InvalidExposedType, MissingRequiredProperty, UnknownDatabaseEngine -from .data_node import DataNode class AbstractSQLDataNode(DataNode): """Abstract base class for data node implementations (SQLDataNode and SQLTableDataNode) that use SQL.""" - __STORAGE_TYPE = None + __STORAGE_TYPE = "NOT_IMPLEMENTED" __EXPOSED_TYPE_NUMPY = "numpy" __EXPOSED_TYPE_PANDAS = "pandas" __VALID_STRING_EXPOSED_TYPES = [__EXPOSED_TYPE_PANDAS, __EXPOSED_TYPE_NUMPY] diff --git a/src/taipy/core/data/json.py b/src/taipy/core/data/json.py index 7932299e4..31635c432 100644 --- a/src/taipy/core/data/json.py +++ b/src/taipy/core/data/json.py @@ -96,8 +96,8 @@ def __init__( edit_in_progress, **properties, ) - self._decoder = self._properties.get(self._DECODER_KEY, DefaultJSONDecoder) - self._encoder = self._properties.get(self._ENCODER_KEY, DefaultJSONEncoder) + self._decoder = self._properties.get(self._DECODER_KEY, _DefaultJSONDecoder) + self._encoder = self._properties.get(self._ENCODER_KEY, _DefaultJSONEncoder) if not self._last_edit_date and isfile(self._path): # type: ignore self.unlock_edit() @@ -143,7 +143,7 @@ def _write(self, data: Any): json.dump(data, f, indent=4, cls=self._encoder) -class DefaultJSONEncoder(json.JSONEncoder): +class _DefaultJSONEncoder(json.JSONEncoder): def default(self, o): if isinstance(o, Enum): return o.value @@ -154,5 +154,5 @@ def default(self, o): return super().default(o) -class DefaultJSONDecoder(json.JSONDecoder): +class _DefaultJSONDecoder(json.JSONDecoder): pass diff --git a/src/taipy/core/exceptions/__init__.py b/src/taipy/core/exceptions/__init__.py index e0c595bfb..ca1da7959 100644 --- a/src/taipy/core/exceptions/__init__.py +++ b/src/taipy/core/exceptions/__init__.py @@ -8,3 +8,5 @@ # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the # specific language governing permissions and limitations under the License. + +from .exceptions import * diff --git a/src/taipy/core/job/status.py b/src/taipy/core/job/status.py index 4148fb465..c10afbda6 100644 --- a/src/taipy/core/job/status.py +++ b/src/taipy/core/job/status.py @@ -37,6 +37,9 @@ class Status(_ReprEnum): - `COMPLETED`: A `COMPLETED` job has successfully been executed. - `SKIPPED`: A `SKIPPED` job has not been executed because its outputs were already computed. + + - `ABANDONED`: An `ABANDONED` job has not been executed because it depends on a job that could not complete ( + cancelled, failed, or abandoned). """ SUBMITTED = 1 diff --git a/src/taipy/core/pipeline/_pipeline_manager.py b/src/taipy/core/pipeline/_pipeline_manager.py index 4e066c2ae..eaac669cb 100644 --- a/src/taipy/core/pipeline/_pipeline_manager.py +++ b/src/taipy/core/pipeline/_pipeline_manager.py @@ -14,6 +14,8 @@ from taipy.config.common.scope import Scope +from ._pipeline_repository_factory import _PipelineRepositoryFactory +from .pipeline import Pipeline from .._manager._manager import _Manager from ..common._entity_ids import _EntityIds from ..common.alias import PipelineId, ScenarioId @@ -22,8 +24,6 @@ from ..job._job_manager_factory import _JobManagerFactory from ..job.job import Job from ..task._task_manager_factory import _TaskManagerFactory -from ._pipeline_repository_factory import _PipelineRepositoryFactory -from .pipeline import Pipeline class _PipelineManager(_Manager[Pipeline]): @@ -73,7 +73,7 @@ def __remove_subscriber(cls, callback, params, pipeline): @classmethod def _get_or_create(cls, pipeline_config: PipelineConfig, scenario_id: Optional[ScenarioId] = None) -> Pipeline: - pipeline_id = Pipeline._new_id(pipeline_config.id) + pipeline_id = Pipeline._new_id(str(pipeline_config.id)) task_manager = _TaskManagerFactory._build_manager() tasks = task_manager._bulk_get_or_create(pipeline_config.task_configs, scenario_id, pipeline_id) @@ -81,10 +81,10 @@ def _get_or_create(cls, pipeline_config: PipelineConfig, scenario_id: Optional[S scope = min(task.scope for task in tasks) if len(tasks) != 0 else Scope.GLOBAL parent_id = scenario_id if scope == Scope.SCENARIO else pipeline_id if scope == Scope.PIPELINE else None - if pipelines_from_parent := cls._repository._get_by_config_and_parent_id(pipeline_config.id, parent_id): # type: ignore + if pipelines_from_parent := cls._repository._get_by_config_and_parent_id(str(pipeline_config.id), parent_id): return pipelines_from_parent - pipeline = Pipeline(pipeline_config.id, dict(**pipeline_config._properties), tasks, pipeline_id, parent_id) + pipeline = Pipeline(str(pipeline_config.id), dict(**pipeline_config._properties), tasks, pipeline_id, parent_id) cls._set(pipeline) return pipeline diff --git a/src/taipy/core/pipeline/_pipeline_repository.py b/src/taipy/core/pipeline/_pipeline_repository.py index e7908385d..55b9654f7 100644 --- a/src/taipy/core/pipeline/_pipeline_repository.py +++ b/src/taipy/core/pipeline/_pipeline_repository.py @@ -15,7 +15,7 @@ from .._repository._repository import _AbstractRepository from .._repository._repository_adapter import _RepositoryAdapter from ..common import _utils -from ..common._utils import Subscriber +from ..common._utils import _Subscriber from ..exceptions.exceptions import NonExistingPipeline, NonExistingTask from ..task.task import Task from ._pipeline_model import _PipelineModel @@ -59,7 +59,7 @@ def _from_model(self, model: _PipelineModel) -> Pipeline: model.id, model.parent_id, [ - Subscriber(_utils._load_fct(it["fct_module"], it["fct_name"]), it["fct_params"]) + _Subscriber(_utils._load_fct(it["fct_module"], it["fct_name"]), it["fct_params"]) for it in model.subscribers ], # type: ignore ) diff --git a/src/taipy/core/pipeline/pipeline.py b/src/taipy/core/pipeline/pipeline.py index 6f1394de5..120c6f2a7 100644 --- a/src/taipy/core/pipeline/pipeline.py +++ b/src/taipy/core/pipeline/pipeline.py @@ -23,7 +23,7 @@ from ..common._listattributes import _ListAttributes from ..common._properties import _Properties from ..common._reload import _reload, _self_reload, _self_setter -from ..common._utils import Subscriber +from ..common._utils import _Subscriber from ..common.alias import PipelineId, TaskId from ..data.data_node import DataNode from ..exceptions.exceptions import NonExistingTask @@ -54,7 +54,7 @@ def __init__( tasks: Union[List[TaskId], List[Task], List[Union[TaskId, Task]]], pipeline_id: PipelineId = None, parent_id: Optional[str] = None, - subscribers: List[Subscriber] = None, + subscribers: List[_Subscriber] = None, ): self.config_id = _validate_id(config_id) self.id: PipelineId = pipeline_id or self._new_id(self.config_id) @@ -181,11 +181,11 @@ def subscribers(self, val): def _add_subscriber(self, callback: Callable, params: Optional[List[Any]] = None): params = [] if params is None else params - self._subscribers.append(Subscriber(callback=callback, params=params)) + self._subscribers.append(_Subscriber(callback=callback, params=params)) def _remove_subscriber(self, callback: Callable, params: Optional[List[Any]] = None): if params is not None: - self._subscribers.remove(Subscriber(callback, params)) + self._subscribers.remove(_Subscriber(callback, params)) else: elem = [x for x in self._subscribers if x.callback == callback] if not elem: diff --git a/src/taipy/core/scenario/_scenario_manager.py b/src/taipy/core/scenario/_scenario_manager.py index 00004d31b..9bb039ae7 100644 --- a/src/taipy/core/scenario/_scenario_manager.py +++ b/src/taipy/core/scenario/_scenario_manager.py @@ -15,6 +15,8 @@ from taipy.config.config import Config +from ._scenario_repository_factory import _ScenarioRepositoryFactory +from .scenario import Scenario from .._manager._manager import _Manager from ..common._entity_ids import _EntityIds from ..common.alias import ScenarioId @@ -34,8 +36,6 @@ from ..job._job_manager_factory import _JobManagerFactory from ..job.job import Job from ..pipeline._pipeline_manager_factory import _PipelineManagerFactory -from ._scenario_repository_factory import _ScenarioRepositoryFactory -from .scenario import Scenario class _ScenarioManager(_Manager[Scenario]): @@ -91,7 +91,7 @@ def _create( creation_date: datetime.datetime = None, name: str = None, ) -> Scenario: - scenario_id = Scenario._new_id(config.id) + scenario_id = Scenario._new_id(str(config.id)) pipelines = [ _PipelineManagerFactory._build_manager()._get_or_create(p_config, scenario_id) for p_config in config.pipeline_configs @@ -106,7 +106,7 @@ def _create( if name: props["name"] = name scenario = Scenario( - config.id, + str(config.id), pipelines, # type: ignore props, scenario_id, diff --git a/src/taipy/core/scenario/_scenario_repository.py b/src/taipy/core/scenario/_scenario_repository.py index 47d4c8031..0ab36e7e4 100644 --- a/src/taipy/core/scenario/_scenario_repository.py +++ b/src/taipy/core/scenario/_scenario_repository.py @@ -15,7 +15,7 @@ from .._repository._repository import _AbstractRepository from .._repository._repository_adapter import _RepositoryAdapter from ..common import _utils -from ..common._utils import Subscriber +from ..common._utils import _Subscriber from ..common.alias import CycleId, PipelineId from ..cycle._cycle_manager_factory import _CycleManagerFactory from ..cycle.cycle import Cycle @@ -57,7 +57,7 @@ def _from_model(self, model: _ScenarioModel) -> Scenario: tags=set(model.tags), cycle=self.__to_cycle(model.cycle), subscribers=[ - Subscriber(_utils._load_fct(it["fct_module"], it["fct_name"]), it["fct_params"]) + _Subscriber(_utils._load_fct(it["fct_module"], it["fct_name"]), it["fct_params"]) for it in model.subscribers ], ) diff --git a/src/taipy/core/scenario/scenario.py b/src/taipy/core/scenario/scenario.py index e6b17270e..dbe61da00 100644 --- a/src/taipy/core/scenario/scenario.py +++ b/src/taipy/core/scenario/scenario.py @@ -22,7 +22,7 @@ from ..common._listattributes import _ListAttributes from ..common._properties import _Properties from ..common._reload import _reload, _self_reload, _self_setter -from ..common._utils import Subscriber +from ..common._utils import _Subscriber from ..common.alias import PipelineId, ScenarioId from ..cycle.cycle import Cycle from ..data.data_node import DataNode @@ -63,7 +63,7 @@ def __init__( creation_date=None, is_primary: bool = False, cycle: Cycle = None, - subscribers: List[Subscriber] = None, + subscribers: List[_Subscriber] = None, tags: Set[str] = None, ): self.config_id = _validate_id(config_id) @@ -198,7 +198,7 @@ def __getattr__(self, attribute_name): def _add_subscriber(self, callback: Callable, params: Optional[List[Any]] = None): params = [] if params is None else params - self._subscribers.append(Subscriber(callback=callback, params=params)) + self._subscribers.append(_Subscriber(callback=callback, params=params)) def _add_tag(self, tag: str): self._tags = _reload("scenario", self)._tags @@ -216,7 +216,7 @@ def has_tag(self, tag: str) -> bool: def _remove_subscriber(self, callback: Callable, params: Optional[List[Any]] = None): if params is not None: - self._subscribers.remove(Subscriber(callback, params)) + self._subscribers.remove(_Subscriber(callback, params)) else: elem = [x for x in self._subscribers if x.callback == callback] if not elem: diff --git a/src/taipy/core/task/_task_manager.py b/src/taipy/core/task/_task_manager.py index 11579d120..e2c6e7e83 100644 --- a/src/taipy/core/task/_task_manager.py +++ b/src/taipy/core/task/_task_manager.py @@ -76,7 +76,7 @@ def _bulk_get_or_create( else: inputs = [data_nodes[input_config] for input_config in task_config.input_configs] outputs = [data_nodes[output_config] for output_config in task_config.output_configs] - task = Task(task_config.id, task_config.function, inputs, outputs, parent_id=parent_id) + task = Task(str(task_config.id), task_config.function, inputs, outputs, parent_id=parent_id) cls._set(task) tasks.append(task) diff --git a/tests/core/pipeline/test_pipeline.py b/tests/core/pipeline/test_pipeline.py index 8922047b8..59700e56f 100644 --- a/tests/core/pipeline/test_pipeline.py +++ b/tests/core/pipeline/test_pipeline.py @@ -13,7 +13,7 @@ import pytest -from src.taipy.core.common._utils import Subscriber +from src.taipy.core.common._utils import _Subscriber from src.taipy.core.common.alias import PipelineId, TaskId from src.taipy.core.data.data_node import DataNode from src.taipy.core.data.in_memory import InMemoryDataNode @@ -271,7 +271,7 @@ def test_auto_set_and_reload(task): assert len(pipeline_1.subscribers) == 2 assert len(pipeline_2.subscribers) == 2 - pipeline_1.subscribers.remove(Subscriber(print, [])) + pipeline_1.subscribers.remove(_Subscriber(print, [])) assert len(pipeline_1.subscribers) == 1 assert len(pipeline_2.subscribers) == 1 diff --git a/tests/core/pipeline/test_pipeline_manager.py b/tests/core/pipeline/test_pipeline_manager.py index f0b018cfd..ec8300c43 100644 --- a/tests/core/pipeline/test_pipeline_manager.py +++ b/tests/core/pipeline/test_pipeline_manager.py @@ -17,7 +17,7 @@ from src.taipy.core._scheduler._scheduler import _Scheduler from src.taipy.core._scheduler._scheduler_factory import _SchedulerFactory from src.taipy.core.common import _utils -from src.taipy.core.common._utils import Subscriber +from src.taipy.core.common._utils import _Subscriber from src.taipy.core.common.alias import PipelineId, TaskId from src.taipy.core.config.job_config import JobConfig from src.taipy.core.data._data_manager import _DataManager @@ -492,11 +492,11 @@ def test_pipeline_notification_unsubscribe_multi_param(): pipeline.unsubscribe(notify_multi_param) assert len(pipeline.subscribers) == 2 - assert Subscriber(notify_multi_param, ["foobar", 123, 0]) not in pipeline.subscribers + assert _Subscriber(notify_multi_param, ["foobar", 123, 0]) not in pipeline.subscribers pipeline.unsubscribe(notify_multi_param, ["foobar", 123, 2]) assert len(pipeline.subscribers) == 1 - assert Subscriber(notify_multi_param, ["foobar", 123, 2]) not in pipeline.subscribers + assert _Subscriber(notify_multi_param, ["foobar", 123, 2]) not in pipeline.subscribers with pytest.raises(ValueError): pipeline.unsubscribe(notify_multi_param, ["foobar", 123, 10000]) diff --git a/tests/core/scenario/test_scenario.py b/tests/core/scenario/test_scenario.py index 46b1d3aae..f45cf2337 100644 --- a/tests/core/scenario/test_scenario.py +++ b/tests/core/scenario/test_scenario.py @@ -14,7 +14,7 @@ import pytest -from src.taipy.core.common._utils import Subscriber +from src.taipy.core.common._utils import _Subscriber from src.taipy.core.common.alias import ScenarioId, TaskId from src.taipy.core.cycle._cycle_manager import _CycleManager from src.taipy.core.data.in_memory import InMemoryDataNode @@ -202,7 +202,7 @@ def test_auto_set_and_reload(cycle, current_datetime, pipeline): assert scenario_2.is_primary assert len(scenario_1.subscribers) == 0 - scenario_1.subscribers.append(Subscriber(print, [])) + scenario_1.subscribers.append(_Subscriber(print, [])) assert len(scenario_1.subscribers) == 1 assert len(scenario_2.subscribers) == 1 @@ -210,11 +210,11 @@ def test_auto_set_and_reload(cycle, current_datetime, pipeline): assert len(scenario_1.subscribers) == 0 assert len(scenario_2.subscribers) == 0 - scenario_1.subscribers.extend([Subscriber(print, []), Subscriber(map, [])]) + scenario_1.subscribers.extend([_Subscriber(print, []), _Subscriber(map, [])]) assert len(scenario_1.subscribers) == 2 assert len(scenario_2.subscribers) == 2 - scenario_1.subscribers.remove(Subscriber(print, [])) + scenario_1.subscribers.remove(_Subscriber(print, [])) assert len(scenario_1.subscribers) == 1 assert len(scenario_2.subscribers) == 1 diff --git a/tests/core/scenario/test_scenario_manager.py b/tests/core/scenario/test_scenario_manager.py index abee6cb44..90339349f 100644 --- a/tests/core/scenario/test_scenario_manager.py +++ b/tests/core/scenario/test_scenario_manager.py @@ -17,7 +17,7 @@ from src.taipy.core._scheduler._scheduler import _Scheduler from src.taipy.core._scheduler._scheduler_factory import _SchedulerFactory from src.taipy.core.common import _utils -from src.taipy.core.common._utils import Subscriber +from src.taipy.core.common._utils import _Subscriber from src.taipy.core.common.alias import PipelineId, ScenarioId, TaskId from src.taipy.core.config.job_config import JobConfig from src.taipy.core.cycle._cycle_manager import _CycleManager @@ -468,12 +468,12 @@ def test_notification_unsubscribe_multi_param(): # if no params are passed, removes the first occurrence of the subscriber when theres more than one copy scenario.unsubscribe(notify_multi_param) assert len(scenario.subscribers) == 2 - assert Subscriber(notify_multi_param, ["foobar", 123, 0]) not in scenario.subscribers + assert _Subscriber(notify_multi_param, ["foobar", 123, 0]) not in scenario.subscribers # If params are passed, find the corresponding pair of callback and params to remove scenario.unsubscribe(notify_multi_param, ["foobar", 123, 2]) assert len(scenario.subscribers) == 1 - assert Subscriber(notify_multi_param, ["foobar", 123, 2]) not in scenario.subscribers + assert _Subscriber(notify_multi_param, ["foobar", 123, 2]) not in scenario.subscribers # If params are passed but is not on the list of subscribers, throws a ValueErrors with pytest.raises(ValueError):