Skip to content

Commit

Permalink
Update sum and grid to take residuals into account
Browse files Browse the repository at this point in the history
  • Loading branch information
skejserjensen committed May 21, 2023
1 parent 874c57a commit b79c26a
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 47 deletions.
13 changes: 3 additions & 10 deletions crates/modelardb_compression/src/models/gorilla.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,17 +171,10 @@ impl Gorilla {
/// Gorilla's compression method for floating-point values. If `maybe_model_last_value` is provided,
/// it is assumed the first value in `values` is compressed against it instead of being stored in
/// full, i.e., uncompressed.
pub fn sum(
start_time: Timestamp,
end_time: Timestamp,
timestamps: &[u8],
values: &[u8],
maybe_model_last_value: Option<Value>,
) -> Value {
pub fn sum(length: usize, values: &[u8], maybe_model_last_value: Option<Value>) -> Value {
// This function replicates code from gorilla::grid() as it isn't necessary
// to store the univariate ids, timestamps, and values in arrays for a sum.
// So any changes to the decompression must be mirrored in gorilla::grid().
let length = models::len(start_time, end_time, timestamps);
let mut bits = BitReader::try_new(values).unwrap();
let mut leading_zeros = u8::MAX;
let mut trailing_zeros: u8 = 0;
Expand Down Expand Up @@ -369,7 +362,7 @@ mod tests {
fn test_sum(values in collection::vec(ProptestValue::ANY, 0..50)) {
prop_assume!(!values.is_empty());
let compressed_values = compress_values_using_gorilla(&values);
let sum = sum(1, values.len() as i64, &values.len().to_be_bytes(), &compressed_values, None);
let sum = sum(values.len(), &compressed_values, None);
let expected_sum = aggregate::sum(&ValueArray::from_iter_values(values)).unwrap();
prop_assert!(models::equal_or_nan(expected_sum as f64, sum as f64));
}
Expand Down Expand Up @@ -414,7 +407,7 @@ mod tests {
let error_bound = ErrorBound::try_new(0.0).unwrap();
let mut model_type = Gorilla::new(error_bound);
model_type.compress_values(values);
model_type.compressed_values()
model_type.compressed_values.finish()
}

fn slice_of_value_equal(values_one: &[Value], values_two: &[Value]) -> bool {
Expand Down
146 changes: 121 additions & 25 deletions crates/modelardb_compression/src/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ pub mod timestamps;
use std::mem;

use arrow::array::ArrayBuilder;
use modelardb_common::errors::ModelarDbError;
use modelardb_common::types::{
Timestamp, TimestampBuilder, UnivariateId, UnivariateIdBuilder, Value, ValueBuilder,
};
Expand Down Expand Up @@ -99,6 +98,7 @@ pub fn len(start_time: Timestamp, end_time: Timestamp, timestamps: &[u8]) -> usi
}

/// Compute the sum of the values for a time series segment whose values are represented by a model.
#[allow(clippy::too_many_arguments)]
pub fn sum(
model_type_id: u8,
start_time: Timestamp,
Expand All @@ -107,15 +107,52 @@ pub fn sum(
min_value: Value,
max_value: Value,
values: &[u8],
residuals: &[u8],
) -> Value {
match model_type_id {
PMC_MEAN_ID => pmc_mean::sum(start_time, end_time, timestamps, min_value),
SWING_ID => swing::sum(
start_time, end_time, timestamps, min_value, max_value, values,
// Extract the number of residuals stored.
let residuals_length = if residuals.is_empty() {
0
} else {
// The number of residuals are stored as the last byte.
residuals[residuals.len() - 1] as usize
};

let model_length = len(start_time, end_time, timestamps) - residuals_length;

// Computes the sum from the model.
let (model_last_value, model_sum) = match model_type_id {
PMC_MEAN_ID => {
let value =
CompressedSegmentBuilder::decode_values_for_pmc_mean(min_value, max_value, values);
(value, pmc_mean::sum(model_length, value))
}
SWING_ID => {
let (first_value, last_value) =
CompressedSegmentBuilder::decode_values_for_swing(min_value, max_value, values);
(
last_value,
swing::sum(
start_time,
end_time,
timestamps,
first_value,
last_value,
residuals_length,
),
)
}
GORILLA_ID => (
f32::NAN, // A segment with values compressed by Gorilla never has residuals.
gorilla::sum(model_length, values, None),
),
// TODO: take residuals stored as part of the segment into account when refactoring optimizer.
GORILLA_ID => gorilla::sum(start_time, end_time, timestamps, values, None),
_ => panic!("Unknown model type."),
};

// Compute the sum from the residuals.
if residuals.is_empty() {
model_sum
} else {
model_sum + gorilla::sum(residuals_length, residuals, Some(model_last_value))
}
}

Expand All @@ -137,21 +174,15 @@ pub fn grid(
timestamp_builder: &mut TimestampBuilder,
value_builder: &mut ValueBuilder,
) {
// Extract the number of residuals stored.
let residuals_length = if residuals.is_empty() {
0
} else {
// The number of residuals are stored as the last byte.
residuals[residuals.len() - 1]
};

// Decompress all of the timestamps and create a slice for the model's timestamps.
let model_timestamps_start_index = timestamp_builder.values_slice().len();
timestamps::decompress_all_timestamps(start_time, end_time, timestamps, timestamp_builder);
let model_timestamps_end_index =
timestamp_builder.values_slice().len() - residuals_length as usize;
let model_timestamps =
&timestamp_builder.values_slice()[model_timestamps_start_index..model_timestamps_end_index];
// Decompress the timestamps.
let (model_timestamps, residuals_timestamps) =
decompress_all_timestamps_and_split_into_models_and_residuals(
start_time,
end_time,
timestamps,
residuals,
timestamp_builder,
);

// Reconstruct the values from the model.
match model_type_id {
Expand Down Expand Up @@ -192,21 +223,49 @@ pub fn grid(
}

// Reconstruct the values from the residuals.
if residuals_length > 0 {
// The first value in residuals are compressed against models last value.
if !residuals.is_empty() {
let model_last_value = value_builder.values_slice()[value_builder.len() - 1];

gorilla::grid(
univariate_id,
&residuals[..residuals.len() - 1],
univariate_id_builder,
&timestamp_builder.values_slice()[model_timestamps_end_index..],
residuals_timestamps,
value_builder,
Some(model_last_value),
);
}
}

/// Decompress the timestamps stored as `start_time`, `end_time`, and `timestamps`, add them to
/// `timestamp_builder`, and return slices to the model's timestamps and the residual's timestamps.
fn decompress_all_timestamps_and_split_into_models_and_residuals<'a>(
start_time: Timestamp,
end_time: Timestamp,
timestamps: &'a [u8],
residuals: &'a [u8],
timestamp_builder: &'a mut TimestampBuilder,
) -> (&'a [Timestamp], &'a [Timestamp]) {
// Extract the number of residuals stored.
let residuals_length = if residuals.is_empty() {
0
} else {
// The number of residuals are stored as the last byte.
residuals[residuals.len() - 1]
};

let model_timestamps_start_index = timestamp_builder.values_slice().len();
timestamps::decompress_all_timestamps(start_time, end_time, timestamps, timestamp_builder);
let model_timestamps_end_index =
timestamp_builder.values_slice().len() - residuals_length as usize;

let model_timestamps =
&timestamp_builder.values_slice()[model_timestamps_start_index..model_timestamps_end_index];
let residuals_timestamps = &timestamp_builder.values_slice()[model_timestamps_end_index..];

(model_timestamps, residuals_timestamps)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -298,4 +357,41 @@ mod tests {
prop_assert!(!equal_or_nan(v1, v2));
}
}

// Test for decompress_all_timestamps_and_split_into_models_and_residuals().
#[test]
fn test_decompress_all_timestamps_and_split_into_models_and_residuals_no_residuals() {
let mut timestamp_builder = TimestampBuilder::new();

let (model_timestamps, residuals_timestamps) =
decompress_all_timestamps_and_split_into_models_and_residuals(
100,
500,
&[5],
&[],
&mut timestamp_builder,
);

// Type aliases cannot be used when constructor, so &[Timestamp] is not possible.
let expected_residuals_timestamps: &[Timestamp] = &[];
assert_eq!(model_timestamps, &[100, 200, 300, 400, 500]);
assert_eq!(residuals_timestamps, expected_residuals_timestamps);
}

#[test]
fn test_decompress_all_timestamps_and_split_into_models_and_residuals_with_residuals() {
let mut timestamp_builder = TimestampBuilder::new();

let (model_timestamps, residuals_timestamps) =
decompress_all_timestamps_and_split_into_models_and_residuals(
100,
500,
&[5],
&[2],
&mut timestamp_builder,
);

assert_eq!(model_timestamps, &[100, 200, 300]);
assert_eq!(residuals_timestamps, &[400, 500]);
}
}
7 changes: 3 additions & 4 deletions crates/modelardb_compression/src/models/pmc_mean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ impl PMCMean {

/// Compute the sum of the values for a time series segment whose values are
/// represented by a model of type PMC-Mean.
pub fn sum(start_time: Timestamp, end_time: Timestamp, timestamps: &[u8], value: Value) -> Value {
models::len(start_time, end_time, timestamps) as Value * value
pub fn sum(model_length: usize, value: Value) -> Value {
model_length as Value * value
}

/// Reconstruct the values for the `timestamps` without matching values in
Expand Down Expand Up @@ -251,8 +251,7 @@ mod tests {
proptest! {
#[test]
fn test_sum(value in ProptestValue::ANY) {
let sum = sum(1657734000, 1657734540, &[10], value);
prop_assert!(models::equal_or_nan(sum as f64, (10.0 * value) as f64));
prop_assert!(models::equal_or_nan(sum(10, value) as f64, (10.0 * value) as f64));
}
}

Expand Down
26 changes: 20 additions & 6 deletions crates/modelardb_compression/src/models/swing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
//! [ModelarDB paper]: https://www.vldb.org/pvldb/vol11/p1688-jensen.pdf
use modelardb_common::schemas::COMPRESSED_METADATA_SIZE_IN_BYTES;
use modelardb_common::types::{Timestamp, UnivariateId, UnivariateIdBuilder, Value, ValueBuilder};
use modelardb_common::types::{
Timestamp, TimestampBuilder, UnivariateId, UnivariateIdBuilder, Value, ValueBuilder,
};

use crate::models::ErrorBound;
use crate::models::{self, timestamps};
use super::timestamps;
use crate::models::{self, ErrorBound};

/// The state the Swing model type needs while fitting a model to a time series
/// segment.
Expand Down Expand Up @@ -214,6 +216,7 @@ pub fn sum(
timestamps: &[u8],
first_value: Value,
last_value: Value,
residuals_length: usize,
) -> Value {
let (slope, intercept) =
compute_slope_and_intercept(start_time, first_value as f64, end_time, last_value as f64);
Expand All @@ -225,9 +228,20 @@ pub fn sum(
let length = models::len(start_time, end_time, timestamps);
(average * length as f64) as Value
} else {
// TODO: decompress timestamps instead of just casting them when refactoring the optimizer.
let mut timestamp_builder = TimestampBuilder::new();

timestamps::decompress_all_timestamps(
start_time,
end_time,
timestamps,
&mut timestamp_builder,
);

let timestamps = timestamp_builder.finish();
let model_timestamps_end_index = timestamps.len() - residuals_length;

let mut sum: f64 = 0.0;
for timestamp in timestamps {
for timestamp in &timestamps.values()[0..model_timestamps_end_index] {
sum += slope * (*timestamp as f64) + intercept;
}
sum as Value
Expand Down Expand Up @@ -461,7 +475,7 @@ mod tests {
first_value in num::i32::ANY.prop_map(i32_to_value),
last_value in num::i32::ANY.prop_map(i32_to_value),
) {
let sum = sum(START_TIME, END_TIME, &[], first_value, last_value);
let sum = sum(START_TIME, END_TIME, &[], first_value, last_value, 0);
prop_assert_eq!(sum, first_value + last_value);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -698,7 +698,7 @@ impl PhysicalExpr for ModelSumPhysicalExpr {
min_values,
max_values,
values,
_residuals,
residuals,
_error_array
);

Expand All @@ -720,6 +720,7 @@ impl PhysicalExpr for ModelSumPhysicalExpr {
min_value,
max_value,
values,
residuals.values(),
);
}

Expand Down Expand Up @@ -840,7 +841,7 @@ impl PhysicalExpr for ModelAvgPhysicalExpr {
min_values,
max_values,
values,
_residuals,
residuals,
_error_array
);

Expand All @@ -863,6 +864,7 @@ impl PhysicalExpr for ModelAvgPhysicalExpr {
min_value,
max_value,
values,
residuals.values(),
);

count += modelardb_compression::len(start_time, end_time, timestamps);
Expand Down

0 comments on commit b79c26a

Please sign in to comment.