Skip to content

Commit

Permalink
Memory mode: Adding support for synchronous instruments - explicit hi…
Browse files Browse the repository at this point in the history
…stogram (#6153)

Co-authored-by: jack-berg <[email protected]>
  • Loading branch information
asafm and jack-berg authored Jan 25, 2024
1 parent 737dfef commit 8d1cad2
Show file tree
Hide file tree
Showing 13 changed files with 659 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ public static DynamicPrimitiveLongList of(long... values) {
return list;
}

public static DynamicPrimitiveLongList ofSubArrayCapacity(int subarrayCapacity) {
return new DynamicPrimitiveLongList(subarrayCapacity);
}

public static DynamicPrimitiveLongList empty() {
return new DynamicPrimitiveLongList();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ public enum HistogramAggregationParam {
new DoubleExplicitBucketHistogramAggregator(
ExplicitBucketHistogramUtils.createBoundaryArray(
ExplicitBucketHistogramUtils.DEFAULT_HISTOGRAM_BUCKET_BOUNDARIES),
ExemplarReservoir::doubleNoSamples)),
ExemplarReservoir::doubleNoSamples,
IMMUTABLE_DATA)),
EXPLICIT_SINGLE_BUCKET(
new DoubleExplicitBucketHistogramAggregator(
ExplicitBucketHistogramUtils.createBoundaryArray(Collections.emptyList()),
ExemplarReservoir::doubleNoSamples)),
ExemplarReservoir::doubleNoSamples,
IMMUTABLE_DATA)),
EXPONENTIAL_SMALL_CIRCULAR_BUFFER(
new DoubleBase2ExponentialHistogramAggregator(
ExemplarReservoir::doubleNoSamples, 20, 0, IMMUTABLE_DATA)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public static void main(String[] args) {
// Parameters
AggregationTemporality aggregationTemporality = AggregationTemporality.DELTA;
MemoryMode memoryMode = MemoryMode.REUSABLE_DATA;
TestInstrumentType testInstrumentType = TestInstrumentType.EXPONENTIAL_HISTOGRAM;
TestInstrumentType testInstrumentType = TestInstrumentType.EXPLICIT_BUCKET;

InstrumentGarbageCollectionBenchmark.ThreadState benchmarkSetup =
new InstrumentGarbageCollectionBenchmark.ThreadState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.opentelemetry.sdk.metrics.Aggregation;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.internal.state.tester.AsyncCounterTester;
import io.opentelemetry.sdk.metrics.internal.state.tester.ExplicitBucketHistogramTester;
import io.opentelemetry.sdk.metrics.internal.state.tester.ExponentialHistogramTester;
import java.util.List;
import java.util.Random;
Expand All @@ -25,6 +26,12 @@ InstrumentTester createInstrumentTester() {
InstrumentTester createInstrumentTester() {
return new ExponentialHistogramTester();
}
},
EXPLICIT_BUCKET() {
@Override
InstrumentTester createInstrumentTester() {
return new ExplicitBucketHistogramTester();
}
};

abstract InstrumentTester createInstrumentTester();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics.internal.state.tester;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.sdk.metrics.Aggregation;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.internal.aggregator.ExplicitBucketHistogramUtils;
import io.opentelemetry.sdk.metrics.internal.state.TestInstrumentType.InstrumentTester;
import io.opentelemetry.sdk.metrics.internal.state.TestInstrumentType.TestInstrumentsState;
import java.util.List;
import java.util.Random;

public class ExplicitBucketHistogramTester implements InstrumentTester {

static class ExplicitHistogramState implements TestInstrumentsState {
public double maxBucketValue;
DoubleHistogram doubleHistogram;
}

private static final int measurementsPerAttributeSet = 1_000;

@Override
public Aggregation testedAggregation() {
return Aggregation.explicitBucketHistogram();
}

@Override
public TestInstrumentsState buildInstruments(
double instrumentCount,
SdkMeterProvider sdkMeterProvider,
List<Attributes> attributesList,
Random random) {
ExplicitHistogramState state = new ExplicitHistogramState();
state.doubleHistogram =
sdkMeterProvider.get("meter").histogramBuilder("test.explicit.histogram").build();
state.maxBucketValue =
ExplicitBucketHistogramUtils.DEFAULT_HISTOGRAM_BUCKET_BOUNDARIES.get(
ExplicitBucketHistogramUtils.DEFAULT_HISTOGRAM_BUCKET_BOUNDARIES.size() - 1);
return state;
}

@SuppressWarnings("ForLoopReplaceableByForEach") // This is for GC sensitivity testing: no streams
@Override
public void recordValuesInInstruments(
TestInstrumentsState testInstrumentsState, List<Attributes> attributesList, Random random) {

ExplicitHistogramState state = (ExplicitHistogramState) testInstrumentsState;

for (int j = 0; j < attributesList.size(); j++) {
Attributes attributes = attributesList.get(j);
for (int i = 0; i < measurementsPerAttributeSet; i++) {
state.doubleHistogram.record(
random.nextInt(Double.valueOf(state.maxBucketValue * 1.1).intValue()), attributes);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.internal.GuardedBy;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.common.export.MemoryMode;
import io.opentelemetry.sdk.internal.PrimitiveLongList;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.DoubleExemplarData;
Expand All @@ -16,6 +17,7 @@
import io.opentelemetry.sdk.metrics.internal.data.ImmutableHistogramData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableHistogramPointData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData;
import io.opentelemetry.sdk.metrics.internal.data.MutableHistogramPointData;
import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoir;
import io.opentelemetry.sdk.resources.Resource;
Expand All @@ -26,6 +28,7 @@
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import javax.annotation.Nullable;

/**
* Aggregator that generates explicit bucket histograms.
Expand All @@ -36,6 +39,7 @@
public final class DoubleExplicitBucketHistogramAggregator
implements Aggregator<HistogramPointData, DoubleExemplarData> {
private final double[] boundaries;
private final MemoryMode memoryMode;

// a cache for converting to MetricData
private final List<Double> boundaryList;
Expand All @@ -47,10 +51,14 @@ public final class DoubleExplicitBucketHistogramAggregator
*
* @param boundaries Bucket boundaries, in-order.
* @param reservoirSupplier Supplier of exemplar reservoirs per-stream.
* @param memoryMode The {@link MemoryMode} to use in this aggregator.
*/
public DoubleExplicitBucketHistogramAggregator(
double[] boundaries, Supplier<ExemplarReservoir<DoubleExemplarData>> reservoirSupplier) {
double[] boundaries,
Supplier<ExemplarReservoir<DoubleExemplarData>> reservoirSupplier,
MemoryMode memoryMode) {
this.boundaries = boundaries;
this.memoryMode = memoryMode;

List<Double> boundaryList = new ArrayList<>(this.boundaries.length);
for (double v : this.boundaries) {
Expand All @@ -62,7 +70,7 @@ public DoubleExplicitBucketHistogramAggregator(

@Override
public AggregatorHandle<HistogramPointData, DoubleExemplarData> createHandle() {
return new Handle(this.boundaryList, this.boundaries, reservoirSupplier.get());
return new Handle(this.boundaryList, this.boundaries, reservoirSupplier.get(), memoryMode);
}

@Override
Expand Down Expand Up @@ -104,10 +112,14 @@ static final class Handle extends AggregatorHandle<HistogramPointData, DoubleExe

private final ReentrantLock lock = new ReentrantLock();

// Used only when MemoryMode = REUSABLE_DATA
@Nullable private MutableHistogramPointData reusablePoint;

Handle(
List<Double> boundaryList,
double[] boundaries,
ExemplarReservoir<DoubleExemplarData> reservoir) {
ExemplarReservoir<DoubleExemplarData> reservoir,
MemoryMode memoryMode) {
super(reservoir);
this.boundaryList = boundaryList;
this.boundaries = boundaries;
Expand All @@ -116,6 +128,9 @@ static final class Handle extends AggregatorHandle<HistogramPointData, DoubleExe
this.min = Double.MAX_VALUE;
this.max = -1;
this.count = 0;
if (memoryMode == MemoryMode.REUSABLE_DATA) {
this.reusablePoint = new MutableHistogramPointData(counts.length);
}
}

@Override
Expand All @@ -127,19 +142,36 @@ protected HistogramPointData doAggregateThenMaybeReset(
boolean reset) {
lock.lock();
try {
HistogramPointData pointData =
ImmutableHistogramPointData.create(
startEpochNanos,
epochNanos,
attributes,
sum,
this.count > 0,
this.min,
this.count > 0,
this.max,
boundaryList,
PrimitiveLongList.wrap(Arrays.copyOf(counts, counts.length)),
exemplars);
HistogramPointData pointData;
if (reusablePoint == null) {
pointData =
ImmutableHistogramPointData.create(
startEpochNanos,
epochNanos,
attributes,
sum,
this.count > 0,
this.min,
this.count > 0,
this.max,
boundaryList,
PrimitiveLongList.wrap(Arrays.copyOf(counts, counts.length)),
exemplars);
} else /* REUSABLE_DATA */ {
pointData =
reusablePoint.set(
startEpochNanos,
epochNanos,
attributes,
sum,
this.count > 0,
this.min,
this.count > 0,
this.max,
boundaryList,
counts,
exemplars);
}
if (reset) {
this.sum = 0;
this.min = Double.MAX_VALUE;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics.internal.data;

import io.opentelemetry.sdk.metrics.data.HistogramPointData;
import java.util.List;

/**
* Validations for {@link HistogramPointData}.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
final class HistogramPointDataValidations {

private HistogramPointDataValidations() {}

static void validateIsStrictlyIncreasing(List<Double> xs) {
for (int i = 0; i < xs.size() - 1; i++) {
if (xs.get(i).compareTo(xs.get(i + 1)) >= 0) {
throw new IllegalArgumentException("invalid boundaries: " + xs);
}
}
}

static void validateFiniteBoundaries(List<Double> boundaries) {
if (!boundaries.isEmpty()
&& (boundaries.get(0).isInfinite() || boundaries.get(boundaries.size() - 1).isInfinite())) {
throw new IllegalArgumentException("invalid boundaries: contains explicit +/-Inf");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

package io.opentelemetry.sdk.metrics.internal.data;

import static io.opentelemetry.sdk.metrics.internal.data.HistogramPointDataValidations.validateFiniteBoundaries;
import static io.opentelemetry.sdk.metrics.internal.data.HistogramPointDataValidations.validateIsStrictlyIncreasing;

import com.google.auto.value.AutoValue;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.sdk.internal.PrimitiveLongList;
Expand Down Expand Up @@ -85,13 +88,8 @@ public static ImmutableHistogramPointData create(
+ " instead of "
+ counts.size());
}
if (!isStrictlyIncreasing(boundaries)) {
throw new IllegalArgumentException("invalid boundaries: " + boundaries);
}
if (!boundaries.isEmpty()
&& (boundaries.get(0).isInfinite() || boundaries.get(boundaries.size() - 1).isInfinite())) {
throw new IllegalArgumentException("invalid boundaries: contains explicit +/-Inf");
}
validateIsStrictlyIncreasing(boundaries);
validateFiniteBoundaries(boundaries);

long totalCount = 0;
for (long c : PrimitiveLongList.toArray(counts)) {
Expand All @@ -113,13 +111,4 @@ public static ImmutableHistogramPointData create(
}

ImmutableHistogramPointData() {}

private static boolean isStrictlyIncreasing(List<Double> xs) {
for (int i = 0; i < xs.size() - 1; i++) {
if (xs.get(i).compareTo(xs.get(i + 1)) >= 0) {
return false;
}
}
return true;
}
}
Loading

0 comments on commit 8d1cad2

Please sign in to comment.