diff --git a/importer/api/views.py b/importer/api/views.py index dcaf4140..fd10ea98 100644 --- a/importer/api/views.py +++ b/importer/api/views.py @@ -43,7 +43,7 @@ from importer.orchestrator import orchestrator from oauth2_provider.contrib.rest_framework import OAuth2Authentication from rest_framework.authentication import BasicAuthentication, SessionAuthentication -from rest_framework.parsers import FileUploadParser, MultiPartParser +from rest_framework.parsers import FileUploadParser, MultiPartParser, JSONParser from rest_framework.permissions import IsAuthenticatedOrReadOnly from rest_framework.response import Response from geonode.assets.handlers import asset_handler_registry @@ -57,7 +57,7 @@ class ImporterViewSet(DynamicModelViewSet): API endpoint that allows uploads to be viewed or edited. """ - parser_class = [FileUploadParser, MultiPartParser] + parser_class = [JSONParser, FileUploadParser, MultiPartParser] authentication_classes = [ BasicAuthentication, @@ -129,46 +129,40 @@ def create(self, request, *args, **kwargs): handler = orchestrator.get_handler(_data) - if _file and handler: + # not file but handler means that is a remote resource + if (_file and handler) or (not _file and handler): asset = None + files = [] try: # cloning data into a local folder extracted_params, _data = handler.extract_params_from_data(_data) - if storage_manager is None: - # means that the storage manager is not initialized yet, so - # the file is not a zip - storage_manager = StorageManager(remote_files=_data) - storage_manager.clone_remote_files( - cloning_directory=asset_dir, create_tempdir=False + if _file: + storage_manager, asset, files = self._handle_files( + request, asset_dir, storage_manager, _data, handler ) - # get filepath - asset, files = self.generate_asset_and_retrieve_paths( - request, storage_manager, handler - ) - - upload_validator = UploadLimitValidator(request.user) - upload_validator.validate_parallelism_limit_per_user() - upload_validator.validate_files_sum_of_sizes( - storage_manager.data_retriever - ) action = ExecutionRequestAction.IMPORT.value + input_params = { + **{"files": files, "handler_module_path": str(handler)}, + **extracted_params, + } + + if asset: + input_params.update( + { + "asset_id": asset.id, + "asset_module_path": f"{asset.__module__}.{asset.__class__.__name__}", + } + ) + execution_id = orchestrator.create_execution_request( user=request.user, func_name=next(iter(handler.get_task_list(action=action))), step=_(next(iter(handler.get_task_list(action=action)))), - input_params={ - **{"files": files, "handler_module_path": str(handler)}, - **extracted_params, - **{ - "asset_id": asset.id, - "asset_module_path": f"{asset.__module__}.{asset.__class__.__name__}", - }, - }, - legacy_upload_name=_file.name, + input_params=input_params, action=action, - name=_file.name, + name=_file.name if _file else extracted_params.get("title", None), source=extracted_params.get("source"), ) @@ -194,6 +188,25 @@ def create(self, request, *args, **kwargs): raise ImportException(detail="No handlers found for this dataset type") + def _handle_files(self, request, asset_dir, storage_manager, _data, handler): + if storage_manager is None: + # means that the storage manager is not initialized yet, so + # the file is not a zip + storage_manager = StorageManager(remote_files=_data) + storage_manager.clone_remote_files( + cloning_directory=asset_dir, create_tempdir=False + ) + # get filepath + asset, files = self.generate_asset_and_retrieve_paths( + request, storage_manager, handler + ) + + upload_validator = UploadLimitValidator(request.user) + upload_validator.validate_parallelism_limit_per_user() + upload_validator.validate_files_sum_of_sizes(storage_manager.data_retriever) + + return storage_manager, asset, files + def generate_asset_and_retrieve_paths(self, request, storage_manager, handler): asset_handler = asset_handler_registry.get_default_handler() _files = storage_manager.get_retrieved_paths() diff --git a/importer/celery_tasks.py b/importer/celery_tasks.py index a86d3da4..ac79fe8b 100644 --- a/importer/celery_tasks.py +++ b/importer/celery_tasks.py @@ -299,7 +299,7 @@ def create_geonode_resource( layer_name: Optional[str] = None, alternate: Optional[str] = None, handler_module_path: str = None, - action: str = None, + action: str = exa.IMPORT.value, **kwargs, ): """ @@ -329,11 +329,16 @@ def create_geonode_resource( _files = _exec.input_params.get("files") - _asset = ( - import_string(_exec.input_params.get("asset_module_path")) - .objects.filter(id=_exec.input_params.get("asset_id")) - .first() - ) + if not _files: + _asset = None + else: + _asset = ( + import_string(_exec.input_params.get("asset_module_path")) + .objects.filter(id=_exec.input_params.get("asset_id")) + .first() + ) + + handler_module_path = handler_module_path or _exec.input_params.get('handler_module_path') handler = import_string(handler_module_path)() _overwrite = _exec.input_params.get("overwrite_existing_layer") diff --git a/importer/datastore.py b/importer/datastore.py index d472031c..d1533ba1 100644 --- a/importer/datastore.py +++ b/importer/datastore.py @@ -24,7 +24,7 @@ def input_is_valid(self): """ Perform basic validation steps """ - return self.handler.is_valid(self.files, self.user) + return self.handler.is_valid(self.files, self.user, self.execution_id) def prepare_import(self, **kwargs): """ diff --git a/importer/handlers/base.py b/importer/handlers/base.py index 6c74ae8b..68296e49 100644 --- a/importer/handlers/base.py +++ b/importer/handlers/base.py @@ -5,9 +5,11 @@ from geonode.resource.enumerator import ExecutionRequestAction as exa from geonode.layers.models import Dataset from importer.api.exception import ImportException -from importer.utils import ImporterRequestAction as ira +from importer.utils import ImporterRequestAction as ira, find_key_recursively from django_celery_results.models import TaskResult from django.db.models import Q +from geonode.resource.models import ExecutionRequest +from geonode.base.models import ResourceBase logger = logging.getLogger(__name__) @@ -89,7 +91,7 @@ def can_handle_sld_file(self) -> bool: return True @staticmethod - def is_valid(files, user): + def is_valid(files, user, _execid=None): """ Define basic validation steps """ @@ -257,3 +259,86 @@ def delete_resource(self, instance): Base function to delete the resource with all the dependencies (example: dynamic model) """ return NotImplementedError + + def _get_execution_request_object(self, execution_id: str): + return ExecutionRequest.objects.filter(exec_id=execution_id).first() + + def overwrite_resourcehandlerinfo( + self, + handler_module_path: str, + resource: Dataset, + execution_id: ExecutionRequest, + **kwargs, + ): + """ + Overwrite the ResourceHandlerInfo + """ + if resource.resourcehandlerinfo_set.exists(): + resource.resourcehandlerinfo_set.update( + handler_module_path=handler_module_path, + resource=resource, + execution_request=execution_id, + kwargs=kwargs.get("kwargs", {}) or kwargs, + ) + return + return self.create_resourcehandlerinfo( + handler_module_path, resource, execution_id, **kwargs + ) + + def rollback( + self, exec_id, rollback_from_step, action_to_rollback, *args, **kwargs + ): + steps = self.ACTIONS.get(action_to_rollback) + if rollback_from_step not in steps: + logger.info(f"Step not found {rollback_from_step}, skipping") + return + step_index = steps.index(rollback_from_step) + # the start_import, start_copy etc.. dont do anything as step, is just the start + # so there is nothing to rollback + steps_to_rollback = steps[1 : step_index + 1] # noqa + if not steps_to_rollback: + return + # reversing the tuple to going backwards with the rollback + reversed_steps = steps_to_rollback[::-1] + instance_name = None + try: + instance_name = ( + find_key_recursively(kwargs, "new_dataset_alternate") or args[3] + ) + except Exception: + pass + + logger.warning( + f"Starting rollback for execid: {exec_id} resource published was: {instance_name}" + ) + + for step in reversed_steps: + normalized_step_name = step.split(".")[-1] + if getattr(self, f"_{normalized_step_name}_rollback", None): + function = getattr(self, f"_{normalized_step_name}_rollback") + function(exec_id, instance_name, *args, **kwargs) + + logger.warning( + f"Rollback for execid: {exec_id} resource published was: {instance_name} completed" + ) + + def _create_geonode_resource_rollback( + self, exec_id, istance_name=None, *args, **kwargs + ): + """ + The handler will remove the resource from geonode + """ + logger.info( + f"Rollback geonode step in progress for execid: {exec_id} resource created was: {istance_name}" + ) + resource = ResourceBase.objects.filter(alternate__icontains=istance_name) + if resource.exists(): + resource.delete() + + def _copy_dynamic_model_rollback(self, exec_id, istance_name=None, *args, **kwargs): + self._import_resource_rollback(exec_id, istance_name=istance_name) + + def _copy_geonode_resource_rollback( + self, exec_id, istance_name=None, *args, **kwargs + ): + self._create_geonode_resource_rollback(exec_id, istance_name=istance_name) diff --git a/importer/handlers/common/raster.py b/importer/handlers/common/raster.py index 2990c4c5..3c008c2f 100644 --- a/importer/handlers/common/raster.py +++ b/importer/handlers/common/raster.py @@ -1,6 +1,5 @@ import pyproj from importer.publisher import DataPublisher -from importer.utils import find_key_recursively import json import logging from pathlib import Path @@ -55,7 +54,7 @@ def get_geoserver_store_name(default=None): return default, False @staticmethod - def is_valid(files, user): + def is_valid(files, user, _execid=None): """ Define basic validation steps """ @@ -496,43 +495,6 @@ def copy_original_file(dataset): """ return storage_manager.copy(dataset) - def rollback( - self, exec_id, rollback_from_step, action_to_rollback, *args, **kwargs - ): - steps = self.ACTIONS.get(action_to_rollback) - if rollback_from_step not in steps: - logger.info(f"Step not found {rollback_from_step}, skipping") - return - step_index = steps.index(rollback_from_step) - # the start_import, start_copy etc.. dont do anything as step, is just the start - # so there is nothing to rollback - steps_to_rollback = steps[1 : step_index + 1] # noqa - if not steps_to_rollback: - return - # reversing the tuple to going backwards with the rollback - reversed_steps = steps_to_rollback[::-1] - istance_name = None - try: - istance_name = ( - find_key_recursively(kwargs, "new_dataset_alternate") or args[3] - ) - except Exception: - pass - - logger.warning( - f"Starting rollback for execid: {exec_id} resource published was: {istance_name}" - ) - - for step in reversed_steps: - normalized_step_name = step.split(".")[-1] - if getattr(self, f"_{normalized_step_name}_rollback", None): - function = getattr(self, f"_{normalized_step_name}_rollback") - function(exec_id, istance_name, *args, **kwargs) - - logger.warning( - f"Rollback for execid: {exec_id} resource published was: {istance_name} completed" - ) - def _import_resource_rollback(self, exec_id, istance_name=None, *args, **kwargs): """ In the raster, this step just generate the alternate, no real action @@ -552,27 +514,6 @@ def _publish_resource_rollback(self, exec_id, istance_name=None, *args, **kwargs publisher = DataPublisher(handler_module_path=handler_module_path) publisher.delete_resource(istance_name) - def _create_geonode_resource_rollback( - self, exec_id, istance_name=None, *args, **kwargs - ): - """ - The handler will remove the resource from geonode - """ - logger.info( - f"Rollback geonode step in progress for execid: {exec_id} resource created was: {istance_name}" - ) - resource = ResourceBase.objects.filter(alternate__icontains=istance_name) - if resource.exists(): - resource.delete() - - def _copy_dynamic_model_rollback(self, exec_id, istance_name=None, *args, **kwargs): - self._import_resource_rollback(exec_id, istance_name=istance_name) - - def _copy_geonode_resource_rollback( - self, exec_id, istance_name=None, *args, **kwargs - ): - self._create_geonode_resource_rollback(exec_id, istance_name=istance_name) - @importer_app.task( base=ErrorBaseTaskClass, diff --git a/importer/handlers/common/remote.py b/importer/handlers/common/remote.py new file mode 100755 index 00000000..359e1ae4 --- /dev/null +++ b/importer/handlers/common/remote.py @@ -0,0 +1,221 @@ +import json +import logging +import os + +import requests +from geonode.layers.models import Dataset +from geonode.resource.enumerator import ExecutionRequestAction as exa +from importer.api.exception import ImportException +from importer.handlers.base import BaseHandler +from importer.handlers.common.serializer import RemoteResourceSerializer +from importer.models import ResourceHandlerInfo +from importer.orchestrator import orchestrator +from importer.celery_tasks import import_orchestrator +from importer.handlers.utils import create_alternate +from importer.utils import ImporterRequestAction as ira +from geonode.base.models import ResourceBase, Link +from urllib.parse import urlparse +from geonode.base.enumerations import SOURCE_TYPE_REMOTE +from geonode.resource.manager import resource_manager +from geonode.resource.models import ExecutionRequest + +logger = logging.getLogger(__name__) + + +class BaseRemoteResourceHandler(BaseHandler): + """ + Handler to import remote resources files into GeoNode data db + It must provide the task_lists required to comple the upload + As first implementation only remote 3dtiles are supported + """ + + ACTIONS = { + exa.IMPORT.value: ( + "start_import", + "importer.import_resource", + "importer.create_geonode_resource", + ), + exa.COPY.value: ( + "start_copy", + "importer.copy_geonode_resource", + ), + ira.ROLLBACK.value: ( + "start_rollback", + "importer.rollback", + ), + } + + @staticmethod + def has_serializer(data) -> bool: + if "url" in data: + return RemoteResourceSerializer + return False + + @staticmethod + def can_handle(_data) -> bool: + """ + This endpoint will return True or False if with the info provided + the handler is able to handle the file or not + """ + if "url" in _data: + return True + return True + + @staticmethod + def is_valid(files, user, _execid=None): + """ + We mark it as valid if the urls is reachable + and if the url is valid + """ + try: + url = orchestrator.get_execution_object(_execid)\ + .input_params\ + .get('url') + r = requests.get(url, timeout=10) + r.raise_for_status() + except requests.exceptions.Timeout: + ImportException("Timed out") + + return True + + @staticmethod + def extract_params_from_data(_data, action=None): + """ + Remove from the _data the params that needs to save into the executionRequest object + all the other are returned + """ + if action == exa.COPY.value: + title = json.loads(_data.get("defaults")) + return {"title": title.pop("title")}, _data + + return { + "source": _data.pop("source", "upload"), + "title": _data.pop("title", None), + "url": _data.pop("url", None), + "type": _data.pop("type", None), + }, _data + + def import_resource(self, files: dict, execution_id: str, **kwargs) -> str: + """ + Main function to import the resource. + Internally will call the steps required to import the + data inside the geonode_data database + """ + # for the moment we skip the dyanamic model creation + logger.info("Total number of layers available: 1") + _exec = self._get_execution_request_object(execution_id) + _input = {**_exec.input_params, **{"total_layers": 1}} + orchestrator.update_execution_request_status( + execution_id=str(execution_id), input_params=_input + ) + + try: + params = _exec.input_params.copy() + url = params.get("url") + title = params.get("title", os.path.basename(urlparse(url).path)) + + # start looping on the layers available + layer_name = self.fixup_name(title) + + should_be_overwritten = _exec.input_params.get("overwrite_existing_layer") + + user_datasets = ResourceBase.objects.filter( + owner=_exec.user, alternate=layer_name + ) + + dataset_exists = user_datasets.exists() + + if dataset_exists and should_be_overwritten: + layer_name, alternate = ( + layer_name, + user_datasets.first().alternate.split(":")[-1], + ) + elif not dataset_exists: + alternate = layer_name + else: + alternate = create_alternate(layer_name, execution_id) + + import_orchestrator.apply_async( + ( + files, + execution_id, + str(self), + "importer.import_resource", + layer_name, + alternate, + exa.IMPORT.value, + ) + ) + return layer_name, alternate, execution_id + + except Exception as e: + logger.error(e) + raise e + + def create_geonode_resource( + self, + layer_name: str, + alternate: str, + execution_id: str, + resource_type: Dataset = ..., + asset=None, + ): + ''' + Creating geonode base resource + We ignore the params, we use the function as a interface to keep the same + importer flow. + We create a standard ResourceBase + ''' + _exec = orchestrator.get_execution_object(execution_id) + params = _exec.input_params.copy() + subtype = params.get("type") + + resource = resource_manager.create( + None, + resource_type=ResourceBase, + defaults=dict( + resource_type="dataset", + subtype=subtype, + sourcetype=SOURCE_TYPE_REMOTE, + alternate=alternate, + dirty_state=True, + title=layer_name, + owner=_exec.user, + ), + ) + resource_manager.set_thumbnail(None, instance=resource) + + resource = self.create_link(resource, params, alternate) + ResourceBase.objects.filter(alternate=alternate).update(dirty_state=False) + + return resource + + def create_link(self, resource, params: dict, name): + link = Link( + resource=resource, + extension=params.get("type"), + url=params.get("url"), + link_type="remote", + name=name, + ) + link.save() + return resource + + def create_resourcehandlerinfo( + self, + handler_module_path: str, + resource: Dataset, + execution_id: ExecutionRequest, + **kwargs, + ): + """ + Create relation between the GeonodeResource and the handler used + to create/copy it + """ + + ResourceHandlerInfo.objects.create( + handler_module_path=handler_module_path, + resource=resource, + execution_request=execution_id, + kwargs=kwargs.get("kwargs", {}) or kwargs, + ) diff --git a/importer/handlers/common/serializer.py b/importer/handlers/common/serializer.py new file mode 100644 index 00000000..e92058b7 --- /dev/null +++ b/importer/handlers/common/serializer.py @@ -0,0 +1,21 @@ +from rest_framework import serializers +from dynamic_rest.serializers import DynamicModelSerializer +from geonode.upload.models import Upload + + +class RemoteResourceSerializer(DynamicModelSerializer): + class Meta: + ref_name = "RemoteResourceSerializer" + model = Upload + view_name = "importer_upload" + fields = ( + "url", + "title", + "type", + "source", + ) + + url = serializers.URLField(required=True) + title = serializers.CharField(required=False) + type = serializers.CharField(required=True) + source = serializers.CharField(required=False, default="upload") diff --git a/importer/handlers/common/test_remote.py b/importer/handlers/common/test_remote.py new file mode 100644 index 00000000..4d78d52e --- /dev/null +++ b/importer/handlers/common/test_remote.py @@ -0,0 +1,360 @@ +import os +import shutil +import uuid +from celery.canvas import Signature +from celery import group +from django.test import TestCase +from mock import MagicMock, patch +from importer.handlers.common.vector import BaseVectorFileHandler, import_with_ogr2ogr +from django.contrib.auth import get_user_model +from importer import project_dir +from importer.handlers.gpkg.handler import GPKGFileHandler +from importer.orchestrator import orchestrator +from geonode.base.populate_test_data import create_single_dataset +from geonode.resource.models import ExecutionRequest +from dynamic_models.models import ModelSchema +from osgeo import ogr +from django.test.utils import override_settings + + +class BaseRemoteResourceHandler(TestCase): + databases = ("default", "datastore") + + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.handler = BaseRemoteResourceHandler() + cls.valid_gpkg = f"{project_dir}/tests/fixture/valid.gpkg" + cls.invalid_gpkg = f"{project_dir}/tests/fixture/invalid.gpkg" + cls.no_crs_gpkg = f"{project_dir}/tests/fixture/noCrsTable.gpkg" + cls.user, _ = get_user_model().objects.get_or_create(username="admin") + cls.invalid_files = {"base_file": cls.invalid_gpkg} + cls.valid_files = {"base_file": "/tmp/valid.gpkg"} + cls.owner = get_user_model().objects.first() + cls.layer = create_single_dataset( + name="stazioni_metropolitana", owner=cls.owner + ) + + def setUp(self) -> None: + shutil.copy(self.valid_gpkg, "/tmp") + super().setUp() + + def test_create_error_log(self): + """ + Should return the formatted way for the log of the handler + """ + actual = self.handler.create_error_log( + Exception("my exception"), + "foo_task_name", + *["exec_id", "layer_name", "alternate"], + ) + expected = "Task: foo_task_name raised an error during actions for layer: alternate: my exception" + self.assertEqual(expected, actual) + + def test_create_dynamic_model_fields(self): + try: + # Prepare the test + exec_id = orchestrator.create_execution_request( + user=get_user_model().objects.first(), + func_name="funct1", + step="step", + input_params={"files": self.valid_files, "skip_existing_layer": True}, + ) + schema, _ = ModelSchema.objects.get_or_create( + name="test_handler", db_name="datastore" + ) + layers = ogr.Open(self.valid_gpkg) + + # starting the tests + dynamic_model, celery_group = self.handler.create_dynamic_model_fields( + layer=[x for x in layers][0], + dynamic_model_schema=schema, + overwrite=False, + execution_id=str(exec_id), + layer_name="stazioni_metropolitana", + ) + + self.assertIsNotNone(dynamic_model) + self.assertIsInstance(celery_group, group) + self.assertEqual(1, len(celery_group.tasks)) + self.assertEqual( + "importer.create_dynamic_structure", celery_group.tasks[0].name + ) + finally: + if schema: + schema.delete() + if exec_id: + ExecutionRequest.objects.filter(exec_id=exec_id).delete() + + def test_setup_dynamic_model_no_dataset_no_modelschema(self): + self._assert_test_result() + + def test_setup_dynamic_model_no_dataset_no_modelschema_overwrite_true(self): + self._assert_test_result(overwrite=True) + + def test_setup_dynamic_model_with_dataset_no_modelschema_overwrite_false(self): + create_single_dataset(name="stazioni_metropolitana", owner=self.user) + self._assert_test_result(overwrite=False) + + def test_setup_dynamic_model_with_dataset_no_modelschema_overwrite_True(self): + create_single_dataset(name="stazioni_metropolitana", owner=self.user) + self._assert_test_result(overwrite=True) + + def test_setup_dynamic_model_no_dataset_with_modelschema_overwrite_false(self): + ModelSchema.objects.get_or_create( + name="stazioni_metropolitana", db_name="datastore" + ) + self._assert_test_result(overwrite=False) + + def test_setup_dynamic_model_with_dataset_with_modelschema_overwrite_false(self): + create_single_dataset(name="stazioni_metropolitana", owner=self.user) + ModelSchema.objects.create( + name="stazioni_metropolitana", db_name="datastore", managed=True + ) + self._assert_test_result(overwrite=False) + + def _assert_test_result(self, overwrite=False): + try: + # Prepare the test + exec_id = orchestrator.create_execution_request( + user=self.user, + func_name="funct1", + step="step", + input_params={"files": self.valid_files, "skip_existing_layer": True}, + ) + + layers = ogr.Open(self.valid_gpkg) + + # starting the tests + dynamic_model, layer_name, celery_group = self.handler.setup_dynamic_model( + layer=[x for x in layers][0], + execution_id=str(exec_id), + should_be_overwritten=overwrite, + username=self.user, + ) + + self.assertIsNotNone(dynamic_model) + + # check if the uuid has been added to the model name + self.assertIsNotNone(layer_name) + + self.assertIsInstance(celery_group, group) + self.assertEqual(1, len(celery_group.tasks)) + self.assertEqual( + "importer.create_dynamic_structure", celery_group.tasks[0].name + ) + finally: + if exec_id: + ExecutionRequest.objects.filter(exec_id=exec_id).delete() + + @patch("importer.handlers.common.vector.BaseVectorFileHandler.get_ogr2ogr_driver") + @patch("importer.handlers.common.vector.chord") + def test_import_resource_should_not_be_imported(self, celery_chord, ogr2ogr_driver): + """ + If the resource exists and should be skept, the celery task + is not going to be called and the layer is skipped + """ + exec_id = None + try: + # create the executionId + exec_id = orchestrator.create_execution_request( + user=get_user_model().objects.first(), + func_name="funct1", + step="step", + input_params={"files": self.valid_files, "skip_existing_layer": True}, + ) + + with self.assertRaises(Exception) as exception: + # start the resource import + self.handler.import_resource( + files=self.valid_files, execution_id=str(exec_id) + ) + self.assertIn( + "No valid layers found", + exception.exception.args[0], + "No valid layers found.", + ) + + celery_chord.assert_not_called() + finally: + if exec_id: + ExecutionRequest.objects.filter(exec_id=exec_id).delete() + + @patch("importer.handlers.common.vector.BaseVectorFileHandler.get_ogr2ogr_driver") + @patch("importer.handlers.common.vector.chord") + def test_import_resource_should_work(self, celery_chord, ogr2ogr_driver): + try: + ogr2ogr_driver.return_value = ogr.GetDriverByName("GPKG") + exec_id = orchestrator.create_execution_request( + user=get_user_model().objects.first(), + func_name="funct1", + step="step", + input_params={"files": self.valid_files}, + ) + + # start the resource import + self.handler.import_resource( + files=self.valid_files, execution_id=str(exec_id) + ) + + celery_chord.assert_called_once() + finally: + if exec_id: + ExecutionRequest.objects.filter(exec_id=exec_id).delete() + + def test_get_ogr2ogr_task_group(self): + _uuid = uuid.uuid4() + + actual = self.handler.get_ogr2ogr_task_group( + str(_uuid), + files=self.valid_files, + layer="dataset", + should_be_overwritten=True, + alternate="abc", + ) + self.assertIsInstance(actual, (Signature,)) + self.assertEqual("importer.import_with_ogr2ogr", actual.task) + + @patch("importer.handlers.common.vector.Popen") + def test_import_with_ogr2ogr_without_errors_should_call_the_right_command( + self, _open + ): + _uuid = uuid.uuid4() + + comm = MagicMock() + comm.communicate.return_value = b"", b"" + _open.return_value = comm + + _task, alternate, execution_id = import_with_ogr2ogr( + execution_id=str(_uuid), + files=self.valid_files, + original_name="dataset", + handler_module_path=str(self.handler), + ovverwrite_layer=False, + alternate="alternate", + ) + + self.assertEqual("ogr2ogr", _task) + self.assertEqual(alternate, "alternate") + self.assertEqual(str(_uuid), execution_id) + + _open.assert_called_once() + _open.assert_called_with( + "/usr/bin/ogr2ogr --config PG_USE_COPY YES -f PostgreSQL PG:\" dbname='test_geonode_data' host=" + + os.getenv("DATABASE_HOST", "localhost") + + " port=5432 user='geonode_data' password='geonode_data' \" \"" + + self.valid_files.get("base_file") + + '" -nln alternate "dataset"', + stdout=-1, + stderr=-1, + shell=True, # noqa + ) + + @patch("importer.handlers.common.vector.Popen") + def test_import_with_ogr2ogr_with_errors_should_raise_exception(self, _open): + _uuid = uuid.uuid4() + + comm = MagicMock() + comm.communicate.return_value = b"", b"ERROR: some error here" + _open.return_value = comm + + with self.assertRaises(Exception): + import_with_ogr2ogr( + execution_id=str(_uuid), + files=self.valid_files, + original_name="dataset", + handler_module_path=str(self.handler), + ovverwrite_layer=False, + alternate="alternate", + ) + + _open.assert_called_once() + _open.assert_called_with( + "/usr/bin/ogr2ogr --config PG_USE_COPY YES -f PostgreSQL PG:\" dbname='test_geonode_data' host=" + + os.getenv("DATABASE_HOST", "localhost") + + " port=5432 user='geonode_data' password='geonode_data' \" \"" + + self.valid_files.get("base_file") + + '" -nln alternate "dataset"', + stdout=-1, + stderr=-1, + shell=True, # noqa + ) + + @patch.dict(os.environ, {"OGR2OGR_COPY_WITH_DUMP": "True"}, clear=True) + @patch("importer.handlers.common.vector.Popen") + def test_import_with_ogr2ogr_without_errors_should_call_the_right_command_if_dump_is_enabled( + self, _open + ): + _uuid = uuid.uuid4() + + comm = MagicMock() + comm.communicate.return_value = b"", b"" + _open.return_value = comm + + _task, alternate, execution_id = import_with_ogr2ogr( + execution_id=str(_uuid), + files=self.valid_files, + original_name="dataset", + handler_module_path=str(self.handler), + ovverwrite_layer=False, + alternate="alternate", + ) + + self.assertEqual("ogr2ogr", _task) + self.assertEqual(alternate, "alternate") + self.assertEqual(str(_uuid), execution_id) + + _open.assert_called_once() + _call_as_string = _open.mock_calls[0][1][0] + + self.assertTrue("-f PGDump /vsistdout/" in _call_as_string) + self.assertTrue("psql -d" in _call_as_string) + self.assertFalse("-f PostgreSQL PG" in _call_as_string) + + def test_select_valid_layers(self): + """ + The function should return only the datasets with a geometry + The other one are discarded + """ + all_layers = GPKGFileHandler().get_ogr2ogr_driver().Open(self.no_crs_gpkg) + + with self.assertLogs(level="ERROR") as _log: + valid_layer = GPKGFileHandler()._select_valid_layers(all_layers) + + self.assertIn( + "The following layer layer_styles does not have a Coordinate Reference System (CRS) and will be skipped.", + [x.message for x in _log.records], + ) + self.assertEqual(1, len(valid_layer)) + self.assertEqual("mattia_test", valid_layer[0].GetName()) + + @override_settings(MEDIA_ROOT="/tmp") + def test_perform_last_step(self): + """ + Output params in perform_last_step should return the detail_url and the ID + of the resource created + """ + handler = GPKGFileHandler() + # creating exec_id for the import + exec_id = orchestrator.create_execution_request( + user=get_user_model().objects.first(), + func_name="funct1", + step="step", + input_params={"files": self.valid_files, "store_spatial_file": True}, + ) + + # create_geonode_resource + resource = handler.create_geonode_resource( + "layer_name", + "layer_alternate", + str(exec_id), + ) + exec_obj = orchestrator.get_execution_object(str(exec_id)) + handler.create_resourcehandlerinfo(str(handler), resource, exec_obj) + # calling the last_step + handler.perform_last_step(str(exec_id)) + expected_output = { + "resources": [{"id": resource.pk, "detail_url": resource.detail_url}] + } + exec_obj.refresh_from_db() + self.assertDictEqual(expected_output, exec_obj.output_params) diff --git a/importer/handlers/common/vector.py b/importer/handlers/common/vector.py index b60ea731..f30417aa 100644 --- a/importer/handlers/common/vector.py +++ b/importer/handlers/common/vector.py @@ -1,7 +1,7 @@ import ast from django.db import connections from importer.publisher import DataPublisher -from importer.utils import call_rollback_function, find_key_recursively +from importer.utils import call_rollback_function import json import logging import os @@ -63,7 +63,7 @@ def get_geoserver_store_name(default=None): return os.environ.get("GEONODE_GEODATABASE", "geonode_data"), True @staticmethod - def is_valid(files, user): + def is_valid(files, user, _execid=None): """ Define basic validation steps """ @@ -778,43 +778,6 @@ def _get_type(self, _type: str): """ return STANDARD_TYPE_MAPPING.get(ogr.FieldDefn.GetTypeName(_type)) - def rollback( - self, exec_id, rollback_from_step, action_to_rollback, *args, **kwargs - ): - steps = self.ACTIONS.get(action_to_rollback) - if rollback_from_step not in steps: - logger.info(f"Step not found {rollback_from_step}, skipping") - return - step_index = steps.index(rollback_from_step) - # the start_import, start_copy etc.. dont do anything as step, is just the start - # so there is nothing to rollback - steps_to_rollback = steps[1 : step_index + 1] # noqa - if not steps_to_rollback: - return - # reversing the tuple to going backwards with the rollback - reversed_steps = steps_to_rollback[::-1] - instance_name = None - try: - instance_name = ( - find_key_recursively(kwargs, "new_dataset_alternate") or args[3] - ) - except Exception: - pass - - logger.warning( - f"Starting rollback for execid: {exec_id} resource published was: {instance_name}" - ) - - for step in reversed_steps: - normalized_step_name = step.split(".")[-1] - if getattr(self, f"_{normalized_step_name}_rollback", None): - function = getattr(self, f"_{normalized_step_name}_rollback") - function(exec_id, instance_name, *args, **kwargs) - - logger.warning( - f"Rollback for execid: {exec_id} resource published was: {instance_name} completed" - ) - def _import_resource_rollback(self, exec_id, instance_name=None, *args, **kwargs): """ We use the schema editor directly, because the model itself is not managed @@ -859,29 +822,6 @@ def _publish_resource_rollback(self, exec_id, instance_name=None, *args, **kwarg publisher = DataPublisher(handler_module_path=handler_module_path) publisher.delete_resource(instance_name) - def _create_geonode_resource_rollback( - self, exec_id, instance_name=None, *args, **kwargs - ): - """ - The handler will remove the resource from geonode - """ - logger.info( - f"Rollback geonode step in progress for execid: {exec_id} resource created was: {instance_name}" - ) - resource = ResourceBase.objects.filter(alternate__icontains=instance_name) - if resource.exists(): - resource.delete() - - def _copy_dynamic_model_rollback( - self, exec_id, instance_name=None, *args, **kwargs - ): - self._import_resource_rollback(exec_id, instance_name=instance_name) - - def _copy_geonode_resource_rollback( - self, exec_id, instance_name=None, *args, **kwargs - ): - self._create_geonode_resource_rollback(exec_id, instance_name=instance_name) - @importer_app.task( base=ErrorBaseTaskClass, diff --git a/importer/handlers/csv/handler.py b/importer/handlers/csv/handler.py index f1433ed2..8ad96e96 100644 --- a/importer/handlers/csv/handler.py +++ b/importer/handlers/csv/handler.py @@ -74,7 +74,7 @@ def can_handle(_data) -> bool: ) @staticmethod - def is_valid(files, user): + def is_valid(files, user, _execid=None): BaseVectorFileHandler.is_valid(files, user) # getting the upload limit validation upload_validator = UploadLimitValidator(user) diff --git a/importer/handlers/geojson/handler.py b/importer/handlers/geojson/handler.py index ea79ea0d..b1e8d8d6 100644 --- a/importer/handlers/geojson/handler.py +++ b/importer/handlers/geojson/handler.py @@ -78,7 +78,7 @@ def can_handle(_data) -> bool: return False @staticmethod - def is_valid(files, user): + def is_valid(files, user, _execid=None): """ Define basic validation steps: """ diff --git a/importer/handlers/geotiff/handler.py b/importer/handlers/geotiff/handler.py index 5c6ae0db..49561f3c 100644 --- a/importer/handlers/geotiff/handler.py +++ b/importer/handlers/geotiff/handler.py @@ -59,7 +59,7 @@ def can_handle(_data) -> bool: return ext in ["tiff", "geotiff", "tif", "geotif"] @staticmethod - def is_valid(files, user): + def is_valid(files, user, _execid=None): """ Define basic validation steps: """ diff --git a/importer/handlers/gpkg/handler.py b/importer/handlers/gpkg/handler.py index bd6db8ac..014ec86b 100644 --- a/importer/handlers/gpkg/handler.py +++ b/importer/handlers/gpkg/handler.py @@ -73,7 +73,7 @@ def can_handle(_data) -> bool: ) @staticmethod - def is_valid(files, user): + def is_valid(files, user, _execid=None): """ Define basic validation steps: Upload limit: diff --git a/importer/handlers/kml/handler.py b/importer/handlers/kml/handler.py index 00941594..943e9da8 100644 --- a/importer/handlers/kml/handler.py +++ b/importer/handlers/kml/handler.py @@ -73,7 +73,7 @@ def can_handle(_data) -> bool: ) @staticmethod - def is_valid(files, user): + def is_valid(files, user, _execid=None): """ Define basic validation steps: Upload limit: diff --git a/importer/handlers/shapefile/handler.py b/importer/handlers/shapefile/handler.py index 2ace0bf0..08d044b8 100644 --- a/importer/handlers/shapefile/handler.py +++ b/importer/handlers/shapefile/handler.py @@ -97,7 +97,7 @@ def extract_params_from_data(_data, action=None): return additional_params, _data @staticmethod - def is_valid(files, user): + def is_valid(files, user, _execid=None): """ Define basic validation steps: """ diff --git a/importer/handlers/sld/handler.py b/importer/handlers/sld/handler.py index c93856b0..f7852154 100644 --- a/importer/handlers/sld/handler.py +++ b/importer/handlers/sld/handler.py @@ -30,7 +30,7 @@ def can_handle(_data) -> bool: ) @staticmethod - def is_valid(files, user): + def is_valid(files, user, _execid=None): """ Define basic validation steps """ diff --git a/importer/handlers/tiles3d/handler.py b/importer/handlers/tiles3d/handler.py index 1a9ff92a..6367663b 100755 --- a/importer/handlers/tiles3d/handler.py +++ b/importer/handlers/tiles3d/handler.py @@ -66,7 +66,7 @@ def can_handle(_data) -> bool: return False @staticmethod - def is_valid(files, user): + def is_valid(files, user, _execid=None): """ Define basic validation steps: """ diff --git a/importer/orchestrator.py b/importer/orchestrator.py index afd18f8c..d74abc99 100644 --- a/importer/orchestrator.py +++ b/importer/orchestrator.py @@ -12,7 +12,6 @@ from geonode.base.enumerations import STATE_INVALID, STATE_PROCESSED, STATE_RUNNING from geonode.resource.models import ExecutionRequest from geonode.upload.models import Upload -from geonode.storage.manager import storage_manager from rest_framework import serializers from importer.api.exception import ImportException @@ -179,10 +178,11 @@ def set_as_failed(self, execution_id, reason=None, delete_file=True): if delete_file: exec_obj = self.get_execution_object(execution_id) # cleanup asset in case of fail - asset_handler = import_string(exec_obj.input_params["asset_module_path"]) - asset = asset_handler.objects.filter(pk=exec_obj.input_params["asset_id"]) - if asset.exists(): - asset.first().delete() + if exec_obj.input_params.get("asset_module_path", None): + asset_handler = import_string(exec_obj.input_params["asset_module_path"]) + asset = asset_handler.objects.filter(pk=exec_obj.input_params["asset_id"]) + if asset.exists(): + asset.first().delete() def set_as_partially_failed(self, execution_id, reason=None): """