Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test CheckpointFileTransfer from recipes PR #7

Open
wants to merge 42 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
e803d7c
Use PR branch + deactivate cache in config + add concurrent stage
jbusecke Jun 3, 2024
eb05a50
Fix typo in dependencies
jbusecke Jun 3, 2024
bdc81a4
Disable the garbage collection hack
jbusecke Jun 3, 2024
1f9755a
Limit concurrency to number of cores
jbusecke Jun 3, 2024
3700585
Wrap cache_target explicitly in CacheFSpecTarget
jbusecke Jun 4, 2024
86d2be2
Fix fs for cache target
jbusecke Jun 4, 2024
21326c7
Update requirements.txt
jbusecke Jun 4, 2024
79c9764
Update recipe.py
jbusecke Jun 4, 2024
8245abf
Update recipe.py
jbusecke Jun 4, 2024
2e33bc1
remove time subset, redirect to regular cache dir, increase concurrency
jbusecke Jun 4, 2024
648de51
Update recipe.py
jbusecke Jun 4, 2024
b17c848
Update requirements.txt
jbusecke Jun 4, 2024
816c650
Update recipe.py
jbusecke Jun 4, 2024
670b208
Update recipe.py
jbusecke Jun 4, 2024
f012eb1
Pump up executors
jbusecke Jun 4, 2024
38e5382
Update recipe.py
jbusecke Jun 5, 2024
0ca4d72
Update recipe.py
jbusecke Jun 5, 2024
c3d0946
Update config_dataflow.py
jbusecke Jun 5, 2024
cc104ec
Update recipe.py
jbusecke Jun 5, 2024
14a1d98
Update recipe.py
jbusecke Jun 5, 2024
bc4d195
Pin httpfs-sync >= 0.0.2
jbusecke Jun 5, 2024
ab91a90
Update recipe.py
jbusecke Jun 5, 2024
1375ac8
Update requirements.txt
jbusecke Jun 5, 2024
950eae5
Update recipe.py
jbusecke Jun 5, 2024
ffbe444
add mlo dataset
jbusecke Jun 5, 2024
c40a81b
Update meta.yaml
jbusecke Jun 5, 2024
686e63a
Update recipe.py
jbusecke Jun 5, 2024
92e51b8
Update catalog.yaml
jbusecke Jun 5, 2024
54cbb34
Update recipe.py
jbusecke Jun 5, 2024
c364cc1
Update recipe.py
jbusecke Jun 5, 2024
759b4b4
Update recipe.py
jbusecke Jun 5, 2024
cc481cf
Update recipe.py
jbusecke Jun 5, 2024
ddb6515
Update recipe.py
jbusecke Jun 7, 2024
aa6c849
Update requirements.txt
jbusecke Jun 7, 2024
4f9751c
Switch back to the ugly hack
jbusecke Jun 7, 2024
7cb4770
Update requirements.txt
jbusecke Jun 7, 2024
ffb0959
Update config_dataflow.py
jbusecke Jun 10, 2024
2b29371
Update requirements.txt
jbusecke Jun 12, 2024
0d3b392
Update recipe.py
jbusecke Jun 12, 2024
a8604dc
Update requirements.txt
jbusecke Jun 12, 2024
4053892
New cache dir + force full redownload
jbusecke Jun 12, 2024
612f102
Use new version with size check after download
jbusecke Jun 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions configs/config_dataflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@
c.Bake.bakery_class = "pangeo_forge_runner.bakery.dataflow.DataflowBakery"
c.DataflowBakery.use_dataflow_prime = False
c.DataflowBakery.machine_type = "e2-highmem-4"
c.DataflowBakery.max_workers = 40
c.DataflowBakery.max_num_workers = 50
c.DataflowBakery.use_public_ips = True
c.DataflowBakery.service_account_email = (
"[email protected]"
)
c.DataflowBakery.project_id = "leap-pangeo"
c.DataflowBakery.temp_gcs_location = f"gs://leap-scratch/data-library/feedstocks/temp/{FEEDSTOCK_NAME}"
c.TargetStorage.fsspec_class = "gcsfs.GCSFileSystem"
c.InputCacheStorage.fsspec_class = "gcsfs.GCSFileSystem"
# c.InputCacheStorage.fsspec_class = "gcsfs.GCSFileSystem"
c.TargetStorage.root_path = f"gs://leap-scratch/data-library/feedstocks/output/{FEEDSTOCK_NAME}/{{job_name}}"
c.InputCacheStorage.root_path = f"gs://leap-scratch/data-library/feedstocks/cache"
# c.InputCacheStorage.root_path = f"gs://leap-scratch/data-library/feedstocks/cache"
3 changes: 3 additions & 0 deletions feedstock/catalog.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,6 @@ stores:
- id: "climsim-lowres-mli"
name: "ClimSim Lowres mli"
url: "gs://leap-persistent-ro/data-library/feedstocks/climsim_feedstock/climsim_lowres_mli.zarr"
- id: "climsim-lowres-mlo"
name: "ClimSim Lowres mlo"
url: "gs://leap-persistent-ro/data-library/feedstocks/climsim_feedstock/climsim_lowres_mlo.zarr"
5 changes: 5 additions & 0 deletions feedstock/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ description: >
recipes:
- id: "climsim-lowres-mli"
object: "recipe:climsim_lowres_mli"
- id: "climsim-lowres-mlo"
object: "recipe:climsim_lowres_mlo"
provenance:
providers:
- name: "Hugging Face"
Expand All @@ -22,3 +24,6 @@ maintainers:
- name: "Qingyuan Yang"
orcid: "https://orcid.org/0000-0002-5631-889X"
github: yiqioyang
- name: "Julius Busecke"
orcid: "https://orcid.org/0000-0001-8571-865X"
github: jbusecke
51 changes: 48 additions & 3 deletions feedstock/recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@
StoreToZarr,
ConsolidateMetadata,
ConsolidateDimensionCoordinates,
CheckpointFileTransfer,
)
from pangeo_forge_recipes.storage import CacheFSSpecTarget

