Skip to content

Commit

Permalink
Memory Mode: Adding first part support for synchronous instruments - …
Browse files Browse the repository at this point in the history
…storage (#5998)

Co-authored-by: jack-berg <[email protected]>
  • Loading branch information
asafm and jack-berg authored Dec 14, 2023
1 parent 6e53623 commit ffd53c7
Show file tree
Hide file tree
Showing 5 changed files with 532 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,21 @@ public enum MemoryMode {
*
* <p>In this mode, the SDK reuses objects to reduce allocations, at the expense of disallowing
* concurrent collections / exports.
*
* <p>Metric Signal: For DELTA aggregation temporality, the memory used for recording and
* aggregating metric values is kept between MetricReader collect operation, to avoid memory
* allocations. When the configured maximum cardinality of Attributes is reached, unused
* Attributes are cleared from memory during collect operation, at the cost of requiring new
* memory allocations the next time those attributes are used. Allocations can be minimized by
* increasing the configured max cardinality. For example, suppose instrumentation has recorded
* values for 1000 unique Attributes while the max cardinality configured was 2000. If after a
* collection only 100 unique Attributes values are recorded, the MetricReader's collect operation
* would return 100 points, while in memory the Attributes data structure keeps 1000 unique
* Attributes. If a user recorded values for 3000 unique attributes, the values for the first 1999
* Attributes would be recorded, and the rest of 1001 unique Attributes values would be recorded
* in the CARDINALITY_OVERFLOW Attributes. If after several collect operations, the user now
* records values to only 500 unique attributes, during collect operation, the unused 1500
* Attributes memory would be cleared from memory.
*/
REUSABLE_DATA,

Expand All @@ -25,6 +40,9 @@ public enum MemoryMode {
*
* <p>In this mode, the SDK passes immutable objects to exporters / readers, increasing
* allocations but ensuring safe concurrent exports.
*
* <p>Metric Signal: In DELTA aggregation temporality, the memory used for recording and
* aggregating Attributes values is cleared during a MetricReader collect operation.
*/
IMMUTABLE_DATA
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import static io.opentelemetry.sdk.metrics.internal.view.AttributesProcessor.setIncludes;

import io.opentelemetry.sdk.common.export.MemoryMode;
import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil;
import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory;
import io.opentelemetry.sdk.metrics.internal.state.MetricStorage;
Expand Down Expand Up @@ -96,6 +97,7 @@ public ViewBuilder setAttributeFilter(Predicate<String> keyFilter) {
* <p>Note: not currently stable but additional attribute processors can be configured via {@link
* SdkMeterProviderUtil#appendAllBaggageAttributes(ViewBuilder)}.
*/
@SuppressWarnings("unused")
ViewBuilder addAttributesProcessor(AttributesProcessor attributesProcessor) {
this.processor = this.processor.then(attributesProcessor);
return this;
Expand All @@ -105,7 +107,10 @@ ViewBuilder addAttributesProcessor(AttributesProcessor attributesProcessor) {
* Set the cardinality limit.
*
* <p>Note: not currently stable but cardinality limit can be configured via
* SdkMeterProviderUtil#setCardinalityLimit(ViewBuilder, int)}.
* SdkMeterProviderUtil#setCardinalityLimit(ViewBuilder, int).
*
* <p>Read {@link MemoryMode} to understand the memory usage behavior of reaching cardinality
* limit.
*
* @param cardinalityLimit the maximum number of series for a metric
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public abstract class AggregatorHandle<T extends PointData, U extends ExemplarDa

// A reservoir of sampled exemplars for this time period.
private final ExemplarReservoir<U> exemplarReservoir;
private volatile boolean valuesRecorded = false;

protected AggregatorHandle(ExemplarReservoir<U> exemplarReservoir) {
this.exemplarReservoir = exemplarReservoir;
Expand All @@ -39,6 +40,10 @@ protected AggregatorHandle(ExemplarReservoir<U> exemplarReservoir) {
*/
public final T aggregateThenMaybeReset(
long startEpochNanos, long epochNanos, Attributes attributes, boolean reset) {
if (reset) {
valuesRecorded = false;
}

return doAggregateThenMaybeReset(
startEpochNanos,
epochNanos,
Expand Down Expand Up @@ -69,6 +74,7 @@ public final void recordLong(long value, Attributes attributes, Context context)
*/
public final void recordLong(long value) {
doRecordLong(value);
valuesRecorded = true;
}

/**
Expand All @@ -94,6 +100,7 @@ public final void recordDouble(double value, Attributes attributes, Context cont
*/
public final void recordDouble(double value) {
doRecordDouble(value);
valuesRecorded = true;
}

/**
Expand All @@ -104,4 +111,13 @@ protected void doRecordDouble(double value) {
throw new UnsupportedOperationException(
"This aggregator does not support recording double values.");
}

/**
* Checks whether this handle has values recorded.
*
* @return True if values has been recorded to it
*/
public boolean hasRecordedValues() {
return valuesRecorded;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@

package io.opentelemetry.sdk.metrics.internal.state;

import static io.opentelemetry.sdk.common.export.MemoryMode.IMMUTABLE_DATA;
import static io.opentelemetry.sdk.common.export.MemoryMode.REUSABLE_DATA;
import static io.opentelemetry.sdk.metrics.data.AggregationTemporality.DELTA;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.common.export.MemoryMode;
import io.opentelemetry.sdk.internal.ThrottlingLogger;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.ExemplarData;
Expand Down Expand Up @@ -50,6 +55,16 @@ public final class DefaultSynchronousMetricStorage<T extends PointData, U extend
private volatile AggregatorHolder<T, U> aggregatorHolder = new AggregatorHolder<>();
private final AttributesProcessor attributesProcessor;

private final MemoryMode memoryMode;

// Only populated if memoryMode == REUSABLE_DATA
private final ArrayList<T> reusableResultList = new ArrayList<>();

// Only populated if memoryMode == REUSABLE_DATA and
// aggregationTemporality is DELTA
private volatile ConcurrentHashMap<Attributes, AggregatorHandle<T, U>>
previousCollectionAggregatorHandles = new ConcurrentHashMap<>();

/**
* This field is set to 1 less than the actual intended cardinality limit, allowing the last slot
* to be filled by the {@link MetricStorage#CARDINALITY_OVERFLOW} series.
Expand All @@ -74,6 +89,7 @@ public final class DefaultSynchronousMetricStorage<T extends PointData, U extend
this.aggregator = aggregator;
this.attributesProcessor = attributesProcessor;
this.maxCardinality = maxCardinality - 1;
this.memoryMode = registeredReader.getReader().getMemoryMode();
}

// Visible for testing
Expand Down Expand Up @@ -139,7 +155,7 @@ private AggregatorHolder<T, U> getHolderForRecord() {

/**
* Called on the {@link AggregatorHolder} obtained from {@link #getHolderForRecord()} to indicate
* that recording is complete and it is safe to collect.
* that recording is complete, and it is safe to collect.
*/
private void releaseHolderForRecord(AggregatorHolder<T, U> aggregatorHolder) {
aggregatorHolder.activeRecordingThreads.addAndGet(-2);
Expand Down Expand Up @@ -185,16 +201,20 @@ public MetricData collect(
InstrumentationScopeInfo instrumentationScopeInfo,
long startEpochNanos,
long epochNanos) {
boolean reset = aggregationTemporality == AggregationTemporality.DELTA;
boolean reset = aggregationTemporality == DELTA;
long start =
aggregationTemporality == AggregationTemporality.DELTA
aggregationTemporality == DELTA
? registeredReader.getLastCollectEpochNanos()
: startEpochNanos;

ConcurrentHashMap<Attributes, AggregatorHandle<T, U>> aggregatorHandles;
if (reset) {
AggregatorHolder<T, U> holder = this.aggregatorHolder;
this.aggregatorHolder = new AggregatorHolder<>();
this.aggregatorHolder =
(memoryMode == REUSABLE_DATA)
? new AggregatorHolder<>(previousCollectionAggregatorHandles)
: new AggregatorHolder<>();

// Increment recordsInProgress by 1, which produces an odd number acting as a signal that
// record operations should re-read the volatile this.aggregatorHolder.
// Repeatedly grab recordsInProgress until it is <= 1, which signals all active record
Expand All @@ -208,15 +228,56 @@ public MetricData collect(
aggregatorHandles = this.aggregatorHolder.aggregatorHandles;
}

List<T> points;
if (memoryMode == REUSABLE_DATA) {
reusableResultList.clear();
points = reusableResultList;
} else {
points = new ArrayList<>(aggregatorHandles.size());
}

// In DELTA aggregation temporality each Attributes is reset to 0
// every time we perform a collection (by definition of DELTA).
// In IMMUTABLE_DATA MemoryMode, this is accomplished by removing all aggregator handles
// (into which the values are recorded) effectively starting from 0
// for each recorded Attributes.
// In REUSABLE_DATA MemoryMode, we strive for zero allocations. Since even removing
// a key-value from a map and putting it again on next recording will cost an allocation,
// we are keeping the aggregator handles in their map, and only reset their value once
// we finish collecting the aggregated value from each one.
// The SDK must adhere to keeping no more than maxCardinality unique Attributes in memory,
// hence during collect(), when the map is at full capacity, we try to clear away unused
// aggregator handles, so on next recording cycle using this map, there will be room for newly
// recorded Attributes. This comes at the expanse of memory allocations. This can be avoided
// if the user chooses to increase the maxCardinality.
if (memoryMode == REUSABLE_DATA && reset) {
if (aggregatorHandles.size() >= maxCardinality) {
aggregatorHandles.forEach(
(attribute, handle) -> {
if (!handle.hasRecordedValues()) {
aggregatorHandles.remove(attribute);
}
});
}
}

// Grab aggregated points.
List<T> points = new ArrayList<>(aggregatorHandles.size());
aggregatorHandles.forEach(
(attributes, handle) -> {
if (!handle.hasRecordedValues()) {
return;
}
T point = handle.aggregateThenMaybeReset(start, epochNanos, attributes, reset);
if (reset) {

if (reset && memoryMode == IMMUTABLE_DATA) {
// Return the aggregator to the pool.
// The pool is only used in DELTA temporality (since in CUMULATIVE the handler is
// always used as it is the place accumulating the values and never resets)
// AND only in IMMUTABLE_DATA memory mode since in REUSABLE_DATA we avoid
// using the pool since it allocates memory internally on each put() or remove()
aggregatorHandlePool.offer(handle);
}

if (point != null) {
points.add(point);
}
Expand All @@ -229,6 +290,10 @@ public MetricData collect(
aggregatorHandlePool.poll();
}

if (reset && memoryMode == REUSABLE_DATA) {
previousCollectionAggregatorHandles = aggregatorHandles;
}

if (points.isEmpty()) {
return EmptyMetricData.getInstance();
}
Expand All @@ -243,8 +308,7 @@ public MetricDescriptor getMetricDescriptor() {
}

private static class AggregatorHolder<T extends PointData, U extends ExemplarData> {
private final ConcurrentHashMap<Attributes, AggregatorHandle<T, U>> aggregatorHandles =
new ConcurrentHashMap<>();
private final ConcurrentHashMap<Attributes, AggregatorHandle<T, U>> aggregatorHandles;
// Recording threads grab the current interval (AggregatorHolder) and atomically increment
// this by 2 before recording against it (and then decrement by two when done).
//
Expand All @@ -260,5 +324,14 @@ private static class AggregatorHolder<T extends PointData, U extends ExemplarDat
// all it needs to do is release the "read lock" it just obtained (decrementing by 2),
// and then grab and record against the new current interval (AggregatorHolder).
private final AtomicInteger activeRecordingThreads = new AtomicInteger(0);

private AggregatorHolder() {
aggregatorHandles = new ConcurrentHashMap<>();
}

private AggregatorHolder(
ConcurrentHashMap<Attributes, AggregatorHandle<T, U>> aggregatorHandles) {
this.aggregatorHandles = aggregatorHandles;
}
}
}
Loading

0 comments on commit ffd53c7

Please sign in to comment.