Skip to content

Commit

Permalink
Merge pull request #42 from hydrologie/update-notebooks
Browse files Browse the repository at this point in the history
update documentation
  • Loading branch information
sebastienlanglois authored Jan 10, 2024
2 parents b65f412 + d3d0a0c commit 73e6824
Show file tree
Hide file tree
Showing 10 changed files with 315 additions and 878 deletions.
6 changes: 6 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@
Changelog
=========

v0.3.2 (2024-01-10)
-------------------

* Update documentation. (:pull:`42`)
* Added a functionality to extract geometries to a `geopandas.GeoDataFrame` format. (:pull:`42`)

v0.3.1 (2023-12-01)
-------------------

Expand Down
1,011 changes: 185 additions & 826 deletions docs/notebooks/getting_started.ipynb

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions environment-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ dependencies:
- bottleneck
- clisops
- dask
- dask-geopandas
- geopandas
- intake
- intake-geopandas
Expand Down
2 changes: 2 additions & 0 deletions environment-docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ channels:
dependencies:
- python >=3.9,<3.10
- cartopy
- dask-geopandas
- distributed >=2.0
- furo
- geoviews
Expand All @@ -16,6 +17,7 @@ dependencies:
- ipython
- jupyter_client
- matplotlib
- nbconvert<7.14
- nbsphinx
- nc-time-axis
- netCDF4
Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,11 @@ dependencies = [
"cftime>=1.4.1",
"clisops>=0.9.2",
"dask[array]>=2.6",
"dask-geopandas",
"geopandas",
"intake",
"intake-xarray>=0.6.1",
"intake-geopandas",
"ipython",
"jsonpickle",
"numba",
Expand Down
38 changes: 29 additions & 9 deletions xdatasets/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@
from .scripting import LOGGING_CONFIG
from .utils import cache_catalog
from .validations import _validate_space_params
from .workflows import climate_request, hydrometric_request, user_provided_dataset
from .workflows import (
climate_request,
gis_request,
hydrometric_request,
user_provided_dataset,
)

logging.config.dictConfig(LOGGING_CONFIG)
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -73,7 +78,7 @@ class Query:
... "space": {"clip": "point", "geometry": sites},
... "time": {
... "timestep": "D",
... "aggregation": {"tp": np.nansum, "t2m": np.nanmean},
... "averaging": {"tp": np.nansum, "t2m": np.nanmean},
... "start": "1950-01-01",
... "end": "1955-12-31",
... "timezone": "America/Montreal",
Expand Down Expand Up @@ -259,12 +264,21 @@ def load_query(

try:
# Try naively merging datasets into single dataset
ds = xr.merge(dsets)
ds = ds
ds = None

if type(dsets[0]) == xr.Dataset:
# if more than one dataset, then we add source as a dimension
# so we can merge two or more datasets together
if len(dsets) > 1:
for idx, dset in enumerate(dsets):
for var in dset.keys():
dset[var] = dset[var].expand_dims("source", axis=-1)
dsets[idx] = dset
ds = xr.merge(dsets)
elif len(dsets) == 1:
ds = dsets[0]
except:
logging.warn(
"Couldn't merge datasets so we pass a dictionary of datasets. "
)
logging.warn("Couldn't merge datasets so we pass a list of datasets. ")
# Look into passing a DataTree instead
ds = dsets
pass
Expand Down Expand Up @@ -300,6 +314,12 @@ def _process_one_dataset(self, dataset_name, variables, space, time, **kwargs):
ds = hydrometric_request(
dataset_name, variables, space, time, self.catalog, **kwargs
)
if dataset_category in ["geography"]:
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=RuntimeWarning)
ds = gis_request(
dataset_name, variables, space, time, self.catalog, **kwargs
)

elif dataset_category in ["user-provided"]:
with warnings.catch_warnings():
Expand All @@ -308,5 +328,5 @@ def _process_one_dataset(self, dataset_name, variables, space, time, **kwargs):

return ds

def bbox_clip(self, ds):
return ds.where(~ds.isnull(), drop=True)
def bbox_clip(self, ds, variable="weights"):
return ds.where(~ds[variable].isnull(), drop=True)
17 changes: 12 additions & 5 deletions xdatasets/spatial.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import pandas as pd
import xarray as xr
from clisops.core.subset import shape_bbox_indexer, subset_gridpoint
from tqdm import tqdm
from tqdm.auto import tqdm

from .utils import HiddenPrints

Expand Down Expand Up @@ -58,14 +58,14 @@ def aggregate(ds_in, ds_weights):


def clip_by_polygon(ds, space, dataset_name):
# We are not using clisops for weighted averages because it is too unstable for now and requires conda environment.
# We use a modified version of the xagg package from which we have removed the xesmf/esmpy dependency
# We are not using clisops for weighted averages because it is too unstable for now.
# We use the xagg package instead.

indexer = shape_bbox_indexer(ds, space["geometry"])
ds_copy = ds.isel(indexer).copy()

arrays = []
pbar = tqdm(space["geometry"].iterrows())
pbar = tqdm(space["geometry"].iterrows(), position=0, leave=True)
for idx, row in pbar:
item = (
row[space["unique_id"]]
Expand All @@ -84,7 +84,10 @@ def clip_by_polygon(ds, space, dataset_name):
with HiddenPrints():
ds_weights = create_weights_mask(da.isel(time=0), geom)
if space["averaging"] is True:
da = aggregate(da, ds_weights)
da_tmp = aggregate(da, ds_weights)
for var in da_tmp.variables:
da_tmp[var].attrs = da[var].attrs
da = da_tmp
else:
da = xr.merge([da, ds_weights])
da = da.where(da.weights.notnull(), drop=True)
Expand All @@ -107,6 +110,7 @@ def clip_by_polygon(ds, space, dataset_name):
)
}
)
data[space["unique_id"]].attrs["cf_role"] = "timeseries_id"
except KeyError:
pass
return data
Expand All @@ -124,4 +128,7 @@ def clip_by_point(ds, space, dataset_name):
data = data.rename({"lat": "latitude", "lon": "longitude"})

