Skip to content

Commit

Permalink
Revert "Patch for eolearn 1.5.0 (#264)"
Browse files Browse the repository at this point in the history
This reverts commit 4ed5dc6.
  • Loading branch information
zigaLuksic authored Sep 7, 2023
1 parent 4ed5dc6 commit 51fed2a
Show file tree
Hide file tree
Showing 43 changed files with 1,066 additions and 363 deletions.
17 changes: 10 additions & 7 deletions eogrow/pipelines/batch_to_eopatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@
RenameFeatureTask,
SaveTask,
)
from eolearn.core.types import Feature
from eolearn.features import LinearFunctionTask
from eolearn.io import ImportFromTiffTask

from ..core.pipeline import Pipeline
from ..core.schemas import BaseSchema
from ..tasks.batch_to_eopatch import DeleteFilesTask, FixImportedTimeDependentFeatureTask, LoadUserDataTask
from ..tasks.common import LinearFunctionTask
from ..types import ExecKwargs, PatchList, RawSchemaDict
from ..types import ExecKwargs, Feature, FeatureSpec, PatchList, RawSchemaDict
from ..utils.filter import get_patches_with_missing_features
from ..utils.validators import ensure_storage_key_presence, optional_field_validator, parse_dtype

Expand Down Expand Up @@ -97,15 +96,18 @@ def filter_patch_list(self, patch_list: PatchList) -> PatchList:
self.storage.get_folder(self.config.output_folder_key),
patch_list,
self._get_output_features(),
check_timestamps=self.config.userdata_timestamp_reader is not None,
)

def _get_output_features(self) -> list[Feature]:
def _get_output_features(self) -> list[FeatureSpec]:
"""Lists all features that the pipeline outputs."""
features = [feature_mapping.feature for feature_mapping in self.config.mapping]
features: list[FeatureSpec] = [FeatureType.BBOX]
features.extend(feature_mapping.feature for feature_mapping in self.config.mapping)

if self.config.userdata_feature_name:
features.append((FeatureType.META_INFO, self.config.userdata_feature_name))
features.append(FeatureType.META_INFO)

if self.config.userdata_timestamp_reader:
features.append(FeatureType.TIMESTAMPS)

return features

Expand Down Expand Up @@ -139,6 +141,7 @@ def build_workflow(self) -> EOWorkflow:
path=self.storage.get_folder(self.config.output_folder_key),
filesystem=self.storage.filesystem,
features=self._get_output_features(),
compress_level=1,
overwrite_permission=OverwritePermission.OVERWRITE_FEATURES,
)
save_node = EONode(save_task, inputs=([processing_node] if processing_node else []))
Expand Down
39 changes: 24 additions & 15 deletions eogrow/pipelines/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from pydantic import Field

