Skip to content

Commit

Permalink
[Fixes #257] Add RemoteResourceHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
mattiagiupponi committed Jul 23, 2024
1 parent 12fc9f4 commit fd46d56
Show file tree
Hide file tree
Showing 18 changed files with 759 additions and 173 deletions.
71 changes: 42 additions & 29 deletions importer/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
from importer.orchestrator import orchestrator
from oauth2_provider.contrib.rest_framework import OAuth2Authentication
from rest_framework.authentication import BasicAuthentication, SessionAuthentication
from rest_framework.parsers import FileUploadParser, MultiPartParser
from rest_framework.parsers import FileUploadParser, MultiPartParser, JSONParser
from rest_framework.permissions import IsAuthenticatedOrReadOnly
from rest_framework.response import Response
from geonode.assets.handlers import asset_handler_registry
Expand All @@ -57,7 +57,7 @@ class ImporterViewSet(DynamicModelViewSet):
API endpoint that allows uploads to be viewed or edited.
"""

parser_class = [FileUploadParser, MultiPartParser]
parser_class = [JSONParser, FileUploadParser, MultiPartParser]

authentication_classes = [
BasicAuthentication,
Expand Down Expand Up @@ -129,46 +129,40 @@ def create(self, request, *args, **kwargs):

handler = orchestrator.get_handler(_data)

if _file and handler:
# not file but handler means that is a remote resource
if (_file and handler) or (not _file and handler):
asset = None
files = []
try:
# cloning data into a local folder
extracted_params, _data = handler.extract_params_from_data(_data)
if storage_manager is None:
# means that the storage manager is not initialized yet, so
# the file is not a zip
storage_manager = StorageManager(remote_files=_data)
storage_manager.clone_remote_files(
cloning_directory=asset_dir, create_tempdir=False
if _file:
storage_manager, asset, files = self._handle_files(
request, asset_dir, storage_manager, _data, handler
)
# get filepath
asset, files = self.generate_asset_and_retrieve_paths(
request, storage_manager, handler
)

upload_validator = UploadLimitValidator(request.user)
upload_validator.validate_parallelism_limit_per_user()
upload_validator.validate_files_sum_of_sizes(
storage_manager.data_retriever
)

action = ExecutionRequestAction.IMPORT.value

input_params = {
**{"files": files, "handler_module_path": str(handler)},
**extracted_params,
}

if asset:
input_params.update(
{
"asset_id": asset.id,
"asset_module_path": f"{asset.__module__}.{asset.__class__.__name__}",
}
)

execution_id = orchestrator.create_execution_request(
user=request.user,
func_name=next(iter(handler.get_task_list(action=action))),
step=_(next(iter(handler.get_task_list(action=action)))),
input_params={
**{"files": files, "handler_module_path": str(handler)},
**extracted_params,
**{
"asset_id": asset.id,
"asset_module_path": f"{asset.__module__}.{asset.__class__.__name__}",
},
},
legacy_upload_name=_file.name,
input_params=input_params,
action=action,
name=_file.name,
name=_file.name if _file else extracted_params.get("title", None),
source=extracted_params.get("source"),
)

Expand All @@ -194,6 +188,25 @@ def create(self, request, *args, **kwargs):

raise ImportException(detail="No handlers found for this dataset type")

def _handle_files(self, request, asset_dir, storage_manager, _data, handler):
if storage_manager is None:
# means that the storage manager is not initialized yet, so
# the file is not a zip
storage_manager = StorageManager(remote_files=_data)
storage_manager.clone_remote_files(
cloning_directory=asset_dir, create_tempdir=False
)
# get filepath
asset, files = self.generate_asset_and_retrieve_paths(
request, storage_manager, handler
)

upload_validator = UploadLimitValidator(request.user)
upload_validator.validate_parallelism_limit_per_user()
upload_validator.validate_files_sum_of_sizes(storage_manager.data_retriever)

return storage_manager, asset, files

def generate_asset_and_retrieve_paths(self, request, storage_manager, handler):
asset_handler = asset_handler_registry.get_default_handler()
_files = storage_manager.get_retrieved_paths()
Expand Down
17 changes: 11 additions & 6 deletions importer/celery_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ def create_geonode_resource(
layer_name: Optional[str] = None,
alternate: Optional[str] = None,
handler_module_path: str = None,
action: str = None,
action: str = exa.IMPORT.value,
**kwargs,
):
"""
Expand Down Expand Up @@ -329,11 +329,16 @@ def create_geonode_resource(

_files = _exec.input_params.get("files")

_asset = (
import_string(_exec.input_params.get("asset_module_path"))
.objects.filter(id=_exec.input_params.get("asset_id"))
.first()
)
if not _files:
_asset = None
else:
_asset = (
import_string(_exec.input_params.get("asset_module_path"))
.objects.filter(id=_exec.input_params.get("asset_id"))
.first()
)

handler_module_path = handler_module_path or _exec.input_params.get('handler_module_path')

handler = import_string(handler_module_path)()
_overwrite = _exec.input_params.get("overwrite_existing_layer")
Expand Down
2 changes: 1 addition & 1 deletion importer/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def input_is_valid(self):
"""
Perform basic validation steps
"""
return self.handler.is_valid(self.files, self.user)
return self.handler.is_valid(self.files, self.user, self.execution_id)

def prepare_import(self, **kwargs):
"""
Expand Down
89 changes: 87 additions & 2 deletions importer/handlers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
from geonode.resource.enumerator import ExecutionRequestAction as exa
from geonode.layers.models import Dataset
from importer.api.exception import ImportException
from importer.utils import ImporterRequestAction as ira
from importer.utils import ImporterRequestAction as ira, find_key_recursively
from django_celery_results.models import TaskResult
from django.db.models import Q
from geonode.resource.models import ExecutionRequest
from geonode.base.models import ResourceBase

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -89,7 +91,7 @@ def can_handle_sld_file(self) -> bool:
return True

@staticmethod
def is_valid(files, user):
def is_valid(files, user, _execid=None):
"""
Define basic validation steps
"""
Expand Down Expand Up @@ -257,3 +259,86 @@ def delete_resource(self, instance):
Base function to delete the resource with all the dependencies (example: dynamic model)
"""
return NotImplementedError

def _get_execution_request_object(self, execution_id: str):
return ExecutionRequest.objects.filter(exec_id=execution_id).first()

def overwrite_resourcehandlerinfo(
self,
handler_module_path: str,
resource: Dataset,
execution_id: ExecutionRequest,
**kwargs,
):
"""
Overwrite the ResourceHandlerInfo
"""
if resource.resourcehandlerinfo_set.exists():
resource.resourcehandlerinfo_set.update(
handler_module_path=handler_module_path,
resource=resource,
execution_request=execution_id,
kwargs=kwargs.get("kwargs", {}) or kwargs,
)
return
return self.create_resourcehandlerinfo(
handler_module_path, resource, execution_id, **kwargs
)

def rollback(
self, exec_id, rollback_from_step, action_to_rollback, *args, **kwargs
):
steps = self.ACTIONS.get(action_to_rollback)
if rollback_from_step not in steps:
logger.info(f"Step not found {rollback_from_step}, skipping")
return
step_index = steps.index(rollback_from_step)
# the start_import, start_copy etc.. dont do anything as step, is just the start
# so there is nothing to rollback
steps_to_rollback = steps[1 : step_index + 1] # noqa
if not steps_to_rollback:
return
# reversing the tuple to going backwards with the rollback
reversed_steps = steps_to_rollback[::-1]
instance_name = None
try:
instance_name = (
find_key_recursively(kwargs, "new_dataset_alternate") or args[3]
)
except Exception:
pass

logger.warning(
f"Starting rollback for execid: {exec_id} resource published was: {instance_name}"
)

for step in reversed_steps:
normalized_step_name = step.split(".")[-1]
if getattr(self, f"_{normalized_step_name}_rollback", None):
function = getattr(self, f"_{normalized_step_name}_rollback")
function(exec_id, instance_name, *args, **kwargs)

logger.warning(
f"Rollback for execid: {exec_id} resource published was: {instance_name} completed"
)

def _create_geonode_resource_rollback(
self, exec_id, istance_name=None, *args, **kwargs
):
"""
The handler will remove the resource from geonode
"""
logger.info(
f"Rollback geonode step in progress for execid: {exec_id} resource created was: {istance_name}"
)
resource = ResourceBase.objects.filter(alternate__icontains=istance_name)
if resource.exists():
resource.delete()

def _copy_dynamic_model_rollback(self, exec_id, istance_name=None, *args, **kwargs):
self._import_resource_rollback(exec_id, istance_name=istance_name)

def _copy_geonode_resource_rollback(
self, exec_id, istance_name=None, *args, **kwargs
):
self._create_geonode_resource_rollback(exec_id, istance_name=istance_name)
61 changes: 1 addition & 60 deletions importer/handlers/common/raster.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import pyproj
from importer.publisher import DataPublisher
from importer.utils import find_key_recursively
import json
import logging
from pathlib import Path
Expand Down Expand Up @@ -55,7 +54,7 @@ def get_geoserver_store_name(default=None):
return default, False

@staticmethod
def is_valid(files, user):
def is_valid(files, user, _execid=None):
"""
Define basic validation steps
"""
Expand Down Expand Up @@ -496,43 +495,6 @@ def copy_original_file(dataset):
"""
return storage_manager.copy(dataset)

def rollback(
self, exec_id, rollback_from_step, action_to_rollback, *args, **kwargs
):
steps = self.ACTIONS.get(action_to_rollback)
if rollback_from_step not in steps:
logger.info(f"Step not found {rollback_from_step}, skipping")
return
step_index = steps.index(rollback_from_step)
# the start_import, start_copy etc.. dont do anything as step, is just the start
# so there is nothing to rollback
steps_to_rollback = steps[1 : step_index + 1] # noqa
if not steps_to_rollback:
return
# reversing the tuple to going backwards with the rollback
reversed_steps = steps_to_rollback[::-1]
istance_name = None
try:
istance_name = (
find_key_recursively(kwargs, "new_dataset_alternate") or args[3]
)
except Exception:
pass

logger.warning(
f"Starting rollback for execid: {exec_id} resource published was: {istance_name}"
)

for step in reversed_steps:
normalized_step_name = step.split(".")[-1]
if getattr(self, f"_{normalized_step_name}_rollback", None):
function = getattr(self, f"_{normalized_step_name}_rollback")
function(exec_id, istance_name, *args, **kwargs)

logger.warning(
f"Rollback for execid: {exec_id} resource published was: {istance_name} completed"
)

def _import_resource_rollback(self, exec_id, istance_name=None, *args, **kwargs):
"""
In the raster, this step just generate the alternate, no real action
Expand All @@ -552,27 +514,6 @@ def _publish_resource_rollback(self, exec_id, istance_name=None, *args, **kwargs
publisher = DataPublisher(handler_module_path=handler_module_path)
publisher.delete_resource(istance_name)

def _create_geonode_resource_rollback(
self, exec_id, istance_name=None, *args, **kwargs
):
"""
The handler will remove the resource from geonode
"""
logger.info(
f"Rollback geonode step in progress for execid: {exec_id} resource created was: {istance_name}"
)
resource = ResourceBase.objects.filter(alternate__icontains=istance_name)
if resource.exists():
resource.delete()

def _copy_dynamic_model_rollback(self, exec_id, istance_name=None, *args, **kwargs):
self._import_resource_rollback(exec_id, istance_name=istance_name)

def _copy_geonode_resource_rollback(
self, exec_id, istance_name=None, *args, **kwargs
):
self._create_geonode_resource_rollback(exec_id, istance_name=istance_name)


@importer_app.task(
base=ErrorBaseTaskClass,
Expand Down
Loading

0 comments on commit fd46d56

Please sign in to comment.