Skip to content

Commit

Permalink
Merge pull request #152 from pastas/more_parallel
Browse files Browse the repository at this point in the history
Improve reading/writing zipfiles using .pas files everywhere
  • Loading branch information
dbrakenhoff authored Nov 20, 2024
2 parents 1d39b0e + 329386e commit 8cf1dd3
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 26 deletions.
22 changes: 11 additions & 11 deletions pastastore/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def empty(self):
def _get_library(self, libname: str):
"""Get library handle.
Must be overriden by subclass.
Must be overridden by subclass.
Parameters
----------
Expand All @@ -88,7 +88,7 @@ def _add_item(
) -> None:
"""Add item for both time series and pastas.Models (internal method).
Must be overriden by subclass.
Must be overridden by subclass.
Parameters
----------
Expand All @@ -106,7 +106,7 @@ def _add_item(
def _get_item(self, libname: str, name: str) -> Union[FrameorSeriesUnion, Dict]:
"""Get item (series or pastas.Models) (internal method).
Must be overriden by subclass.
Must be overridden by subclass.
Parameters
----------
Expand All @@ -125,7 +125,7 @@ def _get_item(self, libname: str, name: str) -> Union[FrameorSeriesUnion, Dict]:
def _del_item(self, libname: str, name: str) -> None:
"""Delete items (series or models) (internal method).
Must be overriden by subclass.
Must be overridden by subclass.
Parameters
----------
Expand All @@ -139,7 +139,7 @@ def _del_item(self, libname: str, name: str) -> None:
def _get_metadata(self, libname: str, name: str) -> Dict:
"""Get metadata (internal method).
Must be overriden by subclass.
Must be overridden by subclass.
Parameters
----------
Expand All @@ -159,23 +159,23 @@ def _get_metadata(self, libname: str, name: str) -> Dict:
def oseries_names(self):
"""List of oseries names.
Property must be overriden by subclass.
Property must be overridden by subclass.
"""

@property
@abstractmethod
def stresses_names(self):
"""List of stresses names.
Property must be overriden by subclass.
Property must be overridden by subclass.
"""

@property
@abstractmethod
def model_names(self):
"""List of model names.
Property must be overriden by subclass.
Property must be overridden by subclass.
"""

@abstractmethod
Expand All @@ -190,7 +190,7 @@ def _parallel(
) -> None:
"""Parallel processing of function.
Must be overriden by subclass.
Must be overridden by subclass.
Parameters
----------
Expand Down Expand Up @@ -1135,8 +1135,8 @@ def _iter_series(self, libname: str, names: Optional[List[str]] = None):
time series contained in library
"""
names = self._parse_names(names, libname)
for nam in names:
yield self._get_series(libname, nam, progressbar=False)
for name in names:
yield self._get_series(libname, name, progressbar=False)

def iter_oseries(self, names: Optional[List[str]] = None):
"""Iterate over oseries in library.
Expand Down
4 changes: 2 additions & 2 deletions pastastore/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -566,8 +566,8 @@ def _series_to_archive(
meta_json = self._stored_metadata_to_json(
libname, names=n, progressbar=False, squeeze=True
)
archive.writestr(f"{libname}/{n}.json", sjson)
archive.writestr(f"{libname}/{n}_meta.json", meta_json)
archive.writestr(f"{libname}/{n}.pas", sjson)
archive.writestr(f"{libname}/{n}_meta.pas", meta_json)

def _models_to_archive(self, archive, names=None, progressbar=True):
"""Write pastas.Model to zipfile (internal method).
Expand Down
4 changes: 2 additions & 2 deletions pastastore/extensions/hpd.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ def download_knmi_precipitation(
meteo_var: str = "RD",
tmin: TimeType = None,
tmax: TimeType = None,
unit_multiplier: float = 1e3,
unit_multiplier: float = 1e-3,
fill_missing_obs: bool = True,
normalize_datetime_index: bool = True,
**kwargs,
Expand Down Expand Up @@ -298,7 +298,7 @@ def download_knmi_evaporation(
meteo_var: str = "EV24",
tmin: TimeType = None,
tmax: TimeType = None,
unit_multiplier: float = 1e3,
unit_multiplier: float = 1e-3,
fill_missing_obs: bool = True,
normalize_datetime_index: bool = True,
**kwargs,
Expand Down
22 changes: 20 additions & 2 deletions pastastore/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -1446,6 +1446,7 @@ def from_zip(
conn: Optional[BaseConnector] = None,
storename: Optional[str] = None,
progressbar: bool = True,
series_ext_json: bool = False,
):
"""Load PastaStore from zipfile.
Expand All @@ -1461,6 +1462,10 @@ def from_zip(
defaults to the name of the Connector.
progressbar : bool, optional
show progressbar, by default True
series_ext_json : bool, optional
if True, series are expected to have a .json extension, by default False,
which assumes a .pas extension. Set this option to true for reading
zipfiles created with older versions of pastastore <1.8.0.
Returns
-------
Expand All @@ -1472,9 +1477,22 @@ def from_zip(
if conn is None:
conn = DictConnector("pastas_db")

if series_ext_json:
ext = "json"
else:
ext = "pas"

# short circuit for PasConnector when zipfile was written using pas files
if conn.conn_type == "pas" and not series_ext_json:
with ZipFile(fname, "r") as archive:
archive.extractall(conn.path)
if storename is None:
storename = conn.name
return cls(conn, storename)

with ZipFile(fname, "r") as archive:
namelist = [
fi for fi in archive.namelist() if not fi.endswith("_meta.json")
fi for fi in archive.namelist() if not fi.endswith(f"_meta.{ext}")
]
for f in tqdm(namelist, desc="Reading zip") if progressbar else namelist:
libname, fjson = os.path.split(f)
Expand All @@ -1483,7 +1501,7 @@ def from_zip(
if not isinstance(s.index, pd.DatetimeIndex):
s.index = pd.to_datetime(s.index, unit="ms")
s = s.sort_index()
meta = json.load(archive.open(f.replace(".json", "_meta.json")))
meta = json.load(archive.open(f.replace(f".{ext}", f"_meta.{ext}")))
conn._add_series(libname, s, fjson.split(".")[0], metadata=meta)
elif libname in ["models"]:
ml = json.load(archive.open(f), object_hook=pastas_hook)
Expand Down
2 changes: 1 addition & 1 deletion pastastore/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
PASTAS_LEQ_022 = PASTAS_VERSION <= parse_version("0.22.0")
PASTAS_GEQ_150 = PASTAS_VERSION >= parse_version("1.5.0")

__version__ = "1.7.2"
__version__ = "1.8.0"


def show_versions(optional=False) -> None:
Expand Down
Binary file modified tests/data/test_hpd_update.zip
Binary file not shown.
18 changes: 10 additions & 8 deletions tests/test_007_hpdextension.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def test_hpd_download_precipitation_from_knmi():
activate_hydropandas_extension()
pstore = pst.PastaStore()
pstore.hpd.download_knmi_precipitation(
stns=[260], tmin="2022-01-01", tmax="2022-01-31"
stns=[260], meteo_var="RH", tmin="2022-01-01", tmax="2022-01-31"
)
assert pstore.n_stresses == 1

Expand All @@ -50,10 +50,10 @@ def test_update_oseries():
activate_hydropandas_extension()

pstore = pst.PastaStore.from_zip("tests/data/test_hpd_update.zip")
pstore.hpd.update_bro_gmw(tmax="2024-01-31")
pstore.hpd.update_bro_gmw(tmax="2022-02-28")
tmintmax = pstore.get_tmin_tmax("oseries")
assert tmintmax.loc["GMW000000036319_1", "tmax"] >= Timestamp("2024-01-30")
assert tmintmax.loc["GMW000000036327_1", "tmax"] >= Timestamp("2024-01-20")
assert tmintmax.loc["GMW000000036319_1", "tmax"] >= Timestamp("2022-02-27")
assert tmintmax.loc["GMW000000036327_1", "tmax"] >= Timestamp("2022-02-27")


@pytest.mark.xfail(reason="KNMI is being flaky, so allow this test to xfail/xpass.")
Expand All @@ -64,9 +64,9 @@ def test_update_stresses():
activate_hydropandas_extension()

pstore = pst.PastaStore.from_zip("tests/data/test_hpd_update.zip")
pstore.hpd.update_knmi_meteo(tmax="2024-01-31", normalize_datetime_index=False)
pstore.hpd.update_knmi_meteo(tmax="2022-02-28", normalize_datetime_index=True)
tmintmax = pstore.get_tmin_tmax("stresses")
assert (tmintmax["tmax"] >= Timestamp("2024-01-31")).all()
assert (tmintmax["tmax"] >= Timestamp("2024-02-27")).all()


@pytest.mark.xfail(reason="KNMI is being flaky, so allow this test to xfail/xpass.")
Expand All @@ -78,8 +78,10 @@ def test_nearest_stresses():

pstore = pst.PastaStore.from_zip("tests/data/test_hpd_update.zip")
pstore.hpd.download_nearest_knmi_precipitation(
"GMW000000036319_1", tmin="2024-01-01"
"GMW000000036319_1", tmin="2024-01-01", tmax="2024-01-31"
)
assert "RD_GROOT-AMMERS" in pstore.stresses_names
pstore.hpd.download_nearest_knmi_evaporation("GMW000000036319_1", tmin="2024-01-01")
pstore.hpd.download_nearest_knmi_evaporation(
"GMW000000036319_1", tmin="2024-01-01", tmax="2024-01-31"
)
assert "EV24_CABAUW-MAST" in pstore.stresses_names

0 comments on commit 8cf1dd3

Please sign in to comment.