from eolearn.core import EONode, EOWorkflow, FeatureType, OverwritePermission, SaveTask
from eolearn.core.types import Feature
from eolearn.features import LinearFunctionTask
from eolearn.io import SentinelHubDemTask, SentinelHubEvalscriptTask, SentinelHubInputTask
from sentinelhub import (
Band,
Expand All @@ -28,8 +28,7 @@

from ..core.pipeline import Pipeline
from ..core.schemas import BaseSchema
from ..tasks.common import LinearFunctionTask
from ..types import ExecKwargs, PatchList, ProcessingType, TimePeriod
from ..types import ExecKwargs, Feature, FeatureSpec, PatchList, ProcessingType, TimePeriod
from ..utils.filter import get_patches_with_missing_features
from ..utils.validators import (
ensure_exactly_one_defined,
Expand Down Expand Up @@ -82,6 +81,7 @@ class Schema(Pipeline.Schema):
)
_ensure_output_folder_key = ensure_storage_key_presence("output_folder_key")

compress_level: int = Field(1, description="Level of compression used in saving EOPatches")
threads_per_worker: Optional[int] = Field(
description=(
"Maximum number of parallel threads used during download by each worker. If set to None it will use "
Expand All @@ -95,21 +95,19 @@ class Schema(Pipeline.Schema):

def __init__(self, *args: Any, **kwargs: Any):
super().__init__(*args, **kwargs)
self.download_node_uid = "<NODE ID NOT SET>"
self.download_node_uid: str | None = None

def filter_patch_list(self, patch_list: PatchList) -> PatchList:
"""EOPatches are filtered according to existence of specified output features"""
output_features = self._get_output_features()
return get_patches_with_missing_features(
self.storage.filesystem,
self.storage.get_folder(self.config.output_folder_key),
patch_list,
output_features,
check_timestamps=any(ftype.is_temporal() for ftype, _ in output_features),
self._get_output_features(),
)

@abc.abstractmethod
def _get_output_features(self) -> list[Feature]:
def _get_output_features(self) -> list[FeatureSpec]:
"""Lists all features that are to be saved upon the pipeline completion"""

@abc.abstractmethod
Expand Down Expand Up @@ -151,6 +149,7 @@ def build_workflow(self, session_loader: SessionLoaderType) -> EOWorkflow:
self.storage.get_folder(self.config.output_folder_key),
filesystem=self.storage.filesystem,
features=self._get_output_features(),
compress_level=self.config.compress_level,
overwrite_permission=OverwritePermission.OVERWRITE_FEATURES,
),
inputs=[postprocessing_node or download_node],
Expand All @@ -165,7 +164,9 @@ def get_execution_arguments(self, workflow: EOWorkflow, patch_list: PatchList) -
"""
exec_args = super().get_execution_arguments(workflow, patch_list)

download_node = workflow.get_node_with_uid(self.download_node_uid, fail_if_missing=True)
download_node = workflow.get_node_with_uid(self.download_node_uid)
if download_node is None:
return exec_args

for patch_name, bbox in patch_list:
exec_args[patch_name][download_node] = {"bbox": bbox}
Expand Down Expand Up @@ -246,8 +247,14 @@ class Schema(BaseDownloadPipeline.Schema, CommonDownloadFields, TimeDependantFie

config: Schema

def _get_output_features(self) -> list[Feature]:
return [(FeatureType.DATA, self.config.bands_feature_name), *self.config.additional_data]
def _get_output_features(self) -> list[FeatureSpec]:
features: list[FeatureSpec] = [
(FeatureType.DATA, self.config.bands_feature_name),
FeatureType.BBOX,
FeatureType.TIMESTAMPS,
]
features.extend(self.config.additional_data)
return features

def _get_download_node(self, session_loader: SessionLoaderType) -> EONode:
time_diff = None if self.config.time_difference is None else dt.timedelta(minutes=self.config.time_difference)
Expand Down Expand Up @@ -293,8 +300,10 @@ class Schema(BaseDownloadPipeline.Schema, CommonDownloadFields, TimeDependantFie

config: Schema

def _get_output_features(self) -> list[Feature]:
return self.config.features
def _get_output_features(self) -> list[FeatureSpec]:
features: list[FeatureSpec] = [FeatureType.BBOX, FeatureType.TIMESTAMPS]
features.extend(self.config.features)
return features

def _get_download_node(self, session_loader: SessionLoaderType) -> EONode:
evalscript = read_data(self.config.evalscript_path, data_format=MimeType.TXT)
Expand Down Expand Up @@ -326,8 +335,8 @@ class Schema(BaseDownloadPipeline.Schema, CommonDownloadFields):

config: Schema

def _get_output_features(self) -> list[Feature]:
return [(FeatureType.DATA_TIMELESS, self.config.feature_name)]
def _get_output_features(self) -> list[FeatureSpec]:
return [(FeatureType.DATA_TIMELESS, self.config.feature_name), FeatureType.BBOX]

def _get_download_node(self, session_loader: SessionLoaderType) -> EONode:
download_task = SentinelHubDemTask(
Expand Down
15 changes: 7 additions & 8 deletions eogrow/pipelines/export_maps.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,17 @@
from pydantic import Field
from tqdm.auto import tqdm

from eolearn.core import EOPatch, EOTask, EOWorkflow, LoadTask, linearly_connect_tasks
from eolearn.core.types import Feature
from eolearn.core import EOPatch, EOTask, EOWorkflow, FeatureType, LoadTask, linearly_connect_tasks
from eolearn.core.utils.fs import get_full_path, pickle_fs, unpickle_fs
from eolearn.core.utils.parallelize import parallelize
from eolearn.features import LinearFunctionTask
from eolearn.io import ExportToTiffTask
from sentinelhub import CRS, MimeType

from eogrow.core.config import RawConfig

from ..core.pipeline import Pipeline
from ..tasks.common import LinearFunctionTask
from ..types import ExecKwargs, PatchList
from ..types import ExecKwargs, Feature, PatchList
from ..utils.eopatch_list import group_by_crs
from ..utils.map import CogifyResamplingOptions, WarpResamplingOptions, cogify_inplace, extract_bands, merge_tiffs
from ..utils.validators import ensure_storage_key_presence
Expand Down Expand Up @@ -176,7 +175,7 @@ def build_workflow(self) -> EOWorkflow:
load_task = LoadTask(
self.storage.get_folder(self.config.input_folder_key),
filesystem=self.storage.filesystem,
features=[self.config.feature],
features=[self.config.feature, FeatureType.BBOX],
)
task_list: list[EOTask] = [load_task]

Expand Down Expand Up @@ -273,10 +272,10 @@ def get_extraction_path(input_path: str, time: dt.datetime) -> str:
def _extract_num_bands_and_timestamps(self, eopatch_name: str) -> tuple[int, list[dt.datetime]]:
"""Loads an eopatch to get information about number of bands and the timestamps."""
path = fs.path.join(self.storage.get_folder(self.config.input_folder_key), eopatch_name)
patch = EOPatch.load(path, [self.config.feature], filesystem=self.storage.filesystem)
patch = EOPatch.load(path, (FeatureType.TIMESTAMPS, self.config.feature), filesystem=self.storage.filesystem)
if self.config.band_indices is not None:
return len(self.config.band_indices), patch.get_timestamps()
return patch[self.config.feature].shape[-1], patch.get_timestamps()
return len(self.config.band_indices), patch.timestamps
return patch[self.config.feature].shape[-1], patch.timestamps

@staticmethod
def _execute_split_jobs(jobs: list[SplitTiffsJob]) -> None:
Expand Down
48 changes: 41 additions & 7 deletions eogrow/pipelines/features.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
OverwritePermission,
SaveTask,
)
from eolearn.core.types import Feature
from eolearn.features import NormalizedDifferenceIndexTask, SimpleFilterTask
from eolearn.features import LinearInterpolationTask, NormalizedDifferenceIndexTask, SimpleFilterTask
from eolearn.mask import JoinMasksTask

from ..core.pipeline import Pipeline
Expand All @@ -30,7 +29,7 @@
ValidDataFractionPredicate,
join_valid_and_cloud_masks,
)
from ..types import PatchList, TimePeriod
from ..types import Feature, FeatureSpec, PatchList, TimePeriod
from ..utils.filter import get_patches_with_missing_features
from ..utils.validators import (
ensure_storage_key_presence,
Expand Down Expand Up @@ -82,6 +81,7 @@ class Schema(Pipeline.Schema):
dtype: Optional[np.dtype] = Field(description="The dtype under which the concatenated features should be saved")
_parse_dtype = optional_field_validator("dtype", parse_dtype, pre=True)
output_feature_name: str = Field(description="Name of output data feature encompassing bands and NDIs")
compress_level: int = Field(1, description="Level of compression used in saving eopatches")

config: Schema

Expand All @@ -92,12 +92,11 @@ def filter_patch_list(self, patch_list: PatchList) -> PatchList:
self.storage.get_folder(self.config.output_folder_key),
patch_list,
self._get_output_features(),
check_timestamps=True,
)

def _get_output_features(self) -> list[Feature]:
def _get_output_features(self) -> list[FeatureSpec]:
"""Lists all features that are to be saved upon the pipeline completion"""
return [(FeatureType.DATA, self.config.output_feature_name)]
return [(FeatureType.DATA, self.config.output_feature_name), FeatureType.BBOX, FeatureType.TIMESTAMPS]

def _get_bands_feature(self) -> Feature:
return FeatureType.DATA, self.config.bands_feature_name
Expand All @@ -123,6 +122,7 @@ def build_workflow(self) -> EOWorkflow:
self.storage.get_folder(self.config.output_folder_key),
filesystem=self.storage.filesystem,
features=self._get_output_features(),
compress_level=self.config.compress_level,
overwrite_permission=OverwritePermission.OVERWRITE_FEATURES,
)
save_node = EONode(save_task, inputs=[postprocessing_node])
Expand Down Expand Up @@ -185,6 +185,40 @@ def get_postprocessing_node(self, previous_node: EONode) -> EONode:
return EONode(merge_task, inputs=[previous_node])


class InterpolationSpecifications(BaseSchema):
time_period: TimePeriod
_parse_time_period = field_validator("time_period", parse_time_period, pre=True)
resampling_period: int


class InterpolationFeaturesPipeline(FeaturesPipeline):
"""A pipeline to calculate and prepare features for ML including interpolation"""

class Schema(FeaturesPipeline.Schema):
interpolation: Optional[InterpolationSpecifications] = Field(
description=(
"Fine-tuning of interpolation parameters. If not set, the interpolation will work on current timestamps"
)
)

config: Schema

def get_temporal_regularization_node(self, previous_node: EONode) -> EONode:
resample_range = None
if self.config.interpolation:
start, end = self.config.interpolation.time_period
start_time, end_time = start.strftime("%Y-%m-%d"), end.strftime("%Y-%m-%d")
resample_range = (start_time, end_time, self.config.interpolation.resampling_period)

interpolation_task = LinearInterpolationTask(
feature=self._get_bands_feature(),
mask_feature=self._get_valid_data_feature(),
resample_range=resample_range,
bounds_error=False,
)
return EONode(interpolation_task, inputs=[previous_node])


class MosaickingSpecifications(BaseSchema):
time_period: TimePeriod
_parse_time_period = field_validator("time_period", parse_time_period, pre=True)
Expand Down Expand Up @@ -243,7 +277,7 @@ def get_temporal_regularization_node(self, previous_node: EONode) -> EONode:
)
mosaicking_node = EONode(mosaicking_task, inputs=[previous_node])
return EONode(
CopyTask(features=[self._get_bands_feature()]),
CopyTask(features=[self._get_bands_feature(), FeatureType.BBOX, FeatureType.TIMESTAMPS]),
inputs=[mosaicking_node],
name="Remove non-mosaicked features",
)
11 changes: 5 additions & 6 deletions eogrow/pipelines/import_tiff.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,14 @@
import numpy as np
from pydantic import Field

from eolearn.core import CreateEOPatchTask, EONode, EOWorkflow, OverwritePermission, SaveTask
from eolearn.core.types import Feature
from eolearn.core import CreateEOPatchTask, EONode, EOWorkflow, FeatureType, OverwritePermission, SaveTask
from eolearn.features.feature_manipulation import SpatialResizeTask
from eolearn.features.utils import ResizeLib, ResizeMethod, ResizeParam
from eolearn.io import ImportFromTiffTask

from ..core.pipeline import Pipeline
from ..core.schemas import BaseSchema
from ..types import PatchList
from ..types import Feature, PatchList
from ..utils.filter import get_patches_with_missing_features
from ..utils.validators import ensure_storage_key_presence, optional_field_validator, parse_dtype

Expand Down Expand Up @@ -70,8 +69,7 @@ def filter_patch_list(self, patch_list: PatchList) -> PatchList:
self.storage.filesystem,
self.storage.get_folder(self.config.output_folder_key),
patch_list,
[self.config.output_feature],
check_timestamps=False,
[self.config.output_feature, FeatureType.BBOX],
)

def build_workflow(self) -> EOWorkflow:
Expand Down Expand Up @@ -107,9 +105,10 @@ def build_workflow(self) -> EOWorkflow:
save_task = SaveTask(
self.storage.get_folder(self.config.output_folder_key),
filesystem=self.storage.filesystem,
compress_level=1,
overwrite_permission=OverwritePermission.OVERWRITE_FEATURES,
config=self.sh_config,
features=[self.config.output_feature],
features=[self.config.output_feature, FeatureType.BBOX],
)
save_node = EONode(save_task, inputs=[resize_node or import_node])

Expand Down
6 changes: 3 additions & 3 deletions eogrow/pipelines/import_vector.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@
from pydantic import Field

from eolearn.core import EONode, EOWorkflow, FeatureType, OverwritePermission, SaveTask
from eolearn.core.types import Feature
from eolearn.io import VectorImportTask

from ..core.pipeline import Pipeline
from ..types import ExecKwargs, PatchList
from ..types import ExecKwargs, Feature, PatchList
from ..utils.validators import ensure_storage_key_presence, field_validator, restrict_types


Expand Down Expand Up @@ -56,7 +55,8 @@ def build_workflow(self) -> EOWorkflow:
save_task = SaveTask(
path=self.storage.get_folder(self.config.output_folder_key),
filesystem=self.storage.filesystem,
features=[self.config.output_feature],
features=[FeatureType.BBOX, self.config.output_feature],
compress_level=1,
overwrite_permission=OverwritePermission.OVERWRITE_FEATURES,
)
save_node = EONode(save_task, inputs=[import_node])
Expand Down
Loading

0 comments on commit 51fed2a

Please sign in to comment.