#######################################
import datetime as dt
import functools
import gcsfs

import cftime
from pangeo_forge_recipes.patterns import ConcatDim, FilePattern
Expand Down Expand Up @@ -193,15 +196,24 @@ def expand(self, pcoll: beam.PCollection) -> beam.PCollection:


times = [t for t in generate_times()]
# Debug this recipe by running fewer times
times = times[0:90000]
concat_dim = ConcatDim("time", keys=times)
cache_target = CacheFSSpecTarget(
fs = gcsfs.GCSFileSystem(),
root_path="gs://leap-scratch/data-library/feedstocks/cache_test"
)

lowres_mli_make_url = functools.partial(make_url, ds_type="mli")
lowres_mli_pattern = FilePattern(lowres_mli_make_url, concat_dim)
climsim_lowres_mli = (
beam.Create(lowres_mli_pattern.items())
| OpenURLWithFSSpec(max_concurrency=20)
| CheckpointFileTransfer(
transfer_target=cache_target,
max_executors=10,
concurrency_per_executor=2,
initial_backoff=3.0,
fsspec_sync_patch=False,# works but is slow. Testing with fsspec and new backoff retry
)
| OpenURLWithFSSpec(cache=None, fsspec_sync_patch=True)
| OpenWithXarray(
# FIXME: Get files to open without `copy_to_local=True`
# Related: what is the filetype? Looks like netcdf3, but for some reason
Expand All @@ -220,3 +232,36 @@ def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
| ConsolidateMetadata()
| Copy(target=catalog_store_urls["climsim-lowres-mli"])
)


lowres_mlo_make_url = functools.partial(make_url, ds_type="mlo")
lowres_mlo_pattern = FilePattern(lowres_mlo_make_url, concat_dim)
climsim_lowres_mlo = (
beam.Create(lowres_mlo_pattern.items())
| CheckpointFileTransfer(
transfer_target=cache_target,
max_executors=5,
concurrency_per_executor=20,
fsspec_sync_patch=False,# works but is slow. Testing with fsspec and new backoff retry
)
| OpenURLWithFSSpec(cache=None, fsspec_sync_patch=True)
| OpenWithXarray(
# FIXME: Get files to open without `copy_to_local=True`
# Related: what is the filetype? Looks like netcdf3, but for some reason
# `scipy` backend can't open them, and `netcdf4` can?
copy_to_local=True,
xarray_open_kwargs=dict(engine="netcdf4"),
)
| ExpandTimeDimAndAddMetadata()
| StoreToZarr(
store_name="climsim-lowres-mlo.zarr",
target_chunks={"time": 600},
combine_dims=lowres_mlo_pattern.combine_dim_keys,
)
| InjectAttrs()
| ConsolidateDimensionCoordinates()
| ConsolidateMetadata()
| Copy(target=catalog_store_urls["climsim-lowres-mlo"])
)

#TODO: Reuse pipeline parts and parameters
7 changes: 3 additions & 4 deletions feedstock/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
leap-data-management-utils==0.0.12
git+https://github.com/ranchodeluxe/xarray@ranchodeluxe-patch-1#egg=xarray
git+https://github.com/ranchodeluxe/rioxarray
git+https://github.com/ranchodeluxe/datatree@main#egg=xarray-datatree
git+https://github.com/pangeo-forge/pangeo-forge-recipes@jb/xarray-hack #see @gc/cached_disabled but with cache
git+https://github.com/pangeo-forge/pangeo-forge-recipes@5ee286d2e2c3e1c68a0c7e5885c17c8a9a70e8ca
httpfs-sync>=0.0.2
#git+https://github.com/moradology/httpfs-sync.git@feature/pool-reuse
zarr==2.16.1
gcsfs
apache-beam[gcp]