diff --git a/configs/config_dataflow.py b/configs/config_dataflow.py index fdc1bf8..6b97e15 100644 --- a/configs/config_dataflow.py +++ b/configs/config_dataflow.py @@ -8,7 +8,7 @@ c.Bake.bakery_class = "pangeo_forge_runner.bakery.dataflow.DataflowBakery" c.DataflowBakery.use_dataflow_prime = False c.DataflowBakery.machine_type = "e2-highmem-4" -c.DataflowBakery.max_workers = 40 +c.DataflowBakery.max_num_workers = 50 c.DataflowBakery.use_public_ips = True c.DataflowBakery.service_account_email = ( "leap-community-bakery@leap-pangeo.iam.gserviceaccount.com" @@ -16,6 +16,6 @@ c.DataflowBakery.project_id = "leap-pangeo" c.DataflowBakery.temp_gcs_location = f"gs://leap-scratch/data-library/feedstocks/temp/{FEEDSTOCK_NAME}" c.TargetStorage.fsspec_class = "gcsfs.GCSFileSystem" -c.InputCacheStorage.fsspec_class = "gcsfs.GCSFileSystem" +# c.InputCacheStorage.fsspec_class = "gcsfs.GCSFileSystem" c.TargetStorage.root_path = f"gs://leap-scratch/data-library/feedstocks/output/{FEEDSTOCK_NAME}/{{job_name}}" -c.InputCacheStorage.root_path = f"gs://leap-scratch/data-library/feedstocks/cache" +# c.InputCacheStorage.root_path = f"gs://leap-scratch/data-library/feedstocks/cache" diff --git a/feedstock/catalog.yaml b/feedstock/catalog.yaml index 13c9840..e5c2881 100644 --- a/feedstock/catalog.yaml +++ b/feedstock/catalog.yaml @@ -7,3 +7,6 @@ stores: - id: "climsim-lowres-mli" name: "ClimSim Lowres mli" url: "gs://leap-persistent-ro/data-library/feedstocks/climsim_feedstock/climsim_lowres_mli.zarr" + - id: "climsim-lowres-mlo" + name: "ClimSim Lowres mlo" + url: "gs://leap-persistent-ro/data-library/feedstocks/climsim_feedstock/climsim_lowres_mlo.zarr" diff --git a/feedstock/meta.yaml b/feedstock/meta.yaml index 643dca4..9c57741 100644 --- a/feedstock/meta.yaml +++ b/feedstock/meta.yaml @@ -4,6 +4,8 @@ description: > recipes: - id: "climsim-lowres-mli" object: "recipe:climsim_lowres_mli" + - id: "climsim-lowres-mlo" + object: "recipe:climsim_lowres_mlo" provenance: providers: - name: "Hugging Face" @@ -22,3 +24,6 @@ maintainers: - name: "Qingyuan Yang" orcid: "https://orcid.org/0000-0002-5631-889X" github: yiqioyang + - name: "Julius Busecke" + orcid: "https://orcid.org/0000-0001-8571-865X" + github: jbusecke diff --git a/feedstock/recipe.py b/feedstock/recipe.py index 6f78b91..85252b2 100644 --- a/feedstock/recipe.py +++ b/feedstock/recipe.py @@ -16,11 +16,14 @@ StoreToZarr, ConsolidateMetadata, ConsolidateDimensionCoordinates, + CheckpointFileTransfer, ) +from pangeo_forge_recipes.storage import CacheFSSpecTarget ####################################### import datetime as dt import functools +import gcsfs import cftime from pangeo_forge_recipes.patterns import ConcatDim, FilePattern @@ -193,15 +196,24 @@ def expand(self, pcoll: beam.PCollection) -> beam.PCollection: times = [t for t in generate_times()] -# Debug this recipe by running fewer times -times = times[0:90000] concat_dim = ConcatDim("time", keys=times) +cache_target = CacheFSSpecTarget( + fs = gcsfs.GCSFileSystem(), + root_path="gs://leap-scratch/data-library/feedstocks/cache_test" +) lowres_mli_make_url = functools.partial(make_url, ds_type="mli") lowres_mli_pattern = FilePattern(lowres_mli_make_url, concat_dim) climsim_lowres_mli = ( beam.Create(lowres_mli_pattern.items()) - | OpenURLWithFSSpec(max_concurrency=20) + | CheckpointFileTransfer( + transfer_target=cache_target, + max_executors=10, + concurrency_per_executor=2, + initial_backoff=3.0, + fsspec_sync_patch=False,# works but is slow. Testing with fsspec and new backoff retry + ) + | OpenURLWithFSSpec(cache=None, fsspec_sync_patch=True) | OpenWithXarray( # FIXME: Get files to open without `copy_to_local=True` # Related: what is the filetype? Looks like netcdf3, but for some reason @@ -220,3 +232,36 @@ def expand(self, pcoll: beam.PCollection) -> beam.PCollection: | ConsolidateMetadata() | Copy(target=catalog_store_urls["climsim-lowres-mli"]) ) + + +lowres_mlo_make_url = functools.partial(make_url, ds_type="mlo") +lowres_mlo_pattern = FilePattern(lowres_mlo_make_url, concat_dim) +climsim_lowres_mlo = ( + beam.Create(lowres_mlo_pattern.items()) + | CheckpointFileTransfer( + transfer_target=cache_target, + max_executors=5, + concurrency_per_executor=20, + fsspec_sync_patch=False,# works but is slow. Testing with fsspec and new backoff retry + ) + | OpenURLWithFSSpec(cache=None, fsspec_sync_patch=True) + | OpenWithXarray( + # FIXME: Get files to open without `copy_to_local=True` + # Related: what is the filetype? Looks like netcdf3, but for some reason + # `scipy` backend can't open them, and `netcdf4` can? + copy_to_local=True, + xarray_open_kwargs=dict(engine="netcdf4"), + ) + | ExpandTimeDimAndAddMetadata() + | StoreToZarr( + store_name="climsim-lowres-mlo.zarr", + target_chunks={"time": 600}, + combine_dims=lowres_mlo_pattern.combine_dim_keys, + ) + | InjectAttrs() + | ConsolidateDimensionCoordinates() + | ConsolidateMetadata() + | Copy(target=catalog_store_urls["climsim-lowres-mlo"]) +) + +#TODO: Reuse pipeline parts and parameters diff --git a/feedstock/requirements.txt b/feedstock/requirements.txt index 83f3af0..2841deb 100644 --- a/feedstock/requirements.txt +++ b/feedstock/requirements.txt @@ -1,8 +1,7 @@ leap-data-management-utils==0.0.12 -git+https://github.com/ranchodeluxe/xarray@ranchodeluxe-patch-1#egg=xarray -git+https://github.com/ranchodeluxe/rioxarray -git+https://github.com/ranchodeluxe/datatree@main#egg=xarray-datatree -git+https://github.com/pangeo-forge/pangeo-forge-recipes@jb/xarray-hack #see @gc/cached_disabled but with cache +git+https://github.com/pangeo-forge/pangeo-forge-recipes@5ee286d2e2c3e1c68a0c7e5885c17c8a9a70e8ca +httpfs-sync>=0.0.2 +#git+https://github.com/moradology/httpfs-sync.git@feature/pool-reuse zarr==2.16.1 gcsfs apache-beam[gcp]