From c0ce841d661aa5d987bc80327fddb78712054bf3 Mon Sep 17 00:00:00 2001 From: Charles Stern <62192187+cisaacstern@users.noreply.github.com> Date: Mon, 6 Jun 2022 10:31:54 -0700 Subject: [PATCH] Persist execution context in storage target (#359) * add BaseRecipe.get_execution_context * get pangeo-forge-recipes (not registrar) version * add test_execution_context * write pangeo-forge execution context to store in XarrayZarrRecipe.prepare_target * drop execution context vars from XarrayZarrRecipe equality tests * update execution context version asserts for actions runners compatibility * execution context docs first pass * mark HDFReference recipe execution context as TODO in docs * add assert identical wrapper to test_XarrayZarrRecipe * add json handler for funcs, update test bc funcs are now serializable * use ds.copy() without deep=True --- .../development/release_notes.md | 6 ++++ .../recipe_user_guide/execution.md | 23 ++++++++++++ pangeo_forge_recipes/recipes/base.py | 10 ++++++ pangeo_forge_recipes/recipes/xarray_zarr.py | 4 +++ pangeo_forge_recipes/serialization.py | 3 ++ setup.cfg | 2 +- tests/recipe_tests/test_XarrayZarrRecipe.py | 36 ++++++++++++++----- tests/recipe_tests/test_execution_context.py | 29 +++++++++++++++ tests/test_serialization.py | 10 +++--- 9 files changed, 109 insertions(+), 14 deletions(-) create mode 100644 tests/recipe_tests/test_execution_context.py diff --git a/docs/pangeo_forge_recipes/development/release_notes.md b/docs/pangeo_forge_recipes/development/release_notes.md index 4dcee1f5..e6687025 100644 --- a/docs/pangeo_forge_recipes/development/release_notes.md +++ b/docs/pangeo_forge_recipes/development/release_notes.md @@ -1,5 +1,11 @@ # Release Notes +## v0.9.1 - Unreleased + +- Persist Pangeo Forge execution context metadata in target datasets. This information, which includes +the `pangeo-forge-recipes` version as well as recipe and input hashes, attaches execution provenance +to the dataset itself. {pull}`359` + ## v0.9 - 2022-05-11 - **Breaking changes:** Deprecated `XarrayZarrRecipe` manual stage methods. Manual execution can be diff --git a/docs/pangeo_forge_recipes/recipe_user_guide/execution.md b/docs/pangeo_forge_recipes/recipe_user_guide/execution.md index 8d4ffa65..2652114c 100644 --- a/docs/pangeo_forge_recipes/recipe_user_guide/execution.md +++ b/docs/pangeo_forge_recipes/recipe_user_guide/execution.md @@ -91,3 +91,26 @@ with beam.Pipeline() as p: By default the pipeline runs using Beam's [DirectRunner](https://beam.apache.org/documentation/runners/direct/). See [runners](https://beam.apache.org/documentation/#runners) for more. + + +## Execution context + +All Pangeo Forge {doc}`recipes` contain a `.get_execution_context()` method which returns the +following metadata: + +```{code-block} python +{ + "pangeo-forge:version": "{pangeo_forge_recipes version installed at time of execution}" + "pangeo-forge:recipe_hash": "{recipe hash as returned by `recipe.sha256()`}" + "pangeo-forge:inputs_hash": "{file pattern hash as returned by `recipe.file_pattern.sha256()`}" +} +``` + +Each recipe class defines where to store this metadata: + +- `XarrayZarrRecipe`: Added to Zarr group attributes, and therefore also available via the +`xarray.Dataset.attrs` when opening Zarr stores with xarray. +- `HDFReferenceRecipe`: TODO + +The execution context metadata which is persisted in the target dataset is used for tracking +dataset provenance. diff --git a/pangeo_forge_recipes/recipes/base.py b/pangeo_forge_recipes/recipes/base.py index cea1e92f..d0e5c35d 100644 --- a/pangeo_forge_recipes/recipes/base.py +++ b/pangeo_forge_recipes/recipes/base.py @@ -4,6 +4,8 @@ from dataclasses import dataclass, field, replace from typing import Callable, ClassVar +import pkg_resources # type: ignore + from ..executors.base import Pipeline from ..patterns import FilePattern, prune_pattern from ..serialization import dataclass_sha256 @@ -50,6 +52,14 @@ def to_beam(self): def sha256(self): return dataclass_sha256(self, ignore_keys=self._hash_exclude_) + def get_execution_context(self): + return dict( + # See https://stackoverflow.com/a/2073599 re: version + version=pkg_resources.require("pangeo-forge-recipes")[0].version, + recipe_hash=self.sha256().hex(), + inputs_hash=self.file_pattern.sha256().hex(), + ) + RecipeCompiler = Callable[[BaseRecipe], Pipeline] diff --git a/pangeo_forge_recipes/recipes/xarray_zarr.py b/pangeo_forge_recipes/recipes/xarray_zarr.py index bb00bf9c..e77a5aa1 100644 --- a/pangeo_forge_recipes/recipes/xarray_zarr.py +++ b/pangeo_forge_recipes/recipes/xarray_zarr.py @@ -577,6 +577,10 @@ def filter_init_chunks(chunk_key): recipe_meta = {"input_sequence_lens": input_sequence_lens} config.storage_config.metadata[_GLOBAL_METADATA_KEY] = recipe_meta + zgroup = zarr.open_group(config.target_mapper) + for k, v in config.get_execution_context().items(): + zgroup.attrs[f"pangeo-forge:{k}"] = v + def store_chunk(chunk_key: ChunkKey, *, config: XarrayZarrRecipe) -> None: if config.storage_config.target is None: diff --git a/pangeo_forge_recipes/serialization.py b/pangeo_forge_recipes/serialization.py index edeb56ff..e5eb30c6 100644 --- a/pangeo_forge_recipes/serialization.py +++ b/pangeo_forge_recipes/serialization.py @@ -1,3 +1,4 @@ +import inspect from collections.abc import Collection from dataclasses import asdict from enum import Enum @@ -17,6 +18,8 @@ def either_encode_or_hash(obj: Any): return obj.value elif hasattr(obj, "sha256"): return obj.sha256().hex() + elif inspect.isfunction(obj): + return inspect.getsource(obj) raise TypeError(f"object of type {type(obj).__name__} not serializable") diff --git a/setup.cfg b/setup.cfg index ac6165d9..7c435917 100644 --- a/setup.cfg +++ b/setup.cfg @@ -55,7 +55,7 @@ max-line-length = 100 [isort] known_first_party=pangeo_forge_recipes -known_third_party=aiohttp,apache_beam,click,dask,fsspec,kerchunk,mypy_extensions,numpy,pandas,prefect,pytest,pytest_lazyfixture,setuptools,xarray,yaml,zarr +known_third_party=aiohttp,apache_beam,click,dask,fsspec,kerchunk,mypy_extensions,numpy,packaging,pandas,pkg_resources,prefect,pytest,pytest_lazyfixture,setuptools,xarray,yaml,zarr multi_line_output=3 include_trailing_comma=True force_grid_wrap=0 diff --git a/tests/recipe_tests/test_XarrayZarrRecipe.py b/tests/recipe_tests/test_XarrayZarrRecipe.py index 54c41cbf..3c3f7ff5 100644 --- a/tests/recipe_tests/test_XarrayZarrRecipe.py +++ b/tests/recipe_tests/test_XarrayZarrRecipe.py @@ -21,6 +21,26 @@ from pangeo_forge_recipes.storage import MetadataTarget, StorageConfig +def drop_execution_context_attrs(ds: xr.Dataset) -> xr.Dataset: + """Drop pangeo-forge execution context attrs from a dataset.""" + + ds_copy = ds.copy() + to_drop = [k for k in ds_copy.attrs if k.startswith("pangeo-forge:")] + for k in to_drop: + del ds_copy.attrs[k] + + return ds_copy + + +def assert_identical(ds1: xr.Dataset, ds2: xr.Dataset): + """Assert that two datasets are identical, excluding execution context attrs.""" + + xr.testing.assert_identical( + drop_execution_context_attrs(ds1), + drop_execution_context_attrs(ds2), + ) + + def make_netCDFtoZarr_recipe( file_pattern, xarray_dataset, target, cache, metadata, extra_kwargs=None ): @@ -120,7 +140,7 @@ def test_recipe(recipe_fixture, execute_recipe): rec = RecipeClass(file_pattern, **kwargs) execute_recipe(rec) ds_actual = xr.open_zarr(target.get_mapper()).load() - xr.testing.assert_identical(ds_actual, ds_expected) + assert_identical(ds_actual, ds_expected) @pytest.mark.parametrize("get_mapper_from", ["storage_config", "target", "target_mapper"]) @@ -139,7 +159,7 @@ def test_recipe_default_storage(recipe_fixture, execute_recipe, get_mapper_from) elif get_mapper_from == "target_mapper": mapper = rec.target_mapper ds_actual = xr.open_zarr(mapper).load() - xr.testing.assert_identical(ds_actual, ds_expected) + assert_identical(ds_actual, ds_expected) @pytest.mark.parametrize("recipe_fixture", all_recipes) @@ -150,7 +170,7 @@ def test_recipe_with_references(recipe_fixture, execute_recipe): rec = RecipeClass(file_pattern, open_input_with_kerchunk=True, **kwargs) execute_recipe(rec) ds_actual = xr.open_zarr(target.get_mapper()).load() - xr.testing.assert_identical(ds_actual, ds_expected) + assert_identical(ds_actual, ds_expected) @pytest.mark.parametrize("recipe_fixture", all_recipes) @@ -195,7 +215,7 @@ def test_recipe_caching_copying(recipe, execute_recipe, cache_inputs, copy_input ) execute_recipe(rec) ds_actual = xr.open_zarr(target.get_mapper()).load() - xr.testing.assert_identical(ds_actual, ds_expected) + assert_identical(ds_actual, ds_expected) # function passed to preprocessing @@ -228,7 +248,7 @@ def test_process(recipe_fixture, execute_recipe, process_input, process_chunk): assert not ds_actual.identical(ds_expected) ds_expected = incr_date(ds_expected) - xr.testing.assert_identical(ds_actual, ds_expected) + assert_identical(ds_actual, ds_expected) def do_actual_chunks_test( @@ -303,7 +323,7 @@ def do_actual_chunks_test( for dim in ds_actual.dims: assert store[dim].chunks == ds_actual[dim].shape - xr.testing.assert_identical(ds_actual, ds_expected) + assert_identical(ds_actual, ds_expected) @pytest.mark.parametrize("inputs_per_chunk,subset_inputs", [(1, {}), (1, {"time": 2}), (2, {})]) @@ -376,7 +396,7 @@ def test_no_consolidate_dimension_coordinates(netCDFtoZarr_recipe): rec.consolidate_dimension_coordinates = False rec.to_function()() ds_actual = xr.open_zarr(target.get_mapper()).load() - xr.testing.assert_identical(ds_actual, ds_expected) + assert_identical(ds_actual, ds_expected) store = zarr.open_consolidated(target.get_mapper()) assert store["time"].chunks == (file_pattern.nitems_per_input["time"],) @@ -399,7 +419,7 @@ def test_consolidate_dimension_coordinates_with_coordinateless_dimension( rec = RecipeClass(file_pattern, **kwargs) rec.to_function()() ds_actual = xr.open_zarr(target.get_mapper()).load() - xr.testing.assert_identical(ds_actual, ds_expected) + assert_identical(ds_actual, ds_expected) def test_lock_timeout(netCDFtoZarr_recipe_sequential_only, execute_recipe_no_dask): diff --git a/tests/recipe_tests/test_execution_context.py b/tests/recipe_tests/test_execution_context.py new file mode 100644 index 00000000..020a005d --- /dev/null +++ b/tests/recipe_tests/test_execution_context.py @@ -0,0 +1,29 @@ +import pytest +import xarray as xr +import zarr +from packaging import version + +from pangeo_forge_recipes.recipes import XarrayZarrRecipe + + +@pytest.mark.parametrize("recipe_cls", [XarrayZarrRecipe]) # HDFReferenceRecipe]) +def test_execution_context(recipe_cls, netcdf_local_file_pattern_sequential): + + recipe = recipe_cls(netcdf_local_file_pattern_sequential) + ec = recipe.get_execution_context() + + ec_version = version.parse(ec["version"]) + assert ec_version.is_devrelease # should be True for editable installs used in tests + assert isinstance(ec_version.major, int) and 0 <= ec_version.major <= 1 + assert isinstance(ec_version.minor, int) and 0 <= ec_version.major <= 99 + + assert isinstance(ec["recipe_hash"], str) and len(ec["recipe_hash"]) == 64 + assert isinstance(ec["inputs_hash"], str) and len(ec["inputs_hash"]) == 64 + + recipe.to_function()() + zgroup = zarr.open_group(recipe.target_mapper) + ds = xr.open_zarr(recipe.target_mapper, consolidated=True) + + for k, v in ec.items(): + assert zgroup.attrs[f"pangeo-forge:{k}"] == v + assert ds.attrs[f"pangeo-forge:{k}"] == v diff --git a/tests/test_serialization.py b/tests/test_serialization.py index 689911db..45fa52c1 100644 --- a/tests/test_serialization.py +++ b/tests/test_serialization.py @@ -1,6 +1,6 @@ from dataclasses import asdict, dataclass, field from datetime import datetime, timedelta -from typing import Callable, Optional +from typing import Optional import pandas as pd import pytest @@ -171,17 +171,17 @@ class NewRelease(cls): def test_either_encode_or_hash_raises(): - def f(): + class A: pass @dataclass class HasUnserializableField: - unserializable_field: Callable = f + unserializable_field: type = A - expected_msg = f"object of type {type(f).__name__} not serializable" + expected_msg = f"object of type {type(A).__name__} not serializable" with pytest.raises(TypeError, match=expected_msg): - either_encode_or_hash(f) + either_encode_or_hash(A) with pytest.raises(TypeError, match=expected_msg): # in practice, we never actually call ``either_encode_or_hash`` directly.