Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Go back to rechunker Pipeline executors #192

Closed
wants to merge 4 commits into from

Conversation

rabernat
Copy link
Contributor

As discussed at our last dev meeting, we may want to try going back to using the rechunker Pipeline executors internally.

This PR tries that, plus moves towards a more flexible execution model. Rather than having to define the four specific stages, Recipes just have to define a private method _to_pipelines(). That way they can customize their own stages.

This seems to work (tests pass). The big question is whether the Dask and Prefect graphs are still as efficient as we need them to be. My hope is yes, because the underlying functions in the pipelines are still based on the more efficient partial functions that @TomAugspurger defined in #160.

Comment on lines +80 to +87
def _to_pipelines(self) -> ParallelPipelines:
"""Translate recipe to pipeline for execution.
"""
pipeline = [] # type: MultiStagePipeline
pipeline.append(Stage(self.store_chunk, list(self.iter_chunks())))
pipeline.append(Stage(self.finalize_target))
pipelines = [pipeline] # type: ParallelPipelines
return pipelines
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here note that the compilation of the flow to pipelines is now implemented by each recipe class, not BaseRecipe. This gives recipes a lot more flexibility.

@rabernat
Copy link
Contributor Author

If this branch works, it will help things a lot going forward. We need an answer.

I would like to find a way to test this at scale with the workflows that were previously running out of memory. Questions for @sharkinsspatial:

@sharkinsspatial
Copy link
Contributor

@rabernat I can have a go at this. I already have a rudimentary version of pangeo-forge/pangeo-forge-prefect#25 which I was using for testing purposes which will handle this. Do you have a specific recipe which would be the best candidate for testing?

@rabernat
Copy link
Contributor Author

Do you have a specific recipe which would be the best candidate for testing?

Whichever recipe was causing things to crash before. I guess the noaa-oisst one?

@sharkinsspatial
Copy link
Contributor

@rabernat I attempted to run noaa-oisst recipe using the branch and encountered an OOM termination of the scheduler before completion. I suspect based on my quick research noted here pangeo-forge/gpm-imerge-hhr-feedstock#1 that this is a Prefect issue rather than a serialization issue with pangeo-forge-recipes.

As another test, I attempted to run https://github.com/pangeo-forge/gpm-imerge-hhr-feedstock. Running against pangeo-forge-recipes==0.5.0 this recipe runs correctly (until, as noted it encounters memory issues). However, when run using this branch (which requires moving the fsspec_open_kwargs from the recipe to the file pattern) I receive a FileNotFoundError for each cache_inputs task. Reviewing this with @cisaacstern, he ran a quick local test https://gist.github.com/cisaacstern/d685d2453c613be60b363d44e22e000c demonstrating no issues with the recipe itself. So it appears that there may be a regression with to_pipelines and fsspec_open_kwargs.

For next steps,

  1. I'll run a test with noaa-oisst and the DaskExecutor agains a remote cluster to try and verify that serialization issues are Prefect specific with respect to this branch.
  2. I'll run a test of gpm-imerge-hhr which requires a file pattern using the DaskExecutor so that you can have reproducible example of the FileNotFoundError which does not require Prefect infrastructure.

Let me know if there might be any other considerations here.

@cisaacstern
Copy link
Member

@sharkinsspatial, if the error is FileNotFoundError, then I think the issue is probably not with fsspec_open_kwargs? Because if the issue was fsspec_open_kwargs authentication, then the server should return 401 - Unauthorized, not FileNotFoundError?

@rabernat
Copy link
Contributor Author

I receive a FileNotFoundError for each cache_inputs task.

Do you have DEBUG logs from the failed run we can log at? Just as a general point, exposing these logs for public view seems like a really important priority for the bakeries.

@cisaacstern
Copy link
Member

Just as a general point, exposing these logs for public view seems like a really important priority for the bakeries.

@rabernat, at our last Coordination Meeting, there was discussion of routing these logs to a "third party logging service" to get them out from behind the Prefect credential wall, and make them more easily searchable. Does that still seem like a viable option and if so, is there a particular service or implementation from another project, which we could use as a model?

@rabernat
Copy link
Contributor Author

rabernat commented Sep 20, 2021

is there a particular service or implementation from another project, which we could use as a model?

