From b74905badfe82644cc2de6923e6bdcd6b7f64012 Mon Sep 17 00:00:00 2001 From: Charles Stern <62192187+cisaacstern@users.noreply.github.com> Date: Fri, 29 Mar 2024 10:57:17 -0700 Subject: [PATCH 01/23] mode kw for schema_to_zarr --- pangeo_forge_recipes/aggregation.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pangeo_forge_recipes/aggregation.py b/pangeo_forge_recipes/aggregation.py index 28b015ec..5a1968aa 100644 --- a/pangeo_forge_recipes/aggregation.py +++ b/pangeo_forge_recipes/aggregation.py @@ -2,7 +2,7 @@ from copy import deepcopy from dataclasses import dataclass, field -from typing import Dict, Optional, TypedDict +from typing import Dict, Literal, Optional, TypedDict import cftime import dask.array as dsa @@ -287,13 +287,14 @@ def schema_to_zarr( attrs: Optional[Dict[str, str]] = None, consolidated_metadata: Optional[bool] = True, encoding: Optional[Dict] = None, + mode: Literal["w", "a"] = "w", ) -> zarr.storage.FSStore: """Initialize a zarr group based on a schema.""" ds = schema_to_template_ds(schema, specified_chunks=target_chunks, attrs=attrs) # using mode="w" makes this function idempotent ds.to_zarr( target_store, - mode="w", + mode=mode, compute=False, consolidated=consolidated_metadata, encoding=encoding, From 8e3d55e49cdeb8f9dc0068d1bc6402c2f30603e8 Mon Sep 17 00:00:00 2001 From: Charles Stern <62192187+cisaacstern@users.noreply.github.com> Date: Fri, 29 Mar 2024 10:59:37 -0700 Subject: [PATCH 02/23] pass mode kw through transforms --- pangeo_forge_recipes/transforms.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/pangeo_forge_recipes/transforms.py b/pangeo_forge_recipes/transforms.py index 08a90794..0c6b7e6e 100644 --- a/pangeo_forge_recipes/transforms.py +++ b/pangeo_forge_recipes/transforms.py @@ -5,7 +5,7 @@ import random import sys from dataclasses import dataclass, field -from typing import Callable, Dict, List, Optional, Tuple, TypeVar, Union +from typing import Callable, Dict, List, Literal, Optional, Tuple, TypeVar, Union # PEP612 Concatenate & ParamSpec are useful for annotating decorators, but their import # differs between Python versions 3.9 & 3.10. See: https://stackoverflow.com/a/71990006 @@ -341,6 +341,7 @@ class PrepareZarrTarget(beam.PTransform): then falling out of sync with coordinates if ConsolidateDimensionCoordinates() is applied to the output of StoreToZarr(). + :param mode: One of "w" for writing a new store, or "a" for appending to an existing store. """ target: str | FSSpecTarget @@ -348,6 +349,7 @@ class PrepareZarrTarget(beam.PTransform): attrs: Dict[str, str] = field(default_factory=dict) consolidated_metadata: Optional[bool] = True encoding: Optional[dict] = field(default_factory=dict) + mode: Literal["w", "a"] = "w" def expand(self, pcoll: beam.PCollection) -> beam.PCollection: if isinstance(self.target, str): @@ -362,6 +364,7 @@ def expand(self, pcoll: beam.PCollection) -> beam.PCollection: attrs=self.attrs, encoding=self.encoding, consolidated_metadata=False, + mode=self.mode, ) return initialized_target @@ -641,8 +644,8 @@ class StoreToZarr(beam.PTransform, ZarrWriterMixin): out https://github.com/jbusecke/dynamic_chunks :param dynamic_chunking_fn_kwargs: Optional keyword arguments for ``dynamic_chunking_fn``. :param attrs: Extra group-level attributes to inject into the dataset. - :param encoding: Dictionary encoding for xarray.to_zarr(). + :param mode: One of "w" for writing a new store, or "a" for appending to an existing store. """ # TODO: make it so we don't have to explicitly specify combine_dims @@ -657,6 +660,7 @@ class StoreToZarr(beam.PTransform, ZarrWriterMixin): dynamic_chunking_fn_kwargs: Optional[dict] = field(default_factory=dict) attrs: Dict[str, str] = field(default_factory=dict) encoding: Optional[dict] = field(default_factory=dict) + mode: Literal["w", "a"] = "w" def __post_init__(self): if self.target_chunks and self.dynamic_chunking_fn: @@ -684,6 +688,7 @@ def expand( target_chunks=target_chunks, attrs=self.attrs, encoding=self.encoding, + mode=self.mode, ) n_target_stores = rechunked_datasets | StoreDatasetFragments(target_store=target_store) singleton_target_store = ( From 9d18ba80ca20f470475531b73f01580375fd42a0 Mon Sep 17 00:00:00 2001 From: Charles Stern <62192187+cisaacstern@users.noreply.github.com> Date: Fri, 29 Mar 2024 11:00:29 -0700 Subject: [PATCH 03/23] appending end to end test WIP --- tests/conftest.py | 27 ++++++++++++++++++++++ tests/data_generation.py | 4 ++-- tests/test_end_to_end.py | 49 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 78 insertions(+), 2 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 2d02739d..73445571 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -276,6 +276,11 @@ def daily_xarray_dataset(): return make_ds(nt=10) +@pytest.fixture(scope="session") +def daily_xarray_datasets_to_append(): + return make_ds(nt=10, start="2010-01-01"), make_ds(nt=10, start="2010-01-11") + + @pytest.fixture(scope="session") def daily_xarray_dataset_with_coordinateless_dimension(daily_xarray_dataset): """ @@ -295,6 +300,23 @@ def netcdf_local_paths_sequential_1d(daily_xarray_dataset, tmpdir_factory): ) +@pytest.fixture(scope="session") +def netcdf_local_paths_sequential_1d_to_append( + daily_xarray_datasets_to_append, + tmpdir_factory, +): + return [ + make_local_paths( + ds, + tmpdir_factory, + "D", + split_up_files_by_day, + file_type="netcdf4", + ) + for ds in daily_xarray_datasets_to_append + ] + + @pytest.fixture(scope="session") def netcdf3_local_paths_sequential_1d(daily_xarray_dataset, tmpdir_factory): return make_local_paths( @@ -448,6 +470,11 @@ def netcdf_local_paths_sequential_with_coordinateless_dimension( # FilePattern fixtures ---------------------------------------------------------------------------- +@pytest.fixture(scope="session") +def netcdf_local_file_patterns_to_append(netcdf_local_paths_sequential_1d_to_append): + return [make_file_pattern(paths) for paths in netcdf_local_paths_sequential_1d_to_append] + + @pytest.fixture(scope="session") def netcdf_local_file_pattern_sequential(netcdf_local_paths_sequential): return make_file_pattern(netcdf_local_paths_sequential) diff --git a/tests/data_generation.py b/tests/data_generation.py index 729d4267..8d2c02b7 100644 --- a/tests/data_generation.py +++ b/tests/data_generation.py @@ -3,13 +3,13 @@ import xarray as xr -def make_ds(nt=10, non_dim_coords=False): +def make_ds(nt=10, non_dim_coords=False, start="2010-01-01"): """Return a synthetic random xarray dataset.""" np.random.seed(2) # TODO: change nt to 11 in order to catch the edge case where # items_per_input does not evenly divide the length of the sequence dimension ny, nx = 18, 36 - time = pd.date_range(start="2010-01-01", periods=nt, freq="D") + time = pd.date_range(start=start, periods=nt, freq="D") lon = (np.arange(nx) + 0.5) * 360 / nx lon_attrs = {"units": "degrees_east", "long_name": "longitude"} lat = (np.arange(ny) + 0.5) * 180 / ny diff --git a/tests/test_end_to_end.py b/tests/test_end_to_end.py index 0503eb46..a9bce5f2 100644 --- a/tests/test_end_to_end.py +++ b/tests/test_end_to_end.py @@ -83,6 +83,55 @@ def test_xarray_zarr_subpath( xr.testing.assert_equal(ds.load(), daily_xarray_dataset) +def test_xarray_zarr_append( + daily_xarray_datasets_to_append, + netcdf_local_file_patterns_to_append, + tmp_target, + pipeline, +): + ds0_fixture, ds1_fixture = daily_xarray_datasets_to_append + pattern0, pattern1 = netcdf_local_file_patterns_to_append + assert pattern0.combine_dim_keys == pattern1.combine_dim_keys + + ds_fixture_concat = xr.concat([ds0_fixture, ds1_fixture], dim="time") + assert len(ds_fixture_concat.time) == 20 + + # these kws are reused across both initial and append pipelines + common_kws = dict( + target_root=tmp_target, + store_name="store", + combine_dims=pattern0.combine_dim_keys, + ) + # build an initial zarr store, to which we will append + with pipeline as p: + ( + p + | "CreateInitial" >> beam.Create(pattern0.items()) + | "OpenInitial" >> OpenWithXarray() + | "StoreInitial" >> StoreToZarr(**common_kws) + ) + + # make sure the initial zarr store looks good + ds0 = xr.open_dataset(os.path.join(tmp_target.root_path, "store"), engine="zarr") + assert len(ds0.time) == 10 + xr.testing.assert_equal(ds0.load(), ds0_fixture) + + # now append to it. the two differences here are + # using `pattern1` in Create and `mode="a"` in `StoreToZarr` + with pipeline as p: + ( + p + | "CreateAppend" >> beam.Create(pattern1.items()) + | "OpenAppend" >> OpenWithXarray() + | "StoreAppend" >> StoreToZarr(mode="a", **common_kws) + ) + + # now see if we have appended to time dimension as intended + ds_concat = xr.open_dataset(os.path.join(tmp_target.root_path, "store"), engine="zarr") + assert len(ds_concat.time) == 20 + # xr.testing.assert_equal(ds1.load(), daily_xarray_dataset) + + @pytest.mark.parametrize("output_file_name", ["reference.json", "reference.parquet"]) def test_reference_netcdf( daily_xarray_dataset, From 342dd26207320bedd5115d94a4e4dffd9d4face8 Mon Sep 17 00:00:00 2001 From: Charles Stern <62192187+cisaacstern@users.noreply.github.com> Date: Fri, 29 Mar 2024 11:24:15 -0700 Subject: [PATCH 04/23] test_schema_to_zarr --- tests/test_aggregation.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/tests/test_aggregation.py b/tests/test_aggregation.py index eceaffb0..0379a628 100644 --- a/tests/test_aggregation.py +++ b/tests/test_aggregation.py @@ -8,7 +8,9 @@ dataset_to_schema, determine_target_chunks, schema_to_template_ds, + schema_to_zarr, ) +from pangeo_forge_recipes.storage import FSSpecTarget from .data_generation import make_ds @@ -190,3 +192,26 @@ def test_concat_accumulator(): assert ( merge_accumulator.schema["data_vars"]["bar"] == merge_accumulator.schema["data_vars"]["BAR"] ) + + +def test_schema_to_zarr(daily_xarray_dataset: xr.Dataset, tmp_target: FSSpecTarget): + target_store = tmp_target.get_mapper() + schema = dataset_to_schema(daily_xarray_dataset) + schema_to_zarr( + schema=schema, + target_store=target_store, + target_chunks={}, + attrs={}, + consolidated_metadata=False, + encoding=None, + mode="w", + ) + ds = xr.open_dataset(target_store, engine="zarr") + assert len(ds.time) == len(daily_xarray_dataset.time) + assert len(ds.lon) == len(daily_xarray_dataset.lon) + assert len(ds.lat) == len(daily_xarray_dataset.lat) + + +def test_schema_to_zarr_append_mode( + daily_xarray_datasets_to_append: tuple[xr.Dataset, xr.Dataset], +): ... From 401ee4682245be806a0b74749b886e12d09f056d Mon Sep 17 00:00:00 2001 From: Charles Stern <62192187+cisaacstern@users.noreply.github.com> Date: Fri, 29 Mar 2024 11:26:08 -0700 Subject: [PATCH 05/23] mypy --- tests/test_transforms.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_transforms.py b/tests/test_transforms.py index 8fdaf2e1..ba91d428 100644 --- a/tests/test_transforms.py +++ b/tests/test_transforms.py @@ -313,6 +313,7 @@ def dynamic_chunking_fn(template_ds: xr.Dataset, divisor: int = 1): store_name="test.zarr", combine_dims=pattern.combine_dim_keys, attrs={}, + mode="w", dynamic_chunking_fn=dynamic_chunking_fn, **kws, ) From b9f6b8b8e6d4c83bcc72b051de980f3e0abaf1ee Mon Sep 17 00:00:00 2001 From: Charles Stern <62192187+cisaacstern@users.noreply.github.com> Date: Fri, 29 Mar 2024 13:07:12 -0700 Subject: [PATCH 06/23] note writer offset + test --- pangeo_forge_recipes/writers.py | 13 ++++++++++++- tests/test_writers.py | 5 +++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/pangeo_forge_recipes/writers.py b/pangeo_forge_recipes/writers.py index fecd4d0f..f9977719 100644 --- a/pangeo_forge_recipes/writers.py +++ b/pangeo_forge_recipes/writers.py @@ -29,7 +29,13 @@ def _region_for(var: xr.Variable, index: Index) -> Tuple[slice, ...]: return tuple(region_slice) -def _store_data(vname: str, var: xr.Variable, index: Index, zgroup: zarr.Group) -> None: +def _store_data( + vname: str, + var: xr.Variable, + index: Index, + zgroup: zarr.Group, + offset: Optional[int] = None, +) -> None: zarr_array = zgroup[vname] # get encoding for variable from zarr attributes var_coded = var.copy() # copy needed for test suit to avoid modifying inputs in-place @@ -37,6 +43,11 @@ def _store_data(vname: str, var: xr.Variable, index: Index, zgroup: zarr.Group) var_coded.attrs = {} var = xr.backends.zarr.encode_zarr_variable(var_coded) data = np.asarray(var.data) + # FIXME: index here will be with respect to the currently running recipe, but NOT + # with respect to the aggregate (pre-existing) dataset, in the case of appending. + # So to make appending work, we will need to to pass an offset through here (from + # the outer transform layer) to make sure we're indexed to the correct offset for + # appending. region = _region_for(var, index) # check that the region evenly overlaps the zarr chunks for dimsize, chunksize, region_slice in zip(zarr_array.shape, zarr_array.chunks, region): diff --git a/tests/test_writers.py b/tests/test_writers.py index 20dd1047..72692e5e 100644 --- a/tests/test_writers.py +++ b/tests/test_writers.py @@ -154,6 +154,11 @@ def test_store_dataset_fragment(temp_store): assert ds.time.encoding.get("units") == ds_target.time.encoding.get("units") +def test_store_dataset_fragment_appending(temp_store): + ... + # test that we can write in append mode with correct index offsets + + def test_zarr_consolidate_metadata( netcdf_local_file_pattern, pipeline, From 9defde9a85a552769f36780108eb81bf50031fb9 Mon Sep 17 00:00:00 2001 From: Charles Stern <62192187+cisaacstern@users.noreply.github.com> Date: Fri, 29 Mar 2024 13:10:04 -0700 Subject: [PATCH 07/23] append_dim, not mode --- pangeo_forge_recipes/aggregation.py | 6 +++--- pangeo_forge_recipes/transforms.py | 14 +++++++------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/pangeo_forge_recipes/aggregation.py b/pangeo_forge_recipes/aggregation.py index 5a1968aa..b66e7415 100644 --- a/pangeo_forge_recipes/aggregation.py +++ b/pangeo_forge_recipes/aggregation.py @@ -2,7 +2,7 @@ from copy import deepcopy from dataclasses import dataclass, field -from typing import Dict, Literal, Optional, TypedDict +from typing import Dict, Optional, TypedDict import cftime import dask.array as dsa @@ -287,14 +287,14 @@ def schema_to_zarr( attrs: Optional[Dict[str, str]] = None, consolidated_metadata: Optional[bool] = True, encoding: Optional[Dict] = None, - mode: Literal["w", "a"] = "w", + append_dim: Optional[str] = None, ) -> zarr.storage.FSStore: """Initialize a zarr group based on a schema.""" ds = schema_to_template_ds(schema, specified_chunks=target_chunks, attrs=attrs) # using mode="w" makes this function idempotent ds.to_zarr( target_store, - mode=mode, + append_dim=append_dim, compute=False, consolidated=consolidated_metadata, encoding=encoding, diff --git a/pangeo_forge_recipes/transforms.py b/pangeo_forge_recipes/transforms.py index 0c6b7e6e..0df7c0cc 100644 --- a/pangeo_forge_recipes/transforms.py +++ b/pangeo_forge_recipes/transforms.py @@ -5,7 +5,7 @@ import random import sys from dataclasses import dataclass, field -from typing import Callable, Dict, List, Literal, Optional, Tuple, TypeVar, Union +from typing import Callable, Dict, List, Optional, Tuple, TypeVar, Union # PEP612 Concatenate & ParamSpec are useful for annotating decorators, but their import # differs between Python versions 3.9 & 3.10. See: https://stackoverflow.com/a/71990006 @@ -341,7 +341,7 @@ class PrepareZarrTarget(beam.PTransform): then falling out of sync with coordinates if ConsolidateDimensionCoordinates() is applied to the output of StoreToZarr(). - :param mode: One of "w" for writing a new store, or "a" for appending to an existing store. + :param append_dim: Optional name of the dimension to append to. """ target: str | FSSpecTarget @@ -349,7 +349,7 @@ class PrepareZarrTarget(beam.PTransform): attrs: Dict[str, str] = field(default_factory=dict) consolidated_metadata: Optional[bool] = True encoding: Optional[dict] = field(default_factory=dict) - mode: Literal["w", "a"] = "w" + append_dim: Optional[str] = None def expand(self, pcoll: beam.PCollection) -> beam.PCollection: if isinstance(self.target, str): @@ -364,7 +364,7 @@ def expand(self, pcoll: beam.PCollection) -> beam.PCollection: attrs=self.attrs, encoding=self.encoding, consolidated_metadata=False, - mode=self.mode, + append_dim=self.append_dim, ) return initialized_target @@ -645,7 +645,7 @@ class StoreToZarr(beam.PTransform, ZarrWriterMixin): :param dynamic_chunking_fn_kwargs: Optional keyword arguments for ``dynamic_chunking_fn``. :param attrs: Extra group-level attributes to inject into the dataset. :param encoding: Dictionary encoding for xarray.to_zarr(). - :param mode: One of "w" for writing a new store, or "a" for appending to an existing store. + :param append_dim: Optional name of the dimension to append to. """ # TODO: make it so we don't have to explicitly specify combine_dims @@ -660,7 +660,7 @@ class StoreToZarr(beam.PTransform, ZarrWriterMixin): dynamic_chunking_fn_kwargs: Optional[dict] = field(default_factory=dict) attrs: Dict[str, str] = field(default_factory=dict) encoding: Optional[dict] = field(default_factory=dict) - mode: Literal["w", "a"] = "w" + append_dim: Optional[str] = None def __post_init__(self): if self.target_chunks and self.dynamic_chunking_fn: @@ -688,7 +688,7 @@ def expand( target_chunks=target_chunks, attrs=self.attrs, encoding=self.encoding, - mode=self.mode, + append_dim=self.append_dim, ) n_target_stores = rechunked_datasets | StoreDatasetFragments(target_store=target_store) singleton_target_store = ( From 9eff6107c7fbfd6d70788b354c7142f6fb993279 Mon Sep 17 00:00:00 2001 From: Charles Stern <62192187+cisaacstern@users.noreply.github.com> Date: Fri, 29 Mar 2024 13:15:32 -0700 Subject: [PATCH 08/23] schema_to_zarr append test --- tests/test_aggregation.py | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/tests/test_aggregation.py b/tests/test_aggregation.py index 0379a628..8751a7c5 100644 --- a/tests/test_aggregation.py +++ b/tests/test_aggregation.py @@ -204,7 +204,7 @@ def test_schema_to_zarr(daily_xarray_dataset: xr.Dataset, tmp_target: FSSpecTarg attrs={}, consolidated_metadata=False, encoding=None, - mode="w", + append_dim=None, ) ds = xr.open_dataset(target_store, engine="zarr") assert len(ds.time) == len(daily_xarray_dataset.time) @@ -214,4 +214,25 @@ def test_schema_to_zarr(daily_xarray_dataset: xr.Dataset, tmp_target: FSSpecTarg def test_schema_to_zarr_append_mode( daily_xarray_datasets_to_append: tuple[xr.Dataset, xr.Dataset], -): ... + tmp_target: FSSpecTarget, +): + """Tests dimension resizing for append.""" + + ds0, ds1 = daily_xarray_datasets_to_append + target_store = tmp_target.get_mapper() + common_kws = dict( + target_store=target_store, + target_chunks={}, + attrs={}, + consolidated_metadata=False, + encoding=None, + ) + schema_ds0 = dataset_to_schema(ds0) + schema_to_zarr(schema=schema_ds0, append_dim=None, **common_kws) + ds0_zarr = xr.open_dataset(target_store, engine="zarr") + assert len(ds0_zarr.time) == len(ds0.time) + + schema_ds1 = dataset_to_schema(ds1) + schema_to_zarr(schema=schema_ds1, append_dim="time", **common_kws) + appended_zarr = xr.open_dataset(target_store, engine="zarr") + assert len(appended_zarr.time) == len(ds0.time) + len(ds1.time) From 9a3cb4be0836e81953ed26123dde77b413df6bda Mon Sep 17 00:00:00 2001 From: Charles Stern <62192187+cisaacstern@users.noreply.github.com> Date: Fri, 29 Mar 2024 15:09:47 -0700 Subject: [PATCH 09/23] fix broken test --- tests/test_transforms.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/tests/test_transforms.py b/tests/test_transforms.py index 91e2e4d4..1dad82cf 100644 --- a/tests/test_transforms.py +++ b/tests/test_transforms.py @@ -304,7 +304,7 @@ def dynamic_chunking_fn(template_ds: xr.Dataset, divisor: int = 1): assert isinstance(template_ds, xr.Dataset) return {"time": int(time_len / divisor)} - kws = {} if not with_kws else {"dynamic_chunking_fn_kwargs": {"divisor": 2}} + dynamic_chunking_fn_kwargs = {} if not with_kws else {"divisor": 2} with pipeline as p: datasets = p | beam.Create(pattern.items()) | OpenWithXarray() @@ -312,10 +312,8 @@ def dynamic_chunking_fn(template_ds: xr.Dataset, divisor: int = 1): target_root=tmp_target, store_name="test.zarr", combine_dims=pattern.combine_dim_keys, - attrs={}, - mode="w", dynamic_chunking_fn=dynamic_chunking_fn, - **kws, + dynamic_chunking_fn_kwargs=dynamic_chunking_fn_kwargs, ) open_store = target_store | OpenZarrStore() assert_that(open_store, has_dynamically_set_chunks()) From cd13679e5045ea6d2c2c09e1d1d63bc522a84211 Mon Sep 17 00:00:00 2001 From: Charles Stern <62192187+cisaacstern@users.noreply.github.com> Date: Fri, 29 Mar 2024 16:04:39 -0700 Subject: [PATCH 10/23] simplify schema to zarr append test --- tests/test_aggregation.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/tests/test_aggregation.py b/tests/test_aggregation.py index 8751a7c5..a69e6bab 100644 --- a/tests/test_aggregation.py +++ b/tests/test_aggregation.py @@ -220,19 +220,13 @@ def test_schema_to_zarr_append_mode( ds0, ds1 = daily_xarray_datasets_to_append target_store = tmp_target.get_mapper() - common_kws = dict( - target_store=target_store, - target_chunks={}, - attrs={}, - consolidated_metadata=False, - encoding=None, - ) + schema_ds0 = dataset_to_schema(ds0) - schema_to_zarr(schema=schema_ds0, append_dim=None, **common_kws) + schema_to_zarr(schema=schema_ds0, append_dim=None, target_store=target_store) ds0_zarr = xr.open_dataset(target_store, engine="zarr") assert len(ds0_zarr.time) == len(ds0.time) schema_ds1 = dataset_to_schema(ds1) - schema_to_zarr(schema=schema_ds1, append_dim="time", **common_kws) + schema_to_zarr(schema=schema_ds1, append_dim="time", target_store=target_store) appended_zarr = xr.open_dataset(target_store, engine="zarr") assert len(appended_zarr.time) == len(ds0.time) + len(ds1.time) From d0cbb3fddbd68a178b0120bd3dab9e831176c08e Mon Sep 17 00:00:00 2001 From: Charles Stern <62192187+cisaacstern@users.noreply.github.com> Date: Fri, 29 Mar 2024 16:05:24 -0700 Subject: [PATCH 11/23] get dimension resizing to work in end to end test --- tests/test_end_to_end.py | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/tests/test_end_to_end.py b/tests/test_end_to_end.py index a9bce5f2..3866ea69 100644 --- a/tests/test_end_to_end.py +++ b/tests/test_end_to_end.py @@ -87,7 +87,6 @@ def test_xarray_zarr_append( daily_xarray_datasets_to_append, netcdf_local_file_patterns_to_append, tmp_target, - pipeline, ): ds0_fixture, ds1_fixture = daily_xarray_datasets_to_append pattern0, pattern1 = netcdf_local_file_patterns_to_append @@ -102,34 +101,42 @@ def test_xarray_zarr_append( store_name="store", combine_dims=pattern0.combine_dim_keys, ) + store_path = os.path.join(tmp_target.root_path, "store") # build an initial zarr store, to which we will append - with pipeline as p: + options = PipelineOptions(runtime_type_check=False) + # we run two pipelines in this test, so instantiate them separately to + # avoid any potential of strange co-mingling between the same pipeline + with TestPipeline(options=options) as p0: ( - p + p0 | "CreateInitial" >> beam.Create(pattern0.items()) | "OpenInitial" >> OpenWithXarray() | "StoreInitial" >> StoreToZarr(**common_kws) ) # make sure the initial zarr store looks good - ds0 = xr.open_dataset(os.path.join(tmp_target.root_path, "store"), engine="zarr") + ds0 = xr.open_dataset(store_path, engine="zarr") assert len(ds0.time) == 10 xr.testing.assert_equal(ds0.load(), ds0_fixture) # now append to it. the two differences here are # using `pattern1` in Create and `mode="a"` in `StoreToZarr` - with pipeline as p: + with TestPipeline(options=options) as p1: ( - p + p1 | "CreateAppend" >> beam.Create(pattern1.items()) | "OpenAppend" >> OpenWithXarray() - | "StoreAppend" >> StoreToZarr(mode="a", **common_kws) + | "StoreAppend" >> StoreToZarr(append_dim="time", **common_kws) ) # now see if we have appended to time dimension as intended - ds_concat = xr.open_dataset(os.path.join(tmp_target.root_path, "store"), engine="zarr") + ds_concat = xr.open_dataset(store_path, engine="zarr") assert len(ds_concat.time) == 20 - # xr.testing.assert_equal(ds1.load(), daily_xarray_dataset) + # FIXME: now check that the data is actually written where we want it to be + # we don't expect this to be the case yet, since we haven't added offests to + # the _store_data writer. in this test, presumably the append is just + # overwriting the existing data. (but we are getting dimension resizing, which + # is a good start!) @pytest.mark.parametrize("output_file_name", ["reference.json", "reference.parquet"]) From 3b4c31e2635f6d5b4aebb3034f3a3f871e6b4654 Mon Sep 17 00:00:00 2001 From: Charles Stern <62192187+cisaacstern@users.noreply.github.com> Date: Sun, 31 Mar 2024 14:18:09 -0700 Subject: [PATCH 12/23] append_offset for aughment_index... --- pangeo_forge_recipes/patterns.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/pangeo_forge_recipes/patterns.py b/pangeo_forge_recipes/patterns.py index dbe7d224..9c74c9d8 100644 --- a/pangeo_forge_recipes/patterns.py +++ b/pangeo_forge_recipes/patterns.py @@ -63,18 +63,23 @@ class MergeDim(CombineDim): operation: ClassVar[CombineOp] = CombineOp.MERGE -def augment_index_with_start_stop(position: Position, item_lens: List[int]) -> IndexedPosition: +def augment_index_with_start_stop( + position: Position, + item_lens: List[int], + append_offset: int = 0, +) -> IndexedPosition: """Take an index _without_ start / stop and add them based on the lens defined in sequence_lens. :param index: The ``DimIndex`` instance to augment. :param item_lens: A list of integer lengths for all items in the sequence. + :param append_offset: If appending, the length of the existing ``append_dim``. """ if position.indexed: raise ValueError("This position is already indexed") start = sum(item_lens[: position.value]) dimsize = sum(item_lens) - return IndexedPosition(start, dimsize=dimsize) + return IndexedPosition(start + append_offset, dimsize=dimsize + append_offset) class AutoName(Enum): From 5b43daecd9e33e67d3dc58a9b8bd4d5f2e83963f Mon Sep 17 00:00:00 2001 From: Charles Stern <62192187+cisaacstern@users.noreply.github.com> Date: Sun, 31 Mar 2024 14:20:03 -0700 Subject: [PATCH 13/23] pass append_offset through from transforms layer --- pangeo_forge_recipes/transforms.py | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/pangeo_forge_recipes/transforms.py b/pangeo_forge_recipes/transforms.py index 0df7c0cc..5acd064a 100644 --- a/pangeo_forge_recipes/transforms.py +++ b/pangeo_forge_recipes/transforms.py @@ -301,21 +301,26 @@ class IndexItems(beam.PTransform): """Augment dataset indexes with information about start and stop position.""" schema: beam.PCollection + append_offset: int = 0 @staticmethod - def index_item(item: Indexed[T], schema: XarraySchema) -> Indexed[T]: + def index_item(item: Indexed[T], schema: XarraySchema, append_offset: int) -> Indexed[T]: index, ds = item new_index = Index() for dimkey, dimval in index.items(): if dimkey.operation == CombineOp.CONCAT: item_len_dict = schema["chunks"][dimkey.name] item_lens = [item_len_dict[n] for n in range(len(item_len_dict))] - dimval = augment_index_with_start_stop(dimval, item_lens) + dimval = augment_index_with_start_stop(dimval, item_lens, append_offset) new_index[dimkey] = dimval return new_index, ds def expand(self, pcoll: beam.PCollection): - return pcoll | beam.Map(self.index_item, schema=beam.pvalue.AsSingleton(self.schema)) + return pcoll | beam.Map( + self.index_item, + schema=beam.pvalue.AsSingleton(self.schema), + append_offset=self.append_offset, + ) @dataclass @@ -666,12 +671,21 @@ def __post_init__(self): if self.target_chunks and self.dynamic_chunking_fn: raise ValueError("Passing both `target_chunks` and `dynamic_chunking_fn` not allowed.") + self._append_offset = 0 + if self.append_dim: + dim = [d for d in self.combine_dims if d.name == self.append_dim] + assert dim, f"Append dim not in {self.combine_dims=}." + assert dim[0].operation == CombineOp.CONCAT, "Append dim operation must be CONCAT." + existing_ds = xr.open_dataset(self.get_full_target().get_mapper(), engine="zarr") + assert self.append_dim in existing_ds, "Append dim must be in existing dataset." + self._append_offset = len(existing_ds[self.append_dim]) + def expand( self, datasets: beam.PCollection[Tuple[Index, xr.Dataset]], ) -> beam.PCollection[zarr.storage.FSStore]: schema = datasets | DetermineSchema(combine_dims=self.combine_dims) - indexed_datasets = datasets | IndexItems(schema=schema) + indexed_datasets = datasets | IndexItems(schema=schema, append_offset=self._append_offset) target_chunks = ( self.target_chunks if not self.dynamic_chunking_fn From 99e41edf768a344ff7c0ac2b6a143eee672d855c Mon Sep 17 00:00:00 2001 From: Charles Stern <62192187+cisaacstern@users.noreply.github.com> Date: Sun, 31 Mar 2024 14:20:46 -0700 Subject: [PATCH 14/23] only append schema for append dim coord --- pangeo_forge_recipes/aggregation.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pangeo_forge_recipes/aggregation.py b/pangeo_forge_recipes/aggregation.py index b66e7415..1ac20f99 100644 --- a/pangeo_forge_recipes/aggregation.py +++ b/pangeo_forge_recipes/aggregation.py @@ -290,11 +290,16 @@ def schema_to_zarr( append_dim: Optional[str] = None, ) -> zarr.storage.FSStore: """Initialize a zarr group based on a schema.""" + if append_dim: + # if appending, only keep schema for coordinate to append. if we don't drop other + # coords, we may end up overwriting existing data on the `ds.to_zarr` call below. + schema["coords"] = {k: v for k, v in schema["coords"].items() if k == append_dim} ds = schema_to_template_ds(schema, specified_chunks=target_chunks, attrs=attrs) - # using mode="w" makes this function idempotent + # using mode="w" makes this function idempotent when not appending ds.to_zarr( target_store, append_dim=append_dim, + mode=("a" if append_dim else "w"), compute=False, consolidated=consolidated_metadata, encoding=encoding, From 4bf2eae11078962c651aa5d52967e6e94605e598 Mon Sep 17 00:00:00 2001 From: Charles Stern <62192187+cisaacstern@users.noreply.github.com> Date: Sun, 31 Mar 2024 14:21:22 -0700 Subject: [PATCH 15/23] appending end to end test passes --- tests/test_end_to_end.py | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/tests/test_end_to_end.py b/tests/test_end_to_end.py index 3866ea69..f2abe74f 100644 --- a/tests/test_end_to_end.py +++ b/tests/test_end_to_end.py @@ -115,12 +115,12 @@ def test_xarray_zarr_append( ) # make sure the initial zarr store looks good - ds0 = xr.open_dataset(store_path, engine="zarr") - assert len(ds0.time) == 10 - xr.testing.assert_equal(ds0.load(), ds0_fixture) + initial_actual = xr.open_dataset(store_path, engine="zarr") + assert len(initial_actual.time) == 10 + xr.testing.assert_equal(initial_actual.load(), ds0_fixture) # now append to it. the two differences here are - # using `pattern1` in Create and `mode="a"` in `StoreToZarr` + # passing `pattern1` in `Create` and `append_dim="time"` in `StoreToZarr` with TestPipeline(options=options) as p1: ( p1 @@ -130,13 +130,10 @@ def test_xarray_zarr_append( ) # now see if we have appended to time dimension as intended - ds_concat = xr.open_dataset(store_path, engine="zarr") - assert len(ds_concat.time) == 20 - # FIXME: now check that the data is actually written where we want it to be - # we don't expect this to be the case yet, since we haven't added offests to - # the _store_data writer. in this test, presumably the append is just - # overwriting the existing data. (but we are getting dimension resizing, which - # is a good start!) + append_actual = xr.open_dataset(store_path, engine="zarr") + assert len(append_actual.time) == 20 + append_expected = xr.concat([ds0_fixture, ds1_fixture], dim="time") + xr.testing.assert_equal(append_actual.load(), append_expected) @pytest.mark.parametrize("output_file_name", ["reference.json", "reference.parquet"]) From 468f3262923cac474ae48df1e3cdf5cb0f8b0a21 Mon Sep 17 00:00:00 2001 From: Charles Stern <62192187+cisaacstern@users.noreply.github.com> Date: Sun, 31 Mar 2024 14:31:31 -0700 Subject: [PATCH 16/23] revert writers changes --- pangeo_forge_recipes/writers.py | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/pangeo_forge_recipes/writers.py b/pangeo_forge_recipes/writers.py index f9977719..fecd4d0f 100644 --- a/pangeo_forge_recipes/writers.py +++ b/pangeo_forge_recipes/writers.py @@ -29,13 +29,7 @@ def _region_for(var: xr.Variable, index: Index) -> Tuple[slice, ...]: return tuple(region_slice) -def _store_data( - vname: str, - var: xr.Variable, - index: Index, - zgroup: zarr.Group, - offset: Optional[int] = None, -) -> None: +def _store_data(vname: str, var: xr.Variable, index: Index, zgroup: zarr.Group) -> None: zarr_array = zgroup[vname] # get encoding for variable from zarr attributes var_coded = var.copy() # copy needed for test suit to avoid modifying inputs in-place @@ -43,11 +37,6 @@ def _store_data( var_coded.attrs = {} var = xr.backends.zarr.encode_zarr_variable(var_coded) data = np.asarray(var.data) - # FIXME: index here will be with respect to the currently running recipe, but NOT - # with respect to the aggregate (pre-existing) dataset, in the case of appending. - # So to make appending work, we will need to to pass an offset through here (from - # the outer transform layer) to make sure we're indexed to the correct offset for - # appending. region = _region_for(var, index) # check that the region evenly overlaps the zarr chunks for dimsize, chunksize, region_slice in zip(zarr_array.shape, zarr_array.chunks, region): From 5ce2d6271501ef92973fddf63b9927b106a2ac83 Mon Sep 17 00:00:00 2001 From: Charles Stern <62192187+cisaacstern@users.noreply.github.com> Date: Sun, 31 Mar 2024 14:32:32 -0700 Subject: [PATCH 17/23] remove unused variable in test --- tests/test_end_to_end.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/tests/test_end_to_end.py b/tests/test_end_to_end.py index f2abe74f..37c0d60c 100644 --- a/tests/test_end_to_end.py +++ b/tests/test_end_to_end.py @@ -92,9 +92,6 @@ def test_xarray_zarr_append( pattern0, pattern1 = netcdf_local_file_patterns_to_append assert pattern0.combine_dim_keys == pattern1.combine_dim_keys - ds_fixture_concat = xr.concat([ds0_fixture, ds1_fixture], dim="time") - assert len(ds_fixture_concat.time) == 20 - # these kws are reused across both initial and append pipelines common_kws = dict( target_root=tmp_target, From 7eb866cfb7b96a9f340ba38279cb9a254f54f359 Mon Sep 17 00:00:00 2001 From: Charles Stern <62192187+cisaacstern@users.noreply.github.com> Date: Sun, 31 Mar 2024 14:49:46 -0700 Subject: [PATCH 18/23] test append dim asserts raises --- tests/test_transforms.py | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/tests/test_transforms.py b/tests/test_transforms.py index 1dad82cf..47976af5 100644 --- a/tests/test_transforms.py +++ b/tests/test_transforms.py @@ -7,7 +7,7 @@ from pytest_lazyfixture import lazy_fixture from pangeo_forge_recipes.aggregation import dataset_to_schema -from pangeo_forge_recipes.patterns import FilePattern, FileType +from pangeo_forge_recipes.patterns import ConcatDim, FilePattern, FileType, MergeDim from pangeo_forge_recipes.storage import CacheFSSpecTarget, FSSpecTarget from pangeo_forge_recipes.transforms import ( DetermineSchema, @@ -340,6 +340,25 @@ def fn(template_ds): ) +@pytest.mark.parametrize( + "append_dim, match", + [ + ("date", "Append dim not in self.combine_dims"), + ("var", "Append dim operation must be CONCAT."), + ], +) +def test_StoreToZarr_append_dim_asserts_raises(append_dim, match): + pattern = FilePattern(lambda x: x, ConcatDim("time", [1, 2]), MergeDim("var", ["foo", "bar"])) + kws = dict( + target_root="target", + store_name="test.zarr", + combine_dims=pattern.combine_dim_keys, + target_chunks={"time": 1}, + ) + with pytest.raises(AssertionError, match=match): + _ = StoreToZarr(append_dim=append_dim, **kws) + + def test_StoreToZarr_target_root_default_unrunnable( pipeline, netcdf_local_file_pattern_sequential, From 6822f563a5219e995f3f6dec74cadd41a7850755 Mon Sep 17 00:00:00 2001 From: Charles Stern <62192187+cisaacstern@users.noreply.github.com> Date: Sun, 31 Mar 2024 14:54:02 -0700 Subject: [PATCH 19/23] unit test augment with append offset --- tests/test_patterns.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/test_patterns.py b/tests/test_patterns.py index 2fe81df7..646e5d82 100644 --- a/tests/test_patterns.py +++ b/tests/test_patterns.py @@ -205,8 +205,9 @@ def test_setting_file_types(file_type_value): "position,start", [(0, 0), (1, 2), (2, 4), (3, 7), (4, 9)], ) -def test_augment_index_with_start_stop(position, start): +@pytest.mark.parametrize("append_offset", [0, 5, 500]) +def test_augment_index_with_start_stop(position, start, append_offset): dk = Position(position) - expected = IndexedPosition(start, dimsize=11) - actual = augment_index_with_start_stop(dk, [2, 2, 3, 2, 2]) + expected = IndexedPosition(start + append_offset, dimsize=11 + append_offset) + actual = augment_index_with_start_stop(dk, [2, 2, 3, 2, 2], append_offset) assert actual == expected From ffd21b92e8d0d8165a006b4c61cbace930af8157 Mon Sep 17 00:00:00 2001 From: Charles Stern <62192187+cisaacstern@users.noreply.github.com> Date: Sun, 31 Mar 2024 15:01:49 -0700 Subject: [PATCH 20/23] idempotentcy warning --- pangeo_forge_recipes/transforms.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pangeo_forge_recipes/transforms.py b/pangeo_forge_recipes/transforms.py index 5acd064a..58b582c1 100644 --- a/pangeo_forge_recipes/transforms.py +++ b/pangeo_forge_recipes/transforms.py @@ -673,6 +673,10 @@ def __post_init__(self): self._append_offset = 0 if self.append_dim: + logger.warn( + "When `append_dim` is given, StoreToZarr is NOT idempotent. Successive deployment " + "with the same inputs will append duplicate data to the existing store." + ) dim = [d for d in self.combine_dims if d.name == self.append_dim] assert dim, f"Append dim not in {self.combine_dims=}." assert dim[0].operation == CombineOp.CONCAT, "Append dim operation must be CONCAT." From a51d3cc66cd8e5e1d31cee317b5db9ff8526f80f Mon Sep 17 00:00:00 2001 From: Charles Stern <62192187+cisaacstern@users.noreply.github.com> Date: Sun, 31 Mar 2024 15:41:01 -0700 Subject: [PATCH 21/23] remove unused test stub --- tests/test_writers.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/tests/test_writers.py b/tests/test_writers.py index 72692e5e..20dd1047 100644 --- a/tests/test_writers.py +++ b/tests/test_writers.py @@ -154,11 +154,6 @@ def test_store_dataset_fragment(temp_store): assert ds.time.encoding.get("units") == ds_target.time.encoding.get("units") -def test_store_dataset_fragment_appending(temp_store): - ... - # test that we can write in append mode with correct index offsets - - def test_zarr_consolidate_metadata( netcdf_local_file_pattern, pipeline, From aaa5a694ef3371dd92d2dbf6f96846a6fa199834 Mon Sep 17 00:00:00 2001 From: Charles Stern <62192187+cisaacstern@users.noreply.github.com> Date: Tue, 2 Apr 2024 13:48:25 -0700 Subject: [PATCH 22/23] appending note in docs --- docs/composition/styles.md | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/docs/composition/styles.md b/docs/composition/styles.md index 4379e27a..13448997 100644 --- a/docs/composition/styles.md +++ b/docs/composition/styles.md @@ -32,6 +32,21 @@ If using the {class}`pangeo_forge_recipes.transforms.ConsolidateDimensionCoordin ``` +```{note} +{class}`pangeo_forge_recipes.transforms.StoreToZarr` supports appending to existing Zarr stores +via the optional `append_dim` keyword argument. This option functions nearly identically to the +`append_dim` kwarg in +[`xarray.Dataset.to_zarr`](https://docs.xarray.dev/en/latest/generated/xarray.Dataset.to_zarr.html); +the two differences with this method are that Pangeo Forge will automatically introspect the inputs in +your {class}`FilePattern ` to determine how the existing Zarr +store dimensions need to be resized, and that writes are parallelized via Apache Beam. Apart from +ensuring that the named `append_dim` already exists in the dataset to which you are appending, use of +this option does not ensure logical consistency (e.g. contiguousness, etc.) of the appended data. When +selecting this option, it is therefore up to you, the user, to ensure that the inputs provided in the +{doc} `file pattern ` for the appending recipe are limited to those which you want to +append. +``` + ## Open with Kerchunk, write to virtual Zarr From c75a2745c53682a8ac5723c53963582a5776b28c Mon Sep 17 00:00:00 2001 From: Charles Stern <62192187+cisaacstern@users.noreply.github.com> Date: Tue, 2 Apr 2024 14:50:54 -0700 Subject: [PATCH 23/23] fix doc crosslink --- docs/composition/styles.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/composition/styles.md b/docs/composition/styles.md index 13448997..be697b03 100644 --- a/docs/composition/styles.md +++ b/docs/composition/styles.md @@ -43,7 +43,7 @@ store dimensions need to be resized, and that writes are parallelized via Apache ensuring that the named `append_dim` already exists in the dataset to which you are appending, use of this option does not ensure logical consistency (e.g. contiguousness, etc.) of the appended data. When selecting this option, it is therefore up to you, the user, to ensure that the inputs provided in the -{doc} `file pattern ` for the appending recipe are limited to those which you want to +{doc}`file pattern ` for the appending recipe are limited to those which you want to append. ```