Skip to content

Commit

Permalink
Unify capa and mvcapa code and speed-up mvcapa
Browse files Browse the repository at this point in the history
  • Loading branch information
Tveten committed Dec 26, 2023
1 parent 32a391c commit bff803f
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 140 deletions.
17 changes: 13 additions & 4 deletions interactive/explore_capa.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@
# Multivariate
# TODO: Add plotting functionality to assess the affected subset.
df = teeth(5, 10, p=10, mean=10, affected_proportion=0.2, random_state=2)
capa = Mvcapa(collective_penalty_scale=3, fmt="sparse", max_segment_length=20)
capa = Mvcapa(
collective_penalty_scale=3,
collective_penalty="sparse",
fmt="sparse",
)
anomalies = capa.fit_predict(df)

capa = Mvcapa(labels="score", fmt="dense", max_segment_length=20)
Expand All @@ -42,15 +46,20 @@


# Profiling
# TODO: Add a dedicated univariate version. Currently Capa is x10 slower than strchange
# TODO: Find optimal subset in Mvcapa after detecting temporal position of anomaly.
# Waste of resources to find and store all optimisers along the way.
# Also more accurate to always rerun with 'sparse' penalty.
# TODO: Add pruning?
n = int(1e4)
n = int(1e5)
df = teeth(n_segments=1, mean=0, segment_length=n, p=1)
detector = Capa(
max_segment_length=1000, collective_penalty_scale=5, point_penalty_scale=5
)
detector = Mvcapa(
max_segment_length=1000, collective_penalty_scale=5, point_penalty_scale=5
max_segment_length=1000,
collective_penalty="sparse",
collective_penalty_scale=5,
point_penalty_scale=5,
)
detector = StreamchangeCapa(ConstMeanL2(), maxsl=1000, predict_point_anomalies=True)
profiler = Profiler()
Expand Down
105 changes: 21 additions & 84 deletions skchange/anomaly_detectors/capa.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,102 +9,39 @@
from numba import njit
from sktime.annotation.base import BaseSeriesAnnotator

from skchange.anomaly_detectors.mvcapa import check_capa_input, dense_capa_penalty
from skchange.anomaly_detectors.mvcapa import (
check_capa_input,
dense_capa_penalty,
run_base_capa,
)
from skchange.anomaly_detectors.utils import format_anomaly_output
from skchange.costs.saving_factory import saving_factory


@njit
def get_anomalies(
anomaly_starts: np.ndarray,
) -> Tuple[List[Tuple[int, int]], List[int]]:
collective_anomalies = []
point_anomalies = []
i = anomaly_starts.size - 1
while i >= 0:
start_i = anomaly_starts[i]
size = i - start_i + 1
if size > 1:
collective_anomalies.append((int(start_i), i))
i = int(start_i)
elif size == 1:
point_anomalies.append(i)
i -= 1
return collective_anomalies, point_anomalies


@njit
def penalise_savings(savings: np.ndarray, penalty: float) -> np.ndarray:
if savings.ndim > 1 and savings.shape[1] > 1:
sum_savings = savings.sum(axis=1)
if savings.ndim == 1:
sum_savings = savings
elif savings.ndim > 1 and savings.shape[1] == 1:
sum_savings = savings.reshape(-1)
return sum_savings - penalty


@njit
def optimise_saving(
starts: np.ndarray,
opt_savings: np.ndarray,
next_savings: np.ndarray,
penalty: float,
) -> Tuple[float, int]:
penalised_saving = penalise_savings(next_savings, penalty)
candidate_savings = opt_savings[starts] + penalised_saving
argmax = np.argmax(candidate_savings)
opt_start = starts[0] + argmax
return candidate_savings[argmax], opt_start


@njit
def run_capa(
X: np.ndarray,
saving_func: Callable,
saving_init_func: Callable,
collective_penalty: float,
point_penalty: float,
collective_alpha: float,
point_alpha: float,
min_segment_length: int,
max_segment_length: int,
) -> Tuple[np.ndarray, List[Tuple[int, int]], List[int]]:
) -> Tuple[np.ndarray, List[Tuple[int, int]], List[Tuple[int, int]]]:
params = saving_init_func(X)
n = X.shape[0]
opt_savings = np.zeros(n + 1)
# Store the previous start of an anomaly for each t.
# Used to get the final set of anomalies after the loop.
opt_anomaly_starts = np.repeat(np.nan, n)

