Skip to content

Commit

Permalink
tests: update pelt tests to new structure
Browse files Browse the repository at this point in the history
  • Loading branch information
Tveten committed Nov 25, 2024
1 parent 03771ac commit e475571
Showing 1 changed file with 41 additions and 54 deletions.
95 changes: 41 additions & 54 deletions skchange/change_detectors/tests/test_pelt.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
"""Tests for the Pelt implementation."""

from typing import Callable

import numpy as np
import pytest

from skchange.change_detectors.pelt import (
get_changepoints,
run_pelt,
)
from skchange.costs.l2_cost import L2Cost
from skchange.costs_old.cost_factory import cost_factory
from skchange.costs import BaseCost, L2Cost
from skchange.datasets.generate import generate_alternating_data
from skchange.utils.numba import njit

n_segments = 2
seg_len = 50
Expand All @@ -24,20 +20,17 @@
n_segments=5, mean=10.5, variance=0.5, segment_length=20, p=1, random_state=5
)[0].values.reshape(-1, 1)

cost_func, cost_init_func = cost_factory("mean")
penalty = 2 * np.log(len(changepoint_data))

cost = L2Cost()
penalty = 2 * np.log(len(changepoint_data))


def pelt_partition_cost(
X: np.ndarray,
changepoints: list,
cost_func: Callable,
cost_init_func: Callable,
cost: BaseCost,
penalty: float,
):
cost_args = cost_init_func(X)
cost.fit(X)
n = len(X)

