diff --git a/taipy/core/_init.py b/taipy/core/_init.py index 66a3b76fb9..8a25fbf317 100644 --- a/taipy/core/_init.py +++ b/taipy/core/_init.py @@ -36,7 +36,6 @@ delete_job, delete_jobs, exists, - export_scenario, get, get_cycles, get_cycles_scenarios, @@ -52,7 +51,6 @@ get_sequences, get_submissions, get_tasks, - import_scenario, is_deletable, is_editable, is_promotable, diff --git a/taipy/core/_manager/_manager.py b/taipy/core/_manager/_manager.py index bf406b1bdb..dd4ee7c021 100644 --- a/taipy/core/_manager/_manager.py +++ b/taipy/core/_manager/_manager.py @@ -9,7 +9,6 @@ # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the # specific language governing permissions and limitations under the License. -import pathlib from typing import Dict, Generic, Iterable, List, Optional, TypeVar, Union from taipy.logger._taipy_logger import _TaipyLogger @@ -153,17 +152,6 @@ def _delete_entities_of_multiple_types(cls, _entity_ids: _EntityIds): _DataManagerFactory._build_manager()._delete_many(_entity_ids.data_node_ids) _SubmissionManagerFactory._build_manager()._delete_many(_entity_ids.submission_ids) - @classmethod - def _export(cls, id: str, folder_path: Union[str, pathlib.Path], **kwargs): - return cls._repository._export(id, folder_path) - - @classmethod - def _import(cls, entity_file: pathlib.Path, version: str, **kwargs) -> EntityType: - imported_entity = cls._repository._import(entity_file) - imported_entity._version = version - cls._set(imported_entity) - return imported_entity - @classmethod def _is_editable(cls, entity: Union[EntityType, str]) -> bool: return True diff --git a/taipy/core/_version/_version_manager.py b/taipy/core/_version/_version_manager.py index 6ff5d0fa80..24863888a7 100644 --- a/taipy/core/_version/_version_manager.py +++ b/taipy/core/_version/_version_manager.py @@ -9,7 +9,6 @@ # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the # specific language governing permissions and limitations under the License. -import pathlib import uuid from typing import List, Optional, Union @@ -231,17 +230,3 @@ def __check_production_migration_config(cls): @classmethod def _delete_entities_of_multiple_types(cls, _entity_ids): raise NotImplementedError - - @classmethod - def _import(cls, entity_file: pathlib.Path, version: str, **kwargs) -> _Version: - imported_version = cls._repository._import(entity_file) - - comparator_result = Config._comparator._find_conflict_config( # type: ignore[attr-defined] - imported_version.config, - Config._applied_config, # type: ignore[attr-defined] - imported_version.id, - ) - if comparator_result.get(_ComparatorResult.CONFLICTED_SECTION_KEY): - raise ConflictedConfigurationError() - - return imported_version diff --git a/taipy/core/data/_data_manager.py b/taipy/core/data/_data_manager.py index e74c4820a9..05890323fc 100644 --- a/taipy/core/data/_data_manager.py +++ b/taipy/core/data/_data_manager.py @@ -10,8 +10,6 @@ # specific language governing permissions and limitations under the License. import os -import pathlib -import shutil from typing import Dict, Iterable, List, Optional, Set, Union from taipy.config._config import _Config @@ -178,46 +176,3 @@ def _get_by_config_id(cls, config_id: str, version_number: Optional[str] = None) for fil in filters: fil.update({"config_id": config_id}) return cls._repository._load_all(filters) - - @classmethod - def _export(cls, id: str, folder_path: Union[str, pathlib.Path], **kwargs) -> None: - cls._repository._export(id, folder_path) - - if not kwargs.get("include_data"): - return - - data_node = cls._get(id) - if not isinstance(data_node, _FileDataNodeMixin): - cls._logger.warning(f"Data node {id} is not a file-based data node and the data will not be exported.") - return - - if isinstance(folder_path, str): - folder: pathlib.Path = pathlib.Path(folder_path) - else: - folder = folder_path - - data_export_dir = folder / Config.core.storage_folder / os.path.dirname(data_node.path) - if not data_export_dir.exists(): - data_export_dir.mkdir(parents=True) - - data_export_path = data_export_dir / os.path.basename(data_node.path) - if os.path.exists(data_node.path): - shutil.copy2(data_node.path, data_export_path) - - @classmethod - def _import(cls, entity_file: pathlib.Path, version: str, **kwargs) -> DataNode: - imported_data_node = cls._repository._import(entity_file) - imported_data_node._version = version - cls._set(imported_data_node) - - if not (isinstance(imported_data_node, _FileDataNodeMixin) and isinstance(imported_data_node, DataNode)): - return imported_data_node - - data_folder: pathlib.Path = pathlib.Path(str(kwargs.get("data_folder"))) - if not data_folder.exists(): - return imported_data_node - - if (data_folder / imported_data_node.path).exists(): - shutil.copy2(data_folder / imported_data_node.path, imported_data_node.path) - - return imported_data_node diff --git a/taipy/core/exceptions/exceptions.py b/taipy/core/exceptions/exceptions.py index c5700f5de4..946b8db9b3 100644 --- a/taipy/core/exceptions/exceptions.py +++ b/taipy/core/exceptions/exceptions.py @@ -373,47 +373,6 @@ class FileEmpty(Exception): """Raised when a file is empty.""" -class ExportPathAlreadyExists(Exception): - """Raised when the export folder already exists.""" - - def __init__(self, export_path: str, scenario_id: str): - self.message = ( - f"The path '{export_path}' already exists and can not be used to export scenario '{scenario_id}'." - " Please use the 'override' parameter to override it." - ) - - -class EntitiesToBeImportAlredyExist(Exception): - """Raised when entities in the scenario to be imported have already exists""" - - def __init__(self, import_path): - self.message = f"The import archive file {import_path} contains entities that have already existed." - - -class DataToBeImportAlredyExist(Exception): - """Raised when data files in the scenario to be imported have already exists""" - - def __init__(self, import_path): - self.message = ( - f"The import archive file {import_path} contains data files that have already existed." - " Please use the 'override' parameter to override those." - ) - - -class ImportArchiveDoesntContainAnyScenario(Exception): - """Raised when the import archive file doesn't contain any scenario""" - - def __init__(self, import_path): - self.message = f"The import archive file {import_path} doesn't contain any scenario." - - -class ImportScenarioDoesntHaveAVersion(Exception): - """Raised when the import scenario doesn't have a version""" - - def __init__(self, import_path): - self.message = f"The import scenario in the import archive file {import_path} doesn't have a version." - - class SQLQueryCannotBeExecuted(Exception): """Raised when an SQL Query cannot be executed.""" diff --git a/taipy/core/scenario/_scenario_manager.py b/taipy/core/scenario/_scenario_manager.py index 463f130640..674ba521b8 100644 --- a/taipy/core/scenario/_scenario_manager.py +++ b/taipy/core/scenario/_scenario_manager.py @@ -10,18 +10,14 @@ # specific language governing permissions and limitations under the License. import datetime -import pathlib -import tempfile -import zipfile from functools import partial -from typing import Any, Callable, Dict, List, Literal, Optional, Type, Union +from typing import Any, Callable, Dict, List, Literal, Optional, Union from taipy.config import Config from .._entity._entity_ids import _EntityIds from .._manager._manager import _Manager from .._repository._abstract_repository import _AbstractRepository -from .._version._version_manager_factory import _VersionManagerFactory from .._version._version_mixin import _VersionMixin from ..common.warn_if_inputs_not_ready import _warn_if_inputs_not_ready from ..config.scenario_config import ScenarioConfig @@ -32,9 +28,6 @@ DeletingPrimaryScenario, DifferentScenarioConfigs, DoesNotBelongToACycle, - EntitiesToBeImportAlredyExist, - ImportArchiveDoesntContainAnyScenario, - ImportScenarioDoesntHaveAVersion, InsufficientScenarioToCompare, InvalidScenario, NonExistingComparator, @@ -475,85 +468,3 @@ def _get_by_config_id(cls, config_id: str, version_number: Optional[str] = None) for fil in filters: fil.update({"config_id": config_id}) return cls._repository._load_all(filters) - - @classmethod - def _import_scenario_and_children_entities( - cls, - zip_file_path: pathlib.Path, - override: bool, - entity_managers: Dict[str, Type[_Manager]], - ) -> Optional[Scenario]: - with tempfile.TemporaryDirectory() as tmp_dir: - with zipfile.ZipFile(zip_file_path) as zip_file: - zip_file.extractall(tmp_dir) - - tmp_dir_path = pathlib.Path(tmp_dir) - - if not ((tmp_dir_path / "scenarios").exists() or (tmp_dir_path / "scenario").exists()): - raise ImportArchiveDoesntContainAnyScenario(zip_file_path) - - if not (tmp_dir_path / "version").exists(): - raise ImportScenarioDoesntHaveAVersion(zip_file_path) - - # Import the version to check for compatibility - entity_managers["version"]._import(next((tmp_dir_path / "version").iterdir()), "") - - valid_entity_folders = list(entity_managers.keys()) - valid_data_folder = Config.core.storage_folder - - imported_scenario = None - imported_entities: Dict[str, List] = {} - - for entity_folder in tmp_dir_path.iterdir(): - if not entity_folder.is_dir() or entity_folder.name not in valid_entity_folders + [valid_data_folder]: - cls._logger.warning(f"{entity_folder} is not a valid Taipy folder and will not be imported.") - continue - - try: - for entity_type in valid_entity_folders: - # Skip the version folder as it is already handled - if entity_type == "version": - continue - - entity_folder = tmp_dir_path / entity_type - if not entity_folder.exists(): - continue - - manager = entity_managers[entity_type] - imported_entities[entity_type] = [] - - for entity_file in entity_folder.iterdir(): - # Check if the to-be-imported entity already exists - entity_id = entity_file.stem - if manager._exists(entity_id): - if override: - cls._logger.warning(f"{entity_id} already exists and will be overridden.") - else: - cls._logger.error( - f"{entity_id} already exists. Please use the 'override' parameter to override it." - ) - raise EntitiesToBeImportAlredyExist(zip_file_path) - - # Import the entity - imported_entity = manager._import( - entity_file, - version=_VersionManagerFactory._build_manager()._get_latest_version(), - data_folder=tmp_dir_path / valid_data_folder, - ) - - imported_entities[entity_type].append(imported_entity.id) - if entity_type in ["scenario", "scenarios"]: - imported_scenario = imported_entity - except Exception as err: - cls._logger.error(f"An error occurred during the import: {err}. Rollback the import.") - - # Rollback the import - for entity_type, entity_ids in list(imported_entities.items())[::-1]: - manager = entity_managers[entity_type] - for entity_id in entity_ids: - if manager._exists(entity_id): - manager._delete(entity_id) - raise err - - cls._logger.info(f"Scenario {imported_scenario.id} has been successfully imported.") # type: ignore[union-attr] - return imported_scenario diff --git a/taipy/core/scenario/scenario.py b/taipy/core/scenario/scenario.py index 6c9fd7e183..b7e7894b7d 100644 --- a/taipy/core/scenario/scenario.py +++ b/taipy/core/scenario/scenario.py @@ -10,7 +10,6 @@ # specific language governing permissions and limitations under the License. from __future__ import annotations -import pathlib import uuid from datetime import datetime from typing import Any, Callable, Dict, List, Optional, Set, Union @@ -615,30 +614,6 @@ def submit( return _ScenarioManagerFactory._build_manager()._submit(self, callbacks, force, wait, timeout, **properties) - def export( - self, - folder_path: Union[str, pathlib.Path], - override: bool = False, - include_data: bool = False, - ): - """Export all related entities of this scenario to a folder. - - Parameters: - folder_path (Union[str, pathlib.Path]): The folder path to export the scenario to. - If the path exists and the override parameter is False, an exception is raised. - override (bool): If True, the existing folder will be overridden. Default is False. - include_data (bool): If True, the file-based data nodes are exported as well. - This includes Pickle, CSV, Excel, Parquet, and JSON data nodes. - If the scenario has a data node that is not file-based, a warning will be logged, and the data node - will not be exported. The default value is False. - - Raises: - ExportPathAlreadyExists^: If the `folder_path` already exists and the override parameter is False. - """ - from ... import core as tp - - return tp.export_scenario(self.id, folder_path, override, include_data) - def set_primary(self): """Promote the scenario as the primary scenario of its cycle. diff --git a/taipy/core/sequence/_sequence_manager.py b/taipy/core/sequence/_sequence_manager.py index 78340d1b8f..7387387ce2 100644 --- a/taipy/core/sequence/_sequence_manager.py +++ b/taipy/core/sequence/_sequence_manager.py @@ -9,8 +9,6 @@ # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the # specific language governing permissions and limitations under the License. -import json -import pathlib from functools import partial from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union @@ -398,32 +396,6 @@ def _exists(cls, entity_id: str) -> bool: """ return True if cls._get(entity_id) else False - @classmethod - def _export(cls, id: str, folder_path: Union[str, pathlib.Path], **kwargs) -> None: - """ - Export a Sequence entity. - """ - if isinstance(folder_path, str): - folder: pathlib.Path = pathlib.Path(folder_path) - else: - folder = folder_path - - export_dir = folder / cls._model_name - if not export_dir.exists(): - export_dir.mkdir(parents=True) - - export_path = export_dir / f"{id}.json" - sequence_name, scenario_id = cls._breakdown_sequence_id(id) - sequence = {"id": id, "owner_id": scenario_id, "parent_ids": [scenario_id], "name": sequence_name} - - scenario = _ScenarioManagerFactory._build_manager()._get(scenario_id) - if sequence_data := scenario._sequences.get(sequence_name, None): - sequence.update(sequence_data) - with open(export_path, "w", encoding="utf-8") as export_file: - export_file.write(json.dumps(sequence)) - else: - raise ModelNotFound(cls._model_name, id) - @classmethod def __log_error_entity_not_found(cls, sequence_id: Union[SequenceId, str]): cls._logger.error(f"{cls._ENTITY_NAME} not found: {str(sequence_id)}") diff --git a/taipy/core/taipy.py b/taipy/core/taipy.py index 6d33583c06..6786943716 100644 --- a/taipy/core/taipy.py +++ b/taipy/core/taipy.py @@ -9,19 +9,14 @@ # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the # specific language governing permissions and limitations under the License. -import os -import pathlib -import shutil -import tempfile from datetime import datetime -from typing import Any, Callable, Dict, List, Literal, Optional, Set, Type, Union, overload +from typing import Any, Callable, Dict, List, Literal, Optional, Set, Union, overload from taipy.config import Scope from taipy.logger._taipy_logger import _TaipyLogger from ._core import Core from ._entity._entity import _Entity -from ._manager._manager import _Manager from ._version._version_manager_factory import _VersionManagerFactory from .common._check_instance import ( _is_cycle, @@ -43,7 +38,6 @@ from .data.data_node_id import DataNodeId from .exceptions.exceptions import ( DataNodeConfigIsNotGlobal, - ExportPathAlreadyExists, ModelNotFound, NonExistingVersion, VersionIsNotProductionVersion, @@ -997,116 +991,6 @@ def clean_all_entities(version_number: str) -> bool: return True -def export_scenario( - scenario_id: ScenarioId, - output_path: Union[str, pathlib.Path], - override: bool = False, - include_data: bool = False, -): - """Export all related entities of a scenario to an archive zip file. - - This function exports all related entities of the specified scenario to the - specified archive zip file. - - Parameters: - scenario_id (ScenarioId): The ID of the scenario to export. - output_path (Union[str, pathlib.Path]): The path to export the scenario to. - The path should include the file name without the extension or with the `.zip` extension. - If the path exists and the override parameter is False, an exception is raised. - override (bool): If True, the existing folder will be overridden. The default value is False. - include_data (bool): If True, the file-based data nodes are exported as well. - This includes Pickle, CSV, Excel, Parquet, and JSON data nodes. - If the scenario has a data node that is not file-based, a warning will be logged, and the data node - will not be exported. The default value is False. - - Raises: - ExportPathAlreadyExists^: If the `output_path` already exists and the override parameter is False. - """ - manager = _ScenarioManagerFactory._build_manager() - scenario = manager._get(scenario_id) - entity_ids = manager._get_children_entity_ids(scenario) - entity_ids.scenario_ids = {scenario_id} - if scenario.cycle: - entity_ids.cycle_ids = {scenario.cycle.id} - - output_filename = os.path.splitext(output_path)[0] if str(output_path).endswith(".zip") else str(output_path) - output_zip_path = pathlib.Path(output_filename + ".zip") - - if output_zip_path.exists(): - if override: - __logger.warning(f"Override the existing path '{output_zip_path}' to export scenario {scenario_id}.") - output_zip_path.unlink() - else: - raise ExportPathAlreadyExists(str(output_zip_path), scenario_id) - - with tempfile.TemporaryDirectory() as tmp_dir: - for data_node_id in entity_ids.data_node_ids: - _DataManagerFactory._build_manager()._export(data_node_id, tmp_dir, include_data=include_data) - for task_id in entity_ids.task_ids: - _TaskManagerFactory._build_manager()._export(task_id, tmp_dir) - for sequence_id in entity_ids.sequence_ids: - _SequenceManagerFactory._build_manager()._export(sequence_id, tmp_dir) - for cycle_id in entity_ids.cycle_ids: - _CycleManagerFactory._build_manager()._export(cycle_id, tmp_dir) - for scenario_id in entity_ids.scenario_ids: - _ScenarioManagerFactory._build_manager()._export(scenario_id, tmp_dir) - for job_id in entity_ids.job_ids: - _JobManagerFactory._build_manager()._export(job_id, tmp_dir) - for submission_id in entity_ids.submission_ids: - _SubmissionManagerFactory._build_manager()._export(submission_id, tmp_dir) - _VersionManagerFactory._build_manager()._export(scenario.version, tmp_dir) - - shutil.make_archive(output_filename, "zip", tmp_dir) - - -def import_scenario(input_path: Union[str, pathlib.Path], override: bool = False) -> Optional[Scenario]: - """Import from an archive zip file containing an exported scenario into the current Taipy application. - - The zip file should be created by the `taipy.export_scenario()^` method, which contains all related entities - of the scenario. - All entities should belong to the same version that is compatible with the current Taipy application version. - - Parameters: - input_path (Union[str, pathlib.Path]): The path to the archive scenario to import. - If the path doesn't exist, an exception is raised. - override (bool): If True, override the entities if existed. The default value is False. - - Return: - The imported scenario. - - Raises: - FileNotFoundError: If the import path does not exist. - ImportArchiveDoesntContainAnyScenario: If the unzip folder doesn't contain any scenario. - ConflictedConfigurationError: If the configuration of the imported scenario is conflicted with the current one. - """ - if isinstance(input_path, str): - zip_file_path: pathlib.Path = pathlib.Path(input_path) - else: - zip_file_path = input_path - - if not zip_file_path.exists(): - raise FileNotFoundError(f"The import archive path '{zip_file_path}' does not exist.") - - entity_managers: Dict[str, Type[_Manager]] = { - "cycles": _CycleManagerFactory._build_manager(), - "cycle": _CycleManagerFactory._build_manager(), - "data_nodes": _DataManagerFactory._build_manager(), - "data_node": _DataManagerFactory._build_manager(), - "tasks": _TaskManagerFactory._build_manager(), - "task": _TaskManagerFactory._build_manager(), - "scenarios": _ScenarioManagerFactory._build_manager(), - "scenario": _ScenarioManagerFactory._build_manager(), - "jobs": _JobManagerFactory._build_manager(), - "job": _JobManagerFactory._build_manager(), - "submission": _SubmissionManagerFactory._build_manager(), - "version": _VersionManagerFactory._build_manager(), - } - - return _ScenarioManagerFactory._build_manager()._import_scenario_and_children_entities( - zip_file_path, override, entity_managers - ) - - def get_parents( entity: Union[TaskId, DataNodeId, SequenceId, Task, DataNode, Sequence], parent_dict=None ) -> Dict[str, Set[_Entity]]: diff --git a/tests/core/sequence/test_sequence_manager.py b/tests/core/sequence/test_sequence_manager.py index 06934c3e16..0fa5a77b1e 100644 --- a/tests/core/sequence/test_sequence_manager.py +++ b/tests/core/sequence/test_sequence_manager.py @@ -9,8 +9,6 @@ # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the # specific language governing permissions and limitations under the License. -import json -from pathlib import Path from typing import Callable, Iterable, Optional from unittest import mock from unittest.mock import ANY @@ -737,54 +735,6 @@ def test_exists(): assert _SequenceManager._exists(scenario.sequences["sequence"]) -def test_export(tmpdir_factory): - path = tmpdir_factory.mktemp("data") - task = Task("task", {}, print, id=TaskId("task_id")) - scenario = Scenario( - "scenario", - {task}, - {}, - set(), - version="1.0", - sequences={"sequence_1": {}, "sequence_2": {"tasks": [task], "properties": {"xyz": "acb"}}}, - ) - _TaskManager._set(task) - _ScenarioManager._set(scenario) - - sequence_1 = scenario.sequences["sequence_1"] - sequence_2 = scenario.sequences["sequence_2"] - - _SequenceManager._export(sequence_1.id, Path(path)) - export_sequence_json_file_path = f"{path}/sequences/{sequence_1.id}.json" - with open(export_sequence_json_file_path, "rb") as f: - sequence_json_file = json.load(f) - expected_json = { - "id": sequence_1.id, - "owner_id": scenario.id, - "parent_ids": [scenario.id], - "name": "sequence_1", - "tasks": [], - "properties": {}, - "subscribers": [], - } - assert expected_json == sequence_json_file - - _SequenceManager._export(sequence_2.id, Path(path)) - export_sequence_json_file_path = f"{path}/sequences/{sequence_2.id}.json" - with open(export_sequence_json_file_path, "rb") as f: - sequence_json_file = json.load(f) - expected_json = { - "id": sequence_2.id, - "owner_id": scenario.id, - "parent_ids": [scenario.id], - "name": "sequence_2", - "tasks": [task.id], - "properties": {"xyz": "acb"}, - "subscribers": [], - } - assert expected_json == sequence_json_file - - def test_hard_delete_one_single_sequence_with_scenario_data_nodes(): dn_input_config = Config.configure_data_node("my_input", "in_memory", scope=Scope.SCENARIO, default_data="testing") dn_output_config = Config.configure_data_node("my_output", "in_memory", scope=Scope.SCENARIO) diff --git a/tests/core/test_taipy/test_export.py b/tests/core/test_taipy/test_export.py deleted file mode 100644 index d98ee09691..0000000000 --- a/tests/core/test_taipy/test_export.py +++ /dev/null @@ -1,221 +0,0 @@ -# Copyright 2021-2024 Avaiga Private Limited -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on -# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the -# specific language governing permissions and limitations under the License. - -import os -import zipfile - -import pandas as pd -import pytest - -import taipy.core.taipy as tp -from taipy import Config, Frequency, Scope -from taipy.core.exceptions import ExportPathAlreadyExists - - -@pytest.fixture(scope="function", autouse=True) -def clean_export_zip_file(): - if os.path.exists("./tmp.zip"): - os.remove("./tmp.zip") - yield - if os.path.exists("./tmp.zip"): - os.remove("./tmp.zip") - - -def plus_1(x): - return x + 1 - - -def plus_1_dataframe(x): - return pd.DataFrame({"output": [x + 1]}) - - -def configure_test_scenario(input_data, frequency=None): - input_cfg = Config.configure_data_node( - id=f"i_{input_data}", storage_type="pickle", scope=Scope.SCENARIO, default_data=input_data - ) - csv_output_cfg = Config.configure_data_node(id=f"o_{input_data}_csv", storage_type="csv") - excel_output_cfg = Config.configure_data_node(id=f"o_{input_data}_excel", storage_type="excel") - parquet_output_cfg = Config.configure_data_node(id=f"o_{input_data}_parquet", storage_type="parquet") - json_output_cfg = Config.configure_data_node(id=f"o_{input_data}_json", storage_type="json") - - csv_task_cfg = Config.configure_task(f"t_{input_data}_csv", plus_1_dataframe, input_cfg, csv_output_cfg) - excel_task_cfg = Config.configure_task(f"t_{input_data}_excel", plus_1_dataframe, input_cfg, excel_output_cfg) - parquet_task_cfg = Config.configure_task(f"t_{input_data}_parquet", plus_1_dataframe, input_cfg, parquet_output_cfg) - json_task_cfg = Config.configure_task(f"t_{input_data}_json", plus_1, input_cfg, json_output_cfg) - scenario_cfg = Config.configure_scenario( - id=f"s_{input_data}", - task_configs=[csv_task_cfg, excel_task_cfg, parquet_task_cfg, json_task_cfg], - frequency=frequency, - ) - - return scenario_cfg - - -def test_export_scenario_with_and_without_zip_extension(tmp_path): - scenario_cfg = configure_test_scenario(1, frequency=Frequency.DAILY) - - scenario = tp.create_scenario(scenario_cfg) - tp.submit(scenario) - - # Export without the .zip extension should create the tmp.zip file - tp.export_scenario(scenario.id, f"{tmp_path}/tmp") - assert os.path.exists(f"{tmp_path}/tmp.zip") - - os.remove(f"{tmp_path}/tmp.zip") - - # Export with the .zip extension should also create the tmp.zip file - tp.export_scenario(scenario.id, f"{tmp_path}/tmp.zip") - assert os.path.exists(f"{tmp_path}/tmp.zip") - - # Export with another extension should create the tmp..zip file - tp.export_scenario(scenario.id, f"{tmp_path}/tmp.tar.gz") - assert os.path.exists(f"{tmp_path}/tmp.tar.gz.zip") - - -def test_export_scenario_with_cycle(tmp_path): - scenario_cfg = configure_test_scenario(1, frequency=Frequency.DAILY) - - scenario = tp.create_scenario(scenario_cfg) - submission = tp.submit(scenario) - jobs = submission.jobs - - # Export the submitted scenario - tp.export_scenario(scenario.id, "tmp.zip") - with zipfile.ZipFile("./tmp.zip") as zip_file: - zip_file.extractall(tmp_path) - - assert sorted(os.listdir(f"{tmp_path}/data_nodes")) == sorted( - [ - f"{scenario.i_1.id}.json", - f"{scenario.o_1_csv.id}.json", - f"{scenario.o_1_excel.id}.json", - f"{scenario.o_1_parquet.id}.json", - f"{scenario.o_1_json.id}.json", - ] - ) - assert sorted(os.listdir(f"{tmp_path}/tasks")) == sorted( - [ - f"{scenario.t_1_csv.id}.json", - f"{scenario.t_1_excel.id}.json", - f"{scenario.t_1_parquet.id}.json", - f"{scenario.t_1_json.id}.json", - ] - ) - assert sorted(os.listdir(f"{tmp_path}/scenarios")) == sorted([f"{scenario.id}.json"]) - assert sorted(os.listdir(f"{tmp_path}/jobs")) == sorted( - [f"{jobs[0].id}.json", f"{jobs[1].id}.json", f"{jobs[2].id}.json", f"{jobs[3].id}.json"] - ) - assert os.listdir(f"{tmp_path}/submission") == [f"{submission.id}.json"] - assert sorted(os.listdir(f"{tmp_path}/cycles")) == sorted([f"{scenario.cycle.id}.json"]) - - -def test_export_scenario_without_cycle(tmp_path): - scenario_cfg = configure_test_scenario(1) - - scenario = tp.create_scenario(scenario_cfg) - tp.submit(scenario) - - # Export the submitted scenario - tp.export_scenario(scenario.id, "tmp.zip") - with zipfile.ZipFile("./tmp.zip") as zip_file: - zip_file.extractall(tmp_path) - - assert os.path.exists(f"{tmp_path}/data_nodes") - assert os.path.exists(f"{tmp_path}/tasks") - assert os.path.exists(f"{tmp_path}/scenarios") - assert os.path.exists(f"{tmp_path}/jobs") - assert os.path.exists(f"{tmp_path}/submission") - assert not os.path.exists(f"{tmp_path}/cycles") # No cycle - - -def test_export_scenario_override_existing_files(tmp_path): - scenario_1_cfg = configure_test_scenario(1, frequency=Frequency.DAILY) - scenario_2_cfg = configure_test_scenario(2) - - scenario_1 = tp.create_scenario(scenario_1_cfg) - tp.submit(scenario_1) - - # Export the submitted scenario_1 - tp.export_scenario(scenario_1.id, "tmp.zip") - with zipfile.ZipFile("./tmp.zip") as zip_file: - zip_file.extractall(tmp_path / "scenario_1") - assert os.path.exists(f"{tmp_path}/scenario_1/data_nodes") - assert os.path.exists(f"{tmp_path}/scenario_1/tasks") - assert os.path.exists(f"{tmp_path}/scenario_1/scenarios") - assert os.path.exists(f"{tmp_path}/scenario_1/jobs") - assert os.path.exists(f"{tmp_path}/scenario_1/submission") - assert os.path.exists(f"{tmp_path}/scenario_1/cycles") - - scenario_2 = tp.create_scenario(scenario_2_cfg) - tp.submit(scenario_2) - - # Export the submitted scenario_2 to the same path should raise an error - with pytest.raises(ExportPathAlreadyExists): - tp.export_scenario(scenario_2.id, "tmp.zip") - - # Export the submitted scenario_2 without a cycle and override the existing files - tp.export_scenario(scenario_2.id, "tmp.zip", override=True) - with zipfile.ZipFile("./tmp.zip") as zip_file: - zip_file.extractall(tmp_path / "scenario_2") - assert os.path.exists(f"{tmp_path}/scenario_2/data_nodes") - assert os.path.exists(f"{tmp_path}/scenario_2/tasks") - assert os.path.exists(f"{tmp_path}/scenario_2/scenarios") - assert os.path.exists(f"{tmp_path}/scenario_2/jobs") - assert os.path.exists(f"{tmp_path}/scenario_2/submission") - # The cycles folder should not exists since the new scenario does not have a cycle - assert not os.path.exists(f"{tmp_path}/scenario_2/cycles") - - -def test_export_scenario_filesystem_with_data(tmp_path): - scenario_cfg = configure_test_scenario(1) - scenario = tp.create_scenario(scenario_cfg) - tp.submit(scenario) - - # Export scenario without data - tp.export_scenario(scenario.id, "tmp.zip") - with zipfile.ZipFile("./tmp.zip") as zip_file: - zip_file.extractall(tmp_path / "scenario_without_data") - assert not os.path.exists(f"{tmp_path}/scenario_without_data/user_data") - - # Export scenario with data - tp.export_scenario(scenario.id, "tmp.zip", include_data=True, override=True) - with zipfile.ZipFile("./tmp.zip") as zip_file: - zip_file.extractall(tmp_path / "scenario_with_data") - assert os.path.exists(f"{tmp_path}/scenario_with_data/user_data") - - data_files = [f for _, _, files in os.walk(f"{tmp_path}/scenario_with_data/user_data") for f in files] - assert sorted(data_files) == sorted( - [ - f"{scenario.i_1.id}.p", - f"{scenario.o_1_csv.id}.csv", - f"{scenario.o_1_excel.id}.xlsx", - f"{scenario.o_1_parquet.id}.parquet", - f"{scenario.o_1_json.id}.json", - ] - ) - - -def test_export_non_file_based_data_node_raise_warning(caplog): - input_cfg = Config.configure_data_node(id="i", storage_type="pickle", scope=Scope.SCENARIO, default_data=1) - csv_output_cfg = Config.configure_data_node(id="o_csv", storage_type="csv") - in_mem_output_cfg = Config.configure_data_node(id="o_mem", storage_type="in_memory") - - csv_task_cfg = Config.configure_task("t_csv", plus_1_dataframe, input_cfg, csv_output_cfg) - in_mem_task_cfg = Config.configure_task("t_mem", plus_1, input_cfg, in_mem_output_cfg) - scenario_cfg = Config.configure_scenario(id="s", task_configs=[csv_task_cfg, in_mem_task_cfg]) - - scenario = tp.create_scenario(scenario_cfg) - tp.submit(scenario) - - # Export scenario with in-memory data node - tp.export_scenario(scenario.id, "tmp.zip", include_data=True) - expected_warning = f"Data node {scenario.o_mem.id} is not a file-based data node and the data will not be exported" - assert expected_warning in caplog.text diff --git a/tests/core/test_taipy/test_export_with_sql_repo.py b/tests/core/test_taipy/test_export_with_sql_repo.py deleted file mode 100644 index 7ca34c7d61..0000000000 --- a/tests/core/test_taipy/test_export_with_sql_repo.py +++ /dev/null @@ -1,200 +0,0 @@ -# Copyright 2021-2024 Avaiga Private Limited -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on -# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the -# specific language governing permissions and limitations under the License. - -import os -import zipfile - -import pandas as pd -import pytest - -import taipy.core.taipy as tp -from taipy import Config, Frequency, Scope -from taipy.core.exceptions import ExportPathAlreadyExists - - -@pytest.fixture(scope="function", autouse=True) -def clean_export_zip_file(): - if os.path.exists("./tmp.zip"): - os.remove("./tmp.zip") - yield - if os.path.exists("./tmp.zip"): - os.remove("./tmp.zip") - - -def plus_1(x): - return x + 1 - - -def plus_1_dataframe(x): - return pd.DataFrame({"output": [x + 1]}) - - -def configure_test_scenario(input_data, frequency=None): - input_cfg = Config.configure_data_node( - id=f"i_{input_data}", storage_type="pickle", scope=Scope.SCENARIO, default_data=input_data - ) - csv_output_cfg = Config.configure_data_node(id=f"o_{input_data}_csv", storage_type="csv") - excel_output_cfg = Config.configure_data_node(id=f"o_{input_data}_excel", storage_type="excel") - parquet_output_cfg = Config.configure_data_node(id=f"o_{input_data}_parquet", storage_type="parquet") - json_output_cfg = Config.configure_data_node(id=f"o_{input_data}_json", storage_type="json") - - csv_task_cfg = Config.configure_task(f"t_{input_data}_csv", plus_1_dataframe, input_cfg, csv_output_cfg) - excel_task_cfg = Config.configure_task(f"t_{input_data}_excel", plus_1_dataframe, input_cfg, excel_output_cfg) - parquet_task_cfg = Config.configure_task(f"t_{input_data}_parquet", plus_1_dataframe, input_cfg, parquet_output_cfg) - json_task_cfg = Config.configure_task(f"t_{input_data}_json", plus_1, input_cfg, json_output_cfg) - scenario_cfg = Config.configure_scenario( - id=f"s_{input_data}", - task_configs=[csv_task_cfg, excel_task_cfg, parquet_task_cfg, json_task_cfg], - frequency=frequency, - ) - - return scenario_cfg - - -def test_export_scenario_with_cycle(tmp_path, init_sql_repo): - scenario_cfg = configure_test_scenario(1, frequency=Frequency.DAILY) - - scenario = tp.create_scenario(scenario_cfg) - submission = tp.submit(scenario) - jobs = submission.jobs - - # Export the submitted scenario - tp.export_scenario(scenario.id, "tmp.zip") - with zipfile.ZipFile("./tmp.zip") as zip_file: - zip_file.extractall(tmp_path) - - assert sorted(os.listdir(f"{tmp_path}/data_node")) == sorted( - [ - f"{scenario.i_1.id}.json", - f"{scenario.o_1_csv.id}.json", - f"{scenario.o_1_excel.id}.json", - f"{scenario.o_1_parquet.id}.json", - f"{scenario.o_1_json.id}.json", - ] - ) - assert sorted(os.listdir(f"{tmp_path}/task")) == sorted( - [ - f"{scenario.t_1_csv.id}.json", - f"{scenario.t_1_excel.id}.json", - f"{scenario.t_1_parquet.id}.json", - f"{scenario.t_1_json.id}.json", - ] - ) - assert sorted(os.listdir(f"{tmp_path}/scenario")) == sorted([f"{scenario.id}.json"]) - assert sorted(os.listdir(f"{tmp_path}/job")) == sorted( - [f"{jobs[0].id}.json", f"{jobs[1].id}.json", f"{jobs[2].id}.json", f"{jobs[3].id}.json"] - ) - assert os.listdir(f"{tmp_path}/submission") == [f"{submission.id}.json"] - assert sorted(os.listdir(f"{tmp_path}/cycle")) == sorted([f"{scenario.cycle.id}.json"]) - - -def test_export_scenario_without_cycle(tmp_path, init_sql_repo): - scenario_cfg = configure_test_scenario(1) - - scenario = tp.create_scenario(scenario_cfg) - tp.submit(scenario) - - # Export the submitted scenario - tp.export_scenario(scenario.id, "tmp.zip") - with zipfile.ZipFile("./tmp.zip") as zip_file: - zip_file.extractall(tmp_path) - - assert os.path.exists(f"{tmp_path}/data_node") - assert os.path.exists(f"{tmp_path}/task") - assert os.path.exists(f"{tmp_path}/scenario") - assert os.path.exists(f"{tmp_path}/job") - assert os.path.exists(f"{tmp_path}/submission") - assert not os.path.exists(f"{tmp_path}/cycle") # No cycle - - -def test_export_scenario_override_existing_files(tmp_path, init_sql_repo): - scenario_1_cfg = configure_test_scenario(1, frequency=Frequency.DAILY) - scenario_2_cfg = configure_test_scenario(2) - - scenario_1 = tp.create_scenario(scenario_1_cfg) - tp.submit(scenario_1) - - # Export the submitted scenario_1 - tp.export_scenario(scenario_1.id, "tmp.zip") - with zipfile.ZipFile("./tmp.zip") as zip_file: - zip_file.extractall(tmp_path / "scenario_1") - assert os.path.exists(f"{tmp_path}/scenario_1/data_node") - assert os.path.exists(f"{tmp_path}/scenario_1/task") - assert os.path.exists(f"{tmp_path}/scenario_1/scenario") - assert os.path.exists(f"{tmp_path}/scenario_1/job") - assert os.path.exists(f"{tmp_path}/scenario_1/submission") - assert os.path.exists(f"{tmp_path}/scenario_1/cycle") - - scenario_2 = tp.create_scenario(scenario_2_cfg) - tp.submit(scenario_2) - - # Export the submitted scenario_2 to the same folder should raise an error - with pytest.raises(ExportPathAlreadyExists): - tp.export_scenario(scenario_2.id, "tmp.zip") - - # Export the submitted scenario_2 without a cycle and override the existing files - tp.export_scenario(scenario_2.id, "tmp.zip", override=True) - with zipfile.ZipFile("./tmp.zip") as zip_file: - zip_file.extractall(tmp_path / "scenario_2") - assert os.path.exists(f"{tmp_path}/scenario_2/data_node") - assert os.path.exists(f"{tmp_path}/scenario_2/task") - assert os.path.exists(f"{tmp_path}/scenario_2/scenario") - assert os.path.exists(f"{tmp_path}/scenario_2/job") - assert os.path.exists(f"{tmp_path}/scenario_2/submission") - # The cycles folder should not exists since the new scenario does not have a cycle - assert not os.path.exists(f"{tmp_path}/scenario_2/cycle") - - -def test_export_scenario_sql_repo_with_data(tmp_path, init_sql_repo): - scenario_cfg = configure_test_scenario(1) - scenario = tp.create_scenario(scenario_cfg) - tp.submit(scenario) - - # Export scenario without data - tp.export_scenario(scenario.id, "tmp.zip") - with zipfile.ZipFile("./tmp.zip") as zip_file: - zip_file.extractall(tmp_path / "scenario_without_data") - assert not os.path.exists(f"{tmp_path}/scenario_without_data/user_data") - - # Export scenario with data - tp.export_scenario(scenario.id, "tmp.zip", include_data=True, override=True) - with zipfile.ZipFile("./tmp.zip") as zip_file: - zip_file.extractall(tmp_path / "scenario_with_data") - assert os.path.exists(f"{tmp_path}/scenario_with_data/user_data") - - data_files = [f for _, _, files in os.walk(f"{tmp_path}/scenario_with_data/user_data") for f in files] - assert sorted(data_files) == sorted( - [ - f"{scenario.i_1.id}.p", - f"{scenario.o_1_csv.id}.csv", - f"{scenario.o_1_excel.id}.xlsx", - f"{scenario.o_1_parquet.id}.parquet", - f"{scenario.o_1_json.id}.json", - ] - ) - - -def test_export_non_file_based_data_node_raise_warning(init_sql_repo, caplog): - input_cfg = Config.configure_data_node(id="i", storage_type="pickle", scope=Scope.SCENARIO, default_data=1) - csv_output_cfg = Config.configure_data_node(id="o_csv", storage_type="csv") - in_mem_output_cfg = Config.configure_data_node(id="o_mem", storage_type="in_memory") - - csv_task_cfg = Config.configure_task("t_csv", plus_1_dataframe, input_cfg, csv_output_cfg) - in_mem_task_cfg = Config.configure_task("t_mem", plus_1, input_cfg, in_mem_output_cfg) - scenario_cfg = Config.configure_scenario(id="s", task_configs=[csv_task_cfg, in_mem_task_cfg]) - - scenario = tp.create_scenario(scenario_cfg) - tp.submit(scenario) - - # Export scenario with in-memory data node - tp.export_scenario(scenario.id, "tmp.zip", include_data=True) - expected_warning = f"Data node {scenario.o_mem.id} is not a file-based data node and the data will not be exported" - assert expected_warning in caplog.text diff --git a/tests/core/test_taipy/test_import.py b/tests/core/test_taipy/test_import.py deleted file mode 100644 index f9a1ffcad6..0000000000 --- a/tests/core/test_taipy/test_import.py +++ /dev/null @@ -1,213 +0,0 @@ -# Copyright 2021-2024 Avaiga Private Limited -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on -# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the -# specific language governing permissions and limitations under the License. - -import os -import shutil -import zipfile - -import pandas as pd -import pytest - -import taipy.core.taipy as tp -from taipy import Config, Frequency, Scope -from taipy.core._version._version_manager import _VersionManager -from taipy.core.cycle._cycle_manager import _CycleManager -from taipy.core.data._data_manager import _DataManager -from taipy.core.exceptions.exceptions import ( - ConflictedConfigurationError, - EntitiesToBeImportAlredyExist, - ImportArchiveDoesntContainAnyScenario, - ImportScenarioDoesntHaveAVersion, -) -from taipy.core.job._job_manager import _JobManager -from taipy.core.scenario._scenario_manager import _ScenarioManager -from taipy.core.submission._submission_manager import _SubmissionManager -from taipy.core.task._task_manager import _TaskManager - - -@pytest.fixture(scope="function", autouse=True) -def clean_export_zip_file(): - if os.path.exists("./tmp.zip"): - os.remove("./tmp.zip") - yield - if os.path.exists("./tmp.zip"): - os.remove("./tmp.zip") - - -def plus_1(x): - return x + 1 - - -def plus_1_dataframe(x): - return pd.DataFrame({"output": [x + 1]}) - - -def configure_test_scenario(input_data, frequency=None): - input_cfg = Config.configure_data_node( - id=f"i_{input_data}", storage_type="pickle", scope=Scope.SCENARIO, default_data=input_data - ) - csv_output_cfg = Config.configure_data_node(id=f"o_{input_data}_csv", storage_type="csv") - excel_output_cfg = Config.configure_data_node(id=f"o_{input_data}_excel", storage_type="excel") - parquet_output_cfg = Config.configure_data_node(id=f"o_{input_data}_parquet", storage_type="parquet") - json_output_cfg = Config.configure_data_node(id=f"o_{input_data}_json", storage_type="json") - - csv_task_cfg = Config.configure_task(f"t_{input_data}_csv", plus_1_dataframe, input_cfg, csv_output_cfg) - excel_task_cfg = Config.configure_task(f"t_{input_data}_excel", plus_1_dataframe, input_cfg, excel_output_cfg) - parquet_task_cfg = Config.configure_task(f"t_{input_data}_parquet", plus_1_dataframe, input_cfg, parquet_output_cfg) - json_task_cfg = Config.configure_task(f"t_{input_data}_json", plus_1, input_cfg, json_output_cfg) - scenario_cfg = Config.configure_scenario( - id=f"s_{input_data}", - task_configs=[csv_task_cfg, excel_task_cfg, parquet_task_cfg, json_task_cfg], - frequency=frequency, - ) - - return scenario_cfg - - -def export_test_scenario(scenario_cfg, export_path="tmp.zip", override=False, include_data=False): - scenario = tp.create_scenario(scenario_cfg) - tp.submit(scenario) - - # Export the submitted scenario - tp.export_scenario(scenario.id, export_path, override, include_data) - return scenario - - -def test_import_scenario_without_data(init_managers): - scenario_cfg = configure_test_scenario(1, frequency=Frequency.DAILY) - scenario = export_test_scenario(scenario_cfg) - - init_managers() - - assert _ScenarioManager._get_all() == [] - imported_scenario = tp.import_scenario("tmp.zip") - - # The imported scenario should be the same as the exported scenario - assert _ScenarioManager._get_all() == [imported_scenario] - assert imported_scenario == scenario - - # All entities belonging to the scenario should be imported - assert len(_CycleManager._get_all()) == 1 - assert len(_TaskManager._get_all()) == 4 - assert len(_DataManager._get_all()) == 5 - assert len(_JobManager._get_all()) == 4 - assert len(_SubmissionManager._get_all()) == 1 - assert len(_VersionManager._get_all()) == 1 - - -def test_import_scenario_with_data(init_managers): - scenario_cfg = configure_test_scenario(1, frequency=Frequency.DAILY) - export_test_scenario(scenario_cfg, include_data=True) - - init_managers() - - assert _ScenarioManager._get_all() == [] - imported_scenario = tp.import_scenario("tmp.zip") - - # All data of all data nodes should be imported - assert all(os.path.exists(dn.path) for dn in imported_scenario.data_nodes.values()) - - -def test_import_scenario_when_entities_are_already_existed_should_rollback(caplog): - scenario_cfg = configure_test_scenario(1, frequency=Frequency.DAILY) - export_test_scenario(scenario_cfg) - - caplog.clear() - - _CycleManager._delete_all() - _TaskManager._delete_all() - _DataManager._delete_all() - _JobManager._delete_all() - _ScenarioManager._delete_all() - - assert len(_CycleManager._get_all()) == 0 - assert len(_TaskManager._get_all()) == 0 - assert len(_DataManager._get_all()) == 0 - assert len(_JobManager._get_all()) == 0 - assert len(_SubmissionManager._get_all()) == 1 # Keep the submission entity to test the rollback - submission_id = _SubmissionManager._get_all()[0].id - assert len(_ScenarioManager._get_all()) == 0 - - # Import the scenario when the old entities still exist should raise an error - with pytest.raises(EntitiesToBeImportAlredyExist): - tp.import_scenario("tmp.zip") - assert all(log.levelname in ["ERROR", "INFO"] for log in caplog.records) - assert "An error occurred during the import" in caplog.text - assert f"{submission_id} already exists. Please use the 'override' parameter to override it" in caplog.text - - # No entity should be imported and the old entities should be kept - assert len(_CycleManager._get_all()) == 0 - assert len(_TaskManager._get_all()) == 0 - assert len(_DataManager._get_all()) == 0 - assert len(_JobManager._get_all()) == 0 - assert len(_SubmissionManager._get_all()) == 1 # Keep the submission entity to test the rollback - assert len(_ScenarioManager._get_all()) == 0 - - caplog.clear() - - # Import with override flag - tp.import_scenario("tmp.zip", override=True) - assert all(log.levelname in ["WARNING", "INFO"] for log in caplog.records) - assert f"{submission_id} already exists and will be overridden" in caplog.text - - # The scenario is imported and overridden the old one - assert len(_ScenarioManager._get_all()) == 1 - assert len(_CycleManager._get_all()) == 1 - assert len(_TaskManager._get_all()) == 4 - assert len(_DataManager._get_all()) == 5 - assert len(_JobManager._get_all()) == 4 - assert len(_SubmissionManager._get_all()) == 1 - assert len(_VersionManager._get_all()) == 1 - - -def test_import_incompatible_scenario(init_managers): - scenario_cfg = configure_test_scenario(1, frequency=Frequency.DAILY) - export_test_scenario(scenario_cfg) - - Config.unblock_update() - - # Configure a new dn to make the exported version incompatible - Config.configure_data_node("new_dn") - - with pytest.raises(ConflictedConfigurationError): - tp.import_scenario("tmp.zip") - - -def test_import_a_non_exists_folder(): - scenario_cfg = configure_test_scenario(1, frequency=Frequency.DAILY) - export_test_scenario(scenario_cfg) - - with pytest.raises(FileNotFoundError): - tp.import_scenario("non_exists_folder") - - -def test_import_an_empty_archive(tmpdir_factory): - empty_folder = tmpdir_factory.mktemp("empty_folder").strpath - shutil.make_archive("tmp", "zip", empty_folder) - - with pytest.raises(ImportArchiveDoesntContainAnyScenario): - tp.import_scenario("tmp.zip") - - -def test_import_with_no_version(tmp_path): - scenario_cfg = configure_test_scenario(1, frequency=Frequency.DAILY) - export_test_scenario(scenario_cfg) - - # Extract the zip, - with zipfile.ZipFile("./tmp.zip") as zip_file: - zip_file.extractall(tmp_path) - # remove the version, - shutil.rmtree(f"{tmp_path}/version") - # and archive the scenario without the version again - shutil.make_archive("tmp", "zip", tmp_path) - - with pytest.raises(ImportScenarioDoesntHaveAVersion): - tp.import_scenario("tmp.zip") diff --git a/tests/core/test_taipy/test_import_with_sql_repo.py b/tests/core/test_taipy/test_import_with_sql_repo.py deleted file mode 100644 index 22f6001ab5..0000000000 --- a/tests/core/test_taipy/test_import_with_sql_repo.py +++ /dev/null @@ -1,174 +0,0 @@ -# Copyright 2021-2024 Avaiga Private Limited -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on -# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the -# specific language governing permissions and limitations under the License. - -import os - -import pandas as pd -import pytest - -import taipy.core.taipy as tp -from taipy import Config, Frequency, Scope -from taipy.core._version._version_manager import _VersionManager -from taipy.core.cycle._cycle_manager import _CycleManager -from taipy.core.data._data_manager import _DataManager -from taipy.core.exceptions.exceptions import ConflictedConfigurationError, EntitiesToBeImportAlredyExist -from taipy.core.job._job_manager import _JobManager -from taipy.core.scenario._scenario_manager import _ScenarioManager -from taipy.core.submission._submission_manager import _SubmissionManager -from taipy.core.task._task_manager import _TaskManager - - -@pytest.fixture(scope="function", autouse=True) -def clean_export_zip_file(): - if os.path.exists("./tmp.zip"): - os.remove("./tmp.zip") - yield - if os.path.exists("./tmp.zip"): - os.remove("./tmp.zip") - - -def plus_1(x): - return x + 1 - - -def plus_1_dataframe(x): - return pd.DataFrame({"output": [x + 1]}) - - -def configure_test_scenario(input_data, frequency=None): - input_cfg = Config.configure_data_node( - id=f"i_{input_data}", storage_type="pickle", scope=Scope.SCENARIO, default_data=input_data - ) - csv_output_cfg = Config.configure_data_node(id=f"o_{input_data}_csv", storage_type="csv") - excel_output_cfg = Config.configure_data_node(id=f"o_{input_data}_excel", storage_type="excel") - parquet_output_cfg = Config.configure_data_node(id=f"o_{input_data}_parquet", storage_type="parquet") - json_output_cfg = Config.configure_data_node(id=f"o_{input_data}_json", storage_type="json") - - csv_task_cfg = Config.configure_task(f"t_{input_data}_csv", plus_1_dataframe, input_cfg, csv_output_cfg) - excel_task_cfg = Config.configure_task(f"t_{input_data}_excel", plus_1_dataframe, input_cfg, excel_output_cfg) - parquet_task_cfg = Config.configure_task(f"t_{input_data}_parquet", plus_1_dataframe, input_cfg, parquet_output_cfg) - json_task_cfg = Config.configure_task(f"t_{input_data}_json", plus_1, input_cfg, json_output_cfg) - scenario_cfg = Config.configure_scenario( - id=f"s_{input_data}", - task_configs=[csv_task_cfg, excel_task_cfg, parquet_task_cfg, json_task_cfg], - frequency=frequency, - ) - - return scenario_cfg - - -def export_test_scenario(scenario_cfg, export_path="tmp.zip", override=False, include_data=False): - scenario = tp.create_scenario(scenario_cfg) - tp.submit(scenario) - - # Export the submitted scenario - tp.export_scenario(scenario.id, export_path, override, include_data) - return scenario - - -def test_import_scenario_without_data(init_sql_repo, init_managers): - scenario_cfg = configure_test_scenario(1, frequency=Frequency.DAILY) - scenario = export_test_scenario(scenario_cfg) - - init_managers() - - assert _ScenarioManager._get_all() == [] - imported_scenario = tp.import_scenario("tmp.zip") - - # The imported scenario should be the same as the exported scenario - assert _ScenarioManager._get_all() == [imported_scenario] - assert imported_scenario == scenario - - # All entities belonging to the scenario should be imported - assert len(_CycleManager._get_all()) == 1 - assert len(_TaskManager._get_all()) == 4 - assert len(_DataManager._get_all()) == 5 - assert len(_JobManager._get_all()) == 4 - assert len(_SubmissionManager._get_all()) == 1 - assert len(_VersionManager._get_all()) == 1 - - -def test_import_scenario_with_data(init_sql_repo, init_managers): - scenario_cfg = configure_test_scenario(1, frequency=Frequency.DAILY) - export_test_scenario(scenario_cfg, include_data=True) - - init_managers() - - assert _ScenarioManager._get_all() == [] - imported_scenario = tp.import_scenario("tmp.zip") - - # All data of all data nodes should be imported - assert all(os.path.exists(dn.path) for dn in imported_scenario.data_nodes.values()) - - -def test_import_scenario_when_entities_are_already_existed_should_rollback(init_sql_repo, caplog): - scenario_cfg = configure_test_scenario(1, frequency=Frequency.DAILY) - export_test_scenario(scenario_cfg) - - caplog.clear() - - _CycleManager._delete_all() - _TaskManager._delete_all() - _DataManager._delete_all() - _JobManager._delete_all() - _ScenarioManager._delete_all() - - assert len(_CycleManager._get_all()) == 0 - assert len(_TaskManager._get_all()) == 0 - assert len(_DataManager._get_all()) == 0 - assert len(_JobManager._get_all()) == 0 - assert len(_SubmissionManager._get_all()) == 1 # Keep the submission entity to test the rollback - submission_id = _SubmissionManager._get_all()[0].id - assert len(_ScenarioManager._get_all()) == 0 - - # Import the scenario when the old entities still exist should raise an error - with pytest.raises(EntitiesToBeImportAlredyExist): - tp.import_scenario("tmp.zip") - assert all(log.levelname in ["ERROR", "INFO"] for log in caplog.records) - assert "An error occurred during the import" in caplog.text - assert f"{submission_id} already exists. Please use the 'override' parameter to override it" in caplog.text - - # No entity should be imported and the old entities should be kept - assert len(_CycleManager._get_all()) == 0 - assert len(_TaskManager._get_all()) == 0 - assert len(_DataManager._get_all()) == 0 - assert len(_JobManager._get_all()) == 0 - assert len(_SubmissionManager._get_all()) == 1 # Keep the submission entity to test the rollback - assert len(_ScenarioManager._get_all()) == 0 - - caplog.clear() - - # Import with override flag - tp.import_scenario("tmp.zip", override=True) - assert all(log.levelname in ["WARNING", "INFO"] for log in caplog.records) - assert f"{submission_id} already exists and will be overridden" in caplog.text - - # The scenario is imported and overridden the old one - assert len(_ScenarioManager._get_all()) == 1 - assert len(_CycleManager._get_all()) == 1 - assert len(_TaskManager._get_all()) == 4 - assert len(_DataManager._get_all()) == 5 - assert len(_JobManager._get_all()) == 4 - assert len(_SubmissionManager._get_all()) == 1 - assert len(_VersionManager._get_all()) == 1 - - -def test_import_incompatible_scenario(init_sql_repo, init_managers): - scenario_cfg = configure_test_scenario(1, frequency=Frequency.DAILY) - export_test_scenario(scenario_cfg) - - Config.unblock_update() - - # Configure a new dn to make the exported version incompatible - Config.configure_data_node("new_dn") - - with pytest.raises(ConflictedConfigurationError): - tp.import_scenario("tmp.zip")