ts = np.arange(min_segment_length - 1, n)
for t in ts:
# Collective anomalies
lower_start = max(0, t - max_segment_length + 1)
upper_start = t - min_segment_length + 2
starts = np.arange(lower_start, upper_start)
ends = np.repeat(t, len(starts))
collective_savings = saving_func(params, starts, ends)
opt_collective_saving, opt_start = optimise_saving(
starts, opt_savings, collective_savings, collective_penalty
)

# Point anomalies
t_array = np.array([t])
point_savings = saving_func(params, t_array, t_array)
opt_point_saving, _ = optimise_saving(
t_array, opt_savings, point_savings, point_penalty
)

# Combine and store results
savings = np.array([opt_savings[t], opt_collective_saving, opt_point_saving])
argmax = np.argmax(savings)
opt_savings[t + 1] = savings[argmax]
if argmax == 1:
opt_anomaly_starts[t] = opt_start
elif argmax == 2:
opt_anomaly_starts[t] = t

collective_anomalies, point_anomalies = get_anomalies(opt_anomaly_starts)
return opt_savings[1:], collective_anomalies, point_anomalies
collective_betas = np.zeros(1)
point_betas = np.zeros(1)
return run_base_capa(
X,
params,
saving_func,
collective_alpha,
collective_betas,
point_alpha,
point_betas,
min_segment_length,
max_segment_length,
)


class Capa(BaseSeriesAnnotator):
Expand Down
127 changes: 89 additions & 38 deletions skchange/anomaly_detectors/mvcapa.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,79 +205,95 @@ def capa_penalty_factory(penalty: Union[str, Callable] = "combined") -> Callable


@njit
def get_multivariate_anomalies(
anomaly_starts: np.ndarray, affected_components: List[np.ndarray]
) -> Tuple[List[Tuple[int, int, np.ndarray]], List[Tuple[int, np.ndarray]]]:
def get_anomalies(
anomaly_starts: np.ndarray,
) -> Tuple[List[Tuple[int, int]], List[Tuple[int, int]]]:
collective_anomalies = []
point_anomalies = []
i = anomaly_starts.size - 1
while i >= 0:
start_i = anomaly_starts[i]
size = i - start_i + 1
if size > 1:
affected_components_i = np.sort(affected_components[i])
collective_anomalies.append((int(start_i), i, affected_components_i))
collective_anomalies.append((int(start_i), i))
i = int(start_i)
elif size == 1:
affected_components_i = np.sort(affected_components[i])
point_anomalies.append((i, affected_components_i))
point_anomalies.append((i, i))

Check warning on line 221 in skchange/anomaly_detectors/mvcapa.py

View check run for this annotation

Codecov / codecov/patch

skchange/anomaly_detectors/mvcapa.py#L221

Added line #L221 was not covered by tests
i -= 1
return collective_anomalies, point_anomalies


@njit
def penalise_multivariate_savings(
def penalise_savings(
savings: np.ndarray, alpha: float, betas: np.ndarray
) -> Tuple[np.ndarray, List[np.ndarray]]:
n_savings = savings.shape[0]
penalised_savings = np.zeros(n_savings)
opt_components = []
for i in range(n_savings):
saving_i = savings[i]
saving_order = (-saving_i).argsort(kind="mergesort") # Decreasing order.
penalised_saving = np.cumsum(saving_i[saving_order] - betas) - alpha
argmax_penalised_saving = np.argmax(penalised_saving)
penalised_savings[i] = penalised_saving[argmax_penalised_saving]
opt_components.append(saving_order[: argmax_penalised_saving + 1])
return penalised_savings, opt_components
) -> np.ndarray:
if np.all(betas < 1e-8):
penalised_savings = savings.sum(axis=1) - alpha
if np.all(betas == betas[0]):
penalised_saving_matrix = np.maximum(savings - betas[0], 0.0) - alpha
penalised_savings = penalised_saving_matrix.sum(axis=1)
else:
n_savings = savings.shape[0]
penalised_savings = np.zeros(n_savings)
for i in range(n_savings):
saving_i = savings[i]
saving_order = (-saving_i).argsort() # Decreasing order.
penalised_saving = np.cumsum(saving_i[saving_order] - betas) - alpha
argmax = np.argmax(penalised_saving)
penalised_savings[i] = penalised_saving[argmax]

Check warning on line 243 in skchange/anomaly_detectors/mvcapa.py

View check run for this annotation

Codecov / codecov/patch

skchange/anomaly_detectors/mvcapa.py#L236-L243

Added lines #L236 - L243 were not covered by tests
return penalised_savings


