Skip to content

Commit

Permalink
extend job input parsing to support bbox and measurement structures
Browse files Browse the repository at this point in the history
  • Loading branch information
fmigneault committed Oct 27, 2023
1 parent 50b45b4 commit 5cb80b5
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 31 deletions.
9 changes: 8 additions & 1 deletion weaver/processes/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,17 @@
WPS_Literal_Type = Literal["literal"] # pylint: disable=C0103
WPS_Reference_Type = Literal["reference"] # pylint: disable=C0103
WPS_Complex_Type = Literal["ComplexData"] # pylint: disable=C0103
WPS_Category_Type = Union[WPS_Literal_Type, WPS_Reference_Type, WPS_Complex_Type] # pylint: disable=C0103
WPS_BoundingBox_Type = Literal["BoundingBoxData"] # pylint: disable=C0103
WPS_Category_Type = Union[ # pylint: disable=C0103
WPS_Literal_Type,
WPS_Reference_Type,
WPS_Complex_Type,
WPS_BoundingBox_Type,
]
WPS_LITERAL = get_args(WPS_Literal_Type)[0]
WPS_REFERENCE = get_args(WPS_Reference_Type)[0]
WPS_COMPLEX_DATA = get_args(WPS_Complex_Type)[0]
WPS_BOUNDINGBOX_DATA = get_args(WPS_BoundingBox_Type)[0]
WPS_LITERAL_DATA_BOOLEAN = frozenset(["bool", "boolean"])
WPS_LITERAL_DATA_DATETIME = frozenset(["date", "time", "dateTime"])
WPS_LITERAL_DATA_FLOAT = frozenset(["scale", "angle", "float", "double"])
Expand Down
126 changes: 96 additions & 30 deletions weaver/processes/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from celery.utils.debug import ps as get_celery_process
from celery.utils.log import get_task_logger
from owslib.util import clean_ows_url
from owslib.wps import ComplexDataInput
from owslib.wps import BoundingBoxDataInput, ComplexDataInput
from pyramid.httpexceptions import HTTPBadRequest, HTTPNotAcceptable
from pyramid_celery import celery_app as app

