Skip to content

Commit

Permalink
fix aggregate function (#232)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmoralez authored Oct 3, 2023
1 parent dac318b commit b841c2f
Show file tree
Hide file tree
Showing 4 changed files with 221 additions and 321 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ defaults:
run:
shell: bash -l {0}

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

jobs:
nb-sync:
runs-on: ubuntu-latest
Expand Down
6 changes: 2 additions & 4 deletions hierarchicalforecast/_modidx.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,10 @@
'hierarchicalforecast/utils.py'),
'hierarchicalforecast.utils.HierarchicalPlot.plot_summing_matrix': ( 'utils.html#hierarchicalplot.plot_summing_matrix',
'hierarchicalforecast/utils.py'),
'hierarchicalforecast.utils._to_summing_dataframe': ( 'utils.html#_to_summing_dataframe',
'hierarchicalforecast/utils.py'),
'hierarchicalforecast.utils._to_summing_matrix': ( 'utils.html#_to_summing_matrix',
'hierarchicalforecast/utils.py'),
'hierarchicalforecast.utils._to_upper_hierarchy': ( 'utils.html#_to_upper_hierarchy',
'hierarchicalforecast/utils.py'),
'hierarchicalforecast.utils.aggregate': ( 'utils.html#aggregate',
'hierarchicalforecast/utils.py'),
'hierarchicalforecast.utils.aggregate_before': ( 'utils.html#aggregate_before',
Expand All @@ -191,8 +191,6 @@
'hierarchicalforecast/utils.py'),
'hierarchicalforecast.utils.level_to_outputs': ( 'utils.html#level_to_outputs',
'hierarchicalforecast/utils.py'),
'hierarchicalforecast.utils.numpy_balance': ( 'utils.html#numpy_balance',
'hierarchicalforecast/utils.py'),
'hierarchicalforecast.utils.quantiles_to_outputs': ( 'utils.html#quantiles_to_outputs',
'hierarchicalforecast/utils.py'),
'hierarchicalforecast.utils.samples_to_quantiles_df': ( 'utils.html#samples_to_quantiles_df',
Expand Down
216 changes: 75 additions & 141 deletions hierarchicalforecast/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
# %% ../nbs/utils.ipynb 3
import sys
import timeit
import warnings
from itertools import chain
from typing import Callable, Dict, List, Optional, Iterable

Expand Down Expand Up @@ -100,7 +101,7 @@ def aggregate_before(df: pd.DataFrame,
sparse_s: bool = False):
"""Utils Aggregation Function.
Aggregates bottom level series contained in the pd.DataFrame `df` according
Aggregates bottom level series contained in the pd.DataFrame `df` according
to levels defined in the `spec` list applying the `agg_fn` (sum, mean).<br>
**Parameters:**<br>
Expand Down Expand Up @@ -139,88 +140,15 @@ def aggregate_before(df: pd.DataFrame,
return Y_df, S, tags

# %% ../nbs/utils.ipynb 11
def numpy_balance(*arrs):
"""
Fast NumPy implementation of balance function.
The function creates all the interactions between
the NumPy arrays provided.
**Parameters:**<br>
`arrs`: NumPy arrays.<br>
**Returns:**<br>
`out`: NumPy array.<br>
"""
N = len(arrs)
out = np.transpose(np.meshgrid(*arrs, indexing='ij'),
np.roll(np.arange(N + 1), -1)).reshape(-1, N)
return out

def _to_summing_dataframe(
df: pd.DataFrame, spec: List[List[str]], sparse_s: bool = False
):
#------------------------------- Wrangling -----------------------------#
# Keep unique levels, preserving first aparison order
all_levels = list(chain.from_iterable(spec))
all_levels = [*dict.fromkeys(all_levels)]

# Create hierarchical labels
S_df = df[all_levels].copy()
S_df = S_df.drop_duplicates()

max_len_idx = np.argmax([len(hier) for hier in spec])
bottom_comb = spec[max_len_idx]
hiers_cols = []
df = df.copy()
for hier in spec:
if hier == bottom_comb:
hier_col = 'unique_id'
bottom_col = '/'.join(hier)
df['unique_id'] = df[hier].agg('/'.join, axis=1)
else:
hier_col = '/'.join(hier)
S_df[hier_col] = S_df[hier].agg('/'.join, axis=1)
hiers_cols.append(hier_col)
S_df = S_df.sort_values(by=bottom_comb)
S_df = S_df[hiers_cols]

#------------------------------- Encoding ------------------------------#
# One hot encode only aggregate levels
bottom_ids = list(S_df.unique_id)
del S_df['unique_id']
categories = [S_df[col].unique() for col in S_df.columns]
tags = dict(zip(S_df.columns, categories))
tags[bottom_col] = bottom_ids

try:
encoder = OneHotEncoder(categories=categories, sparse_output=sparse_s, dtype=np.float32)
except TypeError: # sklearn < 1.2
encoder = OneHotEncoder(categories=categories, sparse=sparse_s, dtype=np.float32)

S = encoder.fit_transform(S_df).T

if sparse_s:
S = sparse.vstack(
[
sparse.csr_matrix(S),
sparse.identity(len(bottom_ids), dtype=np.float32, format='csr'),
]
)
S_df = pd.DataFrame.sparse.from_spmatrix(
S, columns=bottom_ids, index=list(chain(*categories)) + bottom_ids
)
else:
S = np.concatenate([S, np.eye(len(bottom_ids), dtype=np.float32)], axis=0)
S_df = pd.DataFrame(
S, columns=bottom_ids, index=list(chain(*categories)) + bottom_ids
)

# Match index ordering of S_df and collapse df to Y_bottom_df
Y_bottom_df = df.copy()
Y_bottom_df = Y_bottom_df.groupby(['unique_id', 'ds'])['y'].sum().reset_index()
Y_bottom_df.unique_id = Y_bottom_df.unique_id.astype('category')
Y_bottom_df.unique_id = Y_bottom_df.unique_id.cat.set_categories(S_df.columns)
return Y_bottom_df, S_df, tags
def _to_upper_hierarchy(bottom_split, bottom_values, upper_key):
upper_split = upper_key.split('/')
upper_idxs = [bottom_split.index(i) for i in upper_split]

def join_upper(bottom_value):
bottom_parts = bottom_value.split('/')
return '/'.join(bottom_parts[i] for i in upper_idxs)

return [join_upper(val) for val in bottom_values]

# %% ../nbs/utils.ipynb 12
def aggregate(
Expand All @@ -229,71 +157,77 @@ def aggregate(
is_balanced: bool = False,
sparse_s: bool = False,
):
""" Utils Aggregation Function.
Aggregates bottom level series contained in the pd.DataFrame `df` according
to levels defined in the `spec` list applying the `agg_fn` (sum, mean).
**Parameters:**<br>
`df`: pd.DataFrame with columns `['ds', 'y']` and columns to aggregate.<br>
`spec`: List of levels. Each element of the list contains a list of columns of `df` to aggregate.<br>
`is_balanced`: bool=False, whether `Y_bottom_df` is balanced, if not we balance.<br>
`sparse_s`: bool=False, whether the returned S_df should be a sparse DataFrame.<br>
**Returns:**<br>
`Y_df, S_df, tags`: tuple with hierarchically structured series `Y_df` ($\mathbf{y}_{[a,b]}$),
summing dataframe `S_df`, and hierarchical aggregation indexes `tags`.
"""Utils Aggregation Function.
Aggregates bottom level series contained in the pandas DataFrame `df` according
to levels defined in the `spec` list.
Parameters
----------
df : pandas DataFrame
Dataframe with columns `['ds', 'y']` and columns to aggregate.
spec : list of list of str
List of levels. Each element of the list should contain a list of columns of `df` to aggregate.
is_balanced : bool (default=False)
Deprecated.
sparse_s : bool (default=False)
Return `S_df` as a sparse dataframe.
Returns
-------
Y_df : pandas DataFrame
Hierarchically structured series.
S_df : pandas DataFrame
Summing dataframe.
tags : dict
Aggregation indices.
"""

#Ensure no null values
# Checks
if df.isnull().values.any():
raise Exception('`df` contains null values')
raise ValueError('`df` contains null values')
if is_balanced:
warnings.warn(
"`is_balanced` is deprecated and will be removed in a future version. "
"Don't set this argument to suppress this warning.",
category=DeprecationWarning,
)

#-------------------------------- Wrangling --------------------------------#
# constraints S_df and collapsed Y_bottom_df with 'unique_id'
Y_bottom_df, S_df, tags = _to_summing_dataframe(df=df, spec=spec, sparse_s=sparse_s)

# Create balanced/sorted dataset for numpy aggregation (nan=0)
# TODO: investigate potential memory speed tradeoff
if not is_balanced:
dates = Y_bottom_df['ds'].unique()
balanced_prod = numpy_balance(S_df.columns, dates)
balanced_df = pd.DataFrame(balanced_prod, columns=['unique_id', 'ds'])
balanced_df['ds'] = balanced_df['ds'].astype(Y_bottom_df['ds'].dtype)

Y_bottom_df.set_index(['unique_id', 'ds'], inplace=True)
balanced_df.set_index(['unique_id', 'ds'], inplace=True)
balanced_df = balanced_df.merge(Y_bottom_df[['y']],
how='left', left_on=['unique_id', 'ds'],
right_index=True).reset_index()
Y_bottom_df.reset_index(inplace=True)
else:
dates = Y_bottom_df['ds'].unique()
balanced_df = Y_bottom_df.copy()

#------------------------------- Aggregation -------------------------------#
n_agg = S_df.shape[0] - S_df.shape[1]
# compute aggregations and tags
spec = sorted(spec, key=len)
bottom = spec[-1]
aggs = []
tags = {}
for levels in spec:
agg = df.groupby(levels + ['ds'])['y'].sum().reset_index('ds')
group = agg.index.get_level_values(0)
for level in levels[1:]:
group = group + '/' + agg.index.get_level_values(level).str.replace('/', '_')
agg.index = group
agg.index.name = 'unique_id'
tags['/'.join(levels)] = group.unique().values
aggs.append(agg)
Y_df = pd.concat(aggs)

# construct S
bottom_key = '/'.join(bottom)
bottom_levels = tags[bottom_key]
S = np.empty((len(bottom_levels), len(spec)), dtype=object)
for j, levels in enumerate(spec[:-1]):
S[:, j] = _to_upper_hierarchy(bottom, bottom_levels, '/'.join(levels))
S[:, -1] = tags[bottom_key]
categories = list(tags.values())
try:
encoder = OneHotEncoder(categories=categories, sparse_output=sparse_s, dtype=np.float32)
except TypeError: # sklearn < 1.2
encoder = OneHotEncoder(categories=categories, sparse=sparse_s, dtype=np.float32)
S = encoder.fit_transform(S).T
if sparse_s:
Agg = S_df.sparse.to_coo().tocsr()[:n_agg, :]
df_constructor = pd.DataFrame.sparse.from_spmatrix
else:
Agg = S_df.values[:n_agg, :]

y_bottom = balanced_df.y.values
y_bottom = y_bottom.reshape(len(S_df.columns), len(dates))
y_bottom_mask = np.isnan(y_bottom)
y_agg = Agg @ np.nan_to_num(y_bottom)
y_agg_mask = Agg @ y_bottom_mask

# Create long format hierarchical dataframe
y_agg = y_agg.flatten()
y_agg[y_agg_mask.flatten() > 1] = np.nan
y_bottom = y_bottom.flatten()
Y_df = pd.DataFrame(dict(
unique_id = np.repeat(S_df.index, len(dates)),
ds = np.tile(dates, len(S_df.index)),
y = np.concatenate([y_agg, y_bottom], axis=0)))
Y_df = Y_df.set_index('unique_id').dropna()
df_constructor = pd.DataFrame
S_df = df_constructor(S, index=np.hstack(categories), columns=bottom_levels)
return Y_df, S_df, tags

# %% ../nbs/utils.ipynb 20
# %% ../nbs/utils.ipynb 21
class HierarchicalPlot:
""" Hierarchical Plot
Expand Down Expand Up @@ -487,7 +421,7 @@ def plot_hierarchical_predictions_gap(self,
plt.grid()
plt.show()

# %% ../nbs/utils.ipynb 35
# %% ../nbs/utils.ipynb 36
# convert levels to output quantile names
def level_to_outputs(level:Iterable[int]):
""" Converts list of levels into output names matching StatsForecast and NeuralForecast methods.
Expand Down Expand Up @@ -531,7 +465,7 @@ def quantiles_to_outputs(quantiles:Iterable[float]):
output_names.append('-median')
return quantiles, output_names

# %% ../nbs/utils.ipynb 36
# %% ../nbs/utils.ipynb 37
# given input array of sample forecasts and inptut quantiles/levels,
# output a Pandas Dataframe with columns of quantile predictions
def samples_to_quantiles_df(samples:np.ndarray,
Expand Down
Loading

0 comments on commit b841c2f

Please sign in to comment.