Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SDK] Cardinality limits for metrics streams (Sync Instruments + Delta Temporality) #2255

Merged
merged 11 commits into from
Nov 21, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,15 @@ namespace metrics

using opentelemetry::sdk::common::OrderedAttributeMap;

constexpr size_t kAggregationCardinalityLimit = 2000;
const std::string kAggregationCardinalityLimitOverflowError =
"Maximum data points for metric stream exceeded. Entry added to overflow";
const std::string kAttributesLimitOverflowKey = "otel.metrics.overflow";
const bool kAttributesLimitOverflowValue = true;
const size_t kOverflowAttributesHash = common::GetHashForAttributeMap(
{{kAttributesLimitOverflowKey,
kAttributesLimitOverflowValue}}); // precalculated for optimization

class AttributeHashGenerator
{
public:
Expand All @@ -35,6 +44,9 @@ class AttributeHashGenerator
class AttributesHashMap
{
public:
AttributesHashMap(size_t attributes_limit = kAggregationCardinalityLimit)
: attributes_limit_(attributes_limit)
{}
Aggregation *Get(size_t hash) const
{
auto it = hash_map_.find(hash);
Expand Down Expand Up @@ -66,6 +78,11 @@ class AttributesHashMap
return it->second.second.get();
}

if (IsOverflowAttributes())
{
return GetOrSetOveflowAttributes(aggregation_callback);
}

MetricAttributes attr{attributes};

hash_map_[hash] = {attr, aggregation_callback()};
Expand All @@ -80,6 +97,12 @@ class AttributesHashMap
{
return it->second.second.get();
}

if (IsOverflowAttributes())
{
return GetOrSetOveflowAttributes(aggregation_callback);
}

MetricAttributes attr{};
hash_map_[hash] = {attr, aggregation_callback()};
return hash_map_[hash].second.get();
Expand All @@ -95,6 +118,11 @@ class AttributesHashMap
return it->second.second.get();
}

if (IsOverflowAttributes())
{
return GetOrSetOveflowAttributes(aggregation_callback);
}

MetricAttributes attr{attributes};

hash_map_[hash] = {attr, aggregation_callback()};
Expand All @@ -113,6 +141,12 @@ class AttributesHashMap
{
it->second.second = std::move(aggr);
}
else if (IsOverflowAttributes())
{
hash_map_[kOverflowAttributesHash] = {
MetricAttributes{{kAttributesLimitOverflowKey, kAttributesLimitOverflowValue}},
std::move(aggr)};
}
else
{
MetricAttributes attr{attributes};
Expand All @@ -127,6 +161,12 @@ class AttributesHashMap
{
it->second.second = std::move(aggr);
}
else if (IsOverflowAttributes())
{
hash_map_[kOverflowAttributesHash] = {
MetricAttributes{{kAttributesLimitOverflowKey, kAttributesLimitOverflowValue}},
std::move(aggr)};
}
else
{
MetricAttributes attr{attributes};
Expand Down Expand Up @@ -157,6 +197,29 @@ class AttributesHashMap

private:
std::unordered_map<size_t, std::pair<MetricAttributes, std::unique_ptr<Aggregation>>> hash_map_;
size_t attributes_limit_;

Aggregation *GetOrSetOveflowAttributes(
std::function<std::unique_ptr<Aggregation>()> aggregation_callback)
{
auto agg = aggregation_callback();
return GetOrSetOveflowAttributes(std::move(agg));
}

Aggregation *GetOrSetOveflowAttributes(std::unique_ptr<Aggregation> agg)
{
auto it = hash_map_.find(kOverflowAttributesHash);
if (it != hash_map_.end())
{
return it->second.second.get();
}

MetricAttributes attr{{kAttributesLimitOverflowKey, kAttributesLimitOverflowValue}};
hash_map_[kOverflowAttributesHash] = {attr, std::move(agg)};
return hash_map_[kOverflowAttributesHash].second.get();
}

bool IsOverflowAttributes() const { return (hash_map_.size() + 1 >= attributes_limit_); }
};
} // namespace metrics

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage
const AttributesProcessor *attributes_processor,
nostd::shared_ptr<ExemplarReservoir> &&exemplar_reservoir
OPENTELEMETRY_MAYBE_UNUSED,
const AggregationConfig *aggregation_config)
const AggregationConfig *aggregation_config,
size_t attributes_limit = kAggregationCardinalityLimit)
: instrument_descriptor_(instrument_descriptor),
attributes_hashmap_(new AttributesHashMap()),
attributes_hashmap_(new AttributesHashMap(attributes_limit)),
attributes_processor_(attributes_processor),
#ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW
exemplar_reservoir_(exemplar_reservoir),
Expand Down
1 change: 1 addition & 0 deletions sdk/test/metrics/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ foreach(
attributes_hashmap_test
base2_exponential_histogram_indexer_test
circular_buffer_counter_test
cardinality_limit_test
histogram_test
sync_metric_storage_counter_test
sync_metric_storage_histogram_test
Expand Down
108 changes: 108 additions & 0 deletions sdk/test/metrics/cardinality_limit_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#include "common.h"
#include "opentelemetry/common/key_value_iterable_view.h"
#include "opentelemetry/sdk/metrics/aggregation/sum_aggregation.h"
#include "opentelemetry/sdk/metrics/instruments.h"
#include "opentelemetry/sdk/metrics/state/attributes_hashmap.h"
#include "opentelemetry/sdk/metrics/state/sync_metric_storage.h"

#include <gtest/gtest.h>
#include <functional>

using namespace opentelemetry::sdk::metrics;
using namespace opentelemetry::common;
namespace nostd = opentelemetry::nostd;

TEST(CardinalityLimit, AttributesHashMapBasicTests)
{
AttributesHashMap hash_map(10);
std::function<std::unique_ptr<Aggregation>()> aggregation_callback =
[]() -> std::unique_ptr<Aggregation> {
return std::unique_ptr<Aggregation>(new LongSumAggregation(true));
};
// add 10 unique metric points. 9 should be added to hashmap, 10th should be overflow.
long record_value = 100;
for (auto i = 0; i < 10; i++)
{
OrderedAttributeMap attributes = {{"key", std::to_string(i)}};
auto hash = opentelemetry::sdk::common::GetHashForAttributeMap(attributes);
static_cast<LongSumAggregation *>(
hash_map.GetOrSetDefault(attributes, aggregation_callback, hash))
->Aggregate(record_value);
}
EXPECT_EQ(hash_map.Size(), 10);
// add 5 unique metric points above limit, they all should get consolidated as single
// overflowmetric point.
for (auto i = 10; i < 15; i++)
{
OrderedAttributeMap attributes = {{"key", std::to_string(i)}};
auto hash = opentelemetry::sdk::common::GetHashForAttributeMap(attributes);
static_cast<LongSumAggregation *>(
hash_map.GetOrSetDefault(attributes, aggregation_callback, hash))
->Aggregate(record_value);
}
EXPECT_EQ(hash_map.Size(), 10); // only one more metric point should be added as overflow.
// get the overflow metric point
auto agg = hash_map.GetOrSetDefault(
OrderedAttributeMap({{kAttributesLimitOverflowKey, kAttributesLimitOverflowValue}}),
aggregation_callback, kOverflowAttributesHash);
EXPECT_NE(agg, nullptr);
auto sum_agg = static_cast<LongSumAggregation *>(agg);
EXPECT_EQ(nostd::get<int64_t>(nostd::get<SumPointData>(sum_agg->ToPoint()).value_),
record_value * 6); // 1 from previous 10, 5 from current 5.
}

class WritableMetricStorageCardinalityLimitTestFixture
: public ::testing::TestWithParam<AggregationTemporality>
{};

TEST_P(WritableMetricStorageCardinalityLimitTestFixture, LongCounterSumAggregation)
{
auto sdk_start_ts = std::chrono::system_clock::now();
const size_t attributes_limit = 10;
InstrumentDescriptor instr_desc = {"name", "desc", "1unit", InstrumentType::kCounter,
InstrumentValueType::kLong};
std::unique_ptr<DefaultAttributesProcessor> default_attributes_processor{
new DefaultAttributesProcessor{}};
SyncMetricStorage storage(instr_desc, AggregationType::kSum, default_attributes_processor.get(),
ExemplarReservoir::GetNoExemplarReservoir(), nullptr, attributes_limit);

long record_value = 100;
// add 9 unique metric points, and 6 more above limit.
for (auto i = 0; i < 15; i++)
{
std::map<std::string, std::string> attributes = {{"key", std::to_string(i)}};
storage.RecordLong(record_value,
KeyValueIterableView<std::map<std::string, std::string>>(attributes),
opentelemetry::context::Context{});
}
AggregationTemporality temporality = GetParam();
std::shared_ptr<CollectorHandle> collector(new MockCollectorHandle(temporality));
std::vector<std::shared_ptr<CollectorHandle>> collectors;
collectors.push_back(collector);
//... Some computation here
auto collection_ts = std::chrono::system_clock::now();
size_t count_attributes = 0;
bool overflow_present = false;
storage.Collect(
collector.get(), collectors, sdk_start_ts, collection_ts, [&](const MetricData &metric_data) {
for (const auto &data_attr : metric_data.point_data_attr_)
{
const auto &data = opentelemetry::nostd::get<SumPointData>(data_attr.point_data);
count_attributes++;
if (data_attr.attributes.begin()->first == kAttributesLimitOverflowKey)
{
EXPECT_EQ(nostd::get<int64_t>(data.value_), record_value * 6);
overflow_present = true;
}
}
return true;
});
EXPECT_EQ(count_attributes, attributes_limit);
EXPECT_EQ(overflow_present, true);
}
INSTANTIATE_TEST_SUITE_P(All,
WritableMetricStorageCardinalityLimitTestFixture,
::testing::Values(AggregationTemporality::kDelta));