From b841c2f62959613a7c12e33b9d28145bc962480c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Morales?= Date: Tue, 3 Oct 2023 15:52:04 -0600 Subject: [PATCH] fix aggregate function (#232) --- .github/workflows/ci.yml | 4 + hierarchicalforecast/_modidx.py | 6 +- hierarchicalforecast/utils.py | 216 ++++++++-------------- nbs/utils.ipynb | 316 ++++++++++++++------------------ 4 files changed, 221 insertions(+), 321 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4501b5b1..5735c889 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -10,6 +10,10 @@ defaults: run: shell: bash -l {0} +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + jobs: nb-sync: runs-on: ubuntu-latest diff --git a/hierarchicalforecast/_modidx.py b/hierarchicalforecast/_modidx.py index 654cd715..9c1b0db3 100644 --- a/hierarchicalforecast/_modidx.py +++ b/hierarchicalforecast/_modidx.py @@ -178,10 +178,10 @@ 'hierarchicalforecast/utils.py'), 'hierarchicalforecast.utils.HierarchicalPlot.plot_summing_matrix': ( 'utils.html#hierarchicalplot.plot_summing_matrix', 'hierarchicalforecast/utils.py'), - 'hierarchicalforecast.utils._to_summing_dataframe': ( 'utils.html#_to_summing_dataframe', - 'hierarchicalforecast/utils.py'), 'hierarchicalforecast.utils._to_summing_matrix': ( 'utils.html#_to_summing_matrix', 'hierarchicalforecast/utils.py'), + 'hierarchicalforecast.utils._to_upper_hierarchy': ( 'utils.html#_to_upper_hierarchy', + 'hierarchicalforecast/utils.py'), 'hierarchicalforecast.utils.aggregate': ( 'utils.html#aggregate', 'hierarchicalforecast/utils.py'), 'hierarchicalforecast.utils.aggregate_before': ( 'utils.html#aggregate_before', @@ -191,8 +191,6 @@ 'hierarchicalforecast/utils.py'), 'hierarchicalforecast.utils.level_to_outputs': ( 'utils.html#level_to_outputs', 'hierarchicalforecast/utils.py'), - 'hierarchicalforecast.utils.numpy_balance': ( 'utils.html#numpy_balance', - 'hierarchicalforecast/utils.py'), 'hierarchicalforecast.utils.quantiles_to_outputs': ( 'utils.html#quantiles_to_outputs', 'hierarchicalforecast/utils.py'), 'hierarchicalforecast.utils.samples_to_quantiles_df': ( 'utils.html#samples_to_quantiles_df', diff --git a/hierarchicalforecast/utils.py b/hierarchicalforecast/utils.py index 1e5d9702..5d5e6352 100644 --- a/hierarchicalforecast/utils.py +++ b/hierarchicalforecast/utils.py @@ -6,6 +6,7 @@ # %% ../nbs/utils.ipynb 3 import sys import timeit +import warnings from itertools import chain from typing import Callable, Dict, List, Optional, Iterable @@ -100,7 +101,7 @@ def aggregate_before(df: pd.DataFrame, sparse_s: bool = False): """Utils Aggregation Function. - Aggregates bottom level series contained in the pd.DataFrame `df` according + Aggregates bottom level series contained in the pd.DataFrame `df` according to levels defined in the `spec` list applying the `agg_fn` (sum, mean).
**Parameters:**
@@ -139,88 +140,15 @@ def aggregate_before(df: pd.DataFrame, return Y_df, S, tags # %% ../nbs/utils.ipynb 11 -def numpy_balance(*arrs): - """ - Fast NumPy implementation of balance function. - The function creates all the interactions between - the NumPy arrays provided. - **Parameters:**
- `arrs`: NumPy arrays.
- **Returns:**
- `out`: NumPy array.
- """ - N = len(arrs) - out = np.transpose(np.meshgrid(*arrs, indexing='ij'), - np.roll(np.arange(N + 1), -1)).reshape(-1, N) - return out - -def _to_summing_dataframe( - df: pd.DataFrame, spec: List[List[str]], sparse_s: bool = False -): - #------------------------------- Wrangling -----------------------------# - # Keep unique levels, preserving first aparison order - all_levels = list(chain.from_iterable(spec)) - all_levels = [*dict.fromkeys(all_levels)] - - # Create hierarchical labels - S_df = df[all_levels].copy() - S_df = S_df.drop_duplicates() - - max_len_idx = np.argmax([len(hier) for hier in spec]) - bottom_comb = spec[max_len_idx] - hiers_cols = [] - df = df.copy() - for hier in spec: - if hier == bottom_comb: - hier_col = 'unique_id' - bottom_col = '/'.join(hier) - df['unique_id'] = df[hier].agg('/'.join, axis=1) - else: - hier_col = '/'.join(hier) - S_df[hier_col] = S_df[hier].agg('/'.join, axis=1) - hiers_cols.append(hier_col) - S_df = S_df.sort_values(by=bottom_comb) - S_df = S_df[hiers_cols] - - #------------------------------- Encoding ------------------------------# - # One hot encode only aggregate levels - bottom_ids = list(S_df.unique_id) - del S_df['unique_id'] - categories = [S_df[col].unique() for col in S_df.columns] - tags = dict(zip(S_df.columns, categories)) - tags[bottom_col] = bottom_ids - - try: - encoder = OneHotEncoder(categories=categories, sparse_output=sparse_s, dtype=np.float32) - except TypeError: # sklearn < 1.2 - encoder = OneHotEncoder(categories=categories, sparse=sparse_s, dtype=np.float32) - - S = encoder.fit_transform(S_df).T - - if sparse_s: - S = sparse.vstack( - [ - sparse.csr_matrix(S), - sparse.identity(len(bottom_ids), dtype=np.float32, format='csr'), - ] - ) - S_df = pd.DataFrame.sparse.from_spmatrix( - S, columns=bottom_ids, index=list(chain(*categories)) + bottom_ids - ) - else: - S = np.concatenate([S, np.eye(len(bottom_ids), dtype=np.float32)], axis=0) - S_df = pd.DataFrame( - S, columns=bottom_ids, index=list(chain(*categories)) + bottom_ids - ) - - # Match index ordering of S_df and collapse df to Y_bottom_df - Y_bottom_df = df.copy() - Y_bottom_df = Y_bottom_df.groupby(['unique_id', 'ds'])['y'].sum().reset_index() - Y_bottom_df.unique_id = Y_bottom_df.unique_id.astype('category') - Y_bottom_df.unique_id = Y_bottom_df.unique_id.cat.set_categories(S_df.columns) - return Y_bottom_df, S_df, tags +def _to_upper_hierarchy(bottom_split, bottom_values, upper_key): + upper_split = upper_key.split('/') + upper_idxs = [bottom_split.index(i) for i in upper_split] + def join_upper(bottom_value): + bottom_parts = bottom_value.split('/') + return '/'.join(bottom_parts[i] for i in upper_idxs) + return [join_upper(val) for val in bottom_values] # %% ../nbs/utils.ipynb 12 def aggregate( @@ -229,71 +157,77 @@ def aggregate( is_balanced: bool = False, sparse_s: bool = False, ): - """ Utils Aggregation Function. - Aggregates bottom level series contained in the pd.DataFrame `df` according - to levels defined in the `spec` list applying the `agg_fn` (sum, mean). - - **Parameters:**
- `df`: pd.DataFrame with columns `['ds', 'y']` and columns to aggregate.
- `spec`: List of levels. Each element of the list contains a list of columns of `df` to aggregate.
- `is_balanced`: bool=False, whether `Y_bottom_df` is balanced, if not we balance.
- `sparse_s`: bool=False, whether the returned S_df should be a sparse DataFrame.
- **Returns:**
- `Y_df, S_df, tags`: tuple with hierarchically structured series `Y_df` ($\mathbf{y}_{[a,b]}$), - summing dataframe `S_df`, and hierarchical aggregation indexes `tags`. + """Utils Aggregation Function. + Aggregates bottom level series contained in the pandas DataFrame `df` according + to levels defined in the `spec` list. + + Parameters + ---------- + df : pandas DataFrame + Dataframe with columns `['ds', 'y']` and columns to aggregate. + spec : list of list of str + List of levels. Each element of the list should contain a list of columns of `df` to aggregate. + is_balanced : bool (default=False) + Deprecated. + sparse_s : bool (default=False) + Return `S_df` as a sparse dataframe. + + Returns + ------- + Y_df : pandas DataFrame + Hierarchically structured series. + S_df : pandas DataFrame + Summing dataframe. + tags : dict + Aggregation indices. """ - - #Ensure no null values + # Checks if df.isnull().values.any(): - raise Exception('`df` contains null values') + raise ValueError('`df` contains null values') + if is_balanced: + warnings.warn( + "`is_balanced` is deprecated and will be removed in a future version. " + "Don't set this argument to suppress this warning.", + category=DeprecationWarning, + ) - #-------------------------------- Wrangling --------------------------------# - # constraints S_df and collapsed Y_bottom_df with 'unique_id' - Y_bottom_df, S_df, tags = _to_summing_dataframe(df=df, spec=spec, sparse_s=sparse_s) - - # Create balanced/sorted dataset for numpy aggregation (nan=0) - # TODO: investigate potential memory speed tradeoff - if not is_balanced: - dates = Y_bottom_df['ds'].unique() - balanced_prod = numpy_balance(S_df.columns, dates) - balanced_df = pd.DataFrame(balanced_prod, columns=['unique_id', 'ds']) - balanced_df['ds'] = balanced_df['ds'].astype(Y_bottom_df['ds'].dtype) - - Y_bottom_df.set_index(['unique_id', 'ds'], inplace=True) - balanced_df.set_index(['unique_id', 'ds'], inplace=True) - balanced_df = balanced_df.merge(Y_bottom_df[['y']], - how='left', left_on=['unique_id', 'ds'], - right_index=True).reset_index() - Y_bottom_df.reset_index(inplace=True) - else: - dates = Y_bottom_df['ds'].unique() - balanced_df = Y_bottom_df.copy() - - #------------------------------- Aggregation -------------------------------# - n_agg = S_df.shape[0] - S_df.shape[1] + # compute aggregations and tags + spec = sorted(spec, key=len) + bottom = spec[-1] + aggs = [] + tags = {} + for levels in spec: + agg = df.groupby(levels + ['ds'])['y'].sum().reset_index('ds') + group = agg.index.get_level_values(0) + for level in levels[1:]: + group = group + '/' + agg.index.get_level_values(level).str.replace('/', '_') + agg.index = group + agg.index.name = 'unique_id' + tags['/'.join(levels)] = group.unique().values + aggs.append(agg) + Y_df = pd.concat(aggs) + + # construct S + bottom_key = '/'.join(bottom) + bottom_levels = tags[bottom_key] + S = np.empty((len(bottom_levels), len(spec)), dtype=object) + for j, levels in enumerate(spec[:-1]): + S[:, j] = _to_upper_hierarchy(bottom, bottom_levels, '/'.join(levels)) + S[:, -1] = tags[bottom_key] + categories = list(tags.values()) + try: + encoder = OneHotEncoder(categories=categories, sparse_output=sparse_s, dtype=np.float32) + except TypeError: # sklearn < 1.2 + encoder = OneHotEncoder(categories=categories, sparse=sparse_s, dtype=np.float32) + S = encoder.fit_transform(S).T if sparse_s: - Agg = S_df.sparse.to_coo().tocsr()[:n_agg, :] + df_constructor = pd.DataFrame.sparse.from_spmatrix else: - Agg = S_df.values[:n_agg, :] - - y_bottom = balanced_df.y.values - y_bottom = y_bottom.reshape(len(S_df.columns), len(dates)) - y_bottom_mask = np.isnan(y_bottom) - y_agg = Agg @ np.nan_to_num(y_bottom) - y_agg_mask = Agg @ y_bottom_mask - - # Create long format hierarchical dataframe - y_agg = y_agg.flatten() - y_agg[y_agg_mask.flatten() > 1] = np.nan - y_bottom = y_bottom.flatten() - Y_df = pd.DataFrame(dict( - unique_id = np.repeat(S_df.index, len(dates)), - ds = np.tile(dates, len(S_df.index)), - y = np.concatenate([y_agg, y_bottom], axis=0))) - Y_df = Y_df.set_index('unique_id').dropna() + df_constructor = pd.DataFrame + S_df = df_constructor(S, index=np.hstack(categories), columns=bottom_levels) return Y_df, S_df, tags -# %% ../nbs/utils.ipynb 20 +# %% ../nbs/utils.ipynb 21 class HierarchicalPlot: """ Hierarchical Plot @@ -487,7 +421,7 @@ def plot_hierarchical_predictions_gap(self, plt.grid() plt.show() -# %% ../nbs/utils.ipynb 35 +# %% ../nbs/utils.ipynb 36 # convert levels to output quantile names def level_to_outputs(level:Iterable[int]): """ Converts list of levels into output names matching StatsForecast and NeuralForecast methods. @@ -531,7 +465,7 @@ def quantiles_to_outputs(quantiles:Iterable[float]): output_names.append('-median') return quantiles, output_names -# %% ../nbs/utils.ipynb 36 +# %% ../nbs/utils.ipynb 37 # given input array of sample forecasts and inptut quantiles/levels, # output a Pandas Dataframe with columns of quantile predictions def samples_to_quantiles_df(samples:np.ndarray, diff --git a/nbs/utils.ipynb b/nbs/utils.ipynb index 092c6a93..1448c509 100644 --- a/nbs/utils.ipynb +++ b/nbs/utils.ipynb @@ -41,6 +41,7 @@ "#| export\n", "import sys\n", "import timeit\n", + "import warnings\n", "from itertools import chain\n", "from typing import Callable, Dict, List, Optional, Iterable\n", "\n", @@ -197,7 +198,7 @@ " sparse_s: bool = False):\n", " \"\"\"Utils Aggregation Function.\n", "\n", - " Aggregates bottom level series contained in the pd.DataFrame `df` according \n", + " Aggregates bottom level series contained in the pd.DataFrame `df` according\n", " to levels defined in the `spec` list applying the `agg_fn` (sum, mean).
\n", "\n", " **Parameters:**
\n", @@ -239,92 +240,20 @@ { "cell_type": "code", "execution_count": null, - "id": "5e767fce", + "id": "7a233df7-50d1-4a70-9f07-afb8bff2aa6e", "metadata": {}, "outputs": [], "source": [ "#| exporti\n", - "def numpy_balance(*arrs):\n", - " \"\"\"\n", - " Fast NumPy implementation of balance function.\n", - " The function creates all the interactions between\n", - " the NumPy arrays provided.\n", - " **Parameters:**
\n", - " `arrs`: NumPy arrays.
\n", - " **Returns:**
\n", - " `out`: NumPy array.
\n", - " \"\"\"\n", - " N = len(arrs)\n", - " out = np.transpose(np.meshgrid(*arrs, indexing='ij'),\n", - " np.roll(np.arange(N + 1), -1)).reshape(-1, N)\n", - " return out\n", - "\n", - "def _to_summing_dataframe(\n", - " df: pd.DataFrame, spec: List[List[str]], sparse_s: bool = False\n", - "):\n", - " #------------------------------- Wrangling -----------------------------#\n", - " # Keep unique levels, preserving first aparison order\n", - " all_levels = list(chain.from_iterable(spec))\n", - " all_levels = [*dict.fromkeys(all_levels)]\n", - "\n", - " # Create hierarchical labels\n", - " S_df = df[all_levels].copy()\n", - " S_df = S_df.drop_duplicates()\n", - "\n", - " max_len_idx = np.argmax([len(hier) for hier in spec])\n", - " bottom_comb = spec[max_len_idx]\n", - " hiers_cols = []\n", - " df = df.copy()\n", - " for hier in spec:\n", - " if hier == bottom_comb:\n", - " hier_col = 'unique_id'\n", - " bottom_col = '/'.join(hier)\n", - " df['unique_id'] = df[hier].agg('/'.join, axis=1)\n", - " else:\n", - " hier_col = '/'.join(hier)\n", - " S_df[hier_col] = S_df[hier].agg('/'.join, axis=1)\n", - " hiers_cols.append(hier_col)\n", - " S_df = S_df.sort_values(by=bottom_comb)\n", - " S_df = S_df[hiers_cols]\n", + "def _to_upper_hierarchy(bottom_split, bottom_values, upper_key):\n", + " upper_split = upper_key.split('/')\n", + " upper_idxs = [bottom_split.index(i) for i in upper_split]\n", "\n", - " #------------------------------- Encoding ------------------------------#\n", - " # One hot encode only aggregate levels\n", - " bottom_ids = list(S_df.unique_id)\n", - " del S_df['unique_id']\n", - " categories = [S_df[col].unique() for col in S_df.columns]\n", - " tags = dict(zip(S_df.columns, categories))\n", - " tags[bottom_col] = bottom_ids\n", + " def join_upper(bottom_value):\n", + " bottom_parts = bottom_value.split('/')\n", + " return '/'.join(bottom_parts[i] for i in upper_idxs)\n", "\n", - " try:\n", - " encoder = OneHotEncoder(categories=categories, sparse_output=sparse_s, dtype=np.float32)\n", - " except TypeError: # sklearn < 1.2\n", - " encoder = OneHotEncoder(categories=categories, sparse=sparse_s, dtype=np.float32)\n", - "\n", - " S = encoder.fit_transform(S_df).T\n", - "\n", - " if sparse_s:\n", - " S = sparse.vstack(\n", - " [\n", - " sparse.csr_matrix(S),\n", - " sparse.identity(len(bottom_ids), dtype=np.float32, format='csr'),\n", - " ]\n", - " )\n", - " S_df = pd.DataFrame.sparse.from_spmatrix(\n", - " S, columns=bottom_ids, index=list(chain(*categories)) + bottom_ids\n", - " )\n", - " else:\n", - " S = np.concatenate([S, np.eye(len(bottom_ids), dtype=np.float32)], axis=0)\n", - " S_df = pd.DataFrame(\n", - " S, columns=bottom_ids, index=list(chain(*categories)) + bottom_ids\n", - " )\n", - "\n", - " # Match index ordering of S_df and collapse df to Y_bottom_df\n", - " Y_bottom_df = df.copy()\n", - " Y_bottom_df = Y_bottom_df.groupby(['unique_id', 'ds'])['y'].sum().reset_index()\n", - " Y_bottom_df.unique_id = Y_bottom_df.unique_id.astype('category')\n", - " Y_bottom_df.unique_id = Y_bottom_df.unique_id.cat.set_categories(S_df.columns)\n", - " return Y_bottom_df, S_df, tags\n", - "\n" + " return [join_upper(val) for val in bottom_values]" ] }, { @@ -341,68 +270,74 @@ " is_balanced: bool = False,\n", " sparse_s: bool = False,\n", "):\n", - " \"\"\" Utils Aggregation Function.\n", - " Aggregates bottom level series contained in the pd.DataFrame `df` according \n", - " to levels defined in the `spec` list applying the `agg_fn` (sum, mean).\n", - "\n", - " **Parameters:**
\n", - " `df`: pd.DataFrame with columns `['ds', 'y']` and columns to aggregate.
\n", - " `spec`: List of levels. Each element of the list contains a list of columns of `df` to aggregate.
\n", - " `is_balanced`: bool=False, whether `Y_bottom_df` is balanced, if not we balance.
\n", - " `sparse_s`: bool=False, whether the returned S_df should be a sparse DataFrame.
\n", - " **Returns:**
\n", - " `Y_df, S_df, tags`: tuple with hierarchically structured series `Y_df` ($\\mathbf{y}_{[a,b]}$),\n", - " summing dataframe `S_df`, and hierarchical aggregation indexes `tags`.\n", + " \"\"\"Utils Aggregation Function.\n", + " Aggregates bottom level series contained in the pandas DataFrame `df` according\n", + " to levels defined in the `spec` list.\n", + "\n", + " Parameters\n", + " ----------\n", + " df : pandas DataFrame\n", + " Dataframe with columns `['ds', 'y']` and columns to aggregate.\n", + " spec : list of list of str\n", + " List of levels. Each element of the list should contain a list of columns of `df` to aggregate.\n", + " is_balanced : bool (default=False)\n", + " Deprecated.\n", + " sparse_s : bool (default=False)\n", + " Return `S_df` as a sparse dataframe.\n", + "\n", + " Returns\n", + " -------\n", + " Y_df : pandas DataFrame\n", + " Hierarchically structured series.\n", + " S_df : pandas DataFrame\n", + " Summing dataframe.\n", + " tags : dict\n", + " Aggregation indices.\n", " \"\"\"\n", - " \n", - " #Ensure no null values\n", + " # Checks\n", " if df.isnull().values.any():\n", - " raise Exception('`df` contains null values')\n", + " raise ValueError('`df` contains null values')\n", + " if is_balanced:\n", + " warnings.warn(\n", + " \"`is_balanced` is deprecated and will be removed in a future version. \"\n", + " \"Don't set this argument to suppress this warning.\",\n", + " category=DeprecationWarning,\n", + " )\n", " \n", - " #-------------------------------- Wrangling --------------------------------#\n", - " # constraints S_df and collapsed Y_bottom_df with 'unique_id'\n", - " Y_bottom_df, S_df, tags = _to_summing_dataframe(df=df, spec=spec, sparse_s=sparse_s)\n", - "\n", - " # Create balanced/sorted dataset for numpy aggregation (nan=0)\n", - " # TODO: investigate potential memory speed tradeoff\n", - " if not is_balanced:\n", - " dates = Y_bottom_df['ds'].unique()\n", - " balanced_prod = numpy_balance(S_df.columns, dates)\n", - " balanced_df = pd.DataFrame(balanced_prod, columns=['unique_id', 'ds'])\n", - " balanced_df['ds'] = balanced_df['ds'].astype(Y_bottom_df['ds'].dtype)\n", - "\n", - " Y_bottom_df.set_index(['unique_id', 'ds'], inplace=True)\n", - " balanced_df.set_index(['unique_id', 'ds'], inplace=True)\n", - " balanced_df = balanced_df.merge(Y_bottom_df[['y']],\n", - " how='left', left_on=['unique_id', 'ds'],\n", - " right_index=True).reset_index()\n", - " Y_bottom_df.reset_index(inplace=True)\n", - " else:\n", - " dates = Y_bottom_df['ds'].unique()\n", - " balanced_df = Y_bottom_df.copy()\n", - "\n", - " #------------------------------- Aggregation -------------------------------#\n", - " n_agg = S_df.shape[0] - S_df.shape[1]\n", + " # compute aggregations and tags\n", + " spec = sorted(spec, key=len)\n", + " bottom = spec[-1]\n", + " aggs = []\n", + " tags = {}\n", + " for levels in spec:\n", + " agg = df.groupby(levels + ['ds'])['y'].sum().reset_index('ds')\n", + " group = agg.index.get_level_values(0)\n", + " for level in levels[1:]:\n", + " group = group + '/' + agg.index.get_level_values(level).str.replace('/', '_')\n", + " agg.index = group\n", + " agg.index.name = 'unique_id'\n", + " tags['/'.join(levels)] = group.unique().values\n", + " aggs.append(agg)\n", + " Y_df = pd.concat(aggs)\n", + "\n", + " # construct S\n", + " bottom_key = '/'.join(bottom)\n", + " bottom_levels = tags[bottom_key]\n", + " S = np.empty((len(bottom_levels), len(spec)), dtype=object)\n", + " for j, levels in enumerate(spec[:-1]):\n", + " S[:, j] = _to_upper_hierarchy(bottom, bottom_levels, '/'.join(levels))\n", + " S[:, -1] = tags[bottom_key]\n", + " categories = list(tags.values())\n", + " try:\n", + " encoder = OneHotEncoder(categories=categories, sparse_output=sparse_s, dtype=np.float32)\n", + " except TypeError: # sklearn < 1.2\n", + " encoder = OneHotEncoder(categories=categories, sparse=sparse_s, dtype=np.float32) \n", + " S = encoder.fit_transform(S).T\n", " if sparse_s:\n", - " Agg = S_df.sparse.to_coo().tocsr()[:n_agg, :]\n", + " df_constructor = pd.DataFrame.sparse.from_spmatrix\n", " else:\n", - " Agg = S_df.values[:n_agg, :]\n", - "\n", - " y_bottom = balanced_df.y.values\n", - " y_bottom = y_bottom.reshape(len(S_df.columns), len(dates))\n", - " y_bottom_mask = np.isnan(y_bottom)\n", - " y_agg = Agg @ np.nan_to_num(y_bottom)\n", - " y_agg_mask = Agg @ y_bottom_mask\n", - "\n", - " # Create long format hierarchical dataframe\n", - " y_agg = y_agg.flatten()\n", - " y_agg[y_agg_mask.flatten() > 1] = np.nan\n", - " y_bottom = y_bottom.flatten()\n", - " Y_df = pd.DataFrame(dict(\n", - " unique_id = np.repeat(S_df.index, len(dates)),\n", - " ds = np.tile(dates, len(S_df.index)),\n", - " y = np.concatenate([y_agg, y_bottom], axis=0)))\n", - " Y_df = Y_df.set_index('unique_id').dropna()\n", + " df_constructor = pd.DataFrame\n", + " S_df = df_constructor(S, index=np.hstack(categories), columns=bottom_levels)\n", " return Y_df, S_df, tags" ] }, @@ -416,6 +351,52 @@ "show_doc(aggregate, title_level=3)" ] }, + { + "cell_type": "code", + "execution_count": null, + "id": "075b8d76-b206-4ca6-8722-dd60e4c3b535", + "metadata": {}, + "outputs": [], + "source": [ + "#| hide\n", + "# simple case\n", + "df = pd.DataFrame(\n", + " {\n", + " 'cat1': ['a', 'a', 'a', 'b'],\n", + " 'cat2': ['1', '2', '3', '2'],\n", + " 'y': [10, 20, 30, 40],\n", + " 'ds': ['2020-01-01', '2020-02-01', '2020-03-01', '2020-02-01']\n", + " }\n", + ")\n", + "df['country'] = 'COUNTRY'\n", + "spec = [['country'], ['country', 'cat1'], ['country','cat1', 'cat2']]\n", + "Y_df, S_df, tags = aggregate(df, spec)\n", + "test_eq(\n", + " Y_df.index.tolist(), \n", + " 3 * ['COUNTRY'] +\n", + " 3 * ['COUNTRY/a'] +\n", + " ['COUNTRY/b'] +\n", + " ['COUNTRY/a/1', 'COUNTRY/a/2', 'COUNTRY/a/3'] +\n", + " ['COUNTRY/b/2']\n", + ")\n", + "test_eq(Y_df.loc['COUNTRY', 'y'].values, [10, 60, 30])\n", + "test_eq(\n", + " S_df.index,\n", + " ['COUNTRY', 'COUNTRY/a', 'COUNTRY/b', 'COUNTRY/a/1', 'COUNTRY/a/2', 'COUNTRY/a/3', 'COUNTRY/b/2'],\n", + ")\n", + "test_eq(\n", + " S_df.columns,\n", + " ['COUNTRY/a/1', 'COUNTRY/a/2', 'COUNTRY/a/3', 'COUNTRY/b/2'],\n", + ")\n", + "expected_tags = {\n", + " 'country': ['COUNTRY'],\n", + " 'country/cat1': ['COUNTRY/a', 'COUNTRY/b'],\n", + " 'country/cat1/cat2': ['COUNTRY/a/1', 'COUNTRY/a/2', 'COUNTRY/a/3','COUNTRY/b/2'],\n", + "}\n", + "for k, actual in tags.items():\n", + " test_eq(actual, expected_tags[k])" + ] + }, { "cell_type": "code", "execution_count": null, @@ -442,8 +423,7 @@ "df = pd.concat(ts_list, ignore_index=True)\n", "\n", "# Create categories\n", - "df.loc[df['ult'] < 2, 'pen'] = 'a'\n", - "df.loc[df['ult'] >= 2, 'pen'] = 'b'\n", + "df['pen'] = np.where(df['ult'] < 2, 'a', 'b')\n", "# Note that unique id requires strings\n", "df['ult'] = df['ult'].astype(str)\n", "\n", @@ -452,12 +432,6 @@ " ['pen', 'ult'],\n", "]\n", "\n", - "# test numpy balance \n", - "# returns the expected values\n", - "Y_bottom_df, S_df, _ = _to_summing_dataframe(df, hier_levels)\n", - "test_eq(Y_bottom_df[['ds', 'y']], df[['ds', 'y']])\n", - "test_eq(Y_bottom_df['unique_id'].cat.categories, S_df.columns)\n", - "\n", "hier_df, S_df, tags = aggregate(df=df, spec=hier_levels)\n", "hier_df_before, S_df_before, _ = aggregate_before(df=df, spec=hier_levels)\n", "test_eq(S_df, S_df_before)\n", @@ -540,46 +514,36 @@ { "cell_type": "code", "execution_count": null, - "id": "1facd1ff", + "id": "f4b3828f-bbcc-4116-a969-49c78c33bf72", "metadata": {}, "outputs": [], "source": [ "#| hide\n", "# Test equality of aggregation and aggregation_before\n", - "with CodeTimer('strict aggregation before'):\n", - " Y_df_before, S_df_before, tags_before = aggregate_before(df=df, spec=hiers_strictly)\n", - "\n", - "with CodeTimer('strict aggregation now'):\n", - " Y_df, S_df, tags = aggregate(df=df, spec=hiers_strictly)\n", - "\n", - "test_close(Y_df.y.values, Y_df_before.y.values)\n", - "test_eq(S_df.values, S_df_before.values)\n", - "\n", - "test_eq(S_df.columns, S_df_before.columns)\n", - "test_eq(S_df.index, S_df_before.index)\n", - "\n", - "test_eq(Y_df.columns, Y_df_before.columns)\n", - "test_eq(Y_df.index, Y_df_before.index)\n", - "\n", - "with CodeTimer('grouped aggregation before'):\n", - " before_Y_df, before_S_df, before_tags = aggregate_before(df=df, spec=hiers_grouped)\n", - "\n", - "with CodeTimer('grouped aggregation now'):\n", - " Y_df, S_df, tags = aggregate(df=df, spec=hiers_grouped)\n", - "\n", - "test_close(Y_df.y.values, before_Y_df.y.values)\n", - "test_eq(S_df.values, before_S_df.values)\n", - "\n", - "test_eq(S_df.columns, before_S_df.columns)\n", - "test_eq(S_df.index, before_S_df.index)\n", - "\n", - "test_eq(Y_df.columns, before_Y_df.columns)\n", - "test_eq(Y_df.index, before_Y_df.index)" + "for name, spec in zip(['strict', 'grouped'], [hiers_strictly, hiers_grouped]):\n", + " with CodeTimer(f'{name} aggregation before'):\n", + " Y_df_before, S_df_before, tags_before = aggregate_before(df=df, spec=spec)\n", + " \n", + " with CodeTimer(f'{name} aggregation now'):\n", + " Y_df, S_df, tags = aggregate(df=df, spec=spec)\n", + " \n", + " np.testing.assert_allclose(\n", + " Y_df['y'].values,\n", + " Y_df_before['y'].values,\n", + " )\n", + " np.testing.assert_equal(S_df.values, S_df_before.values)\n", + " \n", + " test_eq(S_df.columns, S_df_before.columns)\n", + " test_eq(S_df.index, S_df_before.index)\n", + " \n", + " test_eq(Y_df.columns, Y_df_before.columns)\n", + " test_eq(Y_df.index, Y_df_before.index)" ] }, { "cell_type": "code", "execution_count": null, + "id": "f7688d6e", "metadata": {}, "outputs": [], "source": [