@njit
def find_affected_components(
params: Union[np.ndarray, tuple],
saving_func: Callable,
anomalies: List[Tuple[int, int]],
alpha: float,
betas: np.ndarray,
) -> List[Tuple[int, int, np.ndarray]]:
new_anomalies = []
for start, end in anomalies:
saving = saving_func(params, np.array([start]), np.array([end]))[0]
saving_order = (-saving).argsort() # Decreasing order.
penalised_saving = np.cumsum(saving[saving_order] - betas) - alpha
argmax = np.argmax(penalised_saving)
new_anomalies.append((start, end, saving_order[: argmax + 1]))
return new_anomalies


@njit
def optimise_mv_savings(
def optimise_savings(
starts: np.ndarray,
opt_savings: np.ndarray,
next_savings: np.ndarray,
alpha: float,
betas: np.ndarray,
) -> Tuple[float, int, np.ndarray]:
penalised_saving, affected_components = penalise_multivariate_savings(
next_savings, alpha, betas
)
) -> Tuple[float, int]:
penalised_saving = penalise_savings(next_savings, alpha, betas)
candidate_savings = opt_savings[starts] + penalised_saving
argmax = np.argmax(candidate_savings)
opt_start = starts[0] + argmax
return candidate_savings[argmax], opt_start, affected_components[argmax]
return candidate_savings[argmax], opt_start


@njit
def run_mvcapa(
def run_base_capa(
X: np.ndarray,
params: Union[np.ndarray, tuple],
saving_func: Callable,
saving_init_func: Callable,
collective_alpha: float,
collective_betas: np.ndarray,
point_alpha: float,
point_betas: np.ndarray,
min_segment_length: int,
max_segment_length: int,
) -> Tuple[np.ndarray, List[Tuple[int, int, np.ndarray]], List[Tuple[int, np.ndarray]]]:
params = saving_init_func(X)
) -> Tuple[np.ndarray, List[Tuple[int, int]], List[Tuple[int, int]]]:
n = X.shape[0]
opt_savings = np.zeros(n + 1)
# Store the optimal start and affected components of an anomaly for each t.
# Used to get the final set of anomalies after the loop.
opt_anomaly_starts = np.repeat(np.nan, n)
opt_affected_components = [np.array([-1])] * n

ts = np.arange(min_segment_length - 1, n)
for t in ts:
Expand All @@ -287,14 +303,14 @@ def run_mvcapa(
starts = np.arange(lower_start, upper_start)
ends = np.repeat(t, len(starts))
collective_savings = saving_func(params, starts, ends)
opt_collective_saving, opt_start, opt_componenets = optimise_mv_savings(
opt_collective_saving, opt_start = optimise_savings(
starts, opt_savings, collective_savings, collective_alpha, collective_betas
)

# Point anomalies
t_array = np.array([t])
point_savings = saving_func(params, t_array, t_array)
opt_point_saving, _, opt_point_components = optimise_mv_savings(
opt_point_saving, _ = optimise_savings(
t_array, opt_savings, point_savings, point_alpha, point_betas
)

Expand All @@ -304,17 +320,52 @@ def run_mvcapa(
opt_savings[t + 1] = savings[argmax]
if argmax == 1:
opt_anomaly_starts[t] = opt_start
opt_affected_components[t] = opt_componenets
elif argmax == 2:
opt_anomaly_starts[t] = t
opt_affected_components[t] = opt_point_components

collective_anomalies, point_anomalies = get_multivariate_anomalies(
opt_anomaly_starts, opt_affected_components
)
collective_anomalies, point_anomalies = get_anomalies(opt_anomaly_starts)
return opt_savings[1:], collective_anomalies, point_anomalies


@njit
def run_mvcapa(
X: np.ndarray,
saving_func: Callable,
saving_init_func: Callable,
collective_alpha: float,
collective_betas: np.ndarray,
point_alpha: float,
point_betas: np.ndarray,
min_segment_length: int,
max_segment_length: int,
) -> Tuple[
np.ndarray, List[Tuple[int, int, np.ndarray]], List[Tuple[int, int, np.ndarray]]
]:
params = saving_init_func(X)
opt_savings, collective_anomalies, point_anomalies = run_base_capa(
X,
params,
saving_func,
collective_alpha,
collective_betas,
point_alpha,
point_betas,
min_segment_length,
max_segment_length,
)
collective_anomalies = find_affected_components(
params,
saving_func,
collective_anomalies,
collective_alpha,
collective_betas,
)
point_anomalies = find_affected_components(
params, saving_func, point_anomalies, point_alpha, point_betas
)
return opt_savings, collective_anomalies, point_anomalies


class Mvcapa(BaseSeriesAnnotator):
"""Subset multivariate collective and point anomaly detection.
Expand Down
Loading

0 comments on commit bff803f

Please sign in to comment.