data = data.assign_coords({"site": ("site", list(space["geometry"].keys()))})

data["site"].attrs["cf_role"] = "timeseries_id"

return data
8 changes: 5 additions & 3 deletions xdatasets/temporal.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,17 @@ def temporal_aggregation(ds, time, dataset_name, spatial_agg):
.reduce(oper, dim="time")
.expand_dims(
{
"time_agg": [oper.__name__],
# "time_agg": [oper.__name__],
"spatial_agg": [spatial_agg],
"timestep": [time["timestep"]],
}
)
)
# da = da.transpose('id','time', 'timestep','time_agg','spatial_agg')
oper_list.append(da)
oper_list.append(da.rename(f"{var}_{oper.__name__}"))

# ds_new = ds_new.merge(xr.concat(oper_list, dim='time_agg'))
ds_list.append(xr.concat(oper_list, dim="time_agg"))
ds_list.append(xr.merge(oper_list))

else:
try:
Expand All @@ -64,6 +64,8 @@ def temporal_aggregation(ds, time, dataset_name, spatial_agg):

if ds_list:
ds_new = xr.merge(ds_list)
# for var in ds_new:
# ds_new[var].attrs = ds[var].attrs

# if requested timestep is lower
# bfill the timestep and add a warning
Expand Down
66 changes: 35 additions & 31 deletions xdatasets/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import sys
import tempfile
import urllib.request
import warnings
from functools import reduce
from pathlib import Path

