From 51fed2a30ad039df546ce77b1abea897e194b027 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=BDiga=20Luk=C5=A1i=C4=8D?= <31988337+zigaLuksic@users.noreply.github.com> Date: Thu, 7 Sep 2023 12:04:55 +0200 Subject: [PATCH] Revert "Patch for eolearn 1.5.0 (#264)" This reverts commit 4ed5dc653d3abc00b15018d97fb9e9bc6b87b51f. --- eogrow/pipelines/batch_to_eopatch.py | 17 +- eogrow/pipelines/download.py | 39 +- eogrow/pipelines/export_maps.py | 15 +- eogrow/pipelines/features.py | 48 ++- eogrow/pipelines/import_tiff.py | 11 +- eogrow/pipelines/import_vector.py | 6 +- eogrow/pipelines/merge_samples.py | 13 +- eogrow/pipelines/prediction.py | 23 +- eogrow/pipelines/rasterize.py | 29 +- eogrow/pipelines/sampling.py | 31 +- eogrow/pipelines/split_grid.py | 21 +- eogrow/pipelines/testing.py | 4 +- eogrow/pipelines/zipmap.py | 25 +- eogrow/tasks/batch_to_eopatch.py | 2 +- eogrow/tasks/common.py | 60 +-- eogrow/tasks/features.py | 9 +- eogrow/tasks/prediction.py | 3 +- eogrow/tasks/spatial.py | 15 +- eogrow/tasks/testing.py | 3 +- eogrow/types.py | 5 +- eogrow/utils/filter.py | 46 ++- eogrow/utils/testing.py | 3 +- eogrow/utils/validators.py | 3 +- pyproject.toml | 21 +- tests/pipelines/test_download.py | 4 +- tests/pipelines/test_features.py | 1 + tests/pipelines/test_merge_samples.py | 14 +- tests/pipelines/test_testing.py | 2 +- tests/pipelines/test_zipmap.py | 2 +- tests/tasks/test_common.py | 55 --- tests/tasks/test_testing.py | 6 +- .../download_and_batch/download_l1c_q1.json | 18 + .../download_and_batch/download_l2a.json | 16 + .../download_and_batch/download_q3.json | 2 +- ...m_collection.json => download_season.json} | 0 .../features/features_interpolation.json | 22 ++ .../download_and_batch/download_l1c_q1.json | 326 ++++++++++++++++ .../download_and_batch/download_l2a.json | 352 ++++++++++++++++++ ...m_collection.json => download_season.json} | 0 .../sampling/sampling_fraction_erosion.json | 110 ++---- tests/utils/test_filter.py | 40 +- tests/utils/test_general.py | 4 +- tests/utils/test_validators.py | 3 +- 43 files changed, 1066 insertions(+), 363 deletions(-) delete mode 100644 tests/tasks/test_common.py create mode 100644 tests/test_config_files/download_and_batch/download_l1c_q1.json create mode 100644 tests/test_config_files/download_and_batch/download_l2a.json rename tests/test_config_files/download_and_batch/{download_custom_collection.json => download_season.json} (100%) create mode 100644 tests/test_config_files/features/features_interpolation.json create mode 100644 tests/test_stats/download_and_batch/download_l1c_q1.json create mode 100644 tests/test_stats/download_and_batch/download_l2a.json rename tests/test_stats/download_and_batch/{download_custom_collection.json => download_season.json} (100%) diff --git a/eogrow/pipelines/batch_to_eopatch.py b/eogrow/pipelines/batch_to_eopatch.py index f5c420a6..33e69dcf 100644 --- a/eogrow/pipelines/batch_to_eopatch.py +++ b/eogrow/pipelines/batch_to_eopatch.py @@ -18,14 +18,13 @@ RenameFeatureTask, SaveTask, ) -from eolearn.core.types import Feature +from eolearn.features import LinearFunctionTask from eolearn.io import ImportFromTiffTask from ..core.pipeline import Pipeline from ..core.schemas import BaseSchema from ..tasks.batch_to_eopatch import DeleteFilesTask, FixImportedTimeDependentFeatureTask, LoadUserDataTask -from ..tasks.common import LinearFunctionTask -from ..types import ExecKwargs, PatchList, RawSchemaDict +from ..types import ExecKwargs, Feature, FeatureSpec, PatchList, RawSchemaDict from ..utils.filter import get_patches_with_missing_features from ..utils.validators import ensure_storage_key_presence, optional_field_validator, parse_dtype @@ -97,15 +96,18 @@ def filter_patch_list(self, patch_list: PatchList) -> PatchList: self.storage.get_folder(self.config.output_folder_key), patch_list, self._get_output_features(), - check_timestamps=self.config.userdata_timestamp_reader is not None, ) - def _get_output_features(self) -> list[Feature]: + def _get_output_features(self) -> list[FeatureSpec]: """Lists all features that the pipeline outputs.""" - features = [feature_mapping.feature for feature_mapping in self.config.mapping] + features: list[FeatureSpec] = [FeatureType.BBOX] + features.extend(feature_mapping.feature for feature_mapping in self.config.mapping) if self.config.userdata_feature_name: - features.append((FeatureType.META_INFO, self.config.userdata_feature_name)) + features.append(FeatureType.META_INFO) + + if self.config.userdata_timestamp_reader: + features.append(FeatureType.TIMESTAMPS) return features @@ -139,6 +141,7 @@ def build_workflow(self) -> EOWorkflow: path=self.storage.get_folder(self.config.output_folder_key), filesystem=self.storage.filesystem, features=self._get_output_features(), + compress_level=1, overwrite_permission=OverwritePermission.OVERWRITE_FEATURES, ) save_node = EONode(save_task, inputs=([processing_node] if processing_node else [])) diff --git a/eogrow/pipelines/download.py b/eogrow/pipelines/download.py index a593945d..d4777cf0 100644 --- a/eogrow/pipelines/download.py +++ b/eogrow/pipelines/download.py @@ -12,7 +12,7 @@ from pydantic import Field from eolearn.core import EONode, EOWorkflow, FeatureType, OverwritePermission, SaveTask -from eolearn.core.types import Feature +from eolearn.features import LinearFunctionTask from eolearn.io import SentinelHubDemTask, SentinelHubEvalscriptTask, SentinelHubInputTask from sentinelhub import ( Band, @@ -28,8 +28,7 @@ from ..core.pipeline import Pipeline from ..core.schemas import BaseSchema -from ..tasks.common import LinearFunctionTask -from ..types import ExecKwargs, PatchList, ProcessingType, TimePeriod +from ..types import ExecKwargs, Feature, FeatureSpec, PatchList, ProcessingType, TimePeriod from ..utils.filter import get_patches_with_missing_features from ..utils.validators import ( ensure_exactly_one_defined, @@ -82,6 +81,7 @@ class Schema(Pipeline.Schema): ) _ensure_output_folder_key = ensure_storage_key_presence("output_folder_key") + compress_level: int = Field(1, description="Level of compression used in saving EOPatches") threads_per_worker: Optional[int] = Field( description=( "Maximum number of parallel threads used during download by each worker. If set to None it will use " @@ -95,21 +95,19 @@ class Schema(Pipeline.Schema): def __init__(self, *args: Any, **kwargs: Any): super().__init__(*args, **kwargs) - self.download_node_uid = "" + self.download_node_uid: str | None = None def filter_patch_list(self, patch_list: PatchList) -> PatchList: """EOPatches are filtered according to existence of specified output features""" - output_features = self._get_output_features() return get_patches_with_missing_features( self.storage.filesystem, self.storage.get_folder(self.config.output_folder_key), patch_list, - output_features, - check_timestamps=any(ftype.is_temporal() for ftype, _ in output_features), + self._get_output_features(), ) @abc.abstractmethod - def _get_output_features(self) -> list[Feature]: + def _get_output_features(self) -> list[FeatureSpec]: """Lists all features that are to be saved upon the pipeline completion""" @abc.abstractmethod @@ -151,6 +149,7 @@ def build_workflow(self, session_loader: SessionLoaderType) -> EOWorkflow: self.storage.get_folder(self.config.output_folder_key), filesystem=self.storage.filesystem, features=self._get_output_features(), + compress_level=self.config.compress_level, overwrite_permission=OverwritePermission.OVERWRITE_FEATURES, ), inputs=[postprocessing_node or download_node], @@ -165,7 +164,9 @@ def get_execution_arguments(self, workflow: EOWorkflow, patch_list: PatchList) - """ exec_args = super().get_execution_arguments(workflow, patch_list) - download_node = workflow.get_node_with_uid(self.download_node_uid, fail_if_missing=True) + download_node = workflow.get_node_with_uid(self.download_node_uid) + if download_node is None: + return exec_args for patch_name, bbox in patch_list: exec_args[patch_name][download_node] = {"bbox": bbox} @@ -246,8 +247,14 @@ class Schema(BaseDownloadPipeline.Schema, CommonDownloadFields, TimeDependantFie config: Schema - def _get_output_features(self) -> list[Feature]: - return [(FeatureType.DATA, self.config.bands_feature_name), *self.config.additional_data] + def _get_output_features(self) -> list[FeatureSpec]: + features: list[FeatureSpec] = [ + (FeatureType.DATA, self.config.bands_feature_name), + FeatureType.BBOX, + FeatureType.TIMESTAMPS, + ] + features.extend(self.config.additional_data) + return features def _get_download_node(self, session_loader: SessionLoaderType) -> EONode: time_diff = None if self.config.time_difference is None else dt.timedelta(minutes=self.config.time_difference) @@ -293,8 +300,10 @@ class Schema(BaseDownloadPipeline.Schema, CommonDownloadFields, TimeDependantFie config: Schema - def _get_output_features(self) -> list[Feature]: - return self.config.features + def _get_output_features(self) -> list[FeatureSpec]: + features: list[FeatureSpec] = [FeatureType.BBOX, FeatureType.TIMESTAMPS] + features.extend(self.config.features) + return features def _get_download_node(self, session_loader: SessionLoaderType) -> EONode: evalscript = read_data(self.config.evalscript_path, data_format=MimeType.TXT) @@ -326,8 +335,8 @@ class Schema(BaseDownloadPipeline.Schema, CommonDownloadFields): config: Schema - def _get_output_features(self) -> list[Feature]: - return [(FeatureType.DATA_TIMELESS, self.config.feature_name)] + def _get_output_features(self) -> list[FeatureSpec]: + return [(FeatureType.DATA_TIMELESS, self.config.feature_name), FeatureType.BBOX] def _get_download_node(self, session_loader: SessionLoaderType) -> EONode: download_task = SentinelHubDemTask( diff --git a/eogrow/pipelines/export_maps.py b/eogrow/pipelines/export_maps.py index 6bdb0ba9..ceab5a3a 100644 --- a/eogrow/pipelines/export_maps.py +++ b/eogrow/pipelines/export_maps.py @@ -17,18 +17,17 @@ from pydantic import Field from tqdm.auto import tqdm -from eolearn.core import EOPatch, EOTask, EOWorkflow, LoadTask, linearly_connect_tasks -from eolearn.core.types import Feature +from eolearn.core import EOPatch, EOTask, EOWorkflow, FeatureType, LoadTask, linearly_connect_tasks from eolearn.core.utils.fs import get_full_path, pickle_fs, unpickle_fs from eolearn.core.utils.parallelize import parallelize +from eolearn.features import LinearFunctionTask from eolearn.io import ExportToTiffTask from sentinelhub import CRS, MimeType from eogrow.core.config import RawConfig from ..core.pipeline import Pipeline -from ..tasks.common import LinearFunctionTask -from ..types import ExecKwargs, PatchList +from ..types import ExecKwargs, Feature, PatchList from ..utils.eopatch_list import group_by_crs from ..utils.map import CogifyResamplingOptions, WarpResamplingOptions, cogify_inplace, extract_bands, merge_tiffs from ..utils.validators import ensure_storage_key_presence @@ -176,7 +175,7 @@ def build_workflow(self) -> EOWorkflow: load_task = LoadTask( self.storage.get_folder(self.config.input_folder_key), filesystem=self.storage.filesystem, - features=[self.config.feature], + features=[self.config.feature, FeatureType.BBOX], ) task_list: list[EOTask] = [load_task] @@ -273,10 +272,10 @@ def get_extraction_path(input_path: str, time: dt.datetime) -> str: def _extract_num_bands_and_timestamps(self, eopatch_name: str) -> tuple[int, list[dt.datetime]]: """Loads an eopatch to get information about number of bands and the timestamps.""" path = fs.path.join(self.storage.get_folder(self.config.input_folder_key), eopatch_name) - patch = EOPatch.load(path, [self.config.feature], filesystem=self.storage.filesystem) + patch = EOPatch.load(path, (FeatureType.TIMESTAMPS, self.config.feature), filesystem=self.storage.filesystem) if self.config.band_indices is not None: - return len(self.config.band_indices), patch.get_timestamps() - return patch[self.config.feature].shape[-1], patch.get_timestamps() + return len(self.config.band_indices), patch.timestamps + return patch[self.config.feature].shape[-1], patch.timestamps @staticmethod def _execute_split_jobs(jobs: list[SplitTiffsJob]) -> None: diff --git a/eogrow/pipelines/features.py b/eogrow/pipelines/features.py index 2f1b8578..fb8e0a03 100644 --- a/eogrow/pipelines/features.py +++ b/eogrow/pipelines/features.py @@ -17,8 +17,7 @@ OverwritePermission, SaveTask, ) -from eolearn.core.types import Feature -from eolearn.features import NormalizedDifferenceIndexTask, SimpleFilterTask +from eolearn.features import LinearInterpolationTask, NormalizedDifferenceIndexTask, SimpleFilterTask from eolearn.mask import JoinMasksTask from ..core.pipeline import Pipeline @@ -30,7 +29,7 @@ ValidDataFractionPredicate, join_valid_and_cloud_masks, ) -from ..types import PatchList, TimePeriod +from ..types import Feature, FeatureSpec, PatchList, TimePeriod from ..utils.filter import get_patches_with_missing_features from ..utils.validators import ( ensure_storage_key_presence, @@ -82,6 +81,7 @@ class Schema(Pipeline.Schema): dtype: Optional[np.dtype] = Field(description="The dtype under which the concatenated features should be saved") _parse_dtype = optional_field_validator("dtype", parse_dtype, pre=True) output_feature_name: str = Field(description="Name of output data feature encompassing bands and NDIs") + compress_level: int = Field(1, description="Level of compression used in saving eopatches") config: Schema @@ -92,12 +92,11 @@ def filter_patch_list(self, patch_list: PatchList) -> PatchList: self.storage.get_folder(self.config.output_folder_key), patch_list, self._get_output_features(), - check_timestamps=True, ) - def _get_output_features(self) -> list[Feature]: + def _get_output_features(self) -> list[FeatureSpec]: """Lists all features that are to be saved upon the pipeline completion""" - return [(FeatureType.DATA, self.config.output_feature_name)] + return [(FeatureType.DATA, self.config.output_feature_name), FeatureType.BBOX, FeatureType.TIMESTAMPS] def _get_bands_feature(self) -> Feature: return FeatureType.DATA, self.config.bands_feature_name @@ -123,6 +122,7 @@ def build_workflow(self) -> EOWorkflow: self.storage.get_folder(self.config.output_folder_key), filesystem=self.storage.filesystem, features=self._get_output_features(), + compress_level=self.config.compress_level, overwrite_permission=OverwritePermission.OVERWRITE_FEATURES, ) save_node = EONode(save_task, inputs=[postprocessing_node]) @@ -185,6 +185,40 @@ def get_postprocessing_node(self, previous_node: EONode) -> EONode: return EONode(merge_task, inputs=[previous_node]) +class InterpolationSpecifications(BaseSchema): + time_period: TimePeriod + _parse_time_period = field_validator("time_period", parse_time_period, pre=True) + resampling_period: int + + +class InterpolationFeaturesPipeline(FeaturesPipeline): + """A pipeline to calculate and prepare features for ML including interpolation""" + + class Schema(FeaturesPipeline.Schema): + interpolation: Optional[InterpolationSpecifications] = Field( + description=( + "Fine-tuning of interpolation parameters. If not set, the interpolation will work on current timestamps" + ) + ) + + config: Schema + + def get_temporal_regularization_node(self, previous_node: EONode) -> EONode: + resample_range = None + if self.config.interpolation: + start, end = self.config.interpolation.time_period + start_time, end_time = start.strftime("%Y-%m-%d"), end.strftime("%Y-%m-%d") + resample_range = (start_time, end_time, self.config.interpolation.resampling_period) + + interpolation_task = LinearInterpolationTask( + feature=self._get_bands_feature(), + mask_feature=self._get_valid_data_feature(), + resample_range=resample_range, + bounds_error=False, + ) + return EONode(interpolation_task, inputs=[previous_node]) + + class MosaickingSpecifications(BaseSchema): time_period: TimePeriod _parse_time_period = field_validator("time_period", parse_time_period, pre=True) @@ -243,7 +277,7 @@ def get_temporal_regularization_node(self, previous_node: EONode) -> EONode: ) mosaicking_node = EONode(mosaicking_task, inputs=[previous_node]) return EONode( - CopyTask(features=[self._get_bands_feature()]), + CopyTask(features=[self._get_bands_feature(), FeatureType.BBOX, FeatureType.TIMESTAMPS]), inputs=[mosaicking_node], name="Remove non-mosaicked features", ) diff --git a/eogrow/pipelines/import_tiff.py b/eogrow/pipelines/import_tiff.py index d964e12a..8b93067b 100644 --- a/eogrow/pipelines/import_tiff.py +++ b/eogrow/pipelines/import_tiff.py @@ -7,15 +7,14 @@ import numpy as np from pydantic import Field -from eolearn.core import CreateEOPatchTask, EONode, EOWorkflow, OverwritePermission, SaveTask -from eolearn.core.types import Feature +from eolearn.core import CreateEOPatchTask, EONode, EOWorkflow, FeatureType, OverwritePermission, SaveTask from eolearn.features.feature_manipulation import SpatialResizeTask from eolearn.features.utils import ResizeLib, ResizeMethod, ResizeParam from eolearn.io import ImportFromTiffTask from ..core.pipeline import Pipeline from ..core.schemas import BaseSchema -from ..types import PatchList +from ..types import Feature, PatchList from ..utils.filter import get_patches_with_missing_features from ..utils.validators import ensure_storage_key_presence, optional_field_validator, parse_dtype @@ -70,8 +69,7 @@ def filter_patch_list(self, patch_list: PatchList) -> PatchList: self.storage.filesystem, self.storage.get_folder(self.config.output_folder_key), patch_list, - [self.config.output_feature], - check_timestamps=False, + [self.config.output_feature, FeatureType.BBOX], ) def build_workflow(self) -> EOWorkflow: @@ -107,9 +105,10 @@ def build_workflow(self) -> EOWorkflow: save_task = SaveTask( self.storage.get_folder(self.config.output_folder_key), filesystem=self.storage.filesystem, + compress_level=1, overwrite_permission=OverwritePermission.OVERWRITE_FEATURES, config=self.sh_config, - features=[self.config.output_feature], + features=[self.config.output_feature, FeatureType.BBOX], ) save_node = EONode(save_task, inputs=[resize_node or import_node]) diff --git a/eogrow/pipelines/import_vector.py b/eogrow/pipelines/import_vector.py index c70d053d..ba1f2f4b 100644 --- a/eogrow/pipelines/import_vector.py +++ b/eogrow/pipelines/import_vector.py @@ -5,11 +5,10 @@ from pydantic import Field from eolearn.core import EONode, EOWorkflow, FeatureType, OverwritePermission, SaveTask -from eolearn.core.types import Feature from eolearn.io import VectorImportTask from ..core.pipeline import Pipeline -from ..types import ExecKwargs, PatchList +from ..types import ExecKwargs, Feature, PatchList from ..utils.validators import ensure_storage_key_presence, field_validator, restrict_types @@ -56,7 +55,8 @@ def build_workflow(self) -> EOWorkflow: save_task = SaveTask( path=self.storage.get_folder(self.config.output_folder_key), filesystem=self.storage.filesystem, - features=[self.config.output_feature], + features=[FeatureType.BBOX, self.config.output_feature], + compress_level=1, overwrite_permission=OverwritePermission.OVERWRITE_FEATURES, ) save_node = EONode(save_task, inputs=[import_node]) diff --git a/eogrow/pipelines/merge_samples.py b/eogrow/pipelines/merge_samples.py index 27ce5a01..33e88fa1 100644 --- a/eogrow/pipelines/merge_samples.py +++ b/eogrow/pipelines/merge_samples.py @@ -9,10 +9,10 @@ from pydantic import Field from eolearn.core import EOPatch, EOWorkflow, FeatureType, LoadTask, OutputTask, linearly_connect_tasks -from eolearn.core.types import Feature from eolearn.core.utils.fs import get_full_path from ..core.pipeline import Pipeline +from ..types import Feature, FeatureSpec from ..utils.validators import ensure_storage_key_presence LOGGER = logging.getLogger(__name__) @@ -69,11 +69,12 @@ def run_procedure(self) -> tuple[list[str], list[str]]: def build_workflow(self) -> EOWorkflow: """Creates a workflow that outputs the requested features""" + features_to_load: list[FeatureSpec] = [FeatureType.TIMESTAMPS] if self.config.include_timestamp else [] + features_to_load.extend(self.config.features_to_merge) load_task = LoadTask( self.storage.get_folder(self.config.input_folder_key), filesystem=self.storage.filesystem, - features=self.config.features_to_merge, - load_timestamps=True if self.config.include_timestamp else "auto", + features=features_to_load, ) output_task = OutputTask(name=self._OUTPUT_NAME) return EOWorkflow(linearly_connect_tasks(load_task, output_task)) @@ -102,7 +103,7 @@ def merge_and_save_features(self, patches: list[EOPatch]) -> None: if self.config.include_timestamp: arrays = [] for patch, sample_num in zip(patches, patch_sample_nums): - arrays.append(np.tile(np.array(patch.get_timestamps()), (sample_num, 1))) + arrays.append(np.tile(np.array(patch.timestamps), (sample_num, 1))) patch.timestamps = [] self._save_array(np.concatenate(arrays, axis=0), "TIMESTAMPS") @@ -122,6 +123,10 @@ def _collect_and_remove_feature(patch: EOPatch, feature: Feature) -> np.ndarray: feature_array = patch[feature] feature_type, _ = feature + if feature_type is FeatureType.TIMESTAMPS: + patch.timestamps = [] + return np.array(feature_array) + del patch[feature] axis = feature_type.ndim() - 2 # type: ignore[operator] diff --git a/eogrow/pipelines/prediction.py b/eogrow/pipelines/prediction.py index ca9058a8..c39ca7d3 100644 --- a/eogrow/pipelines/prediction.py +++ b/eogrow/pipelines/prediction.py @@ -9,11 +9,10 @@ from pydantic import Field from eolearn.core import EONode, EOWorkflow, FeatureType, LoadTask, MergeEOPatchesTask, OverwritePermission, SaveTask -from eolearn.core.types import Feature from ..core.pipeline import Pipeline from ..tasks.prediction import ClassificationPredictionTask, RegressionPredictionTask -from ..types import PatchList +from ..types import Feature, FeatureSpec, PatchList from ..utils.filter import get_patches_with_missing_features from ..utils.validators import ( ensure_defined_together, @@ -58,11 +57,12 @@ class Schema(Pipeline.Schema): description="The storage manager key pointing to the folder of the model used in the prediction pipeline." ) _ensure_model_folder_key = ensure_storage_key_presence("model_folder_key") + compress_level: int = Field(1, description="Level of compression used in saving EOPatches") config: Schema @abc.abstractmethod - def _get_output_features(self) -> list[Feature]: + def _get_output_features(self) -> list[FeatureSpec]: """Lists all features that are to be saved upon the pipeline completion""" @property @@ -72,13 +72,11 @@ def _is_mp_lock_needed(self) -> bool: def filter_patch_list(self, patch_list: PatchList) -> PatchList: """EOPatches are filtered according to existence of specified output features""" - output_features = self._get_output_features() return get_patches_with_missing_features( self.storage.filesystem, self.storage.get_folder(self.config.output_folder_key), patch_list, - output_features, - check_timestamps=False, + self._get_output_features(), ) def build_workflow(self) -> EOWorkflow: @@ -97,11 +95,11 @@ def _get_data_preparation_node(self) -> EONode: LoadTask( self.storage.get_folder(self.config.input_folder_key), filesystem=self.storage.filesystem, - features=self.config.input_features, + features=[FeatureType.BBOX, FeatureType.TIMESTAMPS, *self.config.input_features], ) ) - if not self.config.prediction_mask_folder_key or not self.config.prediction_mask_feature_name: + if not self.config.prediction_mask_folder_key: return features_load_node mask_load_node = EONode( @@ -125,6 +123,7 @@ def _get_saving_node(self, previous_node: EONode) -> EONode: filesystem=self.storage.filesystem, features=self._get_output_features(), overwrite_permission=OverwritePermission.OVERWRITE_FEATURES, + compress_level=self.config.compress_level, ) return EONode(save_task, inputs=[previous_node]) @@ -140,8 +139,8 @@ class Schema(BasePredictionPipeline.Schema): config: Schema - def _get_output_features(self) -> list[Feature]: - return [(FeatureType.DATA_TIMELESS, self.config.output_feature_name)] + def _get_output_features(self) -> list[FeatureSpec]: + return [FeatureType.BBOX, (FeatureType.DATA_TIMELESS, self.config.output_feature_name)] def _get_prediction_node(self, previous_node: EONode) -> EONode: model_path = fs.path.join(self.storage.get_folder(self.config.model_folder_key), self.config.model_filename) @@ -172,8 +171,8 @@ class Schema(BasePredictionPipeline.Schema): config: Schema - def _get_output_features(self) -> list[Feature]: - features = [(FeatureType.MASK_TIMELESS, self.config.output_feature_name)] + def _get_output_features(self) -> list[FeatureSpec]: + features: list[FeatureSpec] = [FeatureType.BBOX, (FeatureType.MASK_TIMELESS, self.config.output_feature_name)] if self.config.output_probability_feature_name: features.append((FeatureType.DATA_TIMELESS, self.config.output_probability_feature_name)) return features diff --git a/eogrow/pipelines/rasterize.py b/eogrow/pipelines/rasterize.py index d3669c4a..fe1acf1c 100644 --- a/eogrow/pipelines/rasterize.py +++ b/eogrow/pipelines/rasterize.py @@ -13,14 +13,13 @@ from pydantic import Field, validator from eolearn.core import CreateEOPatchTask, EONode, EOWorkflow, FeatureType, LoadTask, OverwritePermission, SaveTask -from eolearn.core.types import Feature from eolearn.geometry import VectorToRasterTask from eolearn.io import VectorImportTask from sentinelhub import CRS from ..core.pipeline import Pipeline from ..core.schemas import BaseSchema -from ..types import PatchList +from ..types import Feature, FeatureSpec, PatchList from ..utils.filter import get_patches_with_missing_features from ..utils.fs import LocalFile from ..utils.validators import ensure_exactly_one_defined, ensure_storage_key_presence, field_validator, parse_dtype @@ -82,6 +81,7 @@ class Schema(Pipeline.Schema): ) overlap_value: Optional[float] = Field(description="Value to write over the areas where polygons overlap.") no_data_value: int = Field(0, description="The no_data_value argument to be passed to VectorToRasterTask") + compress_level: int = Field(1, description="Level of compression used in saving EOPatches") @validator("vector_input") def _check_vector_input(cls, vector_input: Feature | str) -> Feature | str: @@ -115,18 +115,16 @@ def __init__(self, *args: Any, **kwargs: Any): if not isinstance(self.config.vector_input, str): self.vector_feature = self.config.vector_input else: - ftype = FeatureType.VECTOR if self.config.output_feature[0].is_temporal() else FeatureType.VECTOR_TIMELESS + ftype = FeatureType.VECTOR if self._is_temporal(self.config.output_feature) else FeatureType.VECTOR_TIMELESS self.filename = self.config.vector_input self.vector_feature = (ftype, f"TEMP_{uuid.uuid4().hex}") def filter_patch_list(self, patch_list: PatchList) -> PatchList: - output_features = self._get_output_features() return get_patches_with_missing_features( self.storage.filesystem, self.storage.get_folder(self.config.output_folder_key), patch_list, - output_features, - check_timestamps=any(ftype.is_temporal() for ftype, _ in output_features), + self._get_output_features(), ) def run_procedure(self) -> tuple[list[str], list[str]]: @@ -175,10 +173,13 @@ def build_workflow(self) -> EOWorkflow: ) data_preparation_node = EONode(import_task, inputs=[create_node]) else: + features = [self.vector_feature, FeatureType.BBOX] + if self._is_temporal(self.vector_feature): + features.append(FeatureType.TIMESTAMPS) input_task = LoadTask( self.storage.get_folder(self.config.input_folder_key), filesystem=self.storage.filesystem, - features=[self.vector_feature], + features=features, ) data_preparation_node = EONode(input_task) @@ -191,6 +192,7 @@ def build_workflow(self) -> EOWorkflow: filesystem=self.storage.filesystem, features=self._get_output_features(), overwrite_permission=OverwritePermission.OVERWRITE_FEATURES, + compress_level=self.config.compress_level, ) save_node = EONode(save_task, inputs=[postprocess_node]) @@ -238,6 +240,15 @@ def _get_dataset_path(self, filename: str) -> str: return fs.path.combine(folder, filename) - def _get_output_features(self) -> list[Feature]: + def _get_output_features(self) -> list[FeatureSpec]: """Lists all features that are to be saved upon the pipeline completion""" - return [self.config.output_feature] + features: list[FeatureSpec] = [self.config.output_feature, FeatureType.BBOX] + if self._is_temporal(self.config.output_feature): + features.append(FeatureType.TIMESTAMPS) + + return features + + @staticmethod + def _is_temporal(feature: Feature) -> bool: + f_type, _ = feature + return f_type.is_temporal() diff --git a/eogrow/pipelines/sampling.py b/eogrow/pipelines/sampling.py index 1e17a17f..608b7166 100644 --- a/eogrow/pipelines/sampling.py +++ b/eogrow/pipelines/sampling.py @@ -8,7 +8,6 @@ from pydantic import Field from eolearn.core import EONode, EOWorkflow, FeatureType, LoadTask, MergeEOPatchesTask, OverwritePermission, SaveTask -from eolearn.core.types import Feature from eolearn.geometry import MorphologicalOperations, MorphologicalStructFactory from eolearn.ml_tools import BlockSamplingTask, FractionSamplingTask, GridSamplingTask @@ -16,7 +15,7 @@ from ..core.pipeline import Pipeline from ..tasks.common import ClassFilterTask -from ..types import ExecKwargs, PatchList +from ..types import ExecKwargs, Feature, FeatureSpec, PatchList from ..utils.filter import get_patches_with_missing_features @@ -44,18 +43,17 @@ class Schema(Pipeline.Schema): "saved as FEATURES_SAMPLED." ) ) + compress_level: int = Field(1, description="Level of compression used in saving eopatches") config: Schema def filter_patch_list(self, patch_list: PatchList) -> PatchList: """Filter output EOPatches that have already been processed""" - output_features = self._get_output_features() return get_patches_with_missing_features( self.storage.filesystem, self.storage.get_folder(self.config.output_folder_key), patch_list, - output_features, - check_timestamps=any(ftype.is_temporal() for ftype, _ in output_features), + self._get_output_features(), ) def build_workflow(self) -> EOWorkflow: @@ -74,6 +72,7 @@ def build_workflow(self) -> EOWorkflow: self.storage.get_folder(self.config.output_folder_key), filesystem=self.storage.filesystem, features=self._get_output_features(), + compress_level=self.config.compress_level, overwrite_permission=OverwritePermission.OVERWRITE_FEATURES, ) @@ -84,7 +83,7 @@ def _get_loading_node(self) -> EONode: load_nodes = [] for folder_name, features in self.config.apply_to.items(): - load_features = [] + load_features: list[FeatureSpec] = [] for feature_type_str, feature_names in features.items(): feature_type = FeatureType(feature_type_str) @@ -95,6 +94,10 @@ def _get_loading_node(self) -> EONode: for feature_name in feature_names: load_features.append((feature_type, feature_name)) + load_features.append(FeatureType.BBOX) + if any(FeatureType(feature_type).is_temporal() for feature_type in features): + load_features.append(FeatureType.TIMESTAMPS) + load_task = LoadTask( self.storage.get_folder(folder_name), filesystem=self.storage.filesystem, @@ -134,14 +137,20 @@ def _get_mask_of_samples_feature(self) -> Feature | None: return FeatureType.MASK_TIMELESS, self.config.mask_of_samples_name return None - def _get_output_features(self) -> list[Feature]: + def _get_output_features(self) -> list[FeatureSpec]: """Get a list of features that will be saved as an output of the pipeline""" - output_features = [(ftype, output_name) for ftype, _, output_name in self._get_features_to_sample()] + output_features: list[FeatureSpec] = [FeatureType.BBOX] + features_to_sample = self._get_features_to_sample() + + for feature_type, _, sampled_feature_name in features_to_sample: + output_features.append((feature_type, sampled_feature_name)) mask_of_samples_feature = self._get_mask_of_samples_feature() if mask_of_samples_feature: output_features.append(mask_of_samples_feature) + if any(feature_type.is_temporal() for feature_type, _, _ in features_to_sample): + output_features.append(FeatureType.TIMESTAMPS) return output_features @@ -157,13 +166,15 @@ class Schema(BaseSamplingPipeline.Schema): def __init__(self, *args: Any, **kwargs: Any): super().__init__(*args, **kwargs) - self._sampling_node_uid: str = "" + self._sampling_node_uid: str | None = None def get_execution_arguments(self, workflow: EOWorkflow, patch_list: PatchList) -> ExecKwargs: """Extends the basic method for adding execution arguments by adding seed arguments a sampling task""" exec_args = super().get_execution_arguments(workflow, patch_list) - sampling_node = workflow.get_node_with_uid(self._sampling_node_uid, fail_if_missing=True) + sampling_node = workflow.get_node_with_uid(self._sampling_node_uid) + if sampling_node is None: + return exec_args generator = np.random.default_rng(seed=self.config.seed) diff --git a/eogrow/pipelines/split_grid.py b/eogrow/pipelines/split_grid.py index d1651043..8ac3f6b8 100644 --- a/eogrow/pipelines/split_grid.py +++ b/eogrow/pipelines/split_grid.py @@ -10,8 +10,7 @@ import geopandas as gpd from pydantic import Field -from eolearn.core import EONode, EOWorkflow, LoadTask, OverwritePermission -from eolearn.core.types import Feature +from eolearn.core import EONode, EOWorkflow, FeatureType, LoadTask, OverwritePermission from sentinelhub import CRS, BBox from sentinelhub.geometry import Geometry @@ -20,7 +19,7 @@ from ..core.pipeline import Pipeline from ..tasks.common import SkippableSaveTask from ..tasks.spatial import SpatialSliceTask -from ..types import ExecKwargs +from ..types import ExecKwargs, Feature, FeatureSpec from ..utils.fs import LocalFile from ..utils.grid import split_bbox from ..utils.validators import ensure_storage_key_presence @@ -143,19 +142,21 @@ def _get_buffer(self) -> tuple[float, float]: ) def build_workflow(self) -> EOWorkflow: + features = self._get_features() + input_path = self.storage.get_folder(self.config.input_folder_key) - load_node = EONode(LoadTask(input_path, filesystem=self.storage.filesystem, features=self.config.features)) + load_node = EONode(LoadTask(input_path, filesystem=self.storage.filesystem, features=features)) processing_nodes = [] output_path = self.storage.get_folder(self.config.eopatch_output_folder_key) for _ in range(self.config.split_x * self.config.split_y): - slice_task = SpatialSliceTask(self.config.features, raise_misaligned=self.config.raise_misaligned) + slice_task = SpatialSliceTask(features, raise_misaligned=self.config.raise_misaligned) slice_node = EONode(slice_task, inputs=[load_node]) save_task = SkippableSaveTask( output_path, filesystem=self.storage.filesystem, - features=self.config.features, + features=features, overwrite_permission=OverwritePermission.OVERWRITE_FEATURES, ) save_node = EONode(save_task, inputs=[slice_node]) @@ -185,6 +186,14 @@ def get_execution_arguments( # type: ignore[override] return exec_args + def _get_features(self) -> list[FeatureSpec]: + """Provides features that will be transformed by the pipeline.""" + meta_features = [FeatureType.BBOX] + if any(f_type.is_temporal() for f_type, _ in self.config.features): + meta_features += [FeatureType.TIMESTAMPS] + + return self.config.features + meta_features + def save_new_grid(self, bbox_splits: list[tuple[NamedBBox, list[NamedBBox]]]) -> None: """Organizes BBoxes into multiple GeoDataFrames that are then saved as layers of a GPKG file.""" crs_groups = defaultdict(list) diff --git a/eogrow/pipelines/testing.py b/eogrow/pipelines/testing.py index 0de81e76..78475c8a 100644 --- a/eogrow/pipelines/testing.py +++ b/eogrow/pipelines/testing.py @@ -8,7 +8,6 @@ from pydantic import Field from eolearn.core import CreateEOPatchTask, EONode, EOWorkflow, OverwritePermission, SaveTask -from eolearn.core.types import Feature from ..core.config import RawConfig, recursive_config_join from ..core.pipeline import Pipeline @@ -19,7 +18,7 @@ NormalDistribution, UniformDistribution, ) -from ..types import ExecKwargs, PatchList, TimePeriod +from ..types import ExecKwargs, Feature, PatchList, TimePeriod from ..utils.validators import ensure_storage_key_presence, field_validator, parse_dtype, parse_time_period Self = TypeVar("Self", bound="TestPipeline") @@ -136,7 +135,6 @@ def build_workflow(self) -> EOWorkflow: self.storage.get_folder(self.config.output_folder_key), filesystem=self.storage.filesystem, overwrite_permission=OverwritePermission.OVERWRITE_FEATURES, - save_timestamps=self.config.timestamps is not None, ) save_node = EONode(save_task, inputs=[previous_node]) diff --git a/eogrow/pipelines/zipmap.py b/eogrow/pipelines/zipmap.py index d58d3ade..7d8057e1 100644 --- a/eogrow/pipelines/zipmap.py +++ b/eogrow/pipelines/zipmap.py @@ -9,17 +9,17 @@ from eolearn.core import ( EONode, EOWorkflow, + FeatureType, LoadTask, MergeEOPatchesTask, OverwritePermission, SaveTask, ZipFeatureTask, ) -from eolearn.core.types import Feature from ..core.pipeline import Pipeline from ..core.schemas import BaseSchema -from ..types import PatchList +from ..types import Feature, FeatureSpec, PatchList from ..utils.filter import get_patches_with_missing_features from ..utils.meta import import_object @@ -29,6 +29,10 @@ class InputFeatureSchema(BaseSchema): feature: Feature = Field(description="Which features to load from folder.") folder_key: str = Field(description="The storage manager key pointing to the folder from which to load data.") + include_bbox_and_timestamp = Field( + True, + description="Auto loads BBOX and (if the features is temporal) TIMESTAMP.", + ) class ZipMapPipeline(Pipeline): @@ -64,23 +68,31 @@ def parse_params(cls, v: dict[str, Any], values: dict[str, Any]) -> dict[str, An ) output_feature: Feature + compress_level: int = Field(1, description="Level of compression used in saving eopatches.") + config: Schema def filter_patch_list(self, patch_list: PatchList) -> PatchList: """EOPatches are filtered according to existence of new features""" + # Note: does not catch missing BBox or Timestamp return get_patches_with_missing_features( self.storage.filesystem, self.storage.get_folder(self.config.output_folder_key), patch_list, [self.config.output_feature], - check_timestamps=self.config.output_feature[0].is_temporal(), ) def get_load_nodes(self) -> list[EONode]: """Prepare all nodes with load tasks.""" - load_schema: defaultdict[str, set[Feature]] = defaultdict(set) + load_schema: defaultdict[str, set[FeatureSpec]] = defaultdict(set) for input_feature in self.config.input_features: - load_schema[input_feature.folder_key].add(input_feature.feature) + features_to_load = load_schema[input_feature.folder_key] + features_to_load.add(input_feature.feature) + + if input_feature.include_bbox_and_timestamp: + features_to_load.add(FeatureType.BBOX) + if input_feature.feature[0].is_temporal(): + features_to_load.add(FeatureType.TIMESTAMPS) load_nodes = [] for folder_key, features in load_schema.items(): @@ -112,7 +124,8 @@ def build_workflow(self) -> EOWorkflow: save_task = SaveTask( save_path, config=self.sh_config, - features=[self.config.output_feature], + features=[self.config.output_feature, FeatureType.BBOX, FeatureType.TIMESTAMPS], + compress_level=self.config.compress_level, overwrite_permission=OverwritePermission.OVERWRITE_FEATURES, ) save_node = EONode(save_task, inputs=[mapping_node]) diff --git a/eogrow/tasks/batch_to_eopatch.py b/eogrow/tasks/batch_to_eopatch.py index f6940c81..b5a38a06 100644 --- a/eogrow/tasks/batch_to_eopatch.py +++ b/eogrow/tasks/batch_to_eopatch.py @@ -9,10 +9,10 @@ from fs.base import FS from eolearn.core import EOPatch, EOTask -from eolearn.core.types import Feature from eolearn.core.utils.fs import pickle_fs, unpickle_fs from sentinelhub import parse_time +from ..types import Feature from ..utils.meta import import_object diff --git a/eogrow/tasks/common.py b/eogrow/tasks/common.py index 570d1ae4..73a1a3d6 100644 --- a/eogrow/tasks/common.py +++ b/eogrow/tasks/common.py @@ -1,17 +1,16 @@ """Common tasks shared between pipelines.""" from __future__ import annotations -from functools import partial -from typing import Any +from typing import Callable -import cv2 import numpy as np -from eolearn.core import EOPatch, EOTask, MapFeatureTask, SaveTask -from eolearn.core.types import Feature, FeaturesSpecification +from eolearn.core import EOPatch, EOTask, SaveTask from eolearn.core.utils.parsing import parse_renamed_feature from eolearn.geometry import MorphologicalOperations +from ..types import Feature + class ClassFilterTask(EOTask): """Run class specific morphological operation.""" @@ -20,7 +19,7 @@ def __init__( self, feature: Feature, labels: list[int], - morph_operation: MorphologicalOperations, + morph_operation: MorphologicalOperations | Callable, struct_elem: np.ndarray | None = None, ): """Perform a morphological operation on a given feature mask @@ -35,17 +34,19 @@ def __init__( self.renamed_feature = parse_renamed_feature(feature) self.labels = labels - self.morph_operation = MorphologicalOperations.get_operation(morph_operation) + if isinstance(morph_operation, MorphologicalOperations): + self.morph_operation = MorphologicalOperations.get_operation(morph_operation) + else: + self.morph_operation = morph_operation self.struct_elem = struct_elem def execute(self, eopatch: EOPatch) -> EOPatch: feature_type, feature_name, new_feature_name = self.renamed_feature mask = eopatch[(feature_type, feature_name)].copy() - morp_func = partial(cv2.morphologyEx, kernel=self.struct_elem, op=self.morph_operation) for label in self.labels: - label_mask = np.squeeze((mask == label).astype(np.uint8), axis=-1) - mask_mod = morp_func(label_mask) * label + label_mask = np.squeeze((mask == label), axis=-1) + mask_mod = self.morph_operation(label_mask, self.struct_elem) * label mask_mod = mask_mod[..., np.newaxis] mask[mask == label] = mask_mod[mask == label] @@ -53,45 +54,10 @@ def execute(self, eopatch: EOPatch) -> EOPatch: return eopatch -class LinearFunctionTask(MapFeatureTask): - """Applies a linear function to the values of input features. - - Each value in the feature is modified as `x -> x * slope + intercept`. The `dtype` of the result can be customized. - """ - - def __init__( - self, - input_features: FeaturesSpecification, - output_features: FeaturesSpecification | None = None, - slope: float = 1, - intercept: float = 0, - dtype: str | type | np.dtype | None = None, - ): - """ - :param input_features: Feature or features on which the function is used. - :param output_features: Feature or features for saving the result. If not provided the input_features are - overwritten. - :param slope: Slope of the function i.e. the multiplication factor. - :param intercept: Intercept of the function i.e. the value added. - :param dtype: Numpy dtype of the output feature. If not provided the dtype is determined by Numpy, so it is - recommended to set manually. - """ - if output_features is None: - output_features = input_features - self.dtype = dtype if dtype is None else np.dtype(dtype) - - super().__init__(input_features, output_features, slope=slope, intercept=intercept) - - def map_method(self, feature: np.ndarray, slope: float, intercept: float) -> np.ndarray: # type:ignore[override] - """A method where feature is multiplied by a slope""" - rescaled_feature = feature * slope + intercept - return rescaled_feature if self.dtype is None else rescaled_feature.astype(self.dtype) - - class SkippableSaveTask(SaveTask): """Same as `SaveTask` but can be skipped if the `eopatch_folder` is set to `None`.""" - def execute(self, eopatch: EOPatch, *, eopatch_folder: str | None = "", **kwargs: Any) -> EOPatch: + def execute(self, eopatch: EOPatch, *, eopatch_folder: str | None = "") -> EOPatch: if eopatch_folder is None: return eopatch - return super().execute(eopatch, eopatch_folder=eopatch_folder, **kwargs) + return super().execute(eopatch, eopatch_folder=eopatch_folder) diff --git a/eogrow/tasks/features.py b/eogrow/tasks/features.py index bcf29060..0ab1f1e1 100644 --- a/eogrow/tasks/features.py +++ b/eogrow/tasks/features.py @@ -8,9 +8,10 @@ import numpy as np from eolearn.core import EOPatch, EOTask, FeatureType, MapFeatureTask -from eolearn.core.types import Feature from eolearn.core.utils.parsing import parse_renamed_feature +from ..types import Feature + def join_valid_and_cloud_masks(valid_mask: np.ndarray, cloud_mask: np.ndarray) -> np.ndarray: """Used to zip together information about valid data and clouds into a combined validity mask""" @@ -120,7 +121,7 @@ def execute(self, eopatch: EOPatch) -> EOPatch: feature_type, _, new_feature_name = self.parsed_feature output_patch = EOPatch(bbox=eopatch.bbox, timestamps=self.compute_mosaic_dates()) - eopatch.timestamps = [ts.replace(tzinfo=None) for ts in eopatch.get_timestamps()] + eopatch.timestamps = [ts.replace(tzinfo=None) for ts in eopatch.timestamps] output_patch[feature_type, new_feature_name] = self.compute_mosaic(eopatch) return output_patch @@ -142,7 +143,7 @@ def __init__( def _compute_single_mosaic(self, eopatch: EOPatch, idate: int) -> np.ndarray: """Compute single mosaic using values of the max NDVI""" - array = self._find_time_indices(eopatch.get_timestamps(), idate) + array = self._find_time_indices(eopatch.timestamps, idate) feature_type, feature_name, _ = self.parsed_feature feat_values = eopatch[(feature_type, feature_name)][array].astype(np.float32) ndvi_values = eopatch[(self.ndvi_feature_type, self.ndvi_feature_name)][array] # type: ignore[index] @@ -190,7 +191,7 @@ def __init__( def _compute_single_mosaic(self, eopatch: EOPatch, idate: int) -> np.ndarray: """Compute single mosaic using the median of values""" - array = self._find_time_indices(eopatch.get_timestamps(), idate) + array = self._find_time_indices(eopatch.timestamps, idate) feature_type, feature_name, _ = self.parsed_feature feat_values = eopatch[(feature_type, feature_name)][array].astype(np.float32) diff --git a/eogrow/tasks/prediction.py b/eogrow/tasks/prediction.py index f943babe..d7f2e2af 100644 --- a/eogrow/tasks/prediction.py +++ b/eogrow/tasks/prediction.py @@ -10,9 +10,10 @@ from fs.base import FS from eolearn.core import EOPatch, EOTask, execute_with_mp_lock -from eolearn.core.types import Feature from eolearn.core.utils.fs import pickle_fs, unpickle_fs +from ..types import Feature + class BasePredictionTask(EOTask, metaclass=abc.ABCMeta): """Base predictions task streamlining data preprocessing before prediction""" diff --git a/eogrow/tasks/spatial.py b/eogrow/tasks/spatial.py index 35d928fa..ef98be4a 100644 --- a/eogrow/tasks/spatial.py +++ b/eogrow/tasks/spatial.py @@ -1,13 +1,15 @@ """Tasks for spatial operations on EOPatches, used in grid-switching.""" from __future__ import annotations +from typing import cast + import numpy as np from geopandas import GeoDataFrame -from eolearn.core import EOPatch, EOTask, deep_eq -from eolearn.core.types import Feature +from eolearn.core import EOPatch, EOTask, FeatureType, deep_eq from sentinelhub import CRS, BBox, bbox_to_resolution +from ..types import Feature, FeatureSpec from ..utils.general import convert_to_int from ..utils.vector import concat_gdf @@ -19,7 +21,7 @@ class SpatialJoinTask(EOTask): def __init__( self, - features: list[Feature], + features: list[FeatureSpec], no_data_map: dict[Feature, float], unique_columns_map: dict[Feature, list[str]], raise_misaligned: bool = True, @@ -110,12 +112,15 @@ def execute(self, *eopatches: EOPatch, bbox: BBox) -> EOPatch: for feature in self.features: feature_type, _ = feature + if feature_type is FeatureType.BBOX: + continue data = [eopatch[feature] for eopatch in eopatches if feature in eopatch] if not data: continue if feature_type.is_spatial(): + feature = cast(Feature, feature) # bbox and timestamp are discarded with above check if feature_type.is_array(): bboxes: list[BBox] = [patch.bbox for patch in eopatches if feature in patch] # type: ignore[misc] joined_data = self._join_spatial_rasters(data, bboxes, bbox, self.no_data_map[feature]) @@ -138,7 +143,7 @@ def execute(self, *eopatches: EOPatch, bbox: BBox) -> EOPatch: class SpatialSliceTask(EOTask): """Spatially slices given EOPatch to create a new one.""" - def __init__(self, features: list[Feature], raise_misaligned: bool = True): + def __init__(self, features: list[FeatureSpec], raise_misaligned: bool = True): self.features = self.parse_features(features) self.raise_misaligned = raise_misaligned @@ -176,6 +181,8 @@ def execute(self, eopatch: EOPatch, *, bbox: BBox, skip: bool = False) -> EOPatc for feature in self.features: feature_type, _ = feature + if feature_type is FeatureType.BBOX: + continue data = eopatch[feature] if feature_type.is_spatial(): diff --git a/eogrow/tasks/testing.py b/eogrow/tasks/testing.py index db7aae1e..f3dda08a 100644 --- a/eogrow/tasks/testing.py +++ b/eogrow/tasks/testing.py @@ -7,10 +7,9 @@ import numpy as np from eolearn.core import EOPatch, EOTask -from eolearn.core.types import Feature from eolearn.core.utils.common import is_discrete_type -from ..types import TimePeriod +from ..types import Feature, TimePeriod @dataclass diff --git a/eogrow/types.py b/eogrow/types.py index b6ed712d..2d33683c 100644 --- a/eogrow/types.py +++ b/eogrow/types.py @@ -10,7 +10,7 @@ else: from typing_extensions import TypeAlias -from eolearn.core import EONode +from eolearn.core import EONode, FeatureType from sentinelhub import BBox PatchList: TypeAlias = List[Tuple[str, BBox]] @@ -19,6 +19,9 @@ ImportPath: TypeAlias = str TimePeriod: TypeAlias = Tuple[datetime.date, datetime.date] +Feature: TypeAlias = Tuple[FeatureType, str] +FeatureSpec: TypeAlias = Union[Tuple[FeatureType, str], FeatureType] + BoolOrAuto: TypeAlias = Union[Literal["auto"], bool] JsonDict: TypeAlias = Dict[str, Any] diff --git a/eogrow/utils/filter.py b/eogrow/utils/filter.py index 4276bb81..4a19cf6b 100644 --- a/eogrow/utils/filter.py +++ b/eogrow/utils/filter.py @@ -10,42 +10,52 @@ from fs.base import FS from tqdm.auto import tqdm -from eolearn.core.eodata_io import get_filesystem_data_info -from eolearn.core.types import Feature +from eolearn.core import FeatureType +from eolearn.core.eodata_io import FilesystemDataInfo, get_filesystem_data_info -from ..types import PatchList +from ..types import FeatureSpec, PatchList def check_if_features_exist( filesystem: FS, eopatch_path: str, - features: Sequence[Feature], - *, - check_bbox: bool = True, - check_timestamps: bool, + features: Sequence[FeatureSpec], ) -> bool: """Checks whether an EOPatch in the given location has all specified features saved""" try: existing_data = get_filesystem_data_info(filesystem, eopatch_path, features) + meta_features = [spec for spec in features if isinstance(spec, FeatureType)] + regular_features = [spec for spec in features if isinstance(spec, tuple)] + + if not all(_check_if_meta_feature_exists(ftype, existing_data) for ftype in meta_features): + return False + + for ftype, fname in regular_features: + if ftype == FeatureType.META_INFO: + raise ValueError("Cannot check for a specific meta-info feature!") + if ftype not in existing_data.features or fname not in existing_data.features[ftype]: + return False + return True + except (IOError, fs.errors.ResourceNotFound): return False - if check_bbox and existing_data.bbox is None: + +def _check_if_meta_feature_exists(ftype: FeatureType, existing_data: FilesystemDataInfo) -> bool: + if ftype == FeatureType.BBOX and existing_data.bbox is None: return False - if check_timestamps and existing_data.timestamps is None: + if ftype == FeatureType.TIMESTAMPS and existing_data.timestamps is None: return False - - return all(fname in existing_data.features.get(ftype, []) for ftype, fname in features) + if ftype == FeatureType.META_INFO and existing_data.meta_info is None: + return False + return True def get_patches_with_missing_features( filesystem: FS, patches_folder: str, patch_list: PatchList, - features: Sequence[Feature], - *, - check_bbox: bool = True, - check_timestamps: bool, + features: Sequence[FeatureSpec], ) -> PatchList: """Filters out names of those EOPatches that are missing some given features. @@ -53,16 +63,12 @@ def get_patches_with_missing_features( :param patches_folder: A path to folder with EOPatches, relative to `filesystem` object. :param patch_list: A list of EOPatch names. :param features: A list of EOPatch features. - :param check_bbox: Whether to make sure that the bbox is present. - :param check_timestamps: Whether to make sure that the timestamps are present. :return: A sublist of `patch_list` with only EOPatch names that are missing some features. """ eopatch_paths = [fs.path.combine(patches_folder, eopatch) for eopatch, _ in patch_list] def check_patch(eopatch_path: str) -> bool: - return check_if_features_exist( - filesystem, eopatch_path, features, check_bbox=check_bbox, check_timestamps=check_timestamps - ) + return check_if_features_exist(filesystem, eopatch_path, features) with ThreadPoolExecutor() as executor: has_features_list = list( diff --git a/eogrow/utils/testing.py b/eogrow/utils/testing.py index 12db68e7..70da9d80 100644 --- a/eogrow/utils/testing.py +++ b/eogrow/utils/testing.py @@ -114,8 +114,7 @@ def _calculate_stats(self, folder: str | None = None) -> JsonDict: if self.filesystem.isdir(content_path): fs_data_info = get_filesystem_data_info(self.filesystem, content_path) if fs_data_info.bbox is not None: - load_timestamps = fs_data_info.timestamps is not None - eopatch = EOPatch.load(content_path, filesystem=self.filesystem, load_timestamps=load_timestamps) + eopatch = EOPatch.load(content_path, filesystem=self.filesystem) stats[content] = self._calculate_eopatch_stats(eopatch) else: # Probably it is not an EOPatch folder stats[content] = self._calculate_stats(folder=content_path) diff --git a/eogrow/utils/validators.py b/eogrow/utils/validators.py index 55232961..3ae9f08b 100644 --- a/eogrow/utils/validators.py +++ b/eogrow/utils/validators.py @@ -11,12 +11,11 @@ from pydantic import BaseModel, Field, validator from eolearn.core import FeatureType -from eolearn.core.types import Feature from eolearn.core.utils.parsing import parse_feature from sentinelhub import DataCollection from sentinelhub.data_collections_bands import Band, Bands, MetaBands, Unit -from ..types import RawSchemaDict, TimePeriod +from ..types import Feature, RawSchemaDict, TimePeriod from .meta import collect_schema, import_object if TYPE_CHECKING: diff --git a/pyproject.toml b/pyproject.toml index 6e8edd15..11515b3d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -43,12 +43,16 @@ classifiers = [ dependencies = [ "click", "colorlog", - "eo-learn[VISUALIZATION]>=1.5.0", + "eo-learn-core>=1.4.1", + "eo-learn-features>=1.4.1", + "eo-learn-geometry>=1.4.1", + "eo-learn-io>=1.4.1", + "eo-learn-mask>=1.4.1", + "eo-learn-visualization>=1.4.1", "fiona", "fs>=2.2.0", "geopandas>=0.8.1", "numpy", - "opencv-python-headless", "pandas", "pydantic>=1.8.0, <2.0", "python-rapidjson", @@ -60,11 +64,6 @@ dependencies = [ ] [project.optional-dependencies] -ml = [ - "joblib", - "lightgbm>=3.0.0", - "scikit-learn", -] docs = [ "autodoc_pydantic", "nbsphinx", @@ -73,7 +72,6 @@ docs = [ "sphinx", ] dev = [ - "eo-grow[ML]", "boto3", "build", "deepdiff", @@ -94,7 +92,12 @@ dev = [ "types-setuptools", "types-urllib3", ] - +ml = [ + "eo-learn-ml-tools>=1.4.1", + "joblib", + "lightgbm>=3.0.0", + "scikit-learn", +] [project.urls] Homepage = "https://github.com/sentinel-hub/eo-grow" diff --git a/tests/pipelines/test_download.py b/tests/pipelines/test_download.py index f75214fe..b719dcb6 100644 --- a/tests/pipelines/test_download.py +++ b/tests/pipelines/test_download.py @@ -16,9 +16,11 @@ def test_preparation(): @pytest.mark.parametrize( "experiment_name", [ + "download_l1c_q1", "download_l1c_q1_dn", "download_l1c_q1_dn_rescaled", - "download_custom_collection", + "download_l2a", + "download_season", "download_custom", "download_q3", "download_dem", diff --git a/tests/pipelines/test_features.py b/tests/pipelines/test_features.py index 6e2ded21..fe282ba1 100644 --- a/tests/pipelines/test_features.py +++ b/tests/pipelines/test_features.py @@ -9,6 +9,7 @@ @pytest.mark.parametrize( "experiment_name", [ + "features_interpolation", "features_mosaicking_custom", "features_on_rescaled_dn", "features_mosaicking", diff --git a/tests/pipelines/test_merge_samples.py b/tests/pipelines/test_merge_samples.py index 66f69c1c..1e3cec11 100644 --- a/tests/pipelines/test_merge_samples.py +++ b/tests/pipelines/test_merge_samples.py @@ -7,11 +7,11 @@ @pytest.mark.chain() @pytest.mark.order(after="test_features.py::test_features_pipeline") -def test_merge_samples_pipeline(config_and_stats_paths): - config_path, stats_path = config_and_stats_paths("merge_samples", "merge_features_samples") - output_path = run_config(config_path, reset_output_folder=True) - compare_content(output_path, stats_path) - - config_path, stats_path = config_and_stats_paths("merge_samples", "merge_reference_samples") - output_path = run_config(config_path, reset_output_folder=False) +@pytest.mark.parametrize( + ("experiment_name", "reset_folder"), + [("merge_features_samples", True), ("merge_reference_samples", False)], +) +def test_merge_samples_pipeline(config_and_stats_paths, experiment_name, reset_folder): + config_path, stats_path = config_and_stats_paths("merge_samples", experiment_name) + output_path = run_config(config_path, reset_output_folder=reset_folder) compare_content(output_path, stats_path) diff --git a/tests/pipelines/test_testing.py b/tests/pipelines/test_testing.py index 4ce75f18..fe926303 100644 --- a/tests/pipelines/test_testing.py +++ b/tests/pipelines/test_testing.py @@ -7,7 +7,7 @@ @pytest.mark.chain() @pytest.mark.parametrize("experiment_name", ["testing", "timestamps_only"]) -def test_data_generating_pipeline(config_and_stats_paths, experiment_name): +def test_features_pipeline(config_and_stats_paths, experiment_name): config_path, stats_path = config_and_stats_paths("testing", experiment_name) output_path = run_config(config_path) compare_content(output_path, stats_path) diff --git a/tests/pipelines/test_zipmap.py b/tests/pipelines/test_zipmap.py index 9b627013..dc68d43b 100644 --- a/tests/pipelines/test_zipmap.py +++ b/tests/pipelines/test_zipmap.py @@ -5,7 +5,7 @@ pytestmark = pytest.mark.integration -@pytest.mark.order(after=["test_rasterize.py::test_rasterize_feature_with_resolution"]) +@pytest.mark.order(after=["test_rasterize.py::test_rasterize_pipeline"]) @pytest.mark.parametrize("experiment_name", [pytest.param("zipmap", marks=pytest.mark.chain)]) def test_zipmap_pipeline(config_and_stats_paths, experiment_name): config_path, stats_path = config_and_stats_paths("zipmap", experiment_name) diff --git a/tests/tasks/test_common.py b/tests/tasks/test_common.py deleted file mode 100644 index e29c0c97..00000000 --- a/tests/tasks/test_common.py +++ /dev/null @@ -1,55 +0,0 @@ -from __future__ import annotations - -import numpy as np - -from eolearn.core import EOPatch, FeatureType -from sentinelhub import CRS, BBox - -from eogrow.tasks.common import LinearFunctionTask - -DUMMY_BBOX = BBox((0, 0, 1, 1), CRS(3857)) - - -def test_linear_function_task(): - eopatch = EOPatch(bbox=DUMMY_BBOX, timestamps=["1994-02-01"] * 8) - - data_feature = (FeatureType.DATA, "DATA_TEST") - data_result_feature = (FeatureType.DATA, "DATA_TRANSFORMED") - data_shape = (8, 10, 10, 5) - eopatch[data_feature] = np.arange(np.prod(data_shape)).reshape(data_shape).astype(np.float32) - - mask_timeless_feature = (FeatureType.MASK_TIMELESS, "MASK_TIMELESS_TEST") - mask_timeless_result_feature = (FeatureType.MASK_TIMELESS, "MASK_TIMELESS_TRANSFORMED") - mask_shape = (10, 10, 1) - eopatch[mask_timeless_feature] = np.ones(mask_shape, dtype=np.uint32) - - task_default = LinearFunctionTask(data_feature, data_result_feature) - task_default(eopatch) - assert np.array_equal(eopatch[data_feature], eopatch[data_result_feature]) - - task_double_minus_five = LinearFunctionTask( - [data_feature, mask_timeless_feature], - [data_result_feature, mask_timeless_result_feature], - slope=2, - intercept=-5, - ) - task_double_minus_five(eopatch) - expected_result = np.arange(np.prod(data_shape)).reshape(data_shape).astype(float) * 2 - 5 - assert np.array_equal(eopatch[data_result_feature], expected_result) - assert np.array_equal(eopatch[mask_timeless_result_feature], np.ones(mask_shape) * 2 - 5) - assert eopatch[data_result_feature].dtype == np.float32 - # The value of the mask timeless changes here - - task_change_dtype = LinearFunctionTask( - mask_timeless_feature, mask_timeless_result_feature, slope=256, dtype=np.uint8 - ) - task_change_dtype(eopatch) - assert np.array_equal(eopatch[mask_timeless_result_feature], np.zeros(mask_shape)) - assert eopatch[mask_timeless_result_feature].dtype == np.uint8 - - task_override = LinearFunctionTask( - [data_feature, mask_timeless_feature], - slope=5, - ) - task_override(eopatch) - assert np.array_equal(eopatch[mask_timeless_feature], np.ones(mask_shape) * 5) diff --git a/tests/tasks/test_testing.py b/tests/tasks/test_testing.py index 72c3e0d0..ba313d9b 100644 --- a/tests/tasks/test_testing.py +++ b/tests/tasks/test_testing.py @@ -32,7 +32,7 @@ def test_generate_timestamp_feature_task(dummy_eopatch: EOPatch): assert isinstance(eopatch, EOPatch) assert eopatch.timestamps is not None - assert len(eopatch.get_features()) == 0 + assert len(eopatch.get_features()) == 2 assert len(eopatch.timestamps) == timestamp_num assert eopatch.timestamps == sorted(eopatch.timestamps) @@ -55,7 +55,7 @@ def test_generate_timestamp_feature_task(dummy_eopatch: EOPatch): (FeatureType.DATA_TIMELESS, (83, 69, 1), int, UniformDistribution(-2, 5)), (FeatureType.MASK, (10, 4, 16, 7), np.int8, UniformDistribution(-3, 7)), (FeatureType.LABEL, (10, 271), bool, UniformDistribution(0, 1)), - (FeatureType.DATA, (10, 20, 31, 1), np.float32, NormalDistribution(-3, 1.5)), + (FeatureType.DATA, (7, 20, 31, 1), np.float32, NormalDistribution(-3, 1.5)), (FeatureType.DATA_TIMELESS, (30, 61, 2), float, NormalDistribution(-3422.23, 1522)), (FeatureType.DATA_TIMELESS, (83, 69, 10), int, NormalDistribution(-2, 10)), ], @@ -69,7 +69,7 @@ def test_generate_raster_feature_task(dummy_eopatch, feature_type, shape, dtype, eopatch = task.execute(dummy_eopatch.copy(), seed=seed) assert feature in eopatch - assert len(eopatch.get_features()) == 1 + assert len(eopatch.get_features()) == 3 data: np.ndarray = eopatch[feature] assert data.shape == shape diff --git a/tests/test_config_files/download_and_batch/download_l1c_q1.json b/tests/test_config_files/download_and_batch/download_l1c_q1.json new file mode 100644 index 00000000..01f3e3b2 --- /dev/null +++ b/tests/test_config_files/download_and_batch/download_l1c_q1.json @@ -0,0 +1,18 @@ +{ + "pipeline": "eogrow.pipelines.download.DownloadPipeline", + "**global_config": "${config_path}/../global_config.json", + "output_folder_key": "temp", + "data_collection": "SENTINEL2_L1C", + "time_period": ["Q1", 2018], + "bands_feature_name": "BANDS-S2-L1C", + "resolution": 10, + "maxcc": 0.3, + "time_difference": 120, + "additional_data": [ + ["mask", "CLM"], + ["mask", "dataMask"] + ], + "workers": 2, + "threads_per_worker": null, + "resampling_type": "nearest" +} diff --git a/tests/test_config_files/download_and_batch/download_l2a.json b/tests/test_config_files/download_and_batch/download_l2a.json new file mode 100644 index 00000000..a89fbd90 --- /dev/null +++ b/tests/test_config_files/download_and_batch/download_l2a.json @@ -0,0 +1,16 @@ +{ + "pipeline": "eogrow.pipelines.download.DownloadPipeline", + "**global_config": "${config_path}/../global_config.json", + "output_folder_key": "temp", + "data_collection": "SENTINEL2_L2A", + "time_period": ["yearly", 2017], + "bands_feature_name": "BANDS-S2-L2A", + "size": [264, 121], + "maxcc": 0.3, + "time_difference": 120, + "additional_data": [ + ["mask", "CLM"], + ["mask", "dataMask"] + ], + "workers": 2 +} diff --git a/tests/test_config_files/download_and_batch/download_q3.json b/tests/test_config_files/download_and_batch/download_q3.json index b67e3b08..72a0e842 100644 --- a/tests/test_config_files/download_and_batch/download_q3.json +++ b/tests/test_config_files/download_and_batch/download_q3.json @@ -5,7 +5,7 @@ "data_collection": "SENTINEL2_L1C", "time_period": ["Q3", 2018], "bands_feature_name": "BANDS-S2-L1C", - "size": [264, 121], + "resolution": 10, "maxcc": 0.3, "additional_data": [ ["mask", "CLM"], diff --git a/tests/test_config_files/download_and_batch/download_custom_collection.json b/tests/test_config_files/download_and_batch/download_season.json similarity index 100% rename from tests/test_config_files/download_and_batch/download_custom_collection.json rename to tests/test_config_files/download_and_batch/download_season.json diff --git a/tests/test_config_files/features/features_interpolation.json b/tests/test_config_files/features/features_interpolation.json new file mode 100644 index 00000000..4df5e229 --- /dev/null +++ b/tests/test_config_files/features/features_interpolation.json @@ -0,0 +1,22 @@ +{ + "pipeline": "eogrow.pipelines.features.InterpolationFeaturesPipeline", + "**global_config": "${config_path}/../global_config.json", + "input_folder_key": "data", + "output_folder_key": "temp", + "bands_feature_name": "BANDS-S2-L1C", + "data_preparation": { + "validity_threshold": 0.8, + "cloud_mask_feature_name": "CLM", + "valid_data_feature_name": "dataMask" + }, + "ndis": { + "NDVI": [7, 3], + "NDWI": [2, 7], + "NDBI": [11, 7] + }, + "interpolation": { + "time_period": ["yearly", 2018], + "resampling_period": 16 + }, + "output_feature_name": "FEATURES" +} diff --git a/tests/test_stats/download_and_batch/download_l1c_q1.json b/tests/test_stats/download_and_batch/download_l1c_q1.json new file mode 100644 index 00000000..5383ab66 --- /dev/null +++ b/tests/test_stats/download_and_batch/download_l1c_q1.json @@ -0,0 +1,326 @@ +{ + "eopatch-id-0-col-0-row-0": { + "bbox": "BBox(((729480.0, 4390045.0), (732120.0, 4391255.0)), crs=CRS('32638'))", + "data": { + "BANDS-S2-L1C": { + "array_shape": [ + 4, + 121, + 264, + 13 + ], + "basic_stats": { + "max": 1.0095, + "mean": 0.16313, + "median": 0.1458, + "min": 0.0008 + }, + "counts": { + "infinite": 0, + "nan": 0 + }, + "dtype": "float32", + "histogram": { + "counts": [ + 636956, + 800809, + 155404, + 33612, + 21702, + 10629, + 1968, + 8 + ], + "edges": [ + 0.0008, + 0.12689, + 0.25298, + 0.37906, + 0.50515, + 0.63124, + 0.75732, + 0.88341, + 1.0095 + ] + }, + "random_values": [ + { + "position": [ + 1, + 49, + 83, + 6 + ], + "value": 0.5122 + }, + { + "position": [ + 1, + 46, + 65, + 0 + ], + "value": 0.4152 + }, + { + "position": [ + 3, + 87, + 180, + 11 + ], + "value": 0.2291 + }, + { + "position": [ + 2, + 4, + 80, + 8 + ], + "value": 0.2585 + }, + { + "position": [ + 1, + 31, + 12, + 7 + ], + "value": 0.6778 + } + ], + "subsample_basic_stats": { + "max": 0.9554, + "mean": 0.1634, + "median": 0.146, + "min": 0.0008 + } + } + }, + "mask": { + "CLM": { + "array_shape": [ + 4, + 121, + 264, + 1 + ], + "dtype": "uint8", + "random_values": [ + { + "position": [ + 2, + 45, + 112, + 0 + ], + "value": 0 + }, + { + "position": [ + 1, + 26, + 128, + 0 + ], + "value": 1 + } + ], + "values": [ + { + "count": 106155, + "value": 0 + }, + { + "count": 21621, + "value": 1 + } + ] + }, + "dataMask": { + "array_shape": [ + 4, + 121, + 264, + 1 + ], + "dtype": "bool", + "values": [ + { + "count": 127776, + "value": true + } + ] + } + }, + "timestamps": [ + "2018-01-19T07:42:27", + "2018-02-28T07:46:50", + "2018-03-05T07:38:03", + "2018-03-25T07:44:17" + ] + }, + "eopatch-id-1-col-0-row-1": { + "bbox": "BBox(((729480.0, 4391145.0), (732120.0, 4392355.0)), crs=CRS('32638'))", + "data": { + "BANDS-S2-L1C": { + "array_shape": [ + 4, + 121, + 264, + 13 + ], + "basic_stats": { + "max": 0.8772, + "mean": 0.16175, + "median": 0.1402, + "min": 0.0007 + }, + "counts": { + "infinite": 0, + "nan": 0 + }, + "dtype": "float32", + "histogram": { + "counts": [ + 498286, + 800134, + 260588, + 54142, + 26625, + 15157, + 5398, + 758 + ], + "edges": [ + 0.0007, + 0.11026, + 0.21982, + 0.32939, + 0.43895, + 0.54851, + 0.65808, + 0.76764, + 0.8772 + ] + }, + "random_values": [ + { + "position": [ + 1, + 106, + 142, + 5 + ], + "value": 0.4919 + }, + { + "position": [ + 1, + 111, + 138, + 1 + ], + "value": 0.3968 + }, + { + "position": [ + 3, + 84, + 40, + 5 + ], + "value": 0.2144 + }, + { + "position": [ + 2, + 10, + 96, + 8 + ], + "value": 0.2432 + }, + { + "position": [ + 1, + 88, + 44, + 8 + ], + "value": 0.6538 + } + ], + "subsample_basic_stats": { + "max": 0.862, + "mean": 0.1618, + "median": 0.1401, + "min": 0.0007 + } + } + }, + "mask": { + "CLM": { + "array_shape": [ + 4, + 121, + 264, + 1 + ], + "dtype": "uint8", + "random_values": [ + { + "position": [ + 2, + 55, + 126, + 0 + ], + "value": 0 + }, + { + "position": [ + 1, + 27, + 217, + 0 + ], + "value": 1 + } + ], + "values": [ + { + "count": 100726, + "value": 0 + }, + { + "count": 27050, + "value": 1 + } + ] + }, + "dataMask": { + "array_shape": [ + 4, + 121, + 264, + 1 + ], + "dtype": "bool", + "values": [ + { + "count": 127776, + "value": true + } + ] + } + }, + "timestamps": [ + "2018-01-19T07:42:27", + "2018-02-28T07:46:50", + "2018-03-05T07:38:03", + "2018-03-25T07:44:17" + ] + } +} diff --git a/tests/test_stats/download_and_batch/download_l2a.json b/tests/test_stats/download_and_batch/download_l2a.json new file mode 100644 index 00000000..27e3bddf --- /dev/null +++ b/tests/test_stats/download_and_batch/download_l2a.json @@ -0,0 +1,352 @@ +{ + "eopatch-id-0-col-0-row-0": { + "bbox": "BBox(((729480.0, 4390045.0), (732120.0, 4391255.0)), crs=CRS('32638'))", + "data": { + "BANDS-S2-L2A": { + "array_shape": [ + 17, + 121, + 264, + 12 + ], + "basic_stats": { + "max": 1.7957, + "mean": 0.23922, + "median": 0.202, + "min": 0.0001 + }, + "counts": { + "infinite": 0, + "nan": 0 + }, + "dtype": "float32", + "histogram": { + "counts": [ + 3727737, + 2231182, + 352782, + 95148, + 52947, + 33987, + 19520, + 3273 + ], + "edges": [ + 0.0001, + 0.22455, + 0.449, + 0.67345, + 0.8979, + 1.12235, + 1.3468, + 1.57125, + 1.7957 + ] + }, + "random_values": [ + { + "position": [ + 15, + 114, + 50, + 11 + ], + "value": 1.0367 + }, + { + "position": [ + 15, + 102, + 204, + 3 + ], + "value": 0.832 + }, + { + "position": [ + 16, + 117, + 45, + 0 + ], + "value": 0.4392 + }, + { + "position": [ + 16, + 30, + 69, + 5 + ], + "value": 0.5012 + }, + { + "position": [ + 15, + 107, + 109, + 10 + ], + "value": 1.3852 + } + ], + "subsample_basic_stats": { + "max": 1.7833, + "mean": 0.23886, + "median": 0.202, + "min": 0.0001 + } + } + }, + "mask": { + "CLM": { + "array_shape": [ + 17, + 121, + 264, + 1 + ], + "dtype": "uint8", + "random_values": [ + { + "position": [ + 7, + 83, + 57, + 0 + ], + "value": 0 + }, + { + "position": [ + 10, + 2, + 76, + 0 + ], + "value": 1 + } + ], + "values": [ + { + "count": 412988, + "value": 0 + }, + { + "count": 130060, + "value": 1 + } + ] + }, + "dataMask": { + "array_shape": [ + 17, + 121, + 264, + 1 + ], + "dtype": "bool", + "values": [ + { + "count": 543048, + "value": true + } + ] + } + }, + "timestamps": [ + "2017-02-18T07:44:53", + "2017-03-10T07:45:49", + "2017-03-30T07:42:04", + "2017-05-09T07:44:15", + "2017-05-29T07:44:28", + "2017-07-03T07:41:53", + "2017-07-23T07:43:03", + "2017-07-28T07:43:08", + "2017-08-12T07:43:01", + "2017-09-01T07:43:08", + "2017-09-06T07:40:42", + "2017-10-11T07:46:43", + "2017-10-31T07:41:37", + "2017-11-05T07:41:49", + "2017-11-25T07:42:33", + "2017-12-10T07:43:05", + "2017-12-30T07:43:14" + ] + }, + "eopatch-id-1-col-0-row-1": { + "bbox": "BBox(((729480.0, 4391145.0), (732120.0, 4392355.0)), crs=CRS('32638'))", + "data": { + "BANDS-S2-L2A": { + "array_shape": [ + 17, + 121, + 264, + 12 + ], + "basic_stats": { + "max": 0.9183, + "mean": 0.21105, + "median": 0.191, + "min": 0.0001 + }, + "counts": { + "infinite": 0, + "nan": 0 + }, + "dtype": "float32", + "histogram": { + "counts": [ + 1858185, + 2190376, + 1542709, + 546500, + 188549, + 127706, + 55788, + 6763 + ], + "edges": [ + 0.0001, + 0.11487, + 0.22965, + 0.34442, + 0.4592, + 0.57397, + 0.68875, + 0.80352, + 0.9183 + ] + }, + "random_values": [ + { + "position": [ + 16, + 79, + 105, + 2 + ], + "value": 0.5624 + }, + { + "position": [ + 10, + 60, + 11, + 8 + ], + "value": 0.4514 + }, + { + "position": [ + 14, + 19, + 80, + 8 + ], + "value": 0.2383 + }, + { + "position": [ + 7, + 58, + 256, + 10 + ], + "value": 0.2719 + }, + { + "position": [ + 16, + 26, + 190, + 1 + ], + "value": 0.751 + } + ], + "subsample_basic_stats": { + "max": 0.9165, + "mean": 0.21105, + "median": 0.1911, + "min": 0.0001 + } + } + }, + "mask": { + "CLM": { + "array_shape": [ + 17, + 121, + 264, + 1 + ], + "dtype": "uint8", + "random_values": [ + { + "position": [ + 8, + 35, + 86, + 0 + ], + "value": 0 + }, + { + "position": [ + 6, + 52, + 206, + 0 + ], + "value": 1 + } + ], + "values": [ + { + "count": 415838, + "value": 0 + }, + { + "count": 127210, + "value": 1 + } + ] + }, + "dataMask": { + "array_shape": [ + 17, + 121, + 264, + 1 + ], + "dtype": "bool", + "values": [ + { + "count": 543048, + "value": true + } + ] + } + }, + "timestamps": [ + "2017-02-18T07:44:53", + "2017-03-10T07:45:49", + "2017-03-30T07:42:04", + "2017-05-09T07:44:15", + "2017-05-29T07:44:28", + "2017-07-03T07:41:53", + "2017-07-23T07:43:03", + "2017-07-28T07:43:08", + "2017-08-12T07:43:01", + "2017-09-01T07:43:08", + "2017-09-06T07:40:42", + "2017-10-11T07:46:43", + "2017-10-31T07:41:37", + "2017-11-05T07:41:49", + "2017-11-25T07:42:33", + "2017-12-10T07:43:05", + "2017-12-30T07:43:14" + ] + } +} diff --git a/tests/test_stats/download_and_batch/download_custom_collection.json b/tests/test_stats/download_and_batch/download_season.json similarity index 100% rename from tests/test_stats/download_and_batch/download_custom_collection.json rename to tests/test_stats/download_and_batch/download_season.json diff --git a/tests/test_stats/sampling/sampling_fraction_erosion.json b/tests/test_stats/sampling/sampling_fraction_erosion.json index 816b3773..ce72e66e 100644 --- a/tests/test_stats/sampling/sampling_fraction_erosion.json +++ b/tests/test_stats/sampling/sampling_fraction_erosion.json @@ -4,34 +4,12 @@ "mask_timeless": { "LULC_ID": { "array_shape": [ - 9090, + 9076, 1, 1 ], "dtype": "int16", - "random_values": [ - { - "position": [ - 7, - 0, - 0 - ], - "value": 4 - }, - { - "position": [ - 2462, - 0, - 0 - ], - "value": 8 - } - ], "values": [ - { - "count": 14, - "value": 4 - }, { "count": 9076, "value": 8 @@ -49,7 +27,7 @@ { "position": [ 64, - 85, + 49, 0 ], "value": 0 @@ -57,7 +35,7 @@ { "position": [ 26, - 116, + 158, 0 ], "value": 1 @@ -65,24 +43,24 @@ ], "values": [ { - "count": 22854, + "count": 22868, "value": 0 }, { - "count": 9090, + "count": 9076, "value": 1 } ] }, "POLYGON_ID": { "array_shape": [ - 9090, + 9076, 1, 1 ], "basic_stats": { "max": 326, - "mean": 0.46293, + "mean": 0.6212, "median": 0.0, "min": 0 }, @@ -93,14 +71,14 @@ "dtype": "int32", "histogram": { "counts": [ - 9076, + 9058, 0, 0, 0, - 2, + 1, 0, 0, - 12 + 17 ], "edges": [ 0.0, @@ -117,7 +95,7 @@ "random_values": [ { "position": [ - 4461, + 4552, 0, 0 ], @@ -125,7 +103,7 @@ }, { "position": [ - 1853, + 4495, 0, 0 ], @@ -133,7 +111,7 @@ }, { "position": [ - 5779, + 7270, 0, 0 ], @@ -141,7 +119,7 @@ }, { "position": [ - 4149, + 1945, 0, 0 ], @@ -149,7 +127,7 @@ }, { "position": [ - 1836, + 3653, 0, 0 ], @@ -157,8 +135,8 @@ } ], "subsample_basic_stats": { - "max": 316, - "mean": 0.73927, + "max": 320, + "mean": 0.74531, "median": 0.0, "min": 0 } @@ -170,34 +148,12 @@ "mask_timeless": { "LULC_ID": { "array_shape": [ - 5794, + 5790, 1, 1 ], "dtype": "int16", - "random_values": [ - { - "position": [ - 2, - 0, - 0 - ], - "value": 4 - }, - { - "position": [ - 1566, - 0, - 0 - ], - "value": 8 - } - ], "values": [ - { - "count": 4, - "value": 4 - }, { "count": 5790, "value": 8 @@ -215,7 +171,7 @@ { "position": [ 57, - 165, + 170, 0 ], "value": 0 @@ -223,7 +179,7 @@ { "position": [ 50, - 107, + 113, 0 ], "value": 1 @@ -231,18 +187,18 @@ ], "values": [ { - "count": 26150, + "count": 26154, "value": 0 }, { - "count": 5794, + "count": 5790, "value": 1 } ] }, "POLYGON_ID": { "array_shape": [ - 5794, + 5790, 1, 1 ], @@ -250,7 +206,7 @@ "random_values": [ { "position": [ - 4956, + 4909, 0, 0 ], @@ -258,7 +214,7 @@ }, { "position": [ - 3670, + 4813, 0, 0 ], @@ -266,7 +222,7 @@ }, { "position": [ - 3394, + 5564, 0, 0 ], @@ -274,7 +230,7 @@ }, { "position": [ - 3819, + 1025, 0, 0 ], @@ -282,7 +238,7 @@ }, { "position": [ - 4599, + 5006, 0, 0 ], @@ -291,23 +247,23 @@ ], "values": [ { - "count": 5759, + "count": 5756, "value": 0 }, { - "count": 9, + "count": 10, "value": 10 }, { - "count": 6, + "count": 5, "value": 21 }, { - "count": 8, + "count": 6, "value": 22 }, { - "count": 1, + "count": 2, "value": 207 }, { diff --git a/tests/utils/test_filter.py b/tests/utils/test_filter.py index 2b9d5746..810e6a47 100644 --- a/tests/utils/test_filter.py +++ b/tests/utils/test_filter.py @@ -1,5 +1,5 @@ import datetime -from itertools import repeat +from itertools import chain, combinations, repeat import boto3 import numpy as np @@ -15,16 +15,14 @@ BUCKET_NAME = "mocked-test-bucket" PATCH_NAMES = [f"eopatch{i}" for i in range(5)] -MISSING = [(FeatureType.META_INFO, "beep"), (FeatureType.DATA, "no_data"), (FeatureType.MASK_TIMELESS, "mask")] -EXISTING = [(FeatureType.DATA, "data"), (FeatureType.DATA, "data2"), (FeatureType.MASK, "mask")] +REAL_FEATURES = [FeatureType.BBOX, FeatureType.TIMESTAMPS, (FeatureType.DATA, "data"), (FeatureType.MASK, "mask")] +MISSING_FEATURES = [FeatureType.META_INFO, (FeatureType.DATA, "no_data"), (FeatureType.MASK_TIMELESS, "mask")] -def _prepare_fs(filesystem, eopatch: EOPatch): - """Saves the eopatch under the predefined names, where every second one only contains the MASK feature type. - All EOPatches contain the bbox, while only the first two are missing the timestamps. - """ +def _prepare_fs(filesystem, eopatch): + """Saves the eopatch under the predefined names, where every second one only contains the BBOX and MASK""" for i, name in enumerate(PATCH_NAMES): - eopatch.save(name, filesystem=filesystem, features=... if i % 2 else [FeatureType.MASK], save_timestamps=i > 1) + eopatch.save(name, filesystem=filesystem, features=[FeatureType.BBOX, FeatureType.MASK] if i % 2 else ...) @pytest.fixture(name="eopatch", scope="session") @@ -33,7 +31,6 @@ def eopatch_fixture(): eopatch.timestamps = [datetime.datetime(2017, 1, 1, 10, 4, 7), datetime.datetime(2017, 1, 4, 10, 14, 5)] eopatch.mask["mask"] = np.zeros((2, 3, 3, 2), dtype=np.int16) eopatch.data["data"] = np.zeros((2, 3, 3, 2), dtype=np.int16) - eopatch.data["data2"] = np.zeros((2, 3, 3, 2), dtype=bool) eopatch.scalar["my scalar with spaces"] = np.array([[1, 2, 3], [1, 2, 3]]) eopatch.scalar_timeless["my timeless scalar with spaces"] = np.array([1, 2, 3]) return eopatch @@ -57,31 +54,26 @@ def temp_fs_fixture(eopatch): @pytest.mark.parametrize( - ("features", "expected_result"), - [ - ([EXISTING[0]], True), - (EXISTING[1:], True), - (EXISTING, True), - *(([missing], False) for missing in MISSING), - *((EXISTING[:i] + [missing] + EXISTING[i:], False) for i, missing in enumerate(MISSING)), - ], + ("test_features", "expected_result"), + [(list(features), True) for features in chain(*(combinations(REAL_FEATURES, i) for i in range(4)))] + + [([missing], False) for missing in MISSING_FEATURES] + + [(REAL_FEATURES[:i] + [missing] + REAL_FEATURES[i:], False) for i, missing in enumerate(MISSING_FEATURES)], ) -def test_check_if_features_exist(mock_s3fs, temp_fs, features, expected_result): +def test_check_if_features_exist(mock_s3fs, temp_fs, test_features, expected_result): for filesystem in [mock_s3fs, temp_fs]: - # take the 4th patch because the 1st and 3rd have missing features, and the first two have missing timestamps - assert check_if_features_exist(filesystem, PATCH_NAMES[3], features, check_timestamps=True) == expected_result + assert check_if_features_exist(filesystem, PATCH_NAMES[0], test_features) == expected_result @pytest.mark.parametrize( ("features", "expected_num"), [ - ([], 2), # timestamps are missing - ([(FeatureType.DATA, "data")], 4), + ([], 0), + ([FeatureType.BBOX], 0), + ([FeatureType.BBOX, (FeatureType.DATA, "data")], 2), ([(FeatureType.DATA, "no_data"), (FeatureType.DATA, "data")], 5), ], ) def test_get_patches_with_missing_features(mock_s3fs, temp_fs, features, expected_num): patch_list = list(zip(PATCH_NAMES, repeat(BBox((0, 0, 1, 1), CRS.WGS84)))) for filesystem in [mock_s3fs, temp_fs]: - incomplete = get_patches_with_missing_features(filesystem, "/", patch_list, features, check_timestamps=True) - assert len(incomplete) == expected_num + assert len(get_patches_with_missing_features(filesystem, "/", patch_list, features)) == expected_num diff --git a/tests/utils/test_general.py b/tests/utils/test_general.py index 0674ccf1..ba484193 100644 --- a/tests/utils/test_general.py +++ b/tests/utils/test_general.py @@ -16,14 +16,14 @@ class MyEnum(enum.Enum): ORIGINAL_CONFIG = { - "feature_types": (FeatureType.DATA, FeatureType.MASK_TIMELESS), + "feature_types": (FeatureType.DATA, FeatureType.BBOX), "enum": MyEnum.NO_DATA, "timestamp": dt.datetime(year=2021, month=9, day=30), "collection_set": {DataCollection.SENTINEL2_L1C}, "collection_def": DataCollection.SENTINEL2_L1C.value, } SERIALIZED_CONFIG = { - "feature_types": ["data", "mask_timeless"], + "feature_types": ["data", "bbox"], "enum": "no data", "timestamp": "2021-09-30T00:00:00", "collection_set": ["SENTINEL2_L1C"], diff --git a/tests/utils/test_validators.py b/tests/utils/test_validators.py index 72d7dfeb..406e5b31 100644 --- a/tests/utils/test_validators.py +++ b/tests/utils/test_validators.py @@ -7,13 +7,12 @@ from pydantic import ValidationError from eolearn.core import FeatureType -from eolearn.core.types import Feature from sentinelhub import DataCollection from sentinelhub.data_collections_bands import Band, MetaBands, Unit from eogrow.core.pipeline import Pipeline from eogrow.core.schemas import BaseSchema, ManagerSchema -from eogrow.types import RawSchemaDict +from eogrow.types import Feature, RawSchemaDict from eogrow.utils.validators import ( ensure_defined_together, ensure_exactly_one_defined,