diff --git a/sdk/include/opentelemetry/sdk/metrics/state/attributes_hashmap.h b/sdk/include/opentelemetry/sdk/metrics/state/attributes_hashmap.h index a4b53a37c5..85009a9fcb 100644 --- a/sdk/include/opentelemetry/sdk/metrics/state/attributes_hashmap.h +++ b/sdk/include/opentelemetry/sdk/metrics/state/attributes_hashmap.h @@ -23,6 +23,15 @@ namespace metrics using opentelemetry::sdk::common::OrderedAttributeMap; +constexpr size_t kAggregationCardinalityLimit = 2000; +const std::string kAggregationCardinalityLimitOverflowError = + "Maximum data points for metric stream exceeded. Entry added to overflow"; +const std::string kAttributesLimitOverflowKey = "otel.metrics.overflow"; +const bool kAttributesLimitOverflowValue = true; +const size_t kOverflowAttributesHash = common::GetHashForAttributeMap( + {{kAttributesLimitOverflowKey, + kAttributesLimitOverflowValue}}); // precalculated for optimization + class AttributeHashGenerator { public: @@ -35,6 +44,9 @@ class AttributeHashGenerator class AttributesHashMap { public: + AttributesHashMap(size_t attributes_limit = kAggregationCardinalityLimit) + : attributes_limit_(attributes_limit) + {} Aggregation *Get(size_t hash) const { auto it = hash_map_.find(hash); @@ -66,6 +78,11 @@ class AttributesHashMap return it->second.second.get(); } + if (IsOverflowAttributes()) + { + return GetOrSetOveflowAttributes(aggregation_callback); + } + MetricAttributes attr{attributes}; hash_map_[hash] = {attr, aggregation_callback()}; @@ -80,6 +97,12 @@ class AttributesHashMap { return it->second.second.get(); } + + if (IsOverflowAttributes()) + { + return GetOrSetOveflowAttributes(aggregation_callback); + } + MetricAttributes attr{}; hash_map_[hash] = {attr, aggregation_callback()}; return hash_map_[hash].second.get(); @@ -95,6 +118,11 @@ class AttributesHashMap return it->second.second.get(); } + if (IsOverflowAttributes()) + { + return GetOrSetOveflowAttributes(aggregation_callback); + } + MetricAttributes attr{attributes}; hash_map_[hash] = {attr, aggregation_callback()}; @@ -113,6 +141,12 @@ class AttributesHashMap { it->second.second = std::move(aggr); } + else if (IsOverflowAttributes()) + { + hash_map_[kOverflowAttributesHash] = { + MetricAttributes{{kAttributesLimitOverflowKey, kAttributesLimitOverflowValue}}, + std::move(aggr)}; + } else { MetricAttributes attr{attributes}; @@ -127,6 +161,12 @@ class AttributesHashMap { it->second.second = std::move(aggr); } + else if (IsOverflowAttributes()) + { + hash_map_[kOverflowAttributesHash] = { + MetricAttributes{{kAttributesLimitOverflowKey, kAttributesLimitOverflowValue}}, + std::move(aggr)}; + } else { MetricAttributes attr{attributes}; @@ -157,6 +197,29 @@ class AttributesHashMap private: std::unordered_map>> hash_map_; + size_t attributes_limit_; + + Aggregation *GetOrSetOveflowAttributes( + std::function()> aggregation_callback) + { + auto agg = aggregation_callback(); + return GetOrSetOveflowAttributes(std::move(agg)); + } + + Aggregation *GetOrSetOveflowAttributes(std::unique_ptr agg) + { + auto it = hash_map_.find(kOverflowAttributesHash); + if (it != hash_map_.end()) + { + return it->second.second.get(); + } + + MetricAttributes attr{{kAttributesLimitOverflowKey, kAttributesLimitOverflowValue}}; + hash_map_[kOverflowAttributesHash] = {attr, std::move(agg)}; + return hash_map_[kOverflowAttributesHash].second.get(); + } + + bool IsOverflowAttributes() const { return (hash_map_.size() + 1 >= attributes_limit_); } }; } // namespace metrics diff --git a/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h b/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h index b450b92179..cf249c39f2 100644 --- a/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h +++ b/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h @@ -40,9 +40,10 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage const AttributesProcessor *attributes_processor, nostd::shared_ptr &&exemplar_reservoir OPENTELEMETRY_MAYBE_UNUSED, - const AggregationConfig *aggregation_config) + const AggregationConfig *aggregation_config, + size_t attributes_limit = kAggregationCardinalityLimit) : instrument_descriptor_(instrument_descriptor), - attributes_hashmap_(new AttributesHashMap()), + attributes_hashmap_(new AttributesHashMap(attributes_limit)), attributes_processor_(attributes_processor), #ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW exemplar_reservoir_(exemplar_reservoir), diff --git a/sdk/test/metrics/CMakeLists.txt b/sdk/test/metrics/CMakeLists.txt index b30d71ae22..ccc31de689 100644 --- a/sdk/test/metrics/CMakeLists.txt +++ b/sdk/test/metrics/CMakeLists.txt @@ -16,6 +16,7 @@ foreach( attributes_hashmap_test base2_exponential_histogram_indexer_test circular_buffer_counter_test + cardinality_limit_test histogram_test sync_metric_storage_counter_test sync_metric_storage_histogram_test diff --git a/sdk/test/metrics/cardinality_limit_test.cc b/sdk/test/metrics/cardinality_limit_test.cc new file mode 100644 index 0000000000..d290f889b5 --- /dev/null +++ b/sdk/test/metrics/cardinality_limit_test.cc @@ -0,0 +1,108 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#include "common.h" +#include "opentelemetry/common/key_value_iterable_view.h" +#include "opentelemetry/sdk/metrics/aggregation/sum_aggregation.h" +#include "opentelemetry/sdk/metrics/instruments.h" +#include "opentelemetry/sdk/metrics/state/attributes_hashmap.h" +#include "opentelemetry/sdk/metrics/state/sync_metric_storage.h" + +#include +#include + +using namespace opentelemetry::sdk::metrics; +using namespace opentelemetry::common; +namespace nostd = opentelemetry::nostd; + +TEST(CardinalityLimit, AttributesHashMapBasicTests) +{ + AttributesHashMap hash_map(10); + std::function()> aggregation_callback = + []() -> std::unique_ptr { + return std::unique_ptr(new LongSumAggregation(true)); + }; + // add 10 unique metric points. 9 should be added to hashmap, 10th should be overflow. + long record_value = 100; + for (auto i = 0; i < 10; i++) + { + OrderedAttributeMap attributes = {{"key", std::to_string(i)}}; + auto hash = opentelemetry::sdk::common::GetHashForAttributeMap(attributes); + static_cast( + hash_map.GetOrSetDefault(attributes, aggregation_callback, hash)) + ->Aggregate(record_value); + } + EXPECT_EQ(hash_map.Size(), 10); + // add 5 unique metric points above limit, they all should get consolidated as single + // overflowmetric point. + for (auto i = 10; i < 15; i++) + { + OrderedAttributeMap attributes = {{"key", std::to_string(i)}}; + auto hash = opentelemetry::sdk::common::GetHashForAttributeMap(attributes); + static_cast( + hash_map.GetOrSetDefault(attributes, aggregation_callback, hash)) + ->Aggregate(record_value); + } + EXPECT_EQ(hash_map.Size(), 10); // only one more metric point should be added as overflow. + // get the overflow metric point + auto agg = hash_map.GetOrSetDefault( + OrderedAttributeMap({{kAttributesLimitOverflowKey, kAttributesLimitOverflowValue}}), + aggregation_callback, kOverflowAttributesHash); + EXPECT_NE(agg, nullptr); + auto sum_agg = static_cast(agg); + EXPECT_EQ(nostd::get(nostd::get(sum_agg->ToPoint()).value_), + record_value * 6); // 1 from previous 10, 5 from current 5. +} + +class WritableMetricStorageCardinalityLimitTestFixture + : public ::testing::TestWithParam +{}; + +TEST_P(WritableMetricStorageCardinalityLimitTestFixture, LongCounterSumAggregation) +{ + auto sdk_start_ts = std::chrono::system_clock::now(); + const size_t attributes_limit = 10; + InstrumentDescriptor instr_desc = {"name", "desc", "1unit", InstrumentType::kCounter, + InstrumentValueType::kLong}; + std::unique_ptr default_attributes_processor{ + new DefaultAttributesProcessor{}}; + SyncMetricStorage storage(instr_desc, AggregationType::kSum, default_attributes_processor.get(), + ExemplarReservoir::GetNoExemplarReservoir(), nullptr, attributes_limit); + + long record_value = 100; + // add 9 unique metric points, and 6 more above limit. + for (auto i = 0; i < 15; i++) + { + std::map attributes = {{"key", std::to_string(i)}}; + storage.RecordLong(record_value, + KeyValueIterableView>(attributes), + opentelemetry::context::Context{}); + } + AggregationTemporality temporality = GetParam(); + std::shared_ptr collector(new MockCollectorHandle(temporality)); + std::vector> collectors; + collectors.push_back(collector); + //... Some computation here + auto collection_ts = std::chrono::system_clock::now(); + size_t count_attributes = 0; + bool overflow_present = false; + storage.Collect( + collector.get(), collectors, sdk_start_ts, collection_ts, [&](const MetricData &metric_data) { + for (const auto &data_attr : metric_data.point_data_attr_) + { + const auto &data = opentelemetry::nostd::get(data_attr.point_data); + count_attributes++; + if (data_attr.attributes.begin()->first == kAttributesLimitOverflowKey) + { + EXPECT_EQ(nostd::get(data.value_), record_value * 6); + overflow_present = true; + } + } + return true; + }); + EXPECT_EQ(count_attributes, attributes_limit); + EXPECT_EQ(overflow_present, true); +} +INSTANTIATE_TEST_SUITE_P(All, + WritableMetricStorageCardinalityLimitTestFixture, + ::testing::Values(AggregationTemporality::kDelta)); diff --git a/tools/vcpkg b/tools/vcpkg index acc3bcf76b..8eb57355a4 160000 --- a/tools/vcpkg +++ b/tools/vcpkg @@ -1 +1 @@ -Subproject commit acc3bcf76b84ae5041c86ab55fe138ae7b8255c7 +Subproject commit 8eb57355a4ffb410a2e94c07b4dca2dffbee8e50