Skip to content

Commit

Permalink
Persist execution context in storage target (#359)
Browse files Browse the repository at this point in the history
* add BaseRecipe.get_execution_context

* get pangeo-forge-recipes (not registrar) version

* add test_execution_context

* write pangeo-forge execution context to store in XarrayZarrRecipe.prepare_target

* drop execution context vars from XarrayZarrRecipe equality tests

* update execution context version asserts for actions runners compatibility

* execution context docs first pass

* mark HDFReference recipe execution context as TODO in docs

* add assert identical wrapper to test_XarrayZarrRecipe

* add json handler for funcs, update test bc funcs are now serializable

* use ds.copy() without deep=True
  • Loading branch information
cisaacstern authored Jun 6, 2022
1 parent 3d979a5 commit c0ce841
Show file tree
Hide file tree
Showing 9 changed files with 109 additions and 14 deletions.
6 changes: 6 additions & 0 deletions docs/pangeo_forge_recipes/development/release_notes.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Release Notes

## v0.9.1 - Unreleased

- Persist Pangeo Forge execution context metadata in target datasets. This information, which includes
the `pangeo-forge-recipes` version as well as recipe and input hashes, attaches execution provenance
to the dataset itself. {pull}`359`

## v0.9 - 2022-05-11

- **Breaking changes:** Deprecated `XarrayZarrRecipe` manual stage methods. Manual execution can be
Expand Down
23 changes: 23 additions & 0 deletions docs/pangeo_forge_recipes/recipe_user_guide/execution.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,26 @@ with beam.Pipeline() as p:

By default the pipeline runs using Beam's [DirectRunner](https://beam.apache.org/documentation/runners/direct/).
See [runners](https://beam.apache.org/documentation/#runners) for more.


## Execution context

All Pangeo Forge {doc}`recipes` contain a `.get_execution_context()` method which returns the
following metadata:

```{code-block} python
{
"pangeo-forge:version": "{pangeo_forge_recipes version installed at time of execution}"
"pangeo-forge:recipe_hash": "{recipe hash as returned by `recipe.sha256()`}"
"pangeo-forge:inputs_hash": "{file pattern hash as returned by `recipe.file_pattern.sha256()`}"
}
```

Each recipe class defines where to store this metadata:

- `XarrayZarrRecipe`: Added to Zarr group attributes, and therefore also available via the
`xarray.Dataset.attrs` when opening Zarr stores with xarray.
- `HDFReferenceRecipe`: TODO

The execution context metadata which is persisted in the target dataset is used for tracking
dataset provenance.
10 changes: 10 additions & 0 deletions pangeo_forge_recipes/recipes/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from dataclasses import dataclass, field, replace
from typing import Callable, ClassVar

import pkg_resources # type: ignore

from ..executors.base import Pipeline
from ..patterns import FilePattern, prune_pattern
from ..serialization import dataclass_sha256
Expand Down Expand Up @@ -50,6 +52,14 @@ def to_beam(self):
def sha256(self):
return dataclass_sha256(self, ignore_keys=self._hash_exclude_)

def get_execution_context(self):
return dict(
# See https://stackoverflow.com/a/2073599 re: version
version=pkg_resources.require("pangeo-forge-recipes")[0].version,
recipe_hash=self.sha256().hex(),
inputs_hash=self.file_pattern.sha256().hex(),
)


RecipeCompiler = Callable[[BaseRecipe], Pipeline]

Expand Down
4 changes: 4 additions & 0 deletions pangeo_forge_recipes/recipes/xarray_zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,10 @@ def filter_init_chunks(chunk_key):
recipe_meta = {"input_sequence_lens": input_sequence_lens}
config.storage_config.metadata[_GLOBAL_METADATA_KEY] = recipe_meta

zgroup = zarr.open_group(config.target_mapper)
for k, v in config.get_execution_context().items():
zgroup.attrs[f"pangeo-forge:{k}"] = v


def store_chunk(chunk_key: ChunkKey, *, config: XarrayZarrRecipe) -> None:
if config.storage_config.target is None:
Expand Down
3 changes: 3 additions & 0 deletions pangeo_forge_recipes/serialization.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import inspect
from collections.abc import Collection
from dataclasses import asdict
from enum import Enum
Expand All @@ -17,6 +18,8 @@ def either_encode_or_hash(obj: Any):
return obj.value
elif hasattr(obj, "sha256"):
return obj.sha256().hex()
elif inspect.isfunction(obj):
return inspect.getsource(obj)
raise TypeError(f"object of type {type(obj).__name__} not serializable")


Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ max-line-length = 100

[isort]
known_first_party=pangeo_forge_recipes
known_third_party=aiohttp,apache_beam,click,dask,fsspec,kerchunk,mypy_extensions,numpy,pandas,prefect,pytest,pytest_lazyfixture,setuptools,xarray,yaml,zarr
known_third_party=aiohttp,apache_beam,click,dask,fsspec,kerchunk,mypy_extensions,numpy,packaging,pandas,pkg_resources,prefect,pytest,pytest_lazyfixture,setuptools,xarray,yaml,zarr
multi_line_output=3
include_trailing_comma=True
force_grid_wrap=0
Expand Down
36 changes: 28 additions & 8 deletions tests/recipe_tests/test_XarrayZarrRecipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,26 @@
from pangeo_forge_recipes.storage import MetadataTarget, StorageConfig


def drop_execution_context_attrs(ds: xr.Dataset) -> xr.Dataset:
"""Drop pangeo-forge execution context attrs from a dataset."""

ds_copy = ds.copy()
to_drop = [k for k in ds_copy.attrs if k.startswith("pangeo-forge:")]
for k in to_drop:
del ds_copy.attrs[k]

return ds_copy


def assert_identical(ds1: xr.Dataset, ds2: xr.Dataset):
"""Assert that two datasets are identical, excluding execution context attrs."""

xr.testing.assert_identical(
drop_execution_context_attrs(ds1),
drop_execution_context_attrs(ds2),
)


def make_netCDFtoZarr_recipe(
file_pattern, xarray_dataset, target, cache, metadata, extra_kwargs=None
):
Expand Down Expand Up @@ -120,7 +140,7 @@ def test_recipe(recipe_fixture, execute_recipe):
rec = RecipeClass(file_pattern, **kwargs)
execute_recipe(rec)
ds_actual = xr.open_zarr(target.get_mapper()).load()
xr.testing.assert_identical(ds_actual, ds_expected)
assert_identical(ds_actual, ds_expected)


@pytest.mark.parametrize("get_mapper_from", ["storage_config", "target", "target_mapper"])
Expand All @@ -139,7 +159,7 @@ def test_recipe_default_storage(recipe_fixture, execute_recipe, get_mapper_from)
elif get_mapper_from == "target_mapper":
mapper = rec.target_mapper
ds_actual = xr.open_zarr(mapper).load()
xr.testing.assert_identical(ds_actual, ds_expected)
assert_identical(ds_actual, ds_expected)


@pytest.mark.parametrize("recipe_fixture", all_recipes)
Expand All @@ -150,7 +170,7 @@ def test_recipe_with_references(recipe_fixture, execute_recipe):
rec = RecipeClass(file_pattern, open_input_with_kerchunk=True, **kwargs)
execute_recipe(rec)
ds_actual = xr.open_zarr(target.get_mapper()).load()
xr.testing.assert_identical(ds_actual, ds_expected)
assert_identical(ds_actual, ds_expected)


@pytest.mark.parametrize("recipe_fixture", all_recipes)
Expand Down Expand Up @@ -195,7 +215,7 @@ def test_recipe_caching_copying(recipe, execute_recipe, cache_inputs, copy_input
)
execute_recipe(rec)
ds_actual = xr.open_zarr(target.get_mapper()).load()
xr.testing.assert_identical(ds_actual, ds_expected)
assert_identical(ds_actual, ds_expected)


# function passed to preprocessing
Expand Down Expand Up @@ -228,7 +248,7 @@ def test_process(recipe_fixture, execute_recipe, process_input, process_chunk):
assert not ds_actual.identical(ds_expected)
ds_expected = incr_date(ds_expected)

xr.testing.assert_identical(ds_actual, ds_expected)
assert_identical(ds_actual, ds_expected)


def do_actual_chunks_test(
Expand Down Expand Up @@ -303,7 +323,7 @@ def do_actual_chunks_test(
for dim in ds_actual.dims:
assert store[dim].chunks == ds_actual[dim].shape

xr.testing.assert_identical(ds_actual, ds_expected)
assert_identical(ds_actual, ds_expected)


@pytest.mark.parametrize("inputs_per_chunk,subset_inputs", [(1, {}), (1, {"time": 2}), (2, {})])
Expand Down Expand Up @@ -376,7 +396,7 @@ def test_no_consolidate_dimension_coordinates(netCDFtoZarr_recipe):
rec.consolidate_dimension_coordinates = False
rec.to_function()()
ds_actual = xr.open_zarr(target.get_mapper()).load()
xr.testing.assert_identical(ds_actual, ds_expected)
assert_identical(ds_actual, ds_expected)

store = zarr.open_consolidated(target.get_mapper())
assert store["time"].chunks == (file_pattern.nitems_per_input["time"],)
Expand All @@ -399,7 +419,7 @@ def test_consolidate_dimension_coordinates_with_coordinateless_dimension(
rec = RecipeClass(file_pattern, **kwargs)
rec.to_function()()
ds_actual = xr.open_zarr(target.get_mapper()).load()
xr.testing.assert_identical(ds_actual, ds_expected)
assert_identical(ds_actual, ds_expected)


def test_lock_timeout(netCDFtoZarr_recipe_sequential_only, execute_recipe_no_dask):
Expand Down
29 changes: 29 additions & 0 deletions tests/recipe_tests/test_execution_context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import pytest
import xarray as xr
import zarr
from packaging import version

from pangeo_forge_recipes.recipes import XarrayZarrRecipe


@pytest.mark.parametrize("recipe_cls", [XarrayZarrRecipe]) # HDFReferenceRecipe])
def test_execution_context(recipe_cls, netcdf_local_file_pattern_sequential):

recipe = recipe_cls(netcdf_local_file_pattern_sequential)
ec = recipe.get_execution_context()

ec_version = version.parse(ec["version"])
assert ec_version.is_devrelease # should be True for editable installs used in tests
assert isinstance(ec_version.major, int) and 0 <= ec_version.major <= 1
assert isinstance(ec_version.minor, int) and 0 <= ec_version.major <= 99

assert isinstance(ec["recipe_hash"], str) and len(ec["recipe_hash"]) == 64
assert isinstance(ec["inputs_hash"], str) and len(ec["inputs_hash"]) == 64

recipe.to_function()()
zgroup = zarr.open_group(recipe.target_mapper)
ds = xr.open_zarr(recipe.target_mapper, consolidated=True)

for k, v in ec.items():
assert zgroup.attrs[f"pangeo-forge:{k}"] == v
assert ds.attrs[f"pangeo-forge:{k}"] == v
10 changes: 5 additions & 5 deletions tests/test_serialization.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from dataclasses import asdict, dataclass, field
from datetime import datetime, timedelta
from typing import Callable, Optional
from typing import Optional

import pandas as pd
import pytest
Expand Down Expand Up @@ -171,17 +171,17 @@ class NewRelease(cls):


def test_either_encode_or_hash_raises():
def f():
class A:
pass

@dataclass
class HasUnserializableField:
unserializable_field: Callable = f
unserializable_field: type = A

expected_msg = f"object of type {type(f).__name__} not serializable"
expected_msg = f"object of type {type(A).__name__} not serializable"

with pytest.raises(TypeError, match=expected_msg):
either_encode_or_hash(f)
either_encode_or_hash(A)

with pytest.raises(TypeError, match=expected_msg):
# in practice, we never actually call ``either_encode_or_hash`` directly.
Expand Down

0 comments on commit c0ce841

Please sign in to comment.