Skip to content

Commit

Permalink
make SI class
Browse files Browse the repository at this point in the history
  • Loading branch information
martinvonk committed Mar 13, 2024
1 parent 59e6667 commit 4340ae5
Show file tree
Hide file tree
Showing 6 changed files with 1,038 additions and 893 deletions.
8 changes: 4 additions & 4 deletions doc/examples/example01_indices.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
},
{
"cell_type": "code",
"execution_count": 1,
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
Expand All @@ -39,7 +39,7 @@
},
{
"cell_type": "code",
"execution_count": 2,
"execution_count": 3,
"metadata": {},
"outputs": [
{
Expand Down Expand Up @@ -86,7 +86,7 @@
},
{
"cell_type": "code",
"execution_count": 3,
"execution_count": 4,
"metadata": {},
"outputs": [
{
Expand All @@ -107,7 +107,7 @@
"Name: rain, Length: 13365, dtype: float64"
]
},
"execution_count": 3,
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
Expand Down
1,412 changes: 706 additions & 706 deletions doc/examples/example02_distributions.ipynb

Large diffs are not rendered by default.

204 changes: 63 additions & 141 deletions src/spei/dist.py
Original file line number Diff line number Diff line change
@@ -1,151 +1,15 @@
import logging
from dataclasses import dataclass, field
from typing import List, Literal, Optional, Tuple

from numpy import ceil, linspace, nan, std
from pandas import Grouper, Series, Timedelta
from scipy.stats import kstest, norm
from numpy import std
from pandas import Series
from scipy.stats import kstest

from ._typing import ContinuousDist
from .utils import daily_window_groupby_yearly_df, get_data_series, group_yearly_df


def compute_si_ppf(
series: Series,
dist: ContinuousDist,
freq: str,
prob_zero: bool = False,
window: int = 0,
nsf: bool = False,
) -> Series:
"""Internal helper function to calculate propability point function of normal
distribution based on a cumulative density function of a fitted
distribution
Parameters
----------
series : Series
Series with observations
dist : ContinuousDist
Continuous distribution from the SciPy library
index : DatetimeIndex, optional
DatetimeIndex with the date of the observations
prob_zero : bool, optional
Apply logic to observations that have value zero and calculate their
probability seperately, by default False
window : int, optional
If a window is supplied, all data within the window is fitted for the
cumulative density function so a bitter fit can be ensured. Frequency
of the data must constant like 'D' or 'W'.
nsf : bool, optional
Use the normal scores transform to calculat the cumulative density
function
Returns
-------
Series
Series with probability point function, ppf
"""

if window > 0:
cdf = compute_cdf_rolling_window(
series=series,
dist=dist,
prob_zero=prob_zero,
freq=freq,
window=window,
)
elif nsf:
cdf = compute_cdf_nsf(series=series, freq=freq)
else:
cdf = compute_cdf_groupby_freq(
series=series,
dist=dist,
prob_zero=prob_zero,
freq=freq,
)

return Series(norm.ppf(cdf.values, loc=0, scale=1), index=series.index, dtype=float)


def compute_cdf_groupby_freq(
series: Series,
dist: ContinuousDist,
prob_zero: bool,
freq: str,
) -> Series:
logging.info("Using rolling groupby frequency method")
dfval = group_yearly_df(series=series)
cdf_series = Series(nan, index=series.index, dtype=float)
for _, grval in dfval.groupby(Grouper(freq=freq)):
data = get_data_series(grval)
fd = FittedDist(data=data, dist=dist, prob_zero=prob_zero)
cdf = fd.cdf()
cdf_series.loc[cdf.index] = cdf.values
return cdf_series


def compute_cdf_rolling_window(
series: Series,
dist: ContinuousDist,
prob_zero: bool,
freq: str,
window: int,
) -> Series:

if freq not in ("d", "w", "D", "W"): # TODO: ideally 14D should also work.
raise ValueError(
f"Frequency freq must be 'D' or 'W', not '{freq}', if a window is provided."
)
logging.info("Using rolling window method")

if window < 3:
logging.error("Window should be larger than 2. Setting the window value to 3.")
window = 3 # make sure window is at least three
elif window % 2 == 0:
logging.error(f"Window should be odd. Setting the window value to {window + 1}")
window += 1 # make sure window is odd

period = int(ceil(window / 2))
if freq in ("W", "w"):
period = Timedelta(value=period, unit="W").days
window = period * 2 + 1

dfval = group_yearly_df(series=series)
cdf_series = Series(nan, index=series.index, dtype=float)
dfval_window = daily_window_groupby_yearly_df(dfval=dfval, period=period)
for dfval_rwindow in dfval_window.rolling(
window=window, min_periods=window, closed="right"
):
if len(dfval_rwindow) < window:
continue # min_periods ignored by Rolling.__iter__
data = get_data_series(dfval_rwindow.iloc[[period]])
data_window = get_data_series(dfval_rwindow)
fd = FittedDist(
data=data, dist=dist, prob_zero=prob_zero, data_window=data_window
)
cdf = fd.cdf()
cdf_series.loc[cdf.index] = cdf.values
return cdf_series


def compute_cdf_nsf(
series: Series,
freq: str,
):
"""Compute cumulative density function using the Normal Scores Transform"""
logging.info("Using the normal scores transform")
dfval = group_yearly_df(series=series)
cdf_series = Series(nan, index=series.index, dtype=float)
for _, grval in dfval.groupby(Grouper(freq=freq)):
data = get_data_series(grval).sort_values()
n = len(data)
cdf_series.loc[data.index] = linspace(1 / (2 * n), 1 - 1 / (2 * n), n)
return cdf_series


@dataclass
class FittedDist:
class Dist:
data: Series = field(init=True, repr=False)
dist: ContinuousDist
loc: float = field(init=False, repr=True)
Expand All @@ -154,8 +18,50 @@ class FittedDist:
prob_zero: bool = field(default=False, init=True, repr=False)
p0: float = field(default=0.0, init=False, repr=False)
data_window: Optional[Series] = field(default=None, init=True, repr=False)
"""
Represents a distribution associated with data.
Parameters
----------
data : Series
The input data for fitting the distribution.
dist : ContinuousDist
The SciPy continuous distribution associated to be fitted.
prob_zero : bool, default=False
Flag indicating whether the probability of zero values in the series is
calculated by the occurence.
data_window : Optional[Series], default=None
Subset of data for fitting more data (if provided).
Attributes
----------
loc : float
Location of the distribution
scale : float
Scale of the distribution
pars : Optional[List[float]]
Attribute storing additional distribution parameters (if applicable).
p0 : float
The probability of zero values in the data. Only calculated if prob_zero=True.
Methods
-------
__post_init__(self) -> None
Initializes the Dist class and fits the distribution.
fit_dist(data: Series, dist: ContinuousDist) -> Tuple
Fits a Scipy continuous distribution to the data.
Notes
-----
The `fit_dist` method uses the `dist.fit` function from Scipy to estimate
distribution parameters. If the fitted distribution requires additional
parameters beyond `loc` and `scale`, they are stored in the `pars` attribute.
"""

def __post_init__(self):
"""
Post initializes the Dist class by fitting the distribution.
"""
data_fit = self.data_window if self.data_window is not None else self.data
pars, loc, scale = self.fit_dist(data=data_fit, dist=self.dist)
self.loc = loc
Expand All @@ -167,7 +73,21 @@ def __post_init__(self):

@staticmethod
def fit_dist(data: Series, dist: ContinuousDist) -> Tuple:
"""Fit a Scipy Continuous Distribution"""
"""
Fits a Scipy continuous distribution to the data.
Parameters
----------
data : Series
The input data for fitting.
dist : ContinuousDist
The continuous distribution to be fitted.
Returns
-------
Tuple
Tuple containing distribution parameters (pars, loc, scale).
"""
fit_tuple = dist.fit(data, scale=std(data))
if len(fit_tuple) == 2:
loc, scale = fit_tuple
Expand Down Expand Up @@ -199,6 +119,8 @@ def pdf(self) -> Series:
else:
pdf = self.dist.pdf(self.data.values, loc=self.loc, scale=self.scale)

# TODO: check what to do if prob_zero

return Series(pdf, index=self.data.index, dtype=float)

def ks_test(
Expand Down
Loading

0 comments on commit 4340ae5

Please sign in to comment.