Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory mode: Adding support for synchronous instruments - Last Value aggregation #6196

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public static void main(String[] args) {
// Parameters
AggregationTemporality aggregationTemporality = AggregationTemporality.DELTA;
MemoryMode memoryMode = MemoryMode.REUSABLE_DATA;
TestInstrumentType testInstrumentType = TestInstrumentType.EXPLICIT_BUCKET;
TestInstrumentType testInstrumentType = TestInstrumentType.DOUBLE_LAST_VALUE;

InstrumentGarbageCollectionBenchmark.ThreadState benchmarkSetup =
new InstrumentGarbageCollectionBenchmark.ThreadState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
import io.opentelemetry.sdk.metrics.Aggregation;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.internal.state.tester.AsyncCounterTester;
import io.opentelemetry.sdk.metrics.internal.state.tester.DoubleLastValueTester;
import io.opentelemetry.sdk.metrics.internal.state.tester.DoubleSumTester;
import io.opentelemetry.sdk.metrics.internal.state.tester.ExplicitBucketHistogramTester;
import io.opentelemetry.sdk.metrics.internal.state.tester.ExponentialHistogramTester;
import io.opentelemetry.sdk.metrics.internal.state.tester.LongLastValueTester;
import io.opentelemetry.sdk.metrics.internal.state.tester.LongSumTester;
import java.util.List;
import java.util.Random;
Expand All @@ -23,7 +25,9 @@ public enum TestInstrumentType {
EXPONENTIAL_HISTOGRAM(ExponentialHistogramTester::new),
EXPLICIT_BUCKET(ExplicitBucketHistogramTester::new),
LONG_SUM(LongSumTester::new, /* dataAllocRateReductionPercentage= */ 97.3f),
DOUBLE_SUM(DoubleSumTester::new, /* dataAllocRateReductionPercentage= */ 97.3f);
DOUBLE_SUM(DoubleSumTester::new, /* dataAllocRateReductionPercentage= */ 97.3f),
LONG_LAST_VALUE(LongLastValueTester::new, /* dataAllocRateReductionPercentage= */ 97.3f),
DOUBLE_LAST_VALUE(DoubleLastValueTester::new, /* dataAllocRateReductionPercentage= */ 97.3f);

private final Supplier<? extends InstrumentTester> instrumentTesterInitializer;
private final float dataAllocRateReductionPercentage;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics.internal.state.tester;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.DoubleCounter;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.sdk.metrics.Aggregation;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.internal.state.TestInstrumentType;
import java.util.List;
import java.util.Random;

public class DoubleLastValueTester implements TestInstrumentType.InstrumentTester {
private static final int measurementsPerAttributeSet = 1_000;

static class DoubleLastValueState implements TestInstrumentType.TestInstrumentsState {
DoubleCounter doubleCounter;
}

@Override
public Aggregation testedAggregation() {
return Aggregation.lastValue();
}

@Override
public TestInstrumentType.TestInstrumentsState buildInstruments(
double instrumentCount,
SdkMeterProvider sdkMeterProvider,
List<Attributes> attributesList,
Random random) {
DoubleLastValueState doubleLastValueState = new DoubleLastValueState();

Meter meter = sdkMeterProvider.meterBuilder("meter").build();
doubleLastValueState.doubleCounter =
meter.counterBuilder("test.double.last.value").ofDoubles().build();
asafm marked this conversation as resolved.
Show resolved Hide resolved

return doubleLastValueState;
}

@SuppressWarnings("ForLoopReplaceableByForEach") // This is for GC sensitivity testing: no streams
@Override
public void recordValuesInInstruments(
TestInstrumentType.TestInstrumentsState testInstrumentsState,
List<Attributes> attributesList,
Random random) {
DoubleLastValueState state = (DoubleLastValueState) testInstrumentsState;

for (int j = 0; j < attributesList.size(); j++) {
Attributes attributes = attributesList.get(j);
for (int i = 0; i < measurementsPerAttributeSet; i++) {
state.doubleCounter.add(1.2f, attributes);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics.internal.state.tester;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.sdk.metrics.Aggregation;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.internal.state.TestInstrumentType;
import java.util.List;
import java.util.Random;

public class LongLastValueTester implements TestInstrumentType.InstrumentTester {
private static final int measurementsPerAttributeSet = 1_000;

static class LongLastValueState implements TestInstrumentType.TestInstrumentsState {
LongCounter longCounter;
}

@Override
public Aggregation testedAggregation() {
return Aggregation.lastValue();
}

@Override
public TestInstrumentType.TestInstrumentsState buildInstruments(
double instrumentCount,
SdkMeterProvider sdkMeterProvider,
List<Attributes> attributesList,
Random random) {
LongLastValueState longLastValueState = new LongLastValueState();

Meter meter = sdkMeterProvider.meterBuilder("meter").build();
longLastValueState.longCounter = meter.counterBuilder("test.long.last.value").build();

return longLastValueState;
}

@SuppressWarnings("ForLoopReplaceableByForEach") // This is for GC sensitivity testing: no streams
@Override
public void recordValuesInInstruments(
TestInstrumentType.TestInstrumentsState testInstrumentsState,
List<Attributes> attributesList,
Random random) {
LongLastValueState state = (LongLastValueState) testInstrumentsState;

for (int j = 0; j < attributesList.size(); j++) {
Attributes attributes = attributesList.get(j);
for (int i = 0; i < measurementsPerAttributeSet; i++) {
state.longCounter.add(1, attributes);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.common.export.MemoryMode;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.DoubleExemplarData;
import io.opentelemetry.sdk.metrics.data.DoublePointData;
Expand Down Expand Up @@ -42,15 +43,17 @@
public final class DoubleLastValueAggregator
implements Aggregator<DoublePointData, DoubleExemplarData> {
private final Supplier<ExemplarReservoir<DoubleExemplarData>> reservoirSupplier;
private final MemoryMode memoryMode;

public DoubleLastValueAggregator(
Supplier<ExemplarReservoir<DoubleExemplarData>> reservoirSupplier) {
Supplier<ExemplarReservoir<DoubleExemplarData>> reservoirSupplier, MemoryMode memoryMode) {
this.reservoirSupplier = reservoirSupplier;
this.memoryMode = memoryMode;
}

@Override
public AggregatorHandle<DoublePointData, DoubleExemplarData> createHandle() {
return new Handle(reservoirSupplier.get());
return new Handle(reservoirSupplier.get(), memoryMode);
}

@Override
Expand Down Expand Up @@ -114,8 +117,16 @@ static final class Handle extends AggregatorHandle<DoublePointData, DoubleExempl
@Nullable private static final Double DEFAULT_VALUE = null;
private final AtomicReference<Double> current = new AtomicReference<>(DEFAULT_VALUE);

private Handle(ExemplarReservoir<DoubleExemplarData> reservoir) {
// Only used when memoryMode is REUSABLE_DATA
@Nullable private final MutableDoublePointData reusablePoint;

private Handle(ExemplarReservoir<DoubleExemplarData> reservoir, MemoryMode memoryMode) {
super(reservoir);
if (memoryMode == MemoryMode.REUSABLE_DATA) {
reusablePoint = new MutableDoublePointData();
} else {
reusablePoint = null;
}
}

@Override
Expand All @@ -126,8 +137,14 @@ protected DoublePointData doAggregateThenMaybeReset(
List<DoubleExemplarData> exemplars,
boolean reset) {
Double value = reset ? this.current.getAndSet(DEFAULT_VALUE) : this.current.get();
return ImmutableDoublePointData.create(
startEpochNanos, epochNanos, attributes, Objects.requireNonNull(value), exemplars);
if (reusablePoint != null) {
reusablePoint.set(
startEpochNanos, epochNanos, attributes, Objects.requireNonNull(value), exemplars);
return reusablePoint;
} else {
return ImmutableDoublePointData.create(
startEpochNanos, epochNanos, attributes, Objects.requireNonNull(value), exemplars);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.common.export.MemoryMode;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.LongExemplarData;
import io.opentelemetry.sdk.metrics.data.LongPointData;
Expand Down Expand Up @@ -39,14 +40,17 @@
*/
public final class LongLastValueAggregator implements Aggregator<LongPointData, LongExemplarData> {
private final Supplier<ExemplarReservoir<LongExemplarData>> reservoirSupplier;
private final MemoryMode memoryMode;

public LongLastValueAggregator(Supplier<ExemplarReservoir<LongExemplarData>> reservoirSupplier) {
public LongLastValueAggregator(
Supplier<ExemplarReservoir<LongExemplarData>> reservoirSupplier, MemoryMode memoryMode) {
this.reservoirSupplier = reservoirSupplier;
this.memoryMode = memoryMode;
}

@Override
public AggregatorHandle<LongPointData, LongExemplarData> createHandle() {
return new Handle(reservoirSupplier.get());
return new Handle(reservoirSupplier.get(), memoryMode);
}

@Override
Expand Down Expand Up @@ -109,8 +113,16 @@ static final class Handle extends AggregatorHandle<LongPointData, LongExemplarDa
@Nullable private static final Long DEFAULT_VALUE = null;
private final AtomicReference<Long> current = new AtomicReference<>(DEFAULT_VALUE);

Handle(ExemplarReservoir<LongExemplarData> exemplarReservoir) {
// Only used when memoryMode is REUSABLE_DATA
@Nullable private final MutableLongPointData reusablePoint;

Handle(ExemplarReservoir<LongExemplarData> exemplarReservoir, MemoryMode memoryMode) {
super(exemplarReservoir);
if (memoryMode == MemoryMode.REUSABLE_DATA) {
reusablePoint = new MutableLongPointData();
} else {
reusablePoint = null;
}
}

@Override
Expand All @@ -121,8 +133,15 @@ protected LongPointData doAggregateThenMaybeReset(
List<LongExemplarData> exemplars,
boolean reset) {
Long value = reset ? this.current.getAndSet(DEFAULT_VALUE) : this.current.get();
return ImmutableLongPointData.create(
startEpochNanos, epochNanos, attributes, Objects.requireNonNull(value), exemplars);

if (reusablePoint != null) {
reusablePoint.set(
startEpochNanos, epochNanos, attributes, Objects.requireNonNull(value), exemplars);
return reusablePoint;
} else {
return ImmutableLongPointData.create(
startEpochNanos, epochNanos, attributes, Objects.requireNonNull(value), exemplars);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@ public <T extends PointData, U extends ExemplarData> Aggregator<T, U> createAggr
// For the initial version we do not sample exemplars on gauges.
switch (instrumentDescriptor.getValueType()) {
case LONG:
return (Aggregator<T, U>) new LongLastValueAggregator(ExemplarReservoir::longNoSamples);
return (Aggregator<T, U>)
new LongLastValueAggregator(ExemplarReservoir::longNoSamples, memoryMode);
case DOUBLE:
return (Aggregator<T, U>) new DoubleLastValueAggregator(ExemplarReservoir::doubleNoSamples);
return (Aggregator<T, U>)
new DoubleLastValueAggregator(ExemplarReservoir::doubleNoSamples, memoryMode);
}
throw new IllegalArgumentException("Invalid instrument value type");
}
Expand Down
Loading
Loading