diff --git a/openeo_driver/ProcessGraphDeserializer.py b/openeo_driver/ProcessGraphDeserializer.py index 3317bed9..fa5bb4c1 100644 --- a/openeo_driver/ProcessGraphDeserializer.py +++ b/openeo_driver/ProcessGraphDeserializer.py @@ -517,6 +517,9 @@ def _extract_load_parameters(env: EvalEnv, source_id: tuple) -> LoadParameters: params = LoadParameters() params.temporal_extent = constraints.get("temporal_extent", ["1970-01-01", "2070-01-01"]) + labels_args = constraints.get("filter_labels", {}) + if("dimension" in labels_args and labels_args["dimension"] == "t"): + params.filter_temporal_labels = labels_args.get("condition") params.spatial_extent = constraints.get("spatial_extent", {}) params.global_extent = global_extent params.bands = constraints.get("bands", None) @@ -1166,6 +1169,17 @@ def filter_temporal(args: dict, env: EvalEnv) -> DriverDataCube: extent = _extract_temporal_extent(args, field="extent", process_id="filter_temporal") return cube.filter_temporal(start=extent[0], end=extent[1]) +@process_registry_100.add_function(spec=read_spec("openeo-processes/1.x/proposals/filter_labels.json")) +@process_registry_2xx.add_function(spec=read_spec("openeo-processes/2.x/proposals/filter_labels.json")) +def filter_labels(args: dict, env: EvalEnv) -> DriverDataCube: + cube = extract_arg(args, 'data') + if not isinstance(cube, DriverDataCube): + raise ProcessParameterInvalidException( + parameter="data", process="filter_labels", + reason=f"Invalid data type {type(cube)!r} expected cube." + ) + + return cube.filter_labels(condition=extract_arg(args,"condition"),dimension=extract_arg(args,"dimension"),context=args.get("context",None),env=env) def _extract_bbox_extent(args: dict, field="extent", process_id="filter_bbox", handle_geojson=False) -> dict: extent = extract_arg(args, name=field, process_id=process_id) diff --git a/openeo_driver/backend.py b/openeo_driver/backend.py index 30a5b5c9..dc98a455 100644 --- a/openeo_driver/backend.py +++ b/openeo_driver/backend.py @@ -162,6 +162,7 @@ class LoadParameters(dict): temporal_extent = dict_item(default=(None, None)) spatial_extent = dict_item(default={}) global_extent = dict_item(default={}) + filter_temporal_labels = dict_item(default=None) bands = dict_item(default=None) properties = dict_item(default={}) # TODO: rename this to filter_spatial_geometries (because it is used for load_collection-time filtering)? diff --git a/openeo_driver/datacube.py b/openeo_driver/datacube.py index b3cb417c..110d592c 100644 --- a/openeo_driver/datacube.py +++ b/openeo_driver/datacube.py @@ -83,6 +83,9 @@ def filter_spatial(self, geometries) -> 'DriverDataCube': def filter_bands(self, bands) -> 'DriverDataCube': self._not_implemented() + def filter_labels(self, condition: dict,dimensin: str, context: Optional[dict] = None, env: EvalEnv = None ) -> 'DriverDataCube': + self._not_implemented() + def apply(self, process: dict, *, context: Optional[dict] = None, env: EvalEnv) -> "DriverDataCube": self._not_implemented() diff --git a/openeo_driver/dry_run.py b/openeo_driver/dry_run.py index 35c82079..6e6c653a 100644 --- a/openeo_driver/dry_run.py +++ b/openeo_driver/dry_run.py @@ -391,7 +391,7 @@ def get_source_constraints(self, merge=True) -> List[SourceConstraint]: for op in [ "temporal_extent", "spatial_extent", "_weak_spatial_extent", "bands", "aggregate_spatial", - "sar_backscatter", "process_type", "custom_cloud_mask", "properties", "filter_spatial" + "sar_backscatter", "process_type", "custom_cloud_mask", "properties", "filter_spatial", "filter_labels" ]: # 1 some processes can not be skipped when pushing filters down, # so find the subgraph that no longer contains these blockers @@ -517,6 +517,9 @@ def save_result(self, filename: str, format: str, format_options: dict = None) - # TODO: this method should be deprecated (limited to single asset) in favor of write_assets (supports multiple assets) return self._process("save_result", {"format": format, "options": format_options}) + def filter_labels(self, condition: dict,dimension: str, context: Optional[dict] = None, env: EvalEnv = None ) -> 'DryRunDataCube': + return self._process("filter_labels", arguments=dict(condition=condition, dimension=dimension,context=context)) + def mask(self, mask: 'DryRunDataCube', replacement=None) -> 'DryRunDataCube': # TODO: if mask cube has no temporal or bbox extent: copy from self? # TODO: or add reference to the self trace to the mask trace and vice versa?