From 5cb80b5a3582e56ad394feb6ab00ec0f3bd7cebb Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Thu, 26 Oct 2023 21:59:56 -0400 Subject: [PATCH] extend job input parsing to support bbox and measurement structures --- weaver/processes/constants.py | 9 ++- weaver/processes/execution.py | 126 ++++++++++++++++++++++++++-------- 2 files changed, 104 insertions(+), 31 deletions(-) diff --git a/weaver/processes/constants.py b/weaver/processes/constants.py index 3e59c4dca..cc14b7d71 100644 --- a/weaver/processes/constants.py +++ b/weaver/processes/constants.py @@ -16,10 +16,17 @@ WPS_Literal_Type = Literal["literal"] # pylint: disable=C0103 WPS_Reference_Type = Literal["reference"] # pylint: disable=C0103 WPS_Complex_Type = Literal["ComplexData"] # pylint: disable=C0103 -WPS_Category_Type = Union[WPS_Literal_Type, WPS_Reference_Type, WPS_Complex_Type] # pylint: disable=C0103 +WPS_BoundingBox_Type = Literal["BoundingBoxData"] # pylint: disable=C0103 +WPS_Category_Type = Union[ # pylint: disable=C0103 + WPS_Literal_Type, + WPS_Reference_Type, + WPS_Complex_Type, + WPS_BoundingBox_Type, +] WPS_LITERAL = get_args(WPS_Literal_Type)[0] WPS_REFERENCE = get_args(WPS_Reference_Type)[0] WPS_COMPLEX_DATA = get_args(WPS_Complex_Type)[0] +WPS_BOUNDINGBOX_DATA = get_args(WPS_BoundingBox_Type)[0] WPS_LITERAL_DATA_BOOLEAN = frozenset(["bool", "boolean"]) WPS_LITERAL_DATA_DATETIME = frozenset(["date", "time", "dateTime"]) WPS_LITERAL_DATA_FLOAT = frozenset(["scale", "angle", "float", "double"]) diff --git a/weaver/processes/execution.py b/weaver/processes/execution.py index 99f404afe..e1d14d559 100644 --- a/weaver/processes/execution.py +++ b/weaver/processes/execution.py @@ -9,7 +9,7 @@ from celery.utils.debug import ps as get_celery_process from celery.utils.log import get_task_logger from owslib.util import clean_ows_url -from owslib.wps import ComplexDataInput +from owslib.wps import BoundingBoxDataInput, ComplexDataInput from pyramid.httpexceptions import HTTPBadRequest, HTTPNotAcceptable from pyramid_celery import celery_app as app @@ -20,7 +20,7 @@ from weaver.notify import map_job_subscribers, notify_job_subscribers from weaver.owsexceptions import OWSInvalidParameterValue, OWSNoApplicableCode from weaver.processes import wps_package -from weaver.processes.constants import WPS_COMPLEX_DATA, JobInputsOutputsSchema +from weaver.processes.constants import WPS_BOUNDINGBOX_DATA, WPS_COMPLEX_DATA, JobInputsOutputsSchema from weaver.processes.convert import ( convert_input_values_schema, convert_output_params_schema, @@ -67,7 +67,7 @@ from celery.app.task import Task from pyramid.request import Request - from pywps.inout.inputs import ComplexInput + from pywps.inout.inputs import BoundingBoxInput, ComplexInput from weaver.datatype import Job from weaver.processes.convert import OWS_Input_Type, ProcessOWS @@ -77,6 +77,7 @@ AnyProcessRef, AnyResponseType, AnyServiceRef, + AnyValueType, CeleryResult, HeaderCookiesType, HeadersType, @@ -394,15 +395,89 @@ def fetch_wps_process(job, wps_url, headers, settings): return wps_process +def parse_wps_input_complex(input_value, input_info): + # type: (Union[str, JSON], JSON) -> ComplexDataInput + """ + Parse the input data details into a complex input. + """ + # if provided, pass down specified input format to allow validation against supported formats + c_enc = ctype = schema = None + schema_vars = ["reference", "$schema"] + if isinstance(input_value, dict): + ctype = ( + get_field(input_value, "type", default=None) or + get_field(input_value, "mime_type", search_variations=True, default=None) + ) + c_enc = get_field(input_value, "encoding", search_variations=True, default=None) + schema = get_field(input_value, "schema", search_variations=True, default=None, extra_variations=schema_vars) + input_value = get_any_value(input_value) + if not ctype: + ctype = get_field(input_info, "mime_type", search_variations=True, default=None) + c_enc = get_field(input_info, "encoding", search_variations=True, default=None) + media_format = get_field(input_info, "format", default=None) + if isinstance(media_format, dict): + ctype = ctype or get_field(input_info, "mime_type", search_variations=True, default=None) + c_enc = c_enc or get_field(input_info, "encoding", search_variations=True, default=None) + if isinstance(schema, dict): + schema = get_field(schema, "$ref", default=None, extra_variations=schema_vars) + # need to support 'file://' scheme, but PyWPS doesn't like them, so remove the 'file://' part + if str(input_value).startswith("file://"): + input_value = input_value[7:] + return ComplexDataInput(input_value, mimeType=ctype, encoding=c_enc, schema=schema) + + +def parse_wps_input_bbox(input_value, input_info): + # type: (Union[str, JSON], JSON) -> BoundingBoxDataInput + """ + Parse the input data details into a bounding box input. + """ + bbox_crs = None + bbox_val = input_value + if isinstance(input_value, dict): + bbox_crs = input_value.get("crs") + bbox_val = input_value.get("bbox") + if not bbox_crs: + bbox_crs = input_info.get("bbox", {}).get("default") or None + bbox_val = bbox_val.split(",") if isinstance(bbox_val, str) else bbox_val + bbox_dim = len(bbox_val) // 2 + return BoundingBoxDataInput(bbox_val, crs=bbox_crs, dimensions=bbox_dim) + + +def parse_wps_input_literal(input_value): + # type: (Union[AnyValueType, JSON]) -> Optional[str] + """ + Parse the input data details into a literal input. + """ + # if JSON 'null' was given, the execution content should simply omit the optional input + # cannot distinguish directly between empty string and 'null' in XML representation + if input_value is None: + return None + + # measurement structure + # however, owslib does not care about the UoM specified as input (no way to provide it) + if isinstance(input_value, dict): + val = get_any_value(input_value, file=False, default=input_value) # in case it was nested twice under 'value' + if isinstance(val, dict): + val = get_field(val, "measure", search_variations=True, default=val) + if val is not None: + input_value = val + + # need to use literal string for any data type + return str(input_value) + + def parse_wps_inputs(wps_process, job): # type: (ProcessOWS, Job) -> List[Tuple[str, OWS_Input_Type]] """ Parses expected WPS process inputs against submitted job input values considering supported process definitions. """ complex_inputs = {} # type: Dict[str, ComplexInput] + bbox_inputs = {} # type: Dict[str, BoundingBoxInput] for process_input in wps_process.dataInputs: - if WPS_COMPLEX_DATA in process_input.dataType: + if process_input.dataType == WPS_COMPLEX_DATA: complex_inputs[process_input.identifier] = process_input + elif process_input.dataType == WPS_BOUNDINGBOX_DATA: + bbox_inputs[process_input.identifier] = process_input try: wps_inputs = [] @@ -416,7 +491,11 @@ def parse_wps_inputs(wps_process, job): else: input_id = get_any_id(job_input) input_val = get_any_value(job_input) - # in case of array inputs, must repeat (id,value) + + # FIXME: handle minOccurs>=2 vs single-value inputs + # - https://github.com/opengeospatial/ogcapi-processes/issues/373 + # - https://github.com/crim-ca/weaver/issues/579 + # in case of array inputs, must repeat (id, value) if isinstance(input_val, list): input_values = input_val input_details = input_val # each value has its own metadata @@ -424,33 +503,20 @@ def parse_wps_inputs(wps_process, job): input_values = [input_val] input_details = [job_input] # metadata directly in definition, not nested per array value - # we need to support file:// scheme but PyWPS doesn't like them so remove the scheme file:// - input_values = [ - # when value is an array of dict that each contain a file reference - (get_any_value(val)[7:] if str(get_any_value(val)).startswith("file://") else get_any_value(val)) - if isinstance(val, dict) else - # when value is directly a single dict with file reference - (val[7:] if str(val).startswith("file://") else val) - for val in input_values - ] - - for input_value, input_detail in zip(input_values, input_details): - # need to use ComplexDataInput structure for complex input + for input_value, input_info in zip(input_values, input_details): if input_id in complex_inputs: - # if provided, pass down specified data input format to allow validation against supported formats - ctype = get_field(input_detail, "type", default=None) - encoding = None - if not ctype: - media_format = get_field(input_detail, "format", default=None) - if isinstance(media_format, dict): - ctype = get_field(input_detail, "mime_type", search_variations=True, default=None) - encoding = get_field(input_detail, "encoding", search_variations=True, default=None) - wps_inputs.append((input_id, ComplexDataInput(input_value, mimeType=ctype, encoding=encoding))) - # need to use literal String for anything else than complex - # FIXME: pre-validate allowed literal values? - # TODO: BoundingBox not supported + input_data = parse_wps_input_complex(input_value, input_info) + elif input_id in bbox_inputs: + input_data = parse_wps_input_bbox(input_value, input_info) + else: + input_data = parse_wps_input_literal(input_value) + if input_data is None: + job.save_log( + message=f"Removing [{input_id}] data input from execution request, value was 'null'.", + logger=LOGGER, level=logging.WARNING, + ) else: - wps_inputs.append((input_id, str(input_value))) + wps_inputs.append((input_id, input_data)) except KeyError: wps_inputs = [] return wps_inputs