total_cost = penalty * len(changepoints)
Expand All @@ -46,26 +39,29 @@ def pelt_partition_cost(
interval_starts = np.concatenate((np.array([0]), np_changepoints + 1), axis=0)
interval_ends = np.concatenate((np_changepoints, np.array([n - 1])), axis=0)

interval_costs = cost_func(cost_args, interval_starts, interval_ends)
interval_costs = np.sum(
cost.evaluate(np.column_stack((interval_starts, interval_ends + 1))), axis=1
)
total_cost += np.sum(interval_costs)

return total_cost


@njit
def run_pelt_old(
X: np.ndarray, cost_func, cost_init_func, penalty, min_segment_length
X: np.ndarray, cost: BaseCost, penalty, min_segment_length
) -> tuple[np.ndarray, list]:
# With 'min_segment_length' > 1, this function can return
# segment lengths < 'min_segment_length'.
params = cost_init_func(X)
cost.fit(X)
n = len(X)

starts = np.array((), dtype=np.int64) # Evolving set of admissible segment starts.
init_starts = np.zeros(min_segment_length - 1, dtype=np.int64)
init_ends = np.arange(min_segment_length - 1)
opt_cost = np.zeros(n + 1) - penalty
opt_cost[1:min_segment_length] = cost_func(params, init_starts, init_ends)
opt_cost[1:min_segment_length] = np.sum(
cost.evaluate(np.column_stack((init_starts, init_ends + 1))), axis=1
)

# Store the previous changepoint for each t.
# Used to get the final set of changepoints after the loop.
Expand All @@ -76,7 +72,9 @@ def run_pelt_old(
starts = np.concatenate((starts, t - min_segment_length + 1))
ends = np.repeat(t, len(starts))
candidate_opt_costs = (
opt_cost[starts] + cost_func(params, starts, ends) + penalty
opt_cost[starts]
+ np.sum(cost.evaluate(np.column_stack((starts, ends + 1))), axis=1)
+ penalty
)
argmin = np.argmin(candidate_opt_costs)
opt_cost[t + 1] = candidate_opt_costs[argmin]
Expand All @@ -88,17 +86,15 @@ def run_pelt_old(
return opt_cost[1:], get_changepoints(prev_cpts)


@njit
def run_optimal_partitioning(
X: np.ndarray,
cost_func,
cost_init_func,
cost: BaseCost,
penalty,
min_segment_length: int = 1,
) -> tuple[np.ndarray, list]:
# The simpler and more direct 'optimal partitioning' algorithm,
# as compared to the PELT algorithm.
params = cost_init_func(X)
cost.fit(X)
num_obs = len(X)
min_segment_shift = min_segment_length - 1

Expand All @@ -119,8 +115,11 @@ def run_optimal_partitioning(
non_changepoint_ends = np.arange(min_segment_length - 1, 2 * min_segment_length - 1)

# Shifted by 1 to account for the first element being -penalty:
opt_cost[min_segment_length : (2 * min_segment_length)] = cost_func(
params, non_changepoint_starts, non_changepoint_ends
opt_cost[min_segment_length : (2 * min_segment_length)] = np.sum(
cost.evaluate(
np.column_stack((non_changepoint_starts, non_changepoint_ends + 1))
),
axis=1,
)

# Store the previous changepoint for each last start added.
Expand All @@ -144,7 +143,10 @@ def run_optimal_partitioning(

candidate_opt_costs = (
opt_cost[candidate_starts] # Shifted by one.
+ cost_func(params, candidate_starts, candidate_ends)
+ np.sum(
cost.evaluate(np.column_stack((candidate_starts, candidate_ends + 1))),
axis=1,
)
+ penalty
)

Expand All @@ -159,16 +161,14 @@ def run_optimal_partitioning(
def test_old_pelt_vs_optimal_partitioning(min_segment_length):
pelt_costs, pelt_changepoints = run_pelt_old(
changepoint_data,
cost_func,
cost_init_func,
cost=cost,
penalty=penalty,
min_segment_length=min_segment_length,
)

opt_part_costs, opt_part_changepoints = run_optimal_partitioning(
changepoint_data,
cost_func,
cost_init_func,
cost=cost,
penalty=penalty,
min_segment_length=min_segment_length,
)
Expand All @@ -186,15 +186,13 @@ def test_xfail_old_pelt_vs_optimal_partitioning_scores(min_segment_length=2):
X = changepoint_data
pelt_costs, _ = run_pelt_old(
X,
cost_func,
cost_init_func,
cost,
penalty=penalty,
min_segment_length=min_segment_length,
)
opt_part_costs, _ = run_optimal_partitioning(
X,
cost_func,
cost_init_func,
cost,
penalty=penalty,
min_segment_length=min_segment_length,
)
Expand All @@ -205,15 +203,13 @@ def test_old_pelt_vs_optimal_partitioning_change_points(min_segment_length=2):
X = changepoint_data
_, pelt_changepoints = run_pelt_old(
X,
cost_func,
cost_init_func,
cost,
penalty=penalty,
min_segment_length=min_segment_length,
)
_, opt_part_changepoints = run_optimal_partitioning(
X,
cost_func,
cost_init_func,
cost,
penalty=penalty,
min_segment_length=min_segment_length,
)
Expand All @@ -223,8 +219,7 @@ def test_old_pelt_vs_optimal_partitioning_change_points(min_segment_length=2):
def test_run_old_pelt(min_segment_length=1):
pelt_costs, changepoints = run_pelt_old(
changepoint_data,
cost_func,
cost_init_func,
cost,
penalty=penalty,
min_segment_length=min_segment_length,
)
Expand All @@ -237,8 +232,7 @@ def test_run_old_pelt(min_segment_length=1):
def test_run_optimal_partitioning(min_segment_length=1):
opt_costs, changepoints = run_optimal_partitioning(
changepoint_data,
cost_func,
cost_init_func,
cost,
penalty=penalty,
min_segment_length=min_segment_length,
)
Expand All @@ -264,15 +258,13 @@ def test_run_pelt(min_segment_length=1):
def test_compare_all_pelt_functions(min_segment_length=1):
old_pelt_costs, old_pelt_changepoints = run_pelt_old(
changepoint_data,
cost_func,
cost_init_func,
cost,
penalty=penalty,
min_segment_length=min_segment_length,
)
opt_part_costs, opt_part_changepoints = run_optimal_partitioning(
changepoint_data,
cost_func,
cost_init_func,
cost,
penalty=penalty,
min_segment_length=min_segment_length,
)
Expand Down Expand Up @@ -306,8 +298,7 @@ def test_pelt_on_tricky_data(min_segment_length):
)
opt_part_costs, opt_part_changepoints = run_optimal_partitioning(
alternating_sequence,
cost_func,
cost_init_func,
cost,
penalty=penalty,
min_segment_length=min_segment_length,
)
Expand All @@ -318,8 +309,7 @@ def test_pelt_on_tricky_data(min_segment_length):
pelt_partition_cost(
alternating_sequence,
pelt_changepoints,
cost_func,
cost_init_func,
cost,
penalty=penalty,
),
decimal=10,
Expand Down Expand Up @@ -348,8 +338,7 @@ def test_pelt_min_segment_lengths(min_segment_length):
)
_, opt_part_changepoints = run_optimal_partitioning(
alternating_sequence,
cost_func,
cost_init_func,
cost,
penalty=penalty,
min_segment_length=min_segment_length,
)
Expand All @@ -372,8 +361,7 @@ def test_xfail_pelt_min_segment_lengths(min_segment_length):
)
_, opt_part_changepoints = run_optimal_partitioning(
alternating_sequence,
cost_func,
cost_init_func,
cost,
penalty=penalty,
min_segment_length=min_segment_length,
)
Expand Down Expand Up @@ -402,8 +390,7 @@ def test_xfail_pelt_on_tricky_data(min_segment_length):
)
opt_part_costs, _ = run_optimal_partitioning(
alternating_sequence,
cost_func,
cost_init_func,
cost,
penalty=penalty,
min_segment_length=min_segment_length,
)
Expand Down

0 comments on commit e475571

Please sign in to comment.