Based on this post, one free path would be

@sharkinsspatial
Copy link
Contributor

sharkinsspatial commented Sep 20, 2021

@cisaacstern Your assumption was correct that this is authentication issue. See the logs below

2021-09-20T21:26:02.050090824Z stderr F distributed.nanny - INFO -         Start Nanny at: 'tcp://10.244.4.3:33347'
--
  |   | 2021-09-20T21:26:03.075550211Z stderr F distributed.worker - INFO -       Start worker at:     tcp://10.244.4.3:46475
  |   | 2021-09-20T21:26:03.075581812Z stderr F distributed.worker - INFO -          Listening to:     tcp://10.244.4.3:46475
  |   | 2021-09-20T21:26:03.075703414Z stderr F distributed.worker - INFO -          dashboard at:           10.244.4.3:39011
  |   | 2021-09-20T21:26:03.075800816Z stderr F distributed.worker - INFO - Waiting to connect to: tcp://dask-jovyan-7c534d4e-8.pangeoforge:8786
  |   | 2021-09-20T21:26:03.075863618Z stderr F distributed.worker - INFO - -------------------------------------------------
  |   | 2021-09-20T21:26:03.07598932Z stderr F distributed.worker - INFO -               Threads:                          1
  |   | 2021-09-20T21:26:03.076079822Z stderr F distributed.worker - INFO -                Memory:                   3.91 GiB
  |   | 2021-09-20T21:26:03.076129123Z stderr F distributed.worker - INFO -       Local Directory: /home/jovyan/dask-worker-space/worker-5912xsg6
  |   | 2021-09-20T21:26:03.076172424Z stderr F distributed.worker - INFO - -------------------------------------------------
  |   | 2021-09-20T21:26:03.084242492Z stderr F distributed.worker - INFO -         Registered to: tcp://dask-jovyan-7c534d4e-8.pangeoforge:8786
  |   | 2021-09-20T21:26:03.084268292Z stderr F distributed.worker - INFO - -------------------------------------------------
  |   | 2021-09-20T21:26:03.0846575Z stderr F distributed.core - INFO - Starting established connection
  |   | 2021-09-20T21:26:07.794290336Z stdout F [2021-09-20 21:26:07+0000] INFO - prefect.CloudTaskRunner \| Task 'MappedTaskWrapper[0]': Starting task run...
  |   | 2021-09-20T21:26:07.99783751Z stderr F INFO:pangeo_forge_recipes.recipes.xarray_zarr:Caching input 'time-0'
  |   | 2021-09-20T21:26:07.998177619Z stderr F INFO:pangeo_forge_recipes.storage:Caching file 'https://arthurhouhttps.pps.eosdis.nasa.gov/gpmdata/2000/06/01/imerg/3B-HHR.MS.MRG.3IMERG.20000601-S000000-E002959.0000.V06B.HDF5'
  |   | 2021-09-20T21:26:08.35787967Z stderr F INFO:pangeo_forge_recipes.storage:Copying remote file 'https://arthurhouhttps.pps.eosdis.nasa.gov/gpmdata/2000/06/01/imerg/3B-HHR.MS.MRG.3IMERG.20000601-S000000-E002959.0000.V06B.HDF5' to cache
  |   | 2021-09-20T21:26:09.1075775Z stdout F [2021-09-20 21:26:09+0000] ERROR - prefect.CloudTaskRunner \| Task 'MappedTaskWrapper[0]': Exception encountered during task execution!
  |   | 2021-09-20T21:26:09.107618801Z stdout F Traceback (most recent call last):
  |   | 2021-09-20T21:26:09.107629802Z stdout F   File "/srv/conda/envs/notebook/lib/python3.8/site-packages/fsspec/implementations/http.py", line 375, in _info
  |   | 2021-09-20T21:26:09.107637002Z stdout F     await _file_info(
  |   | 2021-09-20T21:26:09.107645502Z stdout F   File "/srv/conda/envs/notebook/lib/python3.8/site-packages/fsspec/implementations/http.py", line 736, in _file_info
  |   | 2021-09-20T21:26:09.107651102Z stdout F     r.raise_for_status()
  |   | 2021-09-20T21:26:09.107656802Z stdout F   File "/srv/conda/envs/notebook/lib/python3.8/site-packages/aiohttp/client_reqrep.py", line 1000, in raise_for_status
  |   | 2021-09-20T21:26:09.107662502Z stdout F     raise ClientResponseError(
  |   | 2021-09-20T21:26:09.107669603Z stdout F aiohttp.client_exceptions.ClientResponseError: 401, message='Unauthorized', url=URL('https://arthurhouhttps.pps.eosdis.nasa.gov/gpmdata/2000/06/01/imerg/3B-HHR.MS.MRG.3IMERG.20000601-S000000-E002959.0000.V06B.HDF5')
  |   | 2021-09-20T21:26:09.107676903Z stdout F
  |   | 2021-09-20T21:26:09.107668603Z stderr F ERROR:prefect.CloudTaskRunner:Task 'MappedTaskWrapper[0]': Exception encountered during task execution!
  |   | 2021-09-20T21:26:09.107693303Z stderr F Traceback (most recent call last):
  |   | 2021-09-20T21:26:09.107700404Z stderr F   File "/srv/conda/envs/notebook/lib/python3.8/site-packages/fsspec/implementations/http.py", line 375, in _info
  |   | 2021-09-20T21:26:09.107718604Z stderr F     await _file_info(
  |   | 2021-09-20T21:26:09.107725304Z stderr F   File "/srv/conda/envs/notebook/lib/python3.8/site-packages/fsspec/implementations/http.py", line 736, in _file_info
  |   | 2021-09-20T21:26:09.107683003Z stdout F The above exception was the direct cause of the following exception:
  |   | 2021-09-20T21:26:09.107766105Z stdout F
  |   | 2021-09-20T21:26:09.107775206Z stdout F Traceback (most recent call last):
  |   | 2021-09-20T21:26:09.107782906Z stdout F   File "/srv/conda/envs/notebook/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 861, in get_task_run_state
  |   | 2021-09-20T21:26:09.107789306Z stdout F     value = prefect.utilities.executors.run_task_with_timeout(
  |   | 2021-09-20T21:26:09.107795906Z stdout F   File "/srv/conda/envs/notebook/lib/python3.8/site-packages/prefect/utilities/executors.py", line 323, in run_task_with_timeout
  |   | 2021-09-20T21:26:09.107799706Z stdout F     return task.run(*args, **kwargs)  # type: ignore
  |   | 2021-09-20T21:26:09.107806206Z stdout F   File "/Users/seanharkins/projects/pangeo_forge_prefect/pangeo_forge_prefect/flow_manager.py", line 74, in wrapper
  |   | 2021-09-20T21:26:09.107812507Z stdout F   File "/srv/conda/envs/notebook/lib/python3.8/site-packages/rechunker/executors/prefect.py", line 30, in run
  |   | 2021-09-20T21:26:09.107818007Z stdout F     return self.stage.func(key)
  |   | 2021-09-20T21:26:09.107824707Z stdout F   File "/srv/conda/envs/notebook/lib/python3.8/site-packages/pangeo_forge_recipes/recipes/xarray_zarr.py", line 186, in cache_input
  |   | 2021-09-20T21:26:09.107830307Z stdout F     input_cache.cache_file(
  |   | 2021-09-20T21:26:09.107836807Z stdout F   File "/srv/conda/envs/notebook/lib/python3.8/site-packages/pangeo_forge_recipes/storage.py", line 165, in cache_file
  |   | 2021-09-20T21:26:09.107861108Z stdout F     _copy_btw_filesystems(input_opener, target_opener)
  |   | 2021-09-20T21:26:09.107873308Z stdout F   File "/srv/conda/envs/notebook/lib/python3.8/site-packages/pangeo_forge_recipes/storage.py", line 31, in _copy_btw_filesystems
  |   | 2021-09-20T21:26:09.107879408Z stdout F     with input_opener as source:
  |   | 2021-09-20T21:26:09.107884609Z stdout F   File "/srv/conda/envs/notebook/lib/python3.8/site-packages/fsspec/core.py", line 102, in __enter__
  |   | 2021-09-20T21:26:09.107890109Z stdout F     f = self.fs.open(self.path, mode=mode)
  |   | 2021-09-20T21:26:09.107731304Z stderr F     r.raise_for_status()
  |   | 2021-09-20T21:26:09.10792771Z stderr F   File "/srv/conda/envs/notebook/lib/python3.8/site-packages/aiohttp/client_reqrep.py", line 1000, in raise_for_status
  |   | 2021-09-20T21:26:09.10793381Z stderr F     raise ClientResponseError(
  |   | 2021-09-20T21:26:09.10793911Z stderr F aiohttp.client_exceptions.ClientResponseError: 401, message='Unauthorized', url=URL('https://arthurhouhttps.pps.eosdis.nasa.gov/gpmdata/2000/06/01/imerg/3B-HHR.MS.MRG.3IMERG.20000601-S000000-E002959.0000.V06B.HDF5')
  |   | 2021-09-20T21:26:09.10794431Z stderr F
  |   | 2021-09-20T21:26:09.10794981Z stderr F The above exception was the direct cause of the following exception:
  |   | 2021-09-20T21:26:09.10795461Z stderr F
  |   | 2021-09-20T21:26:09.107961611Z stderr F Traceback (most recent call last):
  |   | 2021-09-20T21:26:09.107967711Z stderr F   File "/srv/conda/envs/notebook/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 861, in get_task_run_state
  |   | 2021-09-20T21:26:09.107973711Z stderr F     value = prefect.utilities.executors.run_task_with_timeout(
  |   | 2021-09-20T21:26:09.107980911Z stderr F   File "/srv/conda/envs/notebook/lib/python3.8/site-packages/prefect/utilities/executors.py", line 323, in run_task_with_timeout
  |   | 2021-09-20T21:26:09.107986911Z stderr F     return task.run(*args, **kwargs)  # type: ignore
  |   | 2021-09-20T21:26:09.107996212Z stderr F   File "/Users/seanharkins/projects/pangeo_forge_prefect/pangeo_forge_prefect/flow_manager.py", line 74, in wrapper
  |   | 2021-09-20T21:26:09.108002112Z stderr F   File "/srv/conda/envs/notebook/lib/python3.8/site-packages/rechunker/executors/prefect.py", line 30, in run
  |   | 2021-09-20T21:26:09.108007112Z stderr F     return self.stage.func(key)
  |   | 2021-09-20T21:26:09.108011912Z stderr F   File "/srv/conda/envs/notebook/lib/python3.8/site-packages/pangeo_forge_recipes/recipes/xarray_zarr.py", line 186, in cache_input
  |   | 2021-09-20T21:26:09.108015912Z stderr F     input_cache.cache_file(
  |   | 2021-09-20T21:26:09.108020112Z stderr F   File "/srv/conda/envs/notebook/lib/python3.8/site-packages/pangeo_forge_recipes/storage.py", line 165, in cache_file
  |   | 2021-09-20T21:26:09.108030713Z stderr F     _copy_btw_filesystems(input_opener, target_opener)
  |   | 2021-09-20T21:26:09.108034813Z stderr F   File "/srv/conda/envs/notebook/lib/python3.8/site-packages/pangeo_forge_recipes/storage.py", line 31, in _copy_btw_filesystems
  |   | 2021-09-20T21:26:09.108038913Z stderr F     with input_opener as source:
  |   | 2021-09-20T21:26:09.108043113Z stderr F   File "/srv/conda/envs/notebook/lib/python3.8/site-packages/fsspec/core.py", line 102, in __enter__
  |   | 2021-09-20T21:26:09.108048313Z stderr F     f = self.fs.open(self.path, mode=mode)
  |   | 2021-09-20T21:26:09.108053813Z stderr F   File "/srv/conda/envs/notebook/lib/python3.8/site-packages/fsspec/spec.py", line 978, in open
  |   | 2021-09-20T21:26:09.108059913Z stderr F     f = self._open(
  |   | 2021-09-20T21:26:09.108066814Z stderr F   File "/srv/conda/envs/notebook/lib/python3.8/site-packages/fsspec/implementations/http.py", line 335, in _open
  |   | 2021-09-20T21:26:09.108072514Z stderr F     size = size or self.info(path, **kwargs)["size"]
  |   | 2021-09-20T21:26:09.108077514Z stderr F   File "/srv/conda/envs/notebook/lib/python3.8/site-packages/fsspec/asyn.py", line 88, in wrapper
  |   | 2021-09-20T21:26:09.108081714Z stderr F     return sync(self.loop, func, *args, **kwargs)
  |   | 2021-09-20T21:26:09.108086214Z stderr F   File "/srv/conda/envs/notebook/lib/python3.8/site-packages/fsspec/asyn.py", line 69, in sync
  |   | 2021-09-20T21:26:09.108090214Z stderr F     raise result[0]
  |   | 2021-09-20T21:26:09.108094114Z stderr F   File "/srv/conda/envs/notebook/lib/python3.8/site-packages/fsspec/asyn.py", line 25, in _runner
  |   | 2021-09-20T21:26:09.108098714Z stderr F     result[0] = await coro
  |   | 2021-09-20T21:26:09.108102615Z stderr F   File "/srv/conda/envs/notebook/lib/python3.8/site-packages/fsspec/implementations/http.py", line 388, in _info
  |   | 2021-09-20T21:26:09.108106615Z stderr F     raise FileNotFoundError(url) from exc
  |   | 2021-09-20T21:26:09.108110815Z stderr F FileNotFoundError: https://arthurhouhttps.pps.eosdis.nasa.gov/gpmdata/2000/06/01/imerg/3B-HHR.MS.MRG.3IMERG.20000601-S000000-E002959.0000.V06B.HDF5
  |   | 2021-09-20T21:26:09.107896009Z stdout F   File "/srv/conda/envs/notebook/lib/python3.8/site-packages/fsspec/spec.py", line 978, in open
  |   | 2021-09-20T21:26:09.108125415Z stdout F     f = self._open(
  |   | 2021-09-20T21:26:09.108129415Z stdout F   File "/srv/conda/envs/notebook/lib/python3.8/site-packages/fsspec/implementations/http.py", line 335, in _open
  |   | 2021-09-20T21:26:09.108134315Z stdout F     size = size or self.info(path, **kwargs)["size"]
  |   | 2021-09-20T21:26:09.108140416Z stdout F   File "/srv/conda/envs/notebook/lib/python3.8/site-packages/fsspec/asyn.py", line 88, in wrapper
  |   | 2021-09-20T21:26:09.108146516Z stdout F     return sync(self.loop, func, *args, **kwargs)
  |   | 2021-09-20T21:26:09.108152616Z stdout F   File "/srv/conda/envs/notebook/lib/python3.8/site-packages/fsspec/asyn.py", line 69, in sync
  |   | 2021-09-20T21:26:09.108159216Z stdout F     raise result[0]
  |   | 2021-09-20T21:26:09.108163416Z stdout F   File "/srv/conda/envs/notebook/lib/python3.8/site-packages/fsspec/asyn.py", line 25, in _runner
  |   | 2021-09-20T21:26:09.108167516Z stdout F     result[0] = await coro
  |   | 2021-09-20T21:26:09.108171416Z stdout F   File "/srv/conda/envs/notebook/lib/python3.8/site-packages/fsspec/implementations/http.py", line 388, in _info
  |   | 2021-09-20T21:26:09.108190517Z stdout F     raise FileNotFoundError(url) from exc
  |   | 2021-09-20T21:26:09.108196217Z stdout F FileNotFoundError: https://arthurhouhttps.pps.eosdis.nasa.gov/gpmdata/2000/06/01/imerg/3B-HHR.MS.MRG.3IMERG.20000601-S000000-E002959.0000.V06B.HDF5

To re-iterate, this authentication issue does not occur with pangeo-forge-recipes==0.5.0 but does occur with this same branch when fsspec_open_kwargs is specified as part of the FilePattern.

@rabernat
Copy link
Contributor Author

this authentication issue does not occur with pangeo-forge-recipes==0.5.0

0.5.0 is over a month old. We are already on 0.6.0 (see release notes). It seems most likely that this regression does not have to do with this PR (which doesn't touch the related code) but rather is due to #167 (refactor of input authentication and secret handling).

Automatic building of bakery images (pangeo-forge/pangeo-forge-bakery-images#7), coupled with bakery integration testing, might have helped catch this issue earlier.

How / where are the secrets getting passed to the flow?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants