From 4df35702dbe63cba127e96aad49c50a611f6d3c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=BDiga=20Luk=C5=A1i=C4=8D?= <31988337+zigaLuksic@users.noreply.github.com> Date: Tue, 21 Nov 2023 14:49:02 +0100 Subject: [PATCH] Remove TestPipeline and remove outdated example (#306) * adjust docstrings a bit * replace value errors with type errors where appropriate * remove TestPipeline * switch import format in a file * clean up tests * remove outdated example --- eogrow/cli.py | 17 - eogrow/core/config.py | 18 +- eogrow/core/pipeline.py | 9 +- eogrow/pipelines/export_maps.py | 2 +- eogrow/pipelines/sampling.py | 2 +- eogrow/pipelines/testing.py | 51 +- examples/workshop/bohinj-aoi.geojson | 29 - examples/workshop/cluster.yaml | 97 --- examples/workshop/configs/download.json | 12 - examples/workshop/configs/global_config.json | 32 - .../workshop/configs/water_detection.json | 4 - examples/workshop/workshop.ipynb | 762 ------------------ tests/test_cli.py | 1 - 13 files changed, 15 insertions(+), 1021 deletions(-) delete mode 100644 examples/workshop/bohinj-aoi.geojson delete mode 100644 examples/workshop/cluster.yaml delete mode 100644 examples/workshop/configs/download.json delete mode 100644 examples/workshop/configs/global_config.json delete mode 100644 examples/workshop/configs/water_detection.json delete mode 100644 examples/workshop/workshop.ipynb diff --git a/eogrow/cli.py b/eogrow/cli.py index 4d73a46d..5ff9e7a6 100644 --- a/eogrow/cli.py +++ b/eogrow/cli.py @@ -14,7 +14,6 @@ from .core.config import CrudeConfig, RawConfig, collect_configs_from_path, interpret_config_from_dict from .core.logging import CLUSTER_FILE_LOCATION_ON_HEAD from .core.schemas import build_schema_template -from .pipelines.testing import TestPipeline from .utils.general import jsonify from .utils.meta import collect_schema, import_object, load_pipeline_class from .utils.pipeline_chain import run_pipeline_chain, validate_pipeline_chain @@ -213,22 +212,6 @@ def validate_config(config_path: str) -> None: click.echo("Config validation succeeded!") -@click.command() -@click.argument("config_path", type=click.Path()) -def run_test_pipeline(config_path: str) -> None: - """Runs a test pipeline that only makes sure the managers work correctly. This can be used to select best - area manager parameters. - - \b - Example: - eogrow-test any_pipeline_config.json - """ - for crude_config in collect_configs_from_path(config_path): - raw_config = interpret_config_from_dict(crude_config) - pipeline = TestPipeline.with_defaults(raw_config) - pipeline.run() - - def _prepare_config(config: CrudeConfig, variables: dict[str, str], test_patches: Iterable[int]) -> RawConfig: raw_config = interpret_config_from_dict(config, variables) if test_patches: diff --git a/eogrow/core/config.py b/eogrow/core/config.py index e688a222..715bd409 100644 --- a/eogrow/core/config.py +++ b/eogrow/core/config.py @@ -39,7 +39,7 @@ def collect_configs_from_path(path: str, used_config_paths: set[str] | None = No config = _recursive_config_build(config, used_config_paths) if not isinstance(config, (dict, list)): - raise ValueError(f"When interpreting config from {path} a dictionary or list was expected, got {type(config)}.") + raise TypeError(f"When interpreting config from {path} a dictionary or list was expected, got {type(config)}.") return cast(Union[CrudeConfig, List[CrudeConfig]], config) @@ -57,7 +57,7 @@ def _recursive_config_build(config: object, used_config_paths: set[str]) -> obje for key, value in config.items(): if not isinstance(key, str): - raise ValueError(f"Dictionary keys should always be strings, but found: {key}") + raise TypeError(f"Dictionary keys should always be strings, but found: {key}") if key.startswith("**"): if value in used_config_paths: @@ -89,6 +89,9 @@ def interpret_config_from_dict(config: CrudeConfig, external_variables: dict[str """ _recursive_check_config(config) + if not isinstance(config, dict): + raise TypeError(f"Can only interpret dictionary objects, got {type(config)}.") + config = cast(CrudeConfig, config.copy()) variable_mapping = config.pop("variables", {}) if external_variables: @@ -101,10 +104,6 @@ def interpret_config_from_dict(config: CrudeConfig, external_variables: dict[str config_with_variables = _recursive_apply_to_strings( config, lambda config_str: _resolve_variables(config_str, variable_mapping) ) - if not isinstance(config_with_variables, dict): - raise ValueError( - f"Interpretation resulted in object of type {type(config_with_variables)} but a dictionary was expected." - ) return cast(RawConfig, config_with_variables) @@ -153,14 +152,11 @@ def _recursive_apply_to_strings(config: object, function: Callable) -> object: def _recursive_check_config(config: object) -> None: - """Recursively checks if the config satisfies basic conditions for being JSON serializable. - - :raises: ValueError - """ + """Recursively checks if the config satisfies basic conditions for being JSON serializable.""" if isinstance(config, dict): for key, value in config.items(): if not isinstance(key, str): - raise ValueError(f"Config keys should be strings but {key} found") + raise TypeError(f"Config keys should be strings but {key} found") _recursive_check_config(value) elif isinstance(config, list): diff --git a/eogrow/core/pipeline.py b/eogrow/core/pipeline.py index 2749c44c..93bf92aa 100644 --- a/eogrow/core/pipeline.py +++ b/eogrow/core/pipeline.py @@ -81,7 +81,7 @@ def _new_pipeline_id() -> str: def _load_manager(manager_config: ManagerSchema, **manager_params: Any) -> Any: """Loads a manager class and back-propagates parsed config - :param manager_key: A config key name of a sub-config with manager parameters + :param manager_config: A sub-config with manager parameters :param manager_params: Other parameters to initialize a manager class """ if manager_config.manager is None: @@ -94,7 +94,7 @@ def get_pipeline_execution_name(self, pipeline_timestamp: str) -> str: return f"{pipeline_timestamp}-{self._pipeline_name}-{self.pipeline_id}" def get_patch_list(self) -> PatchList: - """Method which at the initialization prepares the list of EOPatches which will be used""" + """Method that prepares the list of EOPatches for which to run the pipeline execution.""" patch_list = self.area_manager.get_patch_list() if self.config.test_subset is not None: @@ -130,7 +130,7 @@ def get_execution_arguments(self, workflow: EOWorkflow, patch_list: PatchList) - """Prepares execution arguments for each eopatch from a list of patches. The output should be a dictionary of form `{execution_name: {node: node_kwargs}}`. Execution names are usually - names of EOPatches, but can be anything. + names of EOPatches, but can be anything. :param workflow: A workflow for which arguments will be prepared """ @@ -262,8 +262,7 @@ def run_procedure(self) -> tuple[list[str], list[str]]: """ if not hasattr(self, "build_workflow"): raise NotImplementedError( - "Default implementation of the `run_procedure` method requires implementation of the `build_workflow`" - " method." + "Implementation of the `run_procedure` method requires implementation of the `build_workflow` method." ) workflow = self.build_workflow() patch_list = self.get_patch_list() diff --git a/eogrow/pipelines/export_maps.py b/eogrow/pipelines/export_maps.py index c627edc5..e07f65d5 100644 --- a/eogrow/pipelines/export_maps.py +++ b/eogrow/pipelines/export_maps.py @@ -131,7 +131,7 @@ def run_procedure(self) -> tuple[list[str], list[str]]: successful, failed, _ = self.run_execution(workflow, exec_args) if not successful: - raise ValueError("Failed to extract tiff files from any of EOPatches.") + raise RuntimeError("Failed to extract tiff files from any of EOPatches.") feature_type, _ = self.config.feature output_folder = self.storage.get_folder(self.config.output_folder_key) diff --git a/eogrow/pipelines/sampling.py b/eogrow/pipelines/sampling.py index 57e4ff8e..2d744784 100644 --- a/eogrow/pipelines/sampling.py +++ b/eogrow/pipelines/sampling.py @@ -92,7 +92,7 @@ def _get_loading_node(self) -> EONode: feature_type = FeatureType(feature_type_str) if not feature_type.is_spatial(): - raise ValueError(f"Only spatial features can be sampled, but found {feature_type}: {feature_names}") + raise TypeError(f"Only spatial features can be sampled, but found {feature_type}: {feature_names}") for feature_name in feature_names: load_features.append((feature_type, feature_name)) # noqa: PERF401 diff --git a/eogrow/pipelines/testing.py b/eogrow/pipelines/testing.py index 76b19248..dd4c82c8 100644 --- a/eogrow/pipelines/testing.py +++ b/eogrow/pipelines/testing.py @@ -2,8 +2,7 @@ from __future__ import annotations -import logging -from typing import List, Literal, Optional, Tuple, TypeVar, Union +from typing import List, Literal, Optional, Tuple, Union import numpy as np from pydantic import Field @@ -11,58 +10,12 @@ from eolearn.core import CreateEOPatchTask, EONode, EOWorkflow, OverwritePermission, SaveTask from eolearn.core.types import Feature -from ..core.config import RawConfig, recursive_config_join from ..core.pipeline import Pipeline from ..core.schemas import BaseSchema -from ..tasks.testing import ( - GenerateRasterFeatureTask, - GenerateTimestampsTask, - NormalDistribution, - UniformDistribution, -) +from ..tasks.testing import GenerateRasterFeatureTask, GenerateTimestampsTask, NormalDistribution, UniformDistribution from ..types import ExecKwargs, PatchList, TimePeriod from ..utils.validators import ensure_storage_key_presence, field_validator, parse_dtype, parse_time_period -Self = TypeVar("Self", bound="TestPipeline") -LOGGER = logging.getLogger(__name__) - - -class TestPipeline(Pipeline): - """Pipeline that just tests if all managers works correctly. It can be used to check if area manager creates a - correct grid. - """ - - class Schema(Pipeline.Schema): - class Config: - extra = "allow" - - _DEFAULT_CONFIG_PARAMS = { # noqa: RUF012 - "pipeline": "eogrow.pipelines.testing.TestPipeline", - "logging": {"manager": "eogrow.logging.LoggingManager", "show_logs": True}, - } - - @classmethod - def with_defaults(cls: type[Self], config: RawConfig) -> Self: - config = recursive_config_join(config, cls._DEFAULT_CONFIG_PARAMS) # type: ignore[assignment] - return cls.from_raw_config(config) - - def run_procedure(self) -> tuple[list, list]: - """Performs basic tests of managers""" - if self.storage.filesystem.exists("/"): - LOGGER.info("Project folder %s exists", self.storage.config.project_folder) - else: - LOGGER.info("Project folder %s does not exist", self.storage.config.project_folder) - - self.area_manager.get_area_geometry() - grid = self.area_manager.get_grid() - num_patches = sum(map(len, grid.values())) - LOGGER.info("Grid has %d EOPatches and is split over %d CRS zones", num_patches, len(grid)) - - patch_list = self.area_manager.get_patch_list() - LOGGER.info("The first EOPatch has a name %s", patch_list[0][0]) - - return [], [] - class UniformDistributionSchema(BaseSchema): kind: Literal["uniform"] diff --git a/examples/workshop/bohinj-aoi.geojson b/examples/workshop/bohinj-aoi.geojson deleted file mode 100644 index fe169fc7..00000000 --- a/examples/workshop/bohinj-aoi.geojson +++ /dev/null @@ -1,29 +0,0 @@ -{ - "type": "FeatureCollection", - "name": "aoi", - "crs": { - "type": "name", - "properties": { "name": "urn:ogc:def:crs:OGC:1.3:CRS84" } - }, - "features": [ - { - "type": "Feature", - "properties": { "fid": 0 }, - "geometry": { - "type": "MultiPolygon", - "coordinates": [ - [ - [ - [13.834332897646069, 46.302927445541421], - [13.806601314318312, 46.282448248973182], - [13.846812110143556, 46.26567595392445], - [13.903835178361259, 46.269390261729221], - [13.887196228364607, 46.300173379452758], - [13.834332897646069, 46.302927445541421] - ] - ] - ] - } - } - ] -} diff --git a/examples/workshop/cluster.yaml b/examples/workshop/cluster.yaml deleted file mode 100644 index 6f54116c..00000000 --- a/examples/workshop/cluster.yaml +++ /dev/null @@ -1,97 +0,0 @@ -# A configuration of ray cluster for GEM project -# For info about parameters check https://docs.ray.io/en/latest/cluster/config.html#full-configuration - -cluster_name: workshop-cluster - -max_workers: 4 # Max number of worker instances -upscaling_speed: 1.0 -idle_timeout_minutes: 5 - -docker: - image: ".dkr.ecr.eu-central-1.amazonaws.com/" # Edit this! - container_name: "gem_container" - pull_before_run: True - run_options: - - --privileged # Because of s3fs-fuse - -provider: - type: aws - region: eu-central-1 - availability_zone: eu-central-1a,eu-central-1b,eu-central-1c - cache_stopped_nodes: False # Change for terminating instances - -auth: - ssh_user: ubuntu - -available_node_types: - ray.head: - min_workers: 0 - max_workers: 0 - node_config: - InstanceType: m5.xlarge - ImageId: ami- # Edit this! - BlockDeviceMappings: - - DeviceName: /dev/sda1 - Ebs: - VolumeSize: 20 - resources: {"CPU": 1} - ray.worker: - min_workers: 0 - max_workers: 4 # Max number of workers of this type - node_config: - InstanceType: m5.xlarge - ImageId: ami- # Edit this! - InstanceMarketOptions: - MarketType: spot # always try using spot because it is cheaper - BlockDeviceMappings: - - DeviceName: /dev/sda1 - Ebs: - VolumeSize: 20 -# resources: {"CPU": 1} - -head_node_type: ray.head - -file_mounts: {} -cluster_synced_files: [] -file_mounts_sync_continuously: False -rsync_exclude: - - "**/.git" - - "**/.git/**" -rsync_filter: - - ".gitignore" - -initialization_commands: - - aws ecr get-login-password | docker login --username AWS --password-stdin .dkr.ecr.eu-central-1.amazonaws.com - -setup_commands: - # Set credentials here: - - aws --profile workshop configure set aws_access_key_id - - aws --profile workshop configure set aws_secret_access_key - - aws --profile workshop configure set region eu-central-1 - # Mounting an S3 bucket (useful just for prototyping): - - cat .aws/credentials | grep -m 2 access | awk '{print $3}' | xargs | sed 's/ /:/g' > ~/.passwd-s3fs - - chmod 600 ~/.passwd-s3fs - - s3fs eogrow-workshop ~/data -o umask=0000 | true - - - git -C packages/sentinelhub-py pull - - git -C packages/eo-learn pull - - git -C packages/eo-grow pull - # This is temporal: - - git -C packages/eo-grow checkout -b | true - - git -C packages/eo-grow pull origin - -head_setup_commands: - - pip install jupyter - -worker_setup_commands: [] - -head_start_ray_commands: - - ray stop - - ulimit -n 65536; ray start --head --port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml - -worker_start_ray_commands: - - ray stop - - ulimit -n 65536; ray start --address=$RAY_HEAD_IP:6379 --object-manager-port=8076 - -head_node: {} -worker_nodes: {} diff --git a/examples/workshop/configs/download.json b/examples/workshop/configs/download.json deleted file mode 100644 index b2547dc3..00000000 --- a/examples/workshop/configs/download.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "pipeline": "eogrow.pipelines.download.DownloadPipeline", - "**global_config": "${config_path}/global_config.json", - "output_folder_key": "data", - "time_period": ["${var:month}-01", "${var:next_month}-01"], - "data_collection": "SENTINEL2_L1C", - "resolution": 10, - "bands_feature_name": "BANDS", - "maxcc": 1.0, - "time_difference": 120, - "threads_per_worker": 5 -} diff --git a/examples/workshop/configs/global_config.json b/examples/workshop/configs/global_config.json deleted file mode 100644 index f0e10adc..00000000 --- a/examples/workshop/configs/global_config.json +++ /dev/null @@ -1,32 +0,0 @@ -{ - "variables": { - "month": "2021-06", - "next_month": "2021-07" - }, - "storage": { - "manager": "eogrow.core.storage.StorageManager", - "project_folder": "s3://eogrow-workshop/project/", - "aws_profile": "workshop", - "structure": { - "data": "data/${var:month}", - "results": "results/${var:month}", - "vector_results": "vector-results/${var:month}" - } - }, - "area": { - "manager": "eogrow.core.area.UtmZoneAreaManager", - "area_filename": "bohinj-aoi.geojson", - "area_buffer": 0.01, - "patch_size_x": 2500, - "patch_size_y": 2500 - }, - "eopatch": { - "manager": "eogrow.core.eopatch.EOPatchManager" - }, - "logging": { - "manager": "eogrow.core.logging.LoggingManager", - "save_logs": true, - "show_logs": true - }, - "use_ray": "auto" -} diff --git a/examples/workshop/configs/water_detection.json b/examples/workshop/configs/water_detection.json deleted file mode 100644 index d74898a2..00000000 --- a/examples/workshop/configs/water_detection.json +++ /dev/null @@ -1,4 +0,0 @@ -{ - "**global_config": "${config_path}/global_config.json", - "threshold": 0.1 -} diff --git a/examples/workshop/workshop.ipynb b/examples/workshop/workshop.ipynb deleted file mode 100644 index 95b42f39..00000000 --- a/examples/workshop/workshop.ipynb +++ /dev/null @@ -1,762 +0,0 @@ -{ - "cells": [ - { - "attachments": {}, - "cell_type": "markdown", - "id": "mobile-manor", - "metadata": {}, - "source": [ - "# `eo-grow` Workshop\n", - "\n", - "`eo-grow` is a framework for large-scale processing of EO data. In this workshop we'll learn:\n", - "\n", - "- how to run an `eo-grow` pipeline,\n", - "- how to scale up a pipeline,\n", - "- how to write a new pipeline.\n", - "\n", - "The framework can run:\n", - "\n", - "- completely locally on a laptop,\n", - "- local processing with data storage on S3\n", - " * use only for small data transfers!\n", - "- processing on EC2 instances with data storage on S3.\n", - "\n", - "For this workshop we'll use 2nd and 3rd option.\n", - "\n", - "\n", - "## 0. Prerequisites\n", - "\n", - "The package requires Python version `>=3.8`. You can choose between:\n", - "\n", - "- installing `eo-grow` from PyPI:\n", - " \n", - " ```\n", - " pip install eo-grow\n", - " ```\n", - "\n", - "- or installing `eo-grow` from the current branch with:\n", - "\n", - " ```\n", - " pip install -e .\n", - " ```\n", - " \n", - "This workshop also requires an access to an AWS S3 bucket with data:\n", - "\n", - "```\n", - "aws configure --profile workshop\n", - "```\n", - "\n", - "Additionally you have to set `sentinelhub-py` OAuth credentials.\n", - "\n", - " \n", - "## 1. How to use `eo-grow`?\n", - "\n", - "The core `eo-grow` structure looks like this:\n", - "\n", - "![](../eo-grow.png)\n", - "\n", - "- A `Pipeline` obtains configuration parameters and uses managers as helpers.\n", - "- Configuration parameters can be read from JSON files or Python dictionaries. They are parsed with a special [config language](../config-language.md) and wrapped with an object specific `Schema` class.\n", - "- Storage structure and credentials are handled by a `StorageManager`.\n", - "- AOI is buffered and split into a tiling grid with different implementations of `AreaManager`.\n", - "- EOPatch naming conventions are defined in an `EOPatchManager`.\n", - "- Logging is controlled with a `LoggingManager`.\n", - "\n", - "Pipeline and manager classes all inherit from a base `EOGrowObject` and are similar in a ways that:\n", - "\n", - "- they all contain their own `Schema` class that defines which config parameters they use,\n", - "- they are all meant to be inherited and customized for any use case.\n", - "\n", - "\n", - "The most basic procedure of using `eo-grow` is:\n", - "\n", - "1. set up a project folder for storage,\n", - "2. implement a new pipeline or use one of the basic pipelines in `eogrow.pipelines`,\n", - "3. prepare a config file,\n", - "4. run a pipeline.\n", - "\n", - "### Exercise 1\n", - "\n", - "- As a storage we will use a project folder in an AWS S3 bucket `s3://eogrow-workshop/project/`.\n", - "\n", - "- We will run a basic download pipeline (`eogrow.pipelines.download.DownloadPipeline`) for AOI defined in a file `s3://eogrow-workshop/project/input-data/bohinj_aoi.geojson`.\n", - "\n", - "- We will buffer AOI by `0.01` and split AOI into a UTM grid with a patch size `250x250` pixels on `10m` resolution.\n", - "\n", - "\n", - "For now we will only use CLI commands to run the pipeline. `eo-grow` offers the following commands:\n", - "\n", - "- `eogrow` - run a pipeline\n", - "- `eogrow-template` - create a template config for a\n", - "- `eogrow-validate` - validate a pipeline config\n", - "- `eogrow-test` - test managers on a dummy pipeline\n", - "- `eogrow-ray` - run a pipeline on a cluster\n", - "\n", - "Note: names of these commands are defined in `setup.py`.\n", - "\n", - "A command `eogrow-template` can help us write a config file. Let's check what templates we get for different objects:" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "wooden-school", - "metadata": {}, - "outputs": [], - "source": [ - "!eogrow-template eogrow.pipelines.download.DownloadPipeline\n", - "# !eogrow-template eogrow.pipelines.download.DownloadPipeline download_template_openapi.json -f\n", - "\n", - "# !eogrow-template eogrow.core.storage.StorageManager\n", - "# !eogrow-template eogrow.core.area.UtmZoneAreaManager\n", - "# !eogrow-template eogrow.core.eopatch.EOPatchManager\n", - "# !eogrow-template eogrow.core.logging.LoggingManager" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "id": "revised-ethnic", - "metadata": {}, - "source": [ - "We can use config language to:\n", - "\n", - "- split config parameters into multiple files,\n", - "- avoid parameter duplications,\n", - "- reference:\n", - " * relative file paths,\n", - " * package import paths,\n", - " * environmental variables" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "id": "quarterly-collector", - "metadata": {}, - "source": [ - "If we would like to just check if the config file contains correct parameters without running a pipeline we can do that with:" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "anticipated-charleston", - "metadata": {}, - "outputs": [], - "source": [ - "!eogrow-validate configs/download.json" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "id": "infinite-clinton", - "metadata": {}, - "source": [ - "Before we run the pipeline let's check if all managers are working correctly:" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "posted-meeting", - "metadata": {}, - "outputs": [], - "source": [ - "!eogrow-test configs/download.json" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "id": "prescribed-freedom", - "metadata": {}, - "source": [ - "This ran a simple `TestPipeline` that only checked all managers. The pipeline produced\n", - "\n", - "- logs\n", - "- cached area manager buffered shape and grid\n", - "\n", - "Let's download cached data:" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "liable-wedding", - "metadata": {}, - "outputs": [], - "source": [ - "!aws s3 sync s3://eogrow-workshop/project/cache/ ./cache --profile workshop" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "id": "great-finger", - "metadata": {}, - "source": [ - "To test if the download pipeline will produce correct results we can first run it for a single patch in the grid:" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "antique-theme", - "metadata": {}, - "outputs": [], - "source": [ - "!eogrow configs/download.json -t 0" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "id": "continuing-investigation", - "metadata": {}, - "source": [ - "Now we are ready to run it for the entire grid with a command:\n", - "\n", - "```\n", - "eogrow download.json\n", - "```\n", - "\n", - "But before we do this, let's switch to a Ray cluster." - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "id": "continued-bearing", - "metadata": {}, - "source": [ - "## 2. How to scale up?\n", - "\n", - "In `eo-grow` parallelization can be achieved with:\n", - "\n", - "- multiprocessing on a single machine (for simple use cases),\n", - "- Ray parallelization on:\n", - " * a single machine\n", - " * a **cluster of AWS EC2 instances**.\n", - "\n", - "Ray cluster can be fully configured with a single YAML file as described in [Ray documentation](https://docs.ray.io/en/latest/cluster/config.html).\n", - "\n", - "Once we prepared the YAML file we can spawn a ray cluster:\n", - "\n", - "```bash\n", - "ray up cluster.yaml -y\n", - "```" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "id": "collaborative-invitation", - "metadata": {}, - "source": [ - "We can attach to it with:\n", - "\n", - "```bash\n", - "ray attach cluster.yaml\n", - "```" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "id": "recent-prairie", - "metadata": {}, - "source": [ - "We can upload any local files to the cluster.\n", - "\n", - "```bash\n", - "ray rsync_up cluster.yaml '/local/path' '/full/absolute/path/on/cluster'\n", - "```" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "id": "desperate-amount", - "metadata": {}, - "source": [ - "Note: Alternativelly, we could commit local files and let the cluster pull them from a git repository." - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "id": "convenient-gibson", - "metadata": {}, - "source": [ - "On a cluster we can then simply run the pipeline with:\n", - " \n", - "```bash\n", - "eogrow download.json\n", - "```" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "id": "rough-intake", - "metadata": {}, - "source": [ - "An even easier option is simply run a pipeline on a cluster using your local config to a cluster with a command:\n", - "\n", - "```bash\n", - "eogrow-ray cluster.yaml configs/download.json\n", - "```\n", - "\n", - "This command also has a few useful optional flags:" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "painful-montana", - "metadata": {}, - "outputs": [], - "source": [ - "!eogrow-ray --help" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "id": "formal-laundry", - "metadata": {}, - "source": [ - "Cluster CPU and memory usage can be monitored from a Ray dashboard. We can connect to it with:\n", - "\n", - "```bash\n", - "ray dashboard cluster.yaml\n", - "```\n", - "\n", - "The dashboard will become available at `localhost:8265`." - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "id": "published-florence", - "metadata": {}, - "source": [ - "When we are done processing, let's make sure that we shut down the cluster:\n", - "\n", - "```bash\n", - "ray down cluster.yaml -y\n", - "```" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "id": "excessive-programming", - "metadata": {}, - "source": [ - "## 3. How to implement a new pipeline?\n", - "\n", - "Let's start from a typical workflow, which can be created in a prototype phase. The following workflow performs a simple water detection algorithm on a stack of data that we downloaded:" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "strong-emerald", - "metadata": { - "scrolled": true - }, - "outputs": [], - "source": [ - "%matplotlib inline\n", - "\n", - "import matplotlib.pyplot as plt\n", - "import numpy as np\n", - "\n", - "from eolearn.core import (\n", - " EOWorkflow,\n", - " FeatureType,\n", - " LoadTask,\n", - " MapFeatureTask,\n", - " OutputTask,\n", - " OverwritePermission,\n", - " SaveTask,\n", - " linearly_connect_tasks,\n", - ")\n", - "from eolearn.core.utils.fs import get_aws_credentials\n", - "from eolearn.features import NormalizedDifferenceIndexTask\n", - "\n", - "config = get_aws_credentials(aws_profile=\"workshop\")\n", - "\n", - "bands_feature = FeatureType.DATA, \"BANDS\"\n", - "ndwi_feature = FeatureType.DATA, \"NDWI\"\n", - "water_feature = FeatureType.MASK_TIMELESS, \"WATER\"\n", - "\n", - "load_task = LoadTask(\"s3://eogrow-workshop/project/data/2021-06/\", config=config)\n", - "\n", - "ndwi_task = NormalizedDifferenceIndexTask(bands_feature, ndwi_feature, bands=[2, 7])\n", - "\n", - "\n", - "class ThresholdWater(MapFeatureTask):\n", - " def map_method(self, ndwi, threshold):\n", - " max_ndwi = np.max(ndwi, axis=0)\n", - " return max_ndwi > threshold\n", - "\n", - "\n", - "threshold_task = ThresholdWater(ndwi_feature, water_feature, threshold=0.1)\n", - "\n", - "output_task = OutputTask(name=\"result_eop\")\n", - "\n", - "nodes = linearly_connect_tasks(load_task, ndwi_task, threshold_task, output_task)\n", - "workflow = EOWorkflow(nodes)\n", - "\n", - "workflow_results = workflow.execute({nodes[0]: {\"eopatch_folder\": \"eopatch-id-08-col-3-row-1\"}})\n", - "\n", - "eop = workflow_results.outputs[\"result_eop\"]\n", - "\n", - "eop" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "blond-friendly", - "metadata": {}, - "outputs": [], - "source": [ - "ndwi = eop[ndwi_feature]\n", - "\n", - "fig, axes = plt.subplots(nrows=3, ncols=4, figsize=(20, 15))\n", - "for index in range(12):\n", - " ax = axes[index // 4][index % 4]\n", - " ax.imshow(ndwi[index, ...], vmin=0.1, vmax=0.5)\n", - " ax.set_xticks([])\n", - " ax.set_yticks([])\n", - "\n", - "fig.subplots_adjust(wspace=0, hspace=0);" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "compressed-peeing", - "metadata": {}, - "outputs": [], - "source": [ - "fig, ax = plt.subplots(nrows=1, ncols=1, figsize=(10, 10))\n", - "\n", - "water = eop[water_feature]\n", - "\n", - "ax.imshow(water)\n", - "ax.set_xticks([])\n", - "ax.set_yticks([]);" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "id": "appreciated-minutes", - "metadata": {}, - "source": [ - "Now let's put this process into a pipeline. The minimum that we have to do is:\n", - "\n", - "- Create a class that inherits from `Pipeline` class.\n", - "- In case you want to have custom config parameters, add `Schema` subclass that inherits from `Pipeline.Schema`.\n", - "- Implement `build_workflow` method." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "impossible-overall", - "metadata": {}, - "outputs": [], - "source": [ - "from eogrow.core.pipeline import Pipeline\n", - "\n", - "\n", - "class WaterDetectionPipeline(Pipeline):\n", - " class Schema(Pipeline.Schema):\n", - " threshold: float\n", - "\n", - " def build_workflow(self):\n", - " bands_feature = FeatureType.DATA, \"BANDS\"\n", - " ndwi_feature = FeatureType.DATA, \"NDWI\"\n", - " water_feature = FeatureType.MASK_TIMELESS, \"WATER\"\n", - "\n", - " load_task = LoadTask(self.storage.get_folder(\"data\", full_path=True), config=self.sh_config)\n", - "\n", - " ndwi_task = NormalizedDifferenceIndexTask(bands_feature, ndwi_feature, bands=[2, 7])\n", - "\n", - " threshold_task = ThresholdWater(ndwi_feature, water_feature, threshold=self.config.threshold)\n", - "\n", - " save_task = SaveTask(\n", - " self.storage.get_folder(\"results\", full_path=True),\n", - " features=[water_feature, FeatureType.BBOX],\n", - " compress_level=1,\n", - " overwrite_permission=OverwritePermission.OVERWRITE_FEATURES,\n", - " config=self.sh_config,\n", - " )\n", - "\n", - " nodes = linearly_connect_tasks(load_task, ndwi_task, threshold_task, save_task)\n", - " return EOWorkflow(nodes)" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "id": "played-command", - "metadata": {}, - "source": [ - "This time we cannot run `WaterPipeline` with CLI because the pipeline is implemented in a notebook and we cannot reference its import path. But we can run it from Python. Let's create a config for it. " - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "crazy-rehabilitation", - "metadata": {}, - "outputs": [], - "source": [ - "from eogrow.core.config import interpret_config_from_path\n", - "\n", - "config = interpret_config_from_path(\"./configs/water_detection.json\")\n", - "config" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "id": "historical-client", - "metadata": {}, - "source": [ - "Let's initialize the pipeline and check some of its basic functionalities:" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "suitable-advance", - "metadata": {}, - "outputs": [], - "source": [ - "pipeline = WaterDetectionPipeline.from_raw_config(config)\n", - "\n", - "pipeline\n", - "\n", - "# pipeline.config\n", - "# pipeline.sh_config\n", - "\n", - "# pipeline.storage\n", - "# pipeline.storage.filesystem\n", - "# pipeline.storage.get_folder('data')\n", - "\n", - "# pipeline.area_manager\n", - "# pipeline.area_manager.get_grid()[0]\n", - "\n", - "# pipeline.eopatch_manager\n", - "# pipeline.eopatch_manager.get_eopatch_filenames()\n", - "# pipeline.patch_list\n", - "\n", - "# pipeline.logging_manager\n", - "# pipeline.logging_manager.get_pipeline_logs_folder('pipeline-name')\n", - "# pipeline.get_pipeline_execution_name('2021-10-19')" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "id": "absolute-richardson", - "metadata": {}, - "source": [ - "During `Pipeline` class initialization only config is validated and parsed according to schema and managers are initialized. No computation is done yet. Let's run the pipeline for a single `EOPatch`:" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "orange-camel", - "metadata": {}, - "outputs": [], - "source": [ - "config = interpret_config_from_path(\"./configs/water_detection.json\")\n", - "\n", - "config[\"patch_list\"] = [8] # References EOPatch 'eopatch-id-08-col-3-row-1'\n", - "\n", - "pipeline = WaterDetectionPipeline.from_raw_config(config)\n", - "\n", - "pipeline.run()" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "id": "proper-stick", - "metadata": {}, - "source": [ - "Before we run the pipeline for all EOPatches let's write another pipeline. This one will not be limited by `EOWorkflow` execution. After all, a pipeline can implement any process!\n", - "\n", - "In this example we will create a pipeline that vectorizes water masks, joins vectors from all EOPatches and saves them into a single file." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "abstract-collection", - "metadata": {}, - "outputs": [], - "source": [ - "from eolearn.geometry import RasterToVectorTask\n", - "\n", - "r2v_task = RasterToVectorTask(water_feature, values=[1], raster_dtype=np.uint8)\n", - "\n", - "eop = r2v_task.execute(eop)\n", - "\n", - "eop.vector_timeless[\"WATER\"].plot();" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "id": "adverse-armor", - "metadata": {}, - "source": [ - "This time we also have to implement `run_procedure` method. This is the main method that is triggered by `Pipeline.run` and its default implementation only runs an EOWorkflow." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "baking-introduction", - "metadata": {}, - "outputs": [], - "source": [ - "import logging\n", - "\n", - "import fs\n", - "\n", - "from eogrow.core.pipeline import Pipeline\n", - "from eogrow.utils.fs import LocalFile\n", - "from eogrow.utils.vector import concat_gdf\n", - "\n", - "LOGGER = logging.getLogger(__name__)\n", - "\n", - "\n", - "class WaterExportPipeline(Pipeline):\n", - " water_feature = FeatureType.MASK_TIMELESS, \"WATER\"\n", - " vector_water_feature = FeatureType.VECTOR_TIMELESS, \"WATER\"\n", - "\n", - " def run_procedure(self):\n", - " workflow = self.build_workflow()\n", - " exec_args = self.get_execution_arguments(workflow)\n", - "\n", - " successful, failed, execution_results = self.run_execution(workflow, exec_args)\n", - "\n", - " gdf_list = []\n", - " for result in execution_results:\n", - " eopatch = result.outputs.get(\"water-vectors\")\n", - " if not eopatch:\n", - " continue\n", - "\n", - " gdf_list.append(eopatch[self.vector_water_feature])\n", - "\n", - " if not gdf_list:\n", - " return successful, failed\n", - "\n", - " LOGGER.info(\"Preparing joined vector dataset\")\n", - " joined_gdf = concat_gdf(gdf_list) # This assumes all dataframes are in the same CRS!\n", - "\n", - " path = fs.path.combine(self.storage.get_folder(\"vector_results\"), \"water-vectors.gpkg\")\n", - " with LocalFile(path, mode=\"w\", filesystem=self.storage.filesystem) as local_file:\n", - " joined_gdf.to_file(local_file.path, driver=\"GPKG\", encoding=\"utf-8\")\n", - " LOGGER.info(\"Saved stats to %s\", path)\n", - "\n", - " return successful, failed\n", - "\n", - " def build_workflow(self):\n", - " load_task = LoadTask(\n", - " self.storage.get_folder(\"results\", full_path=True), lazy_loading=True, config=self.sh_config\n", - " )\n", - "\n", - " r2v_task = RasterToVectorTask(self.water_feature, values=[1], raster_dtype=np.uint8)\n", - "\n", - " output_task = OutputTask(name=\"water-vectors\", features=[self.vector_water_feature])\n", - "\n", - " nodes = linearly_connect_tasks(load_task, r2v_task, output_task)\n", - " return EOWorkflow(nodes)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "executed-stream", - "metadata": {}, - "outputs": [], - "source": [ - "# In our implementation the pipeline doesn't need any additional parameters\n", - "config = interpret_config_from_path(\"./configs/global_config.json\")\n", - "\n", - "config.patch_list = [8] # References EOPatch 'eopatch-id-08-col-3-row-1'\n", - "\n", - "pipeline = WaterExportPipeline.from_raw_config(config)\n", - "\n", - "pipeline.run()" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "id": "acceptable-investigator", - "metadata": {}, - "source": [ - "Finally, let's run these new pipelines on a cluster. We can do this by uploading files to the Ray head node, starting Jupyter and run the notebook. We also create configs folder on the head done becuase it doesn't exist yet.\n", - "\n", - "```bash\n", - "ray rsync_up cluster.yaml eogrow-workshop.ipynb /home/ray/eogrow-workshop.ipynb\n", - "\n", - "ray exec cluster.yaml 'mkdir configs'\n", - "ray rsync_up cluster.yaml configs/global_config.json /home/ray/configs/global_config.json\n", - "ray rsync_up cluster.yaml configs/water_detection.json /home/ray/configs/water_detection.json\n", - "```" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "id": "individual-fisher", - "metadata": {}, - "source": [ - "Jupyter can be started with the following command:\n", - "\n", - "```bash\n", - "ray exec cluster.yaml --port-forward=8889 'docker exec -it gem_container /bin/bash -c \"jupyter notebook --port=8889\"'\n", - "```\n", - "\n", - "Then go to `localhost:8889` and run the relevant cells in the notebook copy." - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3 (ipykernel)", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.8.10" - } - }, - "nbformat": 4, - "nbformat_minor": 5 -} diff --git a/tests/test_cli.py b/tests/test_cli.py index 221c81d9..a95ad636 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -14,7 +14,6 @@ "eogrow-ray", "eogrow-template", "eogrow-validate", - "eogrow-test", ], ) def test_help(command):