Expand All @@ -20,7 +20,7 @@
from weaver.notify import map_job_subscribers, notify_job_subscribers
from weaver.owsexceptions import OWSInvalidParameterValue, OWSNoApplicableCode
from weaver.processes import wps_package
from weaver.processes.constants import WPS_COMPLEX_DATA, JobInputsOutputsSchema
from weaver.processes.constants import WPS_BOUNDINGBOX_DATA, WPS_COMPLEX_DATA, JobInputsOutputsSchema
from weaver.processes.convert import (
convert_input_values_schema,
convert_output_params_schema,
Expand Down Expand Up @@ -67,7 +67,7 @@

from celery.app.task import Task
from pyramid.request import Request
from pywps.inout.inputs import ComplexInput
from pywps.inout.inputs import BoundingBoxInput, ComplexInput

from weaver.datatype import Job
from weaver.processes.convert import OWS_Input_Type, ProcessOWS
Expand All @@ -77,6 +77,7 @@
AnyProcessRef,
AnyResponseType,
AnyServiceRef,
AnyValueType,
CeleryResult,
HeaderCookiesType,
HeadersType,
Expand Down Expand Up @@ -394,15 +395,89 @@ def fetch_wps_process(job, wps_url, headers, settings):
return wps_process


def parse_wps_input_complex(input_value, input_info):
# type: (Union[str, JSON], JSON) -> ComplexDataInput
"""
Parse the input data details into a complex input.
"""
# if provided, pass down specified input format to allow validation against supported formats
c_enc = ctype = schema = None
schema_vars = ["reference", "$schema"]
if isinstance(input_value, dict):
ctype = (
get_field(input_value, "type", default=None) or
get_field(input_value, "mime_type", search_variations=True, default=None)
)
c_enc = get_field(input_value, "encoding", search_variations=True, default=None)
schema = get_field(input_value, "schema", search_variations=True, default=None, extra_variations=schema_vars)
input_value = get_any_value(input_value)
if not ctype:
ctype = get_field(input_info, "mime_type", search_variations=True, default=None)
c_enc = get_field(input_info, "encoding", search_variations=True, default=None)
media_format = get_field(input_info, "format", default=None)
if isinstance(media_format, dict):
ctype = ctype or get_field(input_info, "mime_type", search_variations=True, default=None)
c_enc = c_enc or get_field(input_info, "encoding", search_variations=True, default=None)
if isinstance(schema, dict):
schema = get_field(schema, "$ref", default=None, extra_variations=schema_vars)
# need to support 'file://' scheme, but PyWPS doesn't like them, so remove the 'file://' part
if str(input_value).startswith("file://"):
input_value = input_value[7:]
return ComplexDataInput(input_value, mimeType=ctype, encoding=c_enc, schema=schema)


def parse_wps_input_bbox(input_value, input_info):
# type: (Union[str, JSON], JSON) -> BoundingBoxDataInput
"""
Parse the input data details into a bounding box input.
"""
bbox_crs = None
bbox_val = input_value
if isinstance(input_value, dict):
bbox_crs = input_value.get("crs")
bbox_val = input_value.get("bbox")
if not bbox_crs:
bbox_crs = input_info.get("bbox", {}).get("default") or None
bbox_val = bbox_val.split(",") if isinstance(bbox_val, str) else bbox_val
bbox_dim = len(bbox_val) // 2
return BoundingBoxDataInput(bbox_val, crs=bbox_crs, dimensions=bbox_dim)


def parse_wps_input_literal(input_value):
# type: (Union[AnyValueType, JSON]) -> Optional[str]
"""
Parse the input data details into a literal input.
"""
# if JSON 'null' was given, the execution content should simply omit the optional input
# cannot distinguish directly between empty string and 'null' in XML representation
if input_value is None:
return None

# measurement structure
# however, owslib does not care about the UoM specified as input (no way to provide it)
if isinstance(input_value, dict):
val = get_any_value(input_value, file=False, default=input_value) # in case it was nested twice under 'value'
if isinstance(val, dict):
val = get_field(val, "measure", search_variations=True, default=val)
if val is not None:
input_value = val

# need to use literal string for any data type
return str(input_value)


def parse_wps_inputs(wps_process, job):
# type: (ProcessOWS, Job) -> List[Tuple[str, OWS_Input_Type]]
"""
Parses expected WPS process inputs against submitted job input values considering supported process definitions.
"""
complex_inputs = {} # type: Dict[str, ComplexInput]
bbox_inputs = {} # type: Dict[str, BoundingBoxInput]
for process_input in wps_process.dataInputs:
if WPS_COMPLEX_DATA in process_input.dataType:
if process_input.dataType == WPS_COMPLEX_DATA:
complex_inputs[process_input.identifier] = process_input
elif process_input.dataType == WPS_BOUNDINGBOX_DATA:
bbox_inputs[process_input.identifier] = process_input

try:
wps_inputs = []
Expand All @@ -416,41 +491,32 @@ def parse_wps_inputs(wps_process, job):
else:
input_id = get_any_id(job_input)
input_val = get_any_value(job_input)
# in case of array inputs, must repeat (id,value)

# FIXME: handle minOccurs>=2 vs single-value inputs
# - https://github.com/opengeospatial/ogcapi-processes/issues/373
# - https://github.com/crim-ca/weaver/issues/579
# in case of array inputs, must repeat (id, value)
if isinstance(input_val, list):
input_values = input_val
input_details = input_val # each value has its own metadata
else:
input_values = [input_val]
input_details = [job_input] # metadata directly in definition, not nested per array value

# we need to support file:// scheme but PyWPS doesn't like them so remove the scheme file://
input_values = [
# when value is an array of dict that each contain a file reference
(get_any_value(val)[7:] if str(get_any_value(val)).startswith("file://") else get_any_value(val))
if isinstance(val, dict) else
# when value is directly a single dict with file reference
(val[7:] if str(val).startswith("file://") else val)
for val in input_values
]

for input_value, input_detail in zip(input_values, input_details):
# need to use ComplexDataInput structure for complex input
for input_value, input_info in zip(input_values, input_details):
if input_id in complex_inputs:
# if provided, pass down specified data input format to allow validation against supported formats
ctype = get_field(input_detail, "type", default=None)
encoding = None
if not ctype:
media_format = get_field(input_detail, "format", default=None)
if isinstance(media_format, dict):
ctype = get_field(input_detail, "mime_type", search_variations=True, default=None)
encoding = get_field(input_detail, "encoding", search_variations=True, default=None)
wps_inputs.append((input_id, ComplexDataInput(input_value, mimeType=ctype, encoding=encoding)))
# need to use literal String for anything else than complex
# FIXME: pre-validate allowed literal values?
# TODO: BoundingBox not supported
input_data = parse_wps_input_complex(input_value, input_info)
elif input_id in bbox_inputs:
input_data = parse_wps_input_bbox(input_value, input_info)
else:
input_data = parse_wps_input_literal(input_value)
if input_data is None:
job.save_log(
message=f"Removing [{input_id}] data input from execution request, value was 'null'.",
logger=LOGGER, level=logging.WARNING,
)
else:
wps_inputs.append((input_id, str(input_value)))
wps_inputs.append((input_id, input_data))
except KeyError:
wps_inputs = []
return wps_inputs
Expand Down

0 comments on commit 5cb80b5

Please sign in to comment.