Skip to content

Commit

Permalink
Merge pull request #5 from jbusecke/add-pre-commit
Browse files Browse the repository at this point in the history
Add pre-commit config
  • Loading branch information
jbusecke authored Oct 6, 2022
2 parents dc6ca0a + 904a256 commit 7bd23dd
Show file tree
Hide file tree
Showing 11 changed files with 107 additions and 51 deletions.
16 changes: 16 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[flake8]
exclude = __init__.py,.eggs,doc
ignore =
# whitespace before ':' - doesn't work well with black
E203
E402
# line too long - let black worry about that
E501
# do not assign a lambda expression, use a def
E731
# line break before binary operator
W503
E265
F811
# Allows type hinting as Gridded[DataArray, "(X:center)"], where we did `from typing import Annotated as Gridded`
F722
21 changes: 21 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
default_language_version:
python: python3.9
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.3.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-yaml
- repo: https://github.com/PyCQA/isort
rev: 5.10.1
hooks:
- id: isort
- repo: https://github.com/psf/black
rev: 22.8.0
hooks:
- id: black
- repo: https://github.com/PyCQA/flake8
rev: 5.0.4
hooks:
- id: flake8
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ Using queries to the ESGF API to generate urls and keyword arguments for receipe


## Parsing a list of instance ids using wildcards
Pangeo forge recipes require the user to provide exact instance_id's for the datasets they want to be processed. Discovering these with the [web search](https://esgf-node.llnl.gov/search/cmip6/) can become cumbersome, especially when dealing with a large number of members/models etc.
Pangeo forge recipes require the user to provide exact instance_id's for the datasets they want to be processed. Discovering these with the [web search](https://esgf-node.llnl.gov/search/cmip6/) can become cumbersome, especially when dealing with a large number of members/models etc.

`pangeo-forge-esgf` provides some functions to query the ESGF API based on instance_id values with wildcards.
`pangeo-forge-esgf` provides some functions to query the ESGF API based on instance_id values with wildcards.

For example if you want to find all the zonal (`uo`) and meridonal (`vo`) velocities available for the `lgm` experiment of PMIP, you can do:

Expand Down Expand Up @@ -35,4 +35,4 @@ and you will get:
'CMIP6.PMIP.MPI-M.MPI-ESM1-2-LR.lgm.r1i1p1f1.Omon.vo.gn.v20190710']
```

Eventually I hope I can leverage this functionality to handle user requests in PRs that add wildcard instance_ids, but for now this might be helpful to manually construct lists of instance_ids to submit to a pangeo-forge feedstock.
Eventually I hope I can leverage this functionality to handle user requests in PRs that add wildcard instance_ids, but for now this might be helpful to manually construct lists of instance_ids to submit to a pangeo-forge feedstock.
4 changes: 2 additions & 2 deletions environment.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: pangeo-forge-esgf
channels:
- conda-forge
dependencies:
dependencies:
- python
- aiohttp
- aiohttp
10 changes: 6 additions & 4 deletions pangeo_forge_esgf/dynamic_kwargs.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from typing import Dict, Union, List, Tuple
from typing import Dict, List, Tuple

import aiohttp

from .utils import facets_from_iid

# For certain table_ids it is preferrable to have time chunks that are a multiple of e.g. 1 year for monthly data.
Expand Down Expand Up @@ -127,7 +129,7 @@ async def response_data_processing(
print(f"Inferred timesteps per file: {timesteps}")
element_sizes = [size / n_t for size, n_t in zip(sizes, timesteps)]

### Determine kwargs
# Determine kwargs
# MAX_SUBSET_SIZE=1e9 # This is an option if the revised subsetting still runs into errors.
MAX_SUBSET_SIZE = 500e6
DESIRED_CHUNKSIZE = 200e6
Expand All @@ -146,7 +148,7 @@ async def response_data_processing(
if max(sizes) <= MAX_SUBSET_SIZE:
subset_input = 0
else:
## Determine subset_input parameters given the following constraints
# Determine subset_input parameters given the following constraints
# - Needs to keep the subset size below MAX_SUBSET_SIZE
# - (Not currently implemented) Resulting subsets should be evenly dividable by target_chunks (except for the last file, that can be odd). This might ultimately not be required once we figure out the locking issues. I cannot fulfill this right now with the dataset structure where often the first and last files have different number of timesteps than the 'middle' ones.

Expand Down Expand Up @@ -180,4 +182,4 @@ async def is_netcdf3(session: aiohttp.ClientSession, url: str) -> bool:
if not status_code == 206:
raise RuntimeError(f"Range request failed with {status_code} for {url}")
head = await resp.read()
return "CDF" in str(head)
return "CDF" in str(head)
29 changes: 19 additions & 10 deletions pangeo_forge_esgf/parsing.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import requests
import json

from .utils import facets_from_iid


def request_from_facets(url, **facets):
params = {
"type": "Dataset",
Expand All @@ -25,21 +26,29 @@ def instance_ids_from_request(json_dict):
def parse_instance_ids(iid: str) -> list[str]:
"""Parse an instance id with wildcards"""
facets = facets_from_iid(iid)
#convert string to list if square brackets are found
for k,v in facets.items():
if '[' in v:
v = v.replace('[', '').replace(']', '').replace('\'', '').replace(' ','').split(',')
# convert string to list if square brackets are found
for k, v in facets.items():
if "[" in v:
v = (
v.replace("[", "")
.replace("]", "")
.replace("'", "")
.replace(" ", "")
.split(",")
)
facets[k] = v
facets_filtered = {k: v for k, v in facets.items() if v != "*"}

#TODO: I should make the node url a keyword argument. For now this works well enough
url="https://esgf-node.llnl.gov/esg-search/search"

# TODO: I should make the node url a keyword argument.
# For now this works well enough
url = "https://esgf-node.llnl.gov/esg-search/search"
# url = "https://esgf-data.dkrz.de/esg-search/search"
# TODO: how do I iterate over this more efficiently? Maybe we do not want to allow more than x files parsed?
# TODO: how do I iterate over this more efficiently?
# Maybe we do not want to allow more than x files parsed?
resp = request_from_facets(url, **facets_filtered)
if resp.status_code != 200:
print(f"Request [{resp.url}] failed with {resp.status_code}")
return resp
else:
json_dict = resp.json()
return instance_ids_from_request(json_dict)
return instance_ids_from_request(json_dict)
46 changes: 22 additions & 24 deletions pangeo_forge_esgf/recipe_inputs.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from typing import Dict, Union, List, Tuple
import aiohttp
import asyncio
import time
from typing import Dict, List, Union

import aiohttp

from .dynamic_kwargs import response_data_processing
from .utils import facets_from_iid


## global variables
# global variables
search_node_list = [
"https://esgf-node.llnl.gov/esg-search/search",
"https://esgf-data.dkrz.de/esg-search/search",
Expand All @@ -18,7 +18,7 @@
# For now just use llnl
search_node = search_node_list[0]

## Data nodes in preferred order (from naomis code here: https://github.com/naomi-henderson/cmip6collect2/blob/main/myconfig.py)
# Data nodes in preferred order (from naomis code here: https://github.com/naomi-henderson/cmip6collect2/blob/main/myconfig.py)
data_nodes = [
"esgf-data1.llnl.gov",
"esgf-data2.llnl.gov",
Expand Down Expand Up @@ -80,9 +80,7 @@ async def generate_recipe_inputs_from_iids(

tasks = []
for iid in iid_list:
tasks.append(
asyncio.ensure_future(iid_request(session, iid, search_node))
)
tasks.append(asyncio.ensure_future(iid_request(session, iid, search_node)))

raw_input = await asyncio.gather(*tasks)
recipe_inputs = {
Expand Down Expand Up @@ -111,9 +109,7 @@ async def iid_request(
filtered_response_data = await sort_and_filter_response(response_data, session)

print(f"Determining dynamics kwargs for {iid}...")
urls, kwargs = await response_data_processing(
session, filtered_response_data, iid
)
urls, kwargs = await response_data_processing(session, filtered_response_data, iid)

return urls, kwargs

Expand All @@ -128,7 +124,7 @@ async def _esgf_api_request(
"retracted": "false",
"format": "application/solr+json",
"fields": "url,size,table_id,title,instance_id,replica,data_node",
"latest": "true",
"latest": "true",
"distrib": "true",
"limit": 500, # This determines the number of urls/files that are returned. I dont expect this to be ever more than 500?
}
Expand Down Expand Up @@ -158,11 +154,11 @@ async def _esgf_api_request(


async def check_url(url, session):
try:
try:
async with session.head(url, timeout=5) as resp:
return resp.status
except asyncio.exceptions.TimeoutError:
return 503 #TODO: Is this best practice?
return 503 # TODO: Is this best practice?


async def sort_and_filter_response(
Expand Down Expand Up @@ -225,24 +221,26 @@ async def pick_data_node(
) -> Dict[str, Dict[str, str]]:
"""Filters out non-responsive data nodes, and then selects the preferred data node from available ones"""
test_response_list = response_groups.get(list(response_groups.keys())[0])
## Determine preferred data node

# Determine preferred data node
for data_node in data_nodes:
print(f'DEBUG: Testing data node: {data_node}')
matching_data_nodes = [r for r in test_response_list if r['data_node']==data_node]
if len(matching_data_nodes)==1:
matching_data_node = matching_data_nodes[0] # TODO: this is kinda clunky
status = await check_url(matching_data_node['url'], session)
print(f"DEBUG: Testing data node: {data_node}")
matching_data_nodes = [
r for r in test_response_list if r["data_node"] == data_node
]
if len(matching_data_nodes) == 1:
matching_data_node = matching_data_nodes[0] # TODO: this is kinda clunky
status = await check_url(matching_data_node["url"], session)
if status in [200, 308]:
picked_data_node = data_node
print(f"DEBUG: Picking preferred data_node: {picked_data_node}")
break
else:
print(f"Got status {status} for {matching_data_node['instance_id']}")
elif len(matching_data_nodes) == 0:
print(f'DEBUG: Data node: {data_node} not available')
print(f"DEBUG: Data node: {data_node} not available")
else:
raise # this should never happen
raise # this should never happen

# loop through all groups and filter for the picked data node
modified_response_groups = {}
Expand Down
3 changes: 2 additions & 1 deletion pangeo_forge_esgf/utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from typing import Dict


def facets_from_iid(iid: str) -> Dict[str, str]:
"""Translates iid string to facet dict according to CMIP6 naming scheme"""
iid_name_template = "mip_era.activity_id.institution_id.source_id.experiment_id.variant_label.table_id.variable_id.grid_label.version"
facets = {}
for name, value in zip(iid_name_template.split("."), iid.split(".")):
facets[name] = value
return facets
return facets
8 changes: 8 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,11 @@ build-backend = "setuptools.build_meta"
[tool.setuptools_scm]
write_to = "pangeo_forge_esgf/_version.py"
write_to_template = "__version__ = '{version}'"

[tools.isort]
profile = "black"
skip_gitignore = true
force_to_top = true
default_section = "THIRDPARTY"
known_first_party = "pangeo-forge-esgf"
skip= ["doc/conf.py"]
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from setuptools import setup

setup()
setup()
13 changes: 7 additions & 6 deletions test_script.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import asyncio

from pangeo_forge_esgf import generate_recipe_inputs_from_iids

iids = [
#PMIP runs requested by @CommonClimate
'CMIP6.PMIP.MIROC.MIROC-ES2L.past1000.r1i1p1f2.Amon.tas.gn.v20200318',
'CMIP6.PMIP.MRI.MRI-ESM2-0.past1000.r1i1p1f1.Amon.tas.gn.v20200120',
'CMIP6.PMIP.MPI-M.MPI-ESM1-2-LR.past2k.r1i1p1f1.Amon.tas.gn.v20210714',
# PMIP runs requested by @CommonClimate
"CMIP6.PMIP.MIROC.MIROC-ES2L.past1000.r1i1p1f2.Amon.tas.gn.v20200318",
"CMIP6.PMIP.MRI.MRI-ESM2-0.past1000.r1i1p1f1.Amon.tas.gn.v20200120",
"CMIP6.PMIP.MPI-M.MPI-ESM1-2-LR.past2k.r1i1p1f1.Amon.tas.gn.v20210714",
]

recipe_inputs = asyncio.run(generate_recipe_inputs_from_iids(iids))
print('DONE')
print(recipe_inputs)
print("DONE")
print(recipe_inputs)

0 comments on commit 7bd23dd

Please sign in to comment.