Expand Down Expand Up @@ -35,37 +36,40 @@ def open_dataset(
--------
xarray.open_dataset
"""
try:
import intake # noqa: F401
except ImportError as e:
raise ImportError(
"tutorial.open_dataset depends on intake and intake-xarray to download and manage datasets."
" To proceed please install intake and intake-xarray."
) from e

cat = catalog
dataset_info = [
(category, dataset_name)
for category in cat._entries.keys()
for dataset_name in cat[category]._entries.keys()
if dataset_name == name
]

data = reduce(lambda array, index: array[index], dataset_info, cat)

# add proxy infos
proxies = urllib.request.getproxies()
storage_options = data.storage_options
storage_options["config_kwargs"]["proxies"] = proxies

if data.describe()["driver"][0] == "geopandasfile":
data = data(storage_options=storage_options).read()
elif data.describe()["driver"][0] == "zarr":
data = data(storage_options=storage_options).to_dask()
else:
raise NotImplementedError(
f"Dataset {name} is not available. Please request further datasets to our github issues pages"
)
with warnings.catch_warnings():
warnings.simplefilter("ignore")

try:
import intake # noqa: F401
except ImportError as e:
raise ImportError(
"tutorial.open_dataset depends on intake and intake-xarray to download and manage datasets."
" To proceed please install intake and intake-xarray."
) from e

cat = catalog
dataset_info = [
(category, dataset_name)
for category in cat._entries.keys()
for dataset_name in cat[category]._entries.keys()
if dataset_name == name
]

data = reduce(lambda array, index: array[index], dataset_info, cat)

# add proxy infos
proxies = urllib.request.getproxies()
storage_options = data.storage_options
storage_options["config_kwargs"]["proxies"] = proxies

if data.describe()["driver"][0] == "geopandasfile":
data = data(storage_options=storage_options).read()
elif data.describe()["driver"][0] == "zarr":
data = data(storage_options=storage_options).to_dask()
else:
raise NotImplementedError(
f"Dataset {name} is not available. Please request further datasets to our github issues pages"
)
return data


Expand Down
42 changes: 38 additions & 4 deletions xdatasets/workflows.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import fnmatch
import itertools
import warnings

import pandas as pd
import xarray as xr
Expand Down Expand Up @@ -69,11 +70,44 @@ def climate_request(dataset_name, variables, space, time, catalog):
# Add source name to dataset
# np.warnings.filterwarnings('ignore', category=np.VisibleDeprecationWarning)
ds = ds.assign_coords(source=("source", [dataset_name]))
for var in ds.keys():
ds[var] = ds[var].expand_dims("source", axis=-1)

# This is painfully slow if the dataset is large enough and will be commented for now
# for var in ds.keys():
# ds[var] = ds[var].expand_dims("source", axis=-1)
return ds


def gis_request(dataset_name, variables, space, time, catalog, **kwargs):
# This parameter should be set in the catalog
unique_id = catalog["geography"][dataset_name].metadata["unique_id"]

if any(kwargs):
ds = hydrometric_request(
dataset_name.replace("_polygons", ""),
variables,
space,
time,
catalog,
**kwargs,
)
filters = {"filters": [(unique_id, "in", tuple(ds.id.values.tolist()))]}
else:
filters = None
warnings.warn(
"No filters fetches the complete dataset (few minutes). Use filters to expedite data retrieval."
)

gdf = (
catalog["geography"][dataset_name](geopandas_kwargs=filters)
.read()
.infer_objects()
.set_index(unique_id)
)
gdf.index = gdf.index.astype("str")

return gdf


def hydrometric_request(dataset_name, variables, space, time, catalog, **kwargs):
ds = open_dataset(dataset_name, catalog)

Expand Down Expand Up @@ -205,7 +239,7 @@ def user_provided_dataset(dataset_name, variables, space, time, ds):
# Add source name to dataset
# np.warnings.filterwarnings('ignore', category=np.VisibleDeprecationWarning)
ds = ds.assign_coords(source=("source", [dataset_name]))
for var in ds.keys():
ds[var] = ds[var].expand_dims("source", axis=-1)
# for var in ds.keys():
# ds[var] = ds[var].expand_dims("source", axis=-1)

return ds

0 comments on commit 73e6824

Please sign in to comment.