You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
In my CMIP6 feedstock I am often battling issues with flaky servers. It is not uncommon that the urls I am getting by querying the API will not work even a few minutes after (or at least some of them). This is a reality of the use case for as much as I am concerned but I would like a way to handle this better with PGF:
An example can be found here(private dataflow logs - mostly for my own reference, happy to share to anyone interested).
This job takes almost 1 hour (EDIT: Its actually closer to 2hours 🤯) to fail! I think we should be able to surface this sort of failure quicker!
I am getting these sorts of errors:
fsspec.exceptions.FSTimeoutError [while running 'Creating CMIP6.CMIP.CAS.FGOALS-g3.historical.r1i1p1f1.6hrLev.hus.gn.v20190826|OpenURLWithFSSpec|OpenWithXarray|Preprocessor|StoreToZarr|ConsolidateDimensionCoordinates|ConsolidateMetadata|Copy|Logging to bigquery (non-QC)|TestDataset|Logging to bigquery (QC)/OpenURLWithFSSpec/MapWithConcurrencyLimit/open_url-ptransform-86']
Traceback (most recent call last):
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/fsspec/asyn.py", line 56, in _runner
result[0] = await coro
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/fsspec/implementations/http.py", line 646, in async_fetch_range
r = await self.session.get(
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/aiohttp/client.py", line 605, in _request
await resp.start(conn)
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/aiohttp/client_reqrep.py", line 961, in start
with self._timer:
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/aiohttp/helpers.py", line 735, in __exit__
raise asyncio.TimeoutError from None
asyncio.exceptions.TimeoutError
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1435, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 637, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "/tmp/e6f5b8aeaffc1a6f96042a294c19d19882039440233eca4e471a58b60176d08fm5qx9kuc/lib/python3.10/site-packages/apache_beam/transforms/core.py", line 2040, in <lambda>
File "/tmp/e6f5b8aeaffc1a6f96042a294c19d19882039440233eca4e471a58b60176d08fm5qx9kuc/lib/python3.10/site-packages/pangeo_forge_recipes/transforms.py", line 121, in <lambda>
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/pangeo_forge_recipes/openers.py", line 32, in open_url
cache.cache_file(url, secrets, **kw)
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/pangeo_forge_recipes/storage.py", line 209, in cache_file
_copy_btw_filesystems(input_opener, target_opener)
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/pangeo_forge_recipes/storage.py", line 38, in _copy_btw_filesystems
data = source.read(BLOCK_SIZE)
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/fsspec/implementations/http.py", line 598, in read
return super().read(length)
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/fsspec/spec.py", line 1846, in read
out = self.cache._fetch(self.loc, self.loc + length)
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/fsspec/caching.py", line 421, in _fetch
self.cache = self.fetcher(start, bend)
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/fsspec/asyn.py", line 118, in wrapper
return sync(self.loop, func, *args, **kwargs)
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/fsspec/asyn.py", line 101, in sync
raise FSTimeoutError from return_result
fsspec.exceptions.FSTimeoutError
which I interpret as "Fsspec was not able to cache at least one of the files and timed out"
I think that the vanilla PGF case just is not expecting urls to be flaky? And so I am wondering if there is a way to efficiently 'ping' each url on a small resource (e.g. one worker) before even starting the caching (which will often scale up workers and use considerable resources just to find out that e.g. one file was never available, #713 seems relevant here too).
I wonder if something naive like this would be possible (total pseudo code):
urls= ['http://file_a.nc', 'http://file_b.nc']
pattern=pattern_from_list(urls)
beam.Create(pattern.items())
|ReduceAllUrlsToOneElement()
|CheckListOfUrlsOnSingleWorker() # if one of the urls does not exist, fail here and give a nice log message!|MapOutUrlsAsElements() # this should recreate the exact same output as beam.Create(pattern.items())|OpenURLWithFSSpec()
|OpenWithXarray()
...
This is not the highest priority but something that would help me a lot to keep the CMIP6 resource useage and maintenance time lower. Curious what y'all think.
The text was updated successfully, but these errors were encountered:
I'm thinking that a utility that just runs this on the main worker with pattern.items() makes most sense if we want one worker to conduct such a check. The current strategy on #750 enables this kind of check prior to transfer but does it across multiple workers (which is perhaps desirable to avoid rate limits based on IP but that depends on some factors that are hard to determine a priori)
I have concluded that my CMIP feedstock is just very special, and as such I think the need for this feature might not warrant this to be upstreamed all the way. For posteriority I will implement a check in https://github.com/jbusecke/pangeo-forge-esgf/pull/45/files for this. Closing this now.
In my CMIP6 feedstock I am often battling issues with flaky servers. It is not uncommon that the urls I am getting by querying the API will not work even a few minutes after (or at least some of them). This is a reality of the use case for as much as I am concerned but I would like a way to handle this better with PGF:
An example can be found here(private dataflow logs - mostly for my own reference, happy to share to anyone interested).
This job takes almost 1 hour (EDIT: Its actually closer to 2hours 🤯) to fail! I think we should be able to surface this sort of failure quicker!
I am getting these sorts of errors:
which I interpret as "Fsspec was not able to cache at least one of the files and timed out"
I think that the vanilla PGF case just is not expecting urls to be flaky? And so I am wondering if there is a way to efficiently 'ping' each url on a small resource (e.g. one worker) before even starting the caching (which will often scale up workers and use considerable resources just to find out that e.g. one file was never available, #713 seems relevant here too).
I wonder if something naive like this would be possible (total pseudo code):
This is not the highest priority but something that would help me a lot to keep the CMIP6 resource useage and maintenance time lower. Curious what y'all think.
The text was updated successfully, but these errors were encountered: