Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persist execution context in storage target #359

Merged
merged 14 commits into from
Jun 6, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/pangeo_forge_recipes/development/release_notes.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Release Notes

## v0.9.1 - Unreleased

- Persist Pangeo Forge execution context metadata in target datasets. This information, which includes
the `pangeo-forge-recipes` version as well as recipe and input hashes, attaches execution provenance
to the dataset itself. {pull}`359`

## v0.9 - 2022-05-11

- **Breaking changes:** Deprecated `XarrayZarrRecipe` manual stage methods. Manual execution can be
Expand Down
23 changes: 23 additions & 0 deletions docs/pangeo_forge_recipes/recipe_user_guide/execution.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,26 @@ with beam.Pipeline() as p:

By default the pipeline runs using Beam's [DirectRunner](https://beam.apache.org/documentation/runners/direct/).
See [runners](https://beam.apache.org/documentation/#runners) for more.


## Execution context

All Pangeo Forge {doc}`recipes` contain a `.get_execution_context()` method which returns the
following metadata:

```{code-block} python
{
"pangeo-forge:version": "{pangeo_forge_recipes version installed at time of execution}"
"pangeo-forge:recipe_hash": "{recipe hash as returned by `recipe.sha256()`}"
"pangeo-forge:inputs_hash": "{file pattern hash as returned by `recipe.file_pattern.sha256()`}"
}
```

Each recipe class defines where to store this metadata:

- `XarrayZarrRecipe`: Added to Zarr group attributes, and therefore also available via the
`xarray.Dataset.attrs` when opening Zarr stores with xarray.
- `HDFReferenceRecipe`:
cisaacstern marked this conversation as resolved.
Show resolved Hide resolved

The execution context metadata which is persisted in the target dataset is used for tracking
dataset provenance.
10 changes: 10 additions & 0 deletions pangeo_forge_recipes/recipes/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from dataclasses import dataclass, field, replace
from typing import Callable, ClassVar

import pkg_resources # type: ignore

from ..executors.base import Pipeline
from ..patterns import FilePattern, prune_pattern
from ..serialization import dataclass_sha256
Expand Down Expand Up @@ -50,6 +52,14 @@ def to_beam(self):
def sha256(self):
return dataclass_sha256(self, ignore_keys=self._hash_exclude_)

def get_execution_context(self):
return dict(
# See https://stackoverflow.com/a/2073599 re: version
version=pkg_resources.require("pangeo-forge-recipes")[0].version,
recipe_hash=self.sha256().hex(),
inputs_hash=self.file_pattern.sha256().hex(),
)


RecipeCompiler = Callable[[BaseRecipe], Pipeline]

Expand Down
4 changes: 4 additions & 0 deletions pangeo_forge_recipes/recipes/xarray_zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,10 @@ def filter_init_chunks(chunk_key):
recipe_meta = {"input_sequence_lens": input_sequence_lens}
config.storage_config.metadata[_GLOBAL_METADATA_KEY] = recipe_meta

zgroup = zarr.open_group(config.target_mapper)
for k, v in config.get_execution_context().items():
zgroup.attrs[f"pangeo-forge:{k}"] = v


def store_chunk(chunk_key: ChunkKey, *, config: XarrayZarrRecipe) -> None:
if config.storage_config.target is None:
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ max-line-length = 100

[isort]
known_first_party=pangeo_forge_recipes
known_third_party=aiohttp,apache_beam,click,dask,fsspec,kerchunk,mypy_extensions,numpy,pandas,prefect,pytest,pytest_lazyfixture,setuptools,xarray,yaml,zarr
known_third_party=aiohttp,apache_beam,click,dask,fsspec,kerchunk,mypy_extensions,numpy,packaging,pandas,pkg_resources,prefect,pytest,pytest_lazyfixture,setuptools,xarray,yaml,zarr
multi_line_output=3
include_trailing_comma=True
force_grid_wrap=0
Expand Down
27 changes: 19 additions & 8 deletions tests/recipe_tests/test_XarrayZarrRecipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,17 @@
from pangeo_forge_recipes.storage import MetadataTarget, StorageConfig


def drop_execution_context_attrs(ds: xr.Dataset) -> xr.Dataset:
"""Drop pangeo-forge execution context attrs from a dataset."""

ds_copy = ds.copy(deep=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need a deep copy here? That will use a lot more memory.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure! It's "just" the tests so happy to use regular copy and see how it goes.

TBH, I've now run into enough weird behavior of copied dictionaries that are not deep copies (due to shared references with the original, I guess), that I just reflexively make all dictionary copies deep now. 😅

cisaacstern marked this conversation as resolved.
Show resolved Hide resolved
to_drop = [k for k in ds_copy.attrs if k.startswith("pangeo-forge:")]
for k in to_drop:
del ds_copy.attrs[k]

return ds_copy


def make_netCDFtoZarr_recipe(
file_pattern, xarray_dataset, target, cache, metadata, extra_kwargs=None
):
Expand Down Expand Up @@ -120,7 +131,7 @@ def test_recipe(recipe_fixture, execute_recipe):
rec = RecipeClass(file_pattern, **kwargs)
execute_recipe(rec)
ds_actual = xr.open_zarr(target.get_mapper()).load()
xr.testing.assert_identical(ds_actual, ds_expected)
xr.testing.assert_identical(drop_execution_context_attrs(ds_actual), ds_expected)
cisaacstern marked this conversation as resolved.
Show resolved Hide resolved


@pytest.mark.parametrize("get_mapper_from", ["storage_config", "target", "target_mapper"])
Expand All @@ -139,7 +150,7 @@ def test_recipe_default_storage(recipe_fixture, execute_recipe, get_mapper_from)
elif get_mapper_from == "target_mapper":
mapper = rec.target_mapper
ds_actual = xr.open_zarr(mapper).load()
xr.testing.assert_identical(ds_actual, ds_expected)
xr.testing.assert_identical(drop_execution_context_attrs(ds_actual), ds_expected)


@pytest.mark.parametrize("recipe_fixture", all_recipes)
Expand All @@ -150,7 +161,7 @@ def test_recipe_with_references(recipe_fixture, execute_recipe):
rec = RecipeClass(file_pattern, open_input_with_kerchunk=True, **kwargs)
execute_recipe(rec)
ds_actual = xr.open_zarr(target.get_mapper()).load()
xr.testing.assert_identical(ds_actual, ds_expected)
xr.testing.assert_identical(drop_execution_context_attrs(ds_actual), ds_expected)


@pytest.mark.parametrize("recipe_fixture", all_recipes)
Expand Down Expand Up @@ -195,7 +206,7 @@ def test_recipe_caching_copying(recipe, execute_recipe, cache_inputs, copy_input
)
execute_recipe(rec)
ds_actual = xr.open_zarr(target.get_mapper()).load()
xr.testing.assert_identical(ds_actual, ds_expected)
xr.testing.assert_identical(drop_execution_context_attrs(ds_actual), ds_expected)


# function passed to preprocessing
Expand Down Expand Up @@ -228,7 +239,7 @@ def test_process(recipe_fixture, execute_recipe, process_input, process_chunk):
assert not ds_actual.identical(ds_expected)
ds_expected = incr_date(ds_expected)

xr.testing.assert_identical(ds_actual, ds_expected)
xr.testing.assert_identical(drop_execution_context_attrs(ds_actual), ds_expected)


def do_actual_chunks_test(
Expand Down Expand Up @@ -303,7 +314,7 @@ def do_actual_chunks_test(
for dim in ds_actual.dims:
assert store[dim].chunks == ds_actual[dim].shape

xr.testing.assert_identical(ds_actual, ds_expected)
xr.testing.assert_identical(drop_execution_context_attrs(ds_actual), ds_expected)


@pytest.mark.parametrize("inputs_per_chunk,subset_inputs", [(1, {}), (1, {"time": 2}), (2, {})])
Expand Down Expand Up @@ -376,7 +387,7 @@ def test_no_consolidate_dimension_coordinates(netCDFtoZarr_recipe):
rec.consolidate_dimension_coordinates = False
rec.to_function()()
ds_actual = xr.open_zarr(target.get_mapper()).load()
xr.testing.assert_identical(ds_actual, ds_expected)
xr.testing.assert_identical(drop_execution_context_attrs(ds_actual), ds_expected)

store = zarr.open_consolidated(target.get_mapper())
assert store["time"].chunks == (file_pattern.nitems_per_input["time"],)
Expand All @@ -399,7 +410,7 @@ def test_consolidate_dimension_coordinates_with_coordinateless_dimension(
rec = RecipeClass(file_pattern, **kwargs)
rec.to_function()()
ds_actual = xr.open_zarr(target.get_mapper()).load()
xr.testing.assert_identical(ds_actual, ds_expected)
xr.testing.assert_identical(drop_execution_context_attrs(ds_actual), ds_expected)


def test_lock_timeout(netCDFtoZarr_recipe_sequential_only, execute_recipe_no_dask):
Expand Down
29 changes: 29 additions & 0 deletions tests/recipe_tests/test_execution_context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import pytest
import xarray as xr
import zarr
from packaging import version

from pangeo_forge_recipes.recipes import XarrayZarrRecipe


@pytest.mark.parametrize("recipe_cls", [XarrayZarrRecipe]) # HDFReferenceRecipe])
def test_execution_context(recipe_cls, netcdf_local_file_pattern_sequential):

recipe = recipe_cls(netcdf_local_file_pattern_sequential)
ec = recipe.get_execution_context()

ec_version = version.parse(ec["version"])
assert ec_version.is_devrelease # should be True for editable installs used in tests
assert isinstance(ec_version.major, int) and 0 <= ec_version.major <= 1
assert isinstance(ec_version.minor, int) and 0 <= ec_version.major <= 99

assert isinstance(ec["recipe_hash"], str) and len(ec["recipe_hash"]) == 64
assert isinstance(ec["inputs_hash"], str) and len(ec["inputs_hash"]) == 64

recipe.to_function()()
zgroup = zarr.open_group(recipe.target_mapper)
ds = xr.open_zarr(recipe.target_mapper, consolidated=True)

for k, v in ec.items():
assert zgroup.attrs[f"pangeo-forge:{k}"] == v
assert ds.attrs[f"pangeo-forge:{k}"] == v