Skip to content

Commit

Permalink
Better chain configs (#304)
Browse files Browse the repository at this point in the history
* change config collection API

* separate pipeline chain execution

* add remote execution capabilities

* move file

* make mypy happy

* Update eogrow/utils/pipeline_chain.py

Co-authored-by: Matic Lubej <[email protected]>

* rename parameters

* streamline names

* add simple tests for pipeline chain

* reenable test suite (hopefully)

* update docs

* add default description to docs

---------

Co-authored-by: Matic Lubej <[email protected]>
  • Loading branch information
zigaLuksic and mlubej authored Nov 21, 2023
1 parent 9ff076a commit 9ebcbb3
Show file tree
Hide file tree
Showing 10 changed files with 199 additions and 66 deletions.
6 changes: 4 additions & 2 deletions .github/workflows/ci_action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,10 @@ jobs:
pip install -e .[DEV,ML]
pip install gdal==$(gdal-config --version)
- name: Set up local cluster
run: ray start --head
- name: Set up local cluster # we need to install async-timeout until ray 2.9.0 fixes the issue
run: |
pip install async-timeout
ray start --head
- name: Run fast tests
if: ${{ !matrix.full_test_suite }}
Expand Down
23 changes: 16 additions & 7 deletions docs/source/config-language.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,28 @@ Additional notes:

### Pipeline chains

A typical configuration is a dictionary with pipeline parameters. However, it can also be a list of dictionaries. In this case each dictionary must contain parameters of a single pipeline. The order of dictionaries defines the consecutive order in which pipelines will be run. Example:
A typical configuration is a dictionary with pipeline parameters. However, it can also be a list of pipeline-execution dictionaries that specify:
- `pipeline_config`: a configuration for a single pipeline,
- `pipeline_resources` (optional): a dictionary that is passed to `ray.remote` to configure which resources the main pipeline process will request from the cluster (see [here](https://docs.ray.io/en/latest/ray-core/api/doc/ray.remote_function.RemoteFunction.options.html) for options). The pipeline requests 1 CPU by default (and nothing else).

The order of dictionaries defines the consecutive order in which pipelines will be run. Example:

```
[
{
"pipeline": "FirstPipeline",
"param1": "value1",
...
"pipeline_config": {
"pipeline": "FirstPipeline",
"param1": "value1",
...
},
},
{
"pipeline": "SecondPipeline",
"param2": "value2",
...
"pipeline_config": {
"pipeline": "SecondPipeline",
"param2": "value2",
...
},
"pipeline_resources": {"num_cpus": 2}
},
...
]
Expand Down
56 changes: 33 additions & 23 deletions eogrow/cli.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
"""Implements the command line interface for `eo-grow`."""

from __future__ import annotations

import json
import os
import re
import subprocess
from tempfile import NamedTemporaryFile
from typing import Optional, Tuple
from typing import Iterable

import click

from .core.config import collect_configs_from_path, interpret_config_from_dict
from .core.config import CrudeConfig, RawConfig, collect_configs_from_path, interpret_config_from_dict
from .core.logging import CLUSTER_FILE_LOCATION_ON_HEAD
from .core.schemas import build_schema_template
from .pipelines.testing import TestPipeline
from .utils.general import jsonify
from .utils.meta import collect_schema, import_object, load_pipeline_class
from .utils.pipeline_chain import run_pipeline_chain, validate_pipeline_chain
from .utils.ray import generate_cluster_config_path, start_cluster_if_needed

variables_option = click.option(
Expand Down Expand Up @@ -42,28 +45,27 @@
@click.argument("config_path", type=click.Path())
@variables_option
@test_patches_option
def run_pipeline(config_path: str, cli_variables: Tuple[str, ...], test_patches: Tuple[int, ...]) -> None:
def run_pipeline(config_path: str, cli_variables: tuple[str, ...], test_patches: tuple[int, ...]) -> None:
"""Execute eo-grow pipeline using CLI.
\b
Example:
eogrow config_files/config.json
"""

raw_configs = collect_configs_from_path(config_path)
crude_config = collect_configs_from_path(config_path)
cli_variable_mapping = dict(_parse_cli_variable(cli_var) for cli_var in cli_variables)

pipelines = []
for raw_config in raw_configs:
config = interpret_config_from_dict(raw_config, cli_variable_mapping)
if test_patches:
config["test_subset"] = list(test_patches)

pipelines.append(load_pipeline_class(config).from_raw_config(config))

for pipeline in pipelines:
if isinstance(crude_config, dict):
config = _prepare_config(crude_config, cli_variable_mapping, test_patches)
pipeline = load_pipeline_class(config).from_raw_config(config)
pipeline.run()

else:
pipeline_chain = [_prepare_config(config, cli_variable_mapping, test_patches) for config in crude_config]
validate_pipeline_chain(pipeline_chain)
run_pipeline_chain(pipeline_chain)


@click.command()
@click.argument("cluster_yaml", type=click.Path())
Expand All @@ -85,8 +87,8 @@ def run_pipeline_on_cluster(
cluster_yaml: str,
start_cluster: bool,
use_tmux: bool,
cli_variables: Tuple[str, ...],
test_patches: Tuple[int, ...],
cli_variables: tuple[str, ...],
test_patches: tuple[int, ...],
) -> None:
"""Command for running an eo-grow pipeline on a remote Ray cluster of AWS EC2 instances. The provided config is
fully constructed and uploaded to the cluster head in the `~/.synced_configs/` directory, where it is then
Expand All @@ -99,11 +101,9 @@ def run_pipeline_on_cluster(
if start_cluster:
start_cluster_if_needed(cluster_yaml)

raw_configs = [interpret_config_from_dict(config) for config in collect_configs_from_path(config_path)]
remote_path = generate_cluster_config_path(config_path)

with NamedTemporaryFile(mode="w", delete=True, suffix=".json") as local_path:
json.dump(raw_configs, local_path)
json.dump(collect_configs_from_path(config_path), local_path)
local_path.flush() # without this the sync can happen before the file content is written

subprocess.run(f"ray rsync_up {cluster_yaml} {local_path.name!r} {remote_path!r}", shell=True)
Expand Down Expand Up @@ -156,7 +156,7 @@ def run_pipeline_on_cluster(
)
def make_template(
import_path: str,
template_path: Optional[str],
template_path: str | None,
force_override: bool,
template_format: str,
required_only: bool,
Expand Down Expand Up @@ -203,9 +203,12 @@ def validate_config(config_path: str) -> None:
Example:
eogrow-validate config_files/config.json
"""
for config in collect_configs_from_path(config_path):
raw_config = interpret_config_from_dict(config)
load_pipeline_class(config).Schema.parse_obj(raw_config)
config = collect_configs_from_path(config_path)
if isinstance(config, dict):
pipeline_config = _prepare_config(config, {}, ())
collect_schema(load_pipeline_class(pipeline_config)).parse_obj(pipeline_config)
else:
validate_pipeline_chain([_prepare_config(run_config, {}, ()) for run_config in config])

click.echo("Config validation succeeded!")

Expand All @@ -226,7 +229,14 @@ def run_test_pipeline(config_path: str) -> None:
pipeline.run()


def _parse_cli_variable(mapping_str: str) -> Tuple[str, str]:
def _prepare_config(config: CrudeConfig, variables: dict[str, str], test_patches: Iterable[int]) -> RawConfig:
raw_config = interpret_config_from_dict(config, variables)
if test_patches:
raw_config["test_subset"] = list(test_patches)
return raw_config


def _parse_cli_variable(mapping_str: str) -> tuple[str, str]:
"""Checks that the input is of shape `name:value` and then splits it into a tuple"""
match = re.match(r"(?P<name>.+?):(?P<value>.+)", mapping_str)
if match is None:
Expand Down
26 changes: 12 additions & 14 deletions eogrow/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import copy
import re
from functools import reduce
from typing import Any, Callable, NewType, cast
from typing import Any, Callable, List, NewType, Union, cast

import fs.path
import rapidjson
Expand All @@ -16,7 +16,7 @@
RawConfig = NewType("RawConfig", dict)


def collect_configs_from_path(path: str, used_config_paths: set[str] | None = None) -> list[CrudeConfig]:
def collect_configs_from_path(path: str, used_config_paths: set[str] | None = None) -> CrudeConfig | list[CrudeConfig]:
"""Loads and builds a list of config dictionaries defined by the parameters stored in files
This function performs the 1st stage of language interpretation as described in
Expand All @@ -38,11 +38,9 @@ def collect_configs_from_path(path: str, used_config_paths: set[str] | None = No

config = _recursive_config_build(config, used_config_paths)

if isinstance(config, dict):
config = [config]
if isinstance(config, list):
return config
raise ValueError(f"When interpreting config from {path} a dictionary or list was expected, got {type(config)}.")
if not isinstance(config, (dict, list)):
raise ValueError(f"When interpreting config from {path} a dictionary or list was expected, got {type(config)}.")
return cast(Union[CrudeConfig, List[CrudeConfig]], config)


def _recursive_config_build(config: object, used_config_paths: set[str]) -> object:
Expand All @@ -65,13 +63,13 @@ def _recursive_config_build(config: object, used_config_paths: set[str]) -> obje
if value in used_config_paths:
raise ValueError("Detected a cyclic import of configs")

imported_config_list = collect_configs_from_path(value, used_config_paths=used_config_paths)
if len(imported_config_list) != 1:
imported_config = collect_configs_from_path(value, used_config_paths=used_config_paths)
if not isinstance(imported_config, dict):
raise ValueError(
"Config lists cannot be imported inside configs. Found a config list when resolving key"
f" {key} for path {value}"
)
imported_configs.append(imported_config_list[0])
imported_configs.append(imported_config)
else:
joint_config[key] = _recursive_config_build(value, used_config_paths)

Expand Down Expand Up @@ -113,10 +111,10 @@ def interpret_config_from_dict(config: CrudeConfig, external_variables: dict[str

def interpret_config_from_path(path: str) -> RawConfig:
"""Loads from path in applies both steps of the config language."""
configs = collect_configs_from_path(path)
if len(configs) != 1:
raise ValueError(f"The JSON file {path} was expected to contain a single dictionary, got {len(configs)}")
return interpret_config_from_dict(configs[0])
config = collect_configs_from_path(path)
if isinstance(config, dict):
return interpret_config_from_dict(config)
raise ValueError(f"The JSON file {path} was expected to contain a single dictionary, got {len(config)}")


def _resolve_config_paths(config_str: str, config_path: str) -> str:
Expand Down
2 changes: 1 addition & 1 deletion eogrow/core/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def run_execution(
else:
ray.init(address="auto", ignore_reinit_error=True)
executor_class = RayExecutor
executor_kwargs = {"ray_remote_kwargs": self.config.ray_remote_kwargs}
executor_kwargs = {"ray_remote_kwargs": self.config.worker_resources}

LOGGER.info("Starting processing for %d EOPatches", len(execution_kwargs))

Expand Down
2 changes: 1 addition & 1 deletion eogrow/core/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class PipelineSchema(BaseSchema):
logging: ManagerSchema = Field(description="A schema of an implementation of LoggingManager class")
validate_logging = field_validator("logging", validate_manager, pre=True)

ray_remote_kwargs: Dict[str, Any] = Field(
worker_resources: Dict[str, Any] = Field(
default_factory=dict,
description=(
"Keyword arguments passed to ray tasks when executing via `RayExecutor`. The options are specified [here]"
Expand Down
49 changes: 49 additions & 0 deletions eogrow/utils/pipeline_chain.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""Module implementing utilities for chained configs."""

from __future__ import annotations

from typing import Any, Dict

import ray
from pydantic import Field, ValidationError

from ..core.config import RawConfig
from ..core.schemas import BaseSchema
from .meta import collect_schema, load_pipeline_class


class PipelineRunSchema(BaseSchema):
pipeline_config: dict
pipeline_resources: Dict[str, Any] = Field(
default_factory=dict,
description=(
"Keyword arguments passed to ray when executing the main pipeline process. The options are specified [here]"
"(https://docs.ray.io/en/latest/ray-core/api/doc/ray.remote_function.RemoteFunction.options.html)."
),
)


def validate_pipeline_chain(pipeline_chain: list[RawConfig]) -> None:
for i, run_config in enumerate(pipeline_chain):
try:
run_schema = PipelineRunSchema.parse_obj(run_config)
except ValidationError as e:
raise TypeError(
f"Pipeline-chain element {i} should be a dictionary with the fields `pipeline_config` and the optional"
" `pipeline_resources`."
) from e

pipeline_schema = collect_schema(load_pipeline_class(run_schema.pipeline_config))
pipeline_schema.parse_obj(run_schema.pipeline_config)


def run_pipeline_chain(pipeline_chain: list[RawConfig]) -> None:
for run_config in pipeline_chain:
run_schema = PipelineRunSchema.parse_obj(run_config)
runner = _pipeline_runner.options(**run_schema.pipeline_resources) # type: ignore[attr-defined]
ray.get(runner.remote(run_schema.pipeline_config))


@ray.remote
def _pipeline_runner(config: RawConfig) -> None:
return load_pipeline_class(config).from_raw_config(config).run()
3 changes: 2 additions & 1 deletion eogrow/utils/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,8 @@ def run_config(
:param check_logs: If pipeline logs should be checked after the run completes. If EOWorkflows were used, the
function fails if there were unsuccessful executions.
"""
crude_configs = collect_configs_from_path(config_path)
collected_configs = collect_configs_from_path(config_path)
crude_configs = collected_configs if isinstance(collected_configs, list) else [collected_configs]
raw_configs = [interpret_config_from_dict(config) for config in crude_configs]

for config in raw_configs:
Expand Down
35 changes: 18 additions & 17 deletions tests/core/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,26 @@
CONFIG_LIST = [CONFIG_DICT, CONFIG_DICT]


@pytest.mark.parametrize("config_object", [CONFIG_DICT, CONFIG_LIST])
def test_config_from_file(config_object, temp_folder):
def test_config_from_file_single(temp_folder):
path = os.path.join(temp_folder, "config.json")
with open(path, "w") as fp:
json.dump(config_object, fp)

config_list = list(map(interpret_config_from_dict, collect_configs_from_path(path)))
if isinstance(config_object, dict):
directly_loaded_config = interpret_config_from_path(path)
assert len(config_list) == 1
assert isinstance(directly_loaded_config, dict)
assert isinstance(config_list[0], dict)
assert directly_loaded_config == config_object
assert config_list[0] == config_object

else:
assert isinstance(config_list, list)
assert all(isinstance(config, dict) for config in config_list)
assert config_list == config_object
json.dump(CONFIG_DICT, fp)

directly_loaded_config = interpret_config_from_path(path)
assert isinstance(directly_loaded_config, dict)
assert directly_loaded_config == CONFIG_DICT
assert directly_loaded_config == interpret_config_from_dict(collect_configs_from_path(path))


def test_config_from_file_chain(temp_folder):
path = os.path.join(temp_folder, "config.json")
with open(path, "w") as fp:
json.dump(CONFIG_LIST, fp)

config_list = collect_configs_from_path(path)
assert isinstance(config_list, list)
assert all(isinstance(config, dict) for config in config_list)
assert config_list == CONFIG_LIST


def test_missing_config_loading():
Expand Down
Loading

0 comments on commit 9ebcbb3

Please sign in to comment.