Skip to content

Commit

Permalink
added capability to handle duplicated timestamps in timeseries data.
Browse files Browse the repository at this point in the history
  • Loading branch information
pedrofluxa committed Sep 22, 2023
1 parent 3712638 commit f0f45bb
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 8 deletions.
68 changes: 60 additions & 8 deletions dataprep_ml/cleaners.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,16 +370,68 @@ def _get_columns_to_clean(data: pd.DataFrame, dtype_dict: Dict[str, dtype], mode
return cleanable_columns


def clean_timeseries(df: pd.DataFrame, tss: Dict) -> pd.DataFrame:
def clean_timeseries(df: pd.DataFrame, tss: dict) -> pd.DataFrame:
"""
All timeseries-specific cleaning logic goes here. Currently:
1) Any row with `nan`-valued order-by measurements is dropped.
:param df: data.
:param tss: timeseries settings
:return: cleaned data.
All timeseries-specific cleaning logic goes here. Currently:
1) Any row with `nan`-valued order-by measurements is dropped.
2) Rows with duplicated time-stamps are trated the following way:
- columns that are numerical are averaged
- for non-numerical columns, only the first duplicate is kept.
:param df: data.
:param tss: timeseries settings
:return: cleaned data.
"""
# locate and drop rows with invalid (None, Nan, NaT) timestamps
invalid_rows = df[df[tss['order_by']].isna()].index
df = df.drop(invalid_rows)
# build mask with duplicated timestamps
df = df.reset_index(drop=True)
dup_ts_mask = df[tss['order_by']].duplicated(keep=False)
# return if no duplicated indices are found
if dup_ts_mask.sum() == 0:
return df
# find indices with duplicated timestamps
dup_ts_idx = np.where(dup_ts_mask)[0]
# build groups of duplicated indices
dup_idx_groups = []
curr_group = []
for idx, idx_next in zip(dup_ts_idx[:-1:],
dup_ts_idx[1::]):
curr_group.append(idx)
if idx_next - idx > 1 or idx_next == dup_ts_idx[-1]:
g = np.asarray(curr_group)
dup_idx_groups.append(g)
curr_group = []
# average numerical columns in groups
# keep the first value for non-numerical columns
avg_groups = []
for grp in dup_idx_groups:
avg_num_row = df.loc[grp].mean(axis=0, numeric_only=True)
num_data_float = pd.DataFrame(avg_num_row).transpose()
row = None
# respect original types
num_data = pd.DataFrame()
num_cols = list(num_data_float.columns)
for nc in num_cols:
col_dtype = df.dtypes[nc]
num_data[nc] = num_data_float[nc].values.astype(col_dtype)
# handle case where all columns are numeric
if len(num_cols) == len(df.columns):
row = num_data
else:
non_num_row = df.loc[grp].iloc[0].drop(num_cols)
non_num_data = pd.DataFrame(non_num_row).transpose()
non_num_data.reset_index(drop=True, inplace=True)
row = num_data.join(non_num_data)
avg_groups.append(row)
avg_groups = pd.concat(avg_groups, axis=0)
# remove duplicates
df = df[~dup_ts_mask]
# append averaged rows
df = pd.concat([df, avg_groups], axis=0)
# sort to bring back balance to the force
df = df.sort_values(tss['order_by'])
df = df.reset_index(drop=True)

return df
51 changes: 51 additions & 0 deletions tests/integration_tests/test_cleaners.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,54 @@ def test_2_imputers(self):
assert cdf[num_zero_impute_col].iloc[0] == num_zero_target_value
assert cdf[num_median_impute_col].iloc[0] == num_median_target_value
assert cdf[cat_mode_impute_col].iloc[0] == cat_mode_target_value

def test_3_timeseries(self):
""" Dead-simple test for time-series cleaner.
"""
x_correct = np.asarray([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
y_correct = np.asarray(['1', '2', '3', '4', '5', '6', '7', '8', '9', '10'])
z_correct = x_correct.copy()
df_correct = pd.DataFrame.from_records(
{'x': x_correct, 'y': y_correct, 'z': z_correct})
# setup corrupted DataFrame
x_corrupted = np.asarray([
1, 1, 1,
2,
3, 3, 3, 3,
4, 5, 6,
7, 7, 7, 7,
8, 9,
10, 10, 10])
y_corrupted = np.asarray([
'1', '1', '1',
'2',
'3', '3', '3', '3',
'4', '5', '6',
'7', '7', '7', '7',
'8', '9',
'10', '10', '10'
])
z_corrupted = x_corrupted.copy()
df_corrupted = pd.DataFrame.from_records(
{'x': x_corrupted, 'y': y_corrupted, 'z': z_corrupted})
# inferred types are the same for both DataFrames
inferred_types = infer_types(df_correct, pct_invalid=0)
target = 'y'
tss = {
'is_timeseries': True,
'order_by': 'x'
}
df_clean = cleaner(data=df_corrupted,
dtype_dict=inferred_types.dtypes,
pct_invalid=0.01,
identifiers={},
target=target,
mode='train',
timeseries_settings=tss,
anomaly_detection=False,
imputers={},
custom_cleaning_functions={})
self.assertTrue(df_clean.equals(df_correct))
self.assertTrue(isinstance(df_clean, pd.DataFrame))
# TODO: better asserts here

0 comments on commit f0f45bb

Please sign in to comment.