Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More pp fixes #571

Merged
merged 4 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/data_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,9 @@ class _DMDimensionsMixin:
dims: list = dc.field(init=False, default_factory=list)
scalar_coords: list = dc.field(init=False, default_factory=list)

def __init__(self):
self._dim_axes = None

def __post_init__(self, coords=None):
if coords is None:
# if we're called to rebuild dicts, rather than after __init__
Expand Down
80 changes: 54 additions & 26 deletions src/preprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ def edit_request(self, v: varlist_util.VarlistEntry, **kwargs):
return None
new_v = copy_as_alternate(v)
new_v.translation = new_tv
v.translation.name = new_tv.name
v.translation.standard_name = new_tv.standard_name
v.translation.units = new_tv.units
v.translation.long_name = new_tv.long_name
Expand Down Expand Up @@ -566,14 +567,10 @@ def edit_request(self, v: varlist_util.VarlistEntry, **kwargs):
time_coord.axis,
name=new_tv_name
)
new_v = copy_as_alternate(v)
new_v.translation = new_tv
v.alternates.append(v.translation)
v.translation.name = new_tv.name
v.translation.long_name = new_tv.long_name
v.translation.units = new_tv.units
v.translation.dim_axes = new_tv.dim_axes
v.translation.dim_axes_set = new_tv.dim_axes_set

# add original 4D var defined in new_tv as an alternate TranslatedVarlistEntry
# to query if no entries on specified levels are found in the data catalog
v.alternates.append(new_tv)

return v

Expand Down Expand Up @@ -873,6 +870,7 @@ def query_catalog(self,
Dictionary of xarray datasets with catalog information for each case
"""

try_new_query = False
# open the csv file using information provided by the catalog definition file
cat = intake.open_esm_datastore(data_catalog)
# create filter lists for POD variables
Expand All @@ -895,31 +893,61 @@ def query_catalog(self,
# TODO: add method to convert freq from DateFrequency object to string
case_d.query['frequency'] = freq
case_d.query['path'] = [path_regex]
case_d.query['variable'] = v.name
case_d.query['variable_id'] = v.translation.name
# search translation for further query requirements
for q in case_d.query:
if hasattr(v.translation, q):
case_d.query.update({q: getattr(v.translation, q)})
if hasattr(v.translation, 'name'):
case_d.query.update({'variable': getattr(v.translation, 'name')})

# search catalog for convention specific query object
cat_subset = cat.search(**case_d.query)
if cat_subset.df.empty:
raise util.DataRequestError(
f"No assets matching query requirements found for {case_name} in {data_catalog}")
# Get files in specified date range
# https://intake-esm.readthedocs.io/en/stable/how-to/modify-catalog.html
# cat_subset.esmcat._df = self.check_group_daterange(cat_subset.df)
# v.log.debug("Read %d mb for %s.", cat_subset.esmcat._df.dtypes.nbytes / (1024 * 1024), v.full_name)
# convert subset catalog to an xarray dataset dict
# and concatenate the result with the final dict
cat_dict = cat_dict | cat_subset.to_dataset_dict(
progressbar=False,
xarray_open_kwargs=self.open_dataset_kwargs
)
# rename cat_subset case dict keys to case names
cat_dict_rename = self.rename_dataset_keys(cat_dict, case_dict)
return cat_dict_rename
# check whether there is a vertical coordinate
if any(v.translation.scalar_coords) and any(v.alternates):
for a in v.alternates:
found_entry = False
for c in a.scalar_coords:
if c.axis == 'Z':
case_d.query.update({'variable_id': a.name})
v.translation.requires_level_extraction = True
try_new_query = True
found_entry = True
break
else:
continue
if found_entry:
break
if try_new_query:
# search catalog for convention specific query object
cat_subset = cat.search(**case_d.query)
if cat_subset.df.empty:
raise util.DataRequestError(
f"No assets matching query requirements found for {a.name} for"
f" case{case_name} in {data_catalog}")
else:
raise util.DataRequestError(
f"Unable to find match or alternate for {v.translation.name}"
f" for case {case_name} in {data_catalog}")
else:
# Get files in specified date range
# https://intake-esm.readthedocs.io/en/stable/how-to/modify-catalog.html
# cat_subset.esmcat._df = self.check_group_daterange(cat_subset.df)
# v.log.debug("Read %d mb for %s.", cat_subset.esmcat._df.dtypes.nbytes / (1024 * 1024), v.full_name)
# convert subset catalog to an xarray dataset dict
# and concatenate the result with the final dict
cat_subset_df = cat_subset.to_dataset_dict(
progressbar=False,
xarray_open_kwargs=self.open_dataset_kwargs
)
dict_key = list(cat_subset_df)[0]
if dict_key not in cat_dict:
cat_dict[dict_key] = cat_subset_df[dict_key]
else:
cat_dict[dict_key] = xr.merge([cat_dict[dict_key], cat_subset_df[dict_key]])
print(cat_dict)
# rename cat_subset case dict keys to case names
cat_dict_rename = self.rename_dataset_keys(cat_dict, case_dict)
return cat_dict_rename

def edit_request(self, v: varlist_util.VarlistEntry, **kwargs):
"""Top-level method to edit *pod*\'s data request, based on the child
Expand Down
9 changes: 9 additions & 0 deletions src/translation.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@ class TranslatedVarlistEntry(data_model.DMVariable):
scalar_coords: list = \
dc.field(init=False, default_factory=list, metadata={'query': True})
log: typing.Any = util.MANDATORY # assigned from parent var
requires_level_extraction: bool = False

@property
def requires_level_extraction(self) -> bool:
return self._requires_level_extraction

@requires_level_extraction.setter
def requires_level_extraction(self, value: bool) -> bool:
self._requires_level_extraction = value


@util.mdtf_dataclass
Expand Down
Loading