Skip to content

Commit

Permalink
Init aggregation data utils
Browse files Browse the repository at this point in the history
  • Loading branch information
fraillt committed Jan 8, 2025
1 parent a1dda22 commit e5d3549
Show file tree
Hide file tree
Showing 7 changed files with 227 additions and 230 deletions.
61 changes: 60 additions & 1 deletion opentelemetry-sdk/src/metrics/internal/aggregate.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{
marker,
mem::replace,
ops::DerefMut,
ops::{Deref, DerefMut},
sync::{Arc, Mutex},
time::SystemTime,
};
Expand Down Expand Up @@ -121,6 +121,65 @@ impl AttributeSetFilter {
}
}

pub(crate) trait InitAggregationData {
type Aggr;
fn create_new(&self, time: AggregateTime) -> Self::Aggr;
fn reset_existing(&self, existing: &mut Self::Aggr, time: AggregateTime);
}

pub(crate) enum AggregationData<'a, Aggr> {
Existing(&'a mut Aggr),
New(Aggr),
}

impl<'a, Aggr> AggregationData<'a, Aggr>
where
Aggr: Aggregation,
{
pub(crate) fn init(
init: &impl InitAggregationData<Aggr = Aggr>,
existing: Option<&'a mut dyn Aggregation>,
time: AggregateTime,
) -> Self {
match existing.and_then(|aggr| aggr.as_mut().downcast_mut::<Aggr>()) {
Some(existing) => {
init.reset_existing(existing, time);
AggregationData::Existing(existing)
}
None => AggregationData::New(init.create_new(time)),
}
}

pub(crate) fn into_new_boxed(self) -> Option<Box<dyn Aggregation>> {
match self {
AggregationData::Existing(_) => None,
AggregationData::New(aggregation) => {
Some(Box::new(aggregation) as Box<dyn Aggregation>)
}
}
}
}

impl<Aggr> Deref for AggregationData<'_, Aggr> {
type Target = Aggr;

fn deref(&self) -> &Self::Target {
match self {
AggregationData::Existing(existing) => existing,
AggregationData::New(new) => new,
}
}
}

impl<Aggr> DerefMut for AggregationData<'_, Aggr> {
fn deref_mut(&mut self) -> &mut Self::Target {
match self {
AggregationData::Existing(existing) => existing,
AggregationData::New(new) => new,
}
}
}

/// Builds aggregate functions
pub(crate) struct AggregateBuilder<T> {
/// The temporality used for the returned aggregate functions.
Expand Down
73 changes: 34 additions & 39 deletions opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ use crate::metrics::{
};

use super::{
aggregate::{AggregateTimeInitiator, AttributeSetFilter},
aggregate::{
AggregateTime, AggregateTimeInitiator, AggregationData, AttributeSetFilter,
InitAggregationData,
},
Aggregator, ComputeAggregation, Measure, Number, ValueMap,
};

Expand Down Expand Up @@ -384,26 +387,10 @@ impl<T: Number> ExpoHistogram<T> {
}

fn delta(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option<Box<dyn Aggregation>>) {
let time = self.init_time.delta();

let h = dest.and_then(|d| d.as_mut().downcast_mut::<data::ExponentialHistogram<T>>());
let mut new_agg = if h.is_none() {
Some(data::ExponentialHistogram {
data_points: vec![],
start_time: time.start,
time: time.current,
temporality: Temporality::Delta,
})
} else {
None
};
let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none"));
h.temporality = Temporality::Delta;
h.start_time = time.start;
h.time = time.current;
let mut s_data = AggregationData::init(self, dest, self.init_time.delta());

self.value_map
.collect_and_reset(&mut h.data_points, |attributes, attr| {
.collect_and_reset(&mut s_data.data_points, |attributes, attr| {
let b = attr.into_inner().unwrap_or_else(|err| err.into_inner());
data::ExponentialHistogramDataPoint {
attributes,
Expand Down Expand Up @@ -434,33 +421,17 @@ impl<T: Number> ExpoHistogram<T> {
}
});

(h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>))
(s_data.data_points.len(), s_data.into_new_boxed())
}

fn cumulative(
&self,
dest: Option<&mut dyn Aggregation>,
) -> (usize, Option<Box<dyn Aggregation>>) {
let time = self.init_time.cumulative();

let h = dest.and_then(|d| d.as_mut().downcast_mut::<data::ExponentialHistogram<T>>());
let mut new_agg = if h.is_none() {
Some(data::ExponentialHistogram {
data_points: vec![],
start_time: time.start,
time: time.current,
temporality: Temporality::Cumulative,
})
} else {
None
};
let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none"));
h.temporality = Temporality::Cumulative;
h.start_time = time.start;
h.time = time.current;
let mut s_data = AggregationData::init(self, dest, self.init_time.cumulative());

self.value_map
.collect_readonly(&mut h.data_points, |attributes, attr| {
.collect_readonly(&mut s_data.data_points, |attributes, attr| {
let b = attr.lock().unwrap_or_else(|err| err.into_inner());
data::ExponentialHistogramDataPoint {
attributes,
Expand Down Expand Up @@ -491,7 +462,7 @@ impl<T: Number> ExpoHistogram<T> {
}
});

(h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>))
(s_data.data_points.len(), s_data.into_new_boxed())
}
}

Expand Down Expand Up @@ -524,6 +495,30 @@ where
}
}
}

impl<T> InitAggregationData for ExpoHistogram<T>
where
T: Number,
{
type Aggr = data::ExponentialHistogram<T>;

fn create_new(&self, time: AggregateTime) -> Self::Aggr {
data::ExponentialHistogram {
data_points: vec![],
start_time: time.start,
time: time.current,
temporality: self.temporality,
}
}

Check warning on line 512 in opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs#L505-L512

Added lines #L505 - L512 were not covered by tests

fn reset_existing(&self, existing: &mut Self::Aggr, time: AggregateTime) {
existing.data_points.clear();
existing.start_time = time.start;
existing.time = time.current;
existing.temporality = self.temporality;
}
}

#[cfg(test)]
mod tests {
use std::{ops::Neg, time::SystemTime};
Expand Down
70 changes: 32 additions & 38 deletions opentelemetry-sdk/src/metrics/internal/histogram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ use crate::metrics::data::{self, Aggregation};
use crate::metrics::Temporality;
use opentelemetry::KeyValue;

use super::aggregate::AggregateTimeInitiator;
use super::aggregate::AttributeSetFilter;
use super::aggregate::{
AggregateTime, AggregateTimeInitiator, AggregationData, InitAggregationData,
};
use super::ComputeAggregation;
use super::Measure;
use super::ValueMap;
Expand Down Expand Up @@ -108,26 +110,10 @@ impl<T: Number> Histogram<T> {
}

fn delta(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option<Box<dyn Aggregation>>) {
let time = self.init_time.delta();

let h = dest.and_then(|d| d.as_mut().downcast_mut::<data::Histogram<T>>());
let mut new_agg = if h.is_none() {
Some(data::Histogram {
data_points: vec![],
start_time: time.start,
time: time.current,
temporality: Temporality::Delta,
})
} else {
None
};
let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none"));
h.temporality = Temporality::Delta;
h.start_time = time.start;
h.time = time.current;
let mut s_data = AggregationData::init(self, dest, self.init_time.delta());

self.value_map
.collect_and_reset(&mut h.data_points, |attributes, aggr| {
.collect_and_reset(&mut s_data.data_points, |attributes, aggr| {
let b = aggr.into_inner().unwrap_or_else(|err| err.into_inner());
HistogramDataPoint {
attributes,
Expand All @@ -153,32 +139,17 @@ impl<T: Number> Histogram<T> {
}
});

(h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>))
(s_data.data_points.len(), s_data.into_new_boxed())
}

fn cumulative(
&self,
dest: Option<&mut dyn Aggregation>,
) -> (usize, Option<Box<dyn Aggregation>>) {
let time = self.init_time.cumulative();
let h = dest.and_then(|d| d.as_mut().downcast_mut::<data::Histogram<T>>());
let mut new_agg = if h.is_none() {
Some(data::Histogram {
data_points: vec![],
start_time: time.start,
time: time.current,
temporality: Temporality::Cumulative,
})
} else {
None
};
let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none"));
h.temporality = Temporality::Cumulative;
h.start_time = time.start;
h.time = time.current;
let mut s_data = AggregationData::init(self, dest, self.init_time.cumulative());

self.value_map
.collect_readonly(&mut h.data_points, |attributes, aggr| {
.collect_readonly(&mut s_data.data_points, |attributes, aggr| {
let b = aggr.lock().unwrap_or_else(|err| err.into_inner());
HistogramDataPoint {
attributes,
Expand All @@ -204,7 +175,7 @@ impl<T: Number> Histogram<T> {
}
});

(h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>))
(s_data.data_points.len(), s_data.into_new_boxed())
}
}

Expand Down Expand Up @@ -239,6 +210,29 @@ where
}
}

impl<T> InitAggregationData for Histogram<T>
where
T: Number,
{
type Aggr = data::Histogram<T>;

fn create_new(&self, time: AggregateTime) -> Self::Aggr {
data::Histogram {
data_points: vec![],
start_time: time.start,
time: time.current,
temporality: self.temporality,
}
}

fn reset_existing(&self, existing: &mut Self::Aggr, time: AggregateTime) {
existing.data_points.clear();
existing.start_time = time.start;
existing.time = time.current;
existing.temporality = self.temporality;
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading

0 comments on commit e5d3549

Please sign in to comment.