From a9f0def8fb59279bf6eea2bd0d0c6e71f885d5ce Mon Sep 17 00:00:00 2001 From: Lalit Date: Fri, 4 Aug 2023 14:49:23 -0700 Subject: [PATCH 1/5] attribtues limit --- .../sdk/metrics/state/attributes_hashmap.h | 61 +++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/sdk/include/opentelemetry/sdk/metrics/state/attributes_hashmap.h b/sdk/include/opentelemetry/sdk/metrics/state/attributes_hashmap.h index a4b53a37c5..a2af5a5a54 100644 --- a/sdk/include/opentelemetry/sdk/metrics/state/attributes_hashmap.h +++ b/sdk/include/opentelemetry/sdk/metrics/state/attributes_hashmap.h @@ -23,6 +23,15 @@ namespace metrics using opentelemetry::sdk::common::OrderedAttributeMap; +constexpr uint16_t kAggregationCardinalityLimit = 2000; +const std::string kAggregationCardinalityLimitOverflowError = + "Maximum data points for metric stream exceeded. Entry added to overflow"; +const std::string kAttributesLimitOverflowKey = "otel.metrics.overflow"; +const std::string kAttributesLimitOverflowValue = "true"; +const size_t kOverflowAttributesHash = common::GetHashForAttributeMap( + {{kAttributesLimitOverflowKey, + kAttributesLimitOverflowValue}}); // precalculated for optimization + class AttributeHashGenerator { public: @@ -35,6 +44,9 @@ class AttributeHashGenerator class AttributesHashMap { public: + AttributesHashMap(size_t attributes_limit = kAggregationCardinalityLimit) + : attributes_limit_(attributes_limit) + {} Aggregation *Get(size_t hash) const { auto it = hash_map_.find(hash); @@ -66,6 +78,11 @@ class AttributesHashMap return it->second.second.get(); } + if (hash_map_.size() >= attributes_limit_) + { + return GetOrSetOveflowAttributes(aggregation_callback); + } + MetricAttributes attr{attributes}; hash_map_[hash] = {attr, aggregation_callback()}; @@ -80,6 +97,12 @@ class AttributesHashMap { return it->second.second.get(); } + + if (hash_map_.size() >= attributes_limit_) + { + return GetOrSetOveflowAttributes(aggregation_callback); + } + MetricAttributes attr{}; hash_map_[hash] = {attr, aggregation_callback()}; return hash_map_[hash].second.get(); @@ -95,6 +118,11 @@ class AttributesHashMap return it->second.second.get(); } + if (hash_map_.size() >= attributes_limit_) + { + return GetOrSetOveflowAttributes(aggregation_callback); + } + MetricAttributes attr{attributes}; hash_map_[hash] = {attr, aggregation_callback()}; @@ -113,6 +141,12 @@ class AttributesHashMap { it->second.second = std::move(aggr); } + else if (hash_map_.size() >= attributes_limit_) + { + hash_map_[kOverflowAttributesHash] = { + MetricAttributes{{kAttributesLimitOverflowKey, kAttributesLimitOverflowValue}}, + std::move(aggr)}; + } else { MetricAttributes attr{attributes}; @@ -127,6 +161,12 @@ class AttributesHashMap { it->second.second = std::move(aggr); } + else if (hash_map_.size() >= attributes_limit_) + { + hash_map_[kOverflowAttributesHash] = { + MetricAttributes{{kAttributesLimitOverflowKey, kAttributesLimitOverflowValue}}, + std::move(aggr)}; + } else { MetricAttributes attr{attributes}; @@ -157,6 +197,27 @@ class AttributesHashMap private: std::unordered_map>> hash_map_; + size_t attributes_limit_; + + Aggregation *GetOrSetOveflowAttributes( + std::function()> aggregation_callback) + { + auto agg = aggregation_callback(); + return GetOrSetOveflowAttributes(std::move(agg)); + } + + Aggregation *GetOrSetOveflowAttributes(std::unique_ptr agg) + { + auto it = hash_map_.find(kOverflowAttributesHash); + if (it != hash_map_.end()) + { + return it->second.second.get(); + } + + MetricAttributes attr{{kAttributesLimitOverflowKey, kAttributesLimitOverflowValue}}; + hash_map_[kOverflowAttributesHash] = {attr, std::move(agg)}; + return hash_map_[kOverflowAttributesHash].second.get(); + } }; } // namespace metrics From 3b6666dc167136a10cae6adc1a9317bb7d6c2d60 Mon Sep 17 00:00:00 2001 From: Lalit Date: Sun, 6 Aug 2023 00:54:46 -0700 Subject: [PATCH 2/5] add unit test --- sdk/test/metrics/CMakeLists.txt | 1 + sdk/test/metrics/cardinality_limit_test.cc | 48 ++++++++++++++++++++++ 2 files changed, 49 insertions(+) create mode 100644 sdk/test/metrics/cardinality_limit_test.cc diff --git a/sdk/test/metrics/CMakeLists.txt b/sdk/test/metrics/CMakeLists.txt index b30d71ae22..ccc31de689 100644 --- a/sdk/test/metrics/CMakeLists.txt +++ b/sdk/test/metrics/CMakeLists.txt @@ -16,6 +16,7 @@ foreach( attributes_hashmap_test base2_exponential_histogram_indexer_test circular_buffer_counter_test + cardinality_limit_test histogram_test sync_metric_storage_counter_test sync_metric_storage_histogram_test diff --git a/sdk/test/metrics/cardinality_limit_test.cc b/sdk/test/metrics/cardinality_limit_test.cc new file mode 100644 index 0000000000..18dbcc8771 --- /dev/null +++ b/sdk/test/metrics/cardinality_limit_test.cc @@ -0,0 +1,48 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#include "opentelemetry/common/key_value_iterable_view.h" +#include "opentelemetry/sdk/metrics/aggregation/sum_aggregation.h" +#include "opentelemetry/sdk/metrics/instruments.h" +#include "opentelemetry/sdk/metrics/state/attributes_hashmap.h" + +#include +#include + +using namespace opentelemetry::sdk::metrics; +using namespace opentelemetry::common; +namespace nostd = opentelemetry::nostd; + +TEST(CardinalityLimit, AttributesHashMapBasicTests) +{ + AttributesHashMap hash_map(10); + std::function()> aggregation_callback = + []() -> std::unique_ptr { + return std::unique_ptr(new LongSumAggregation(true)); + }; + // add 10 unique metric points. + long record_value = 100; + for (auto i = 0; i < 10; i++) + { + OrderedAttributeMap attributes = {{"key", std::to_string(i)}}; + auto hash = opentelemetry::sdk::common::GetHashForAttributeMap(attributes); + hash_map.GetOrSetDefault(attributes, aggregation_callback, hash)->Aggregate(record_value); + } + EXPECT_EQ(hash_map.Size(), 10); + // add 5 unique metric points above limit + for (auto i = 10; i < 15; i++) + { + OrderedAttributeMap attributes = {{"key", std::to_string(i)}}; + auto hash = opentelemetry::sdk::common::GetHashForAttributeMap(attributes); + hash_map.GetOrSetDefault(attributes, aggregation_callback, hash)->Aggregate(record_value); + } + EXPECT_EQ(hash_map.Size(), 11); // only one more metric point should be added as overflow. + // get the overflow metric point + auto agg = hash_map.GetOrSetDefault( + OrderedAttributeMap({{kAttributesLimitOverflowKey, kAttributesLimitOverflowValue}}), + aggregation_callback, kOverflowAttributesHash); + EXPECT_NE(agg, nullptr); + auto sum_agg = reinterpret_cast(agg); + EXPECT_EQ(nostd::get(nostd::get(sum_agg->ToPoint()).value_), + record_value * 5); +} \ No newline at end of file From 8df2f81ec3094103bb879674aae111ccbaeab51a Mon Sep 17 00:00:00 2001 From: Lalit Date: Sun, 6 Aug 2023 23:30:05 -0700 Subject: [PATCH 3/5] add more tests, and cardinality limit configurable at Sync storage --- .../sdk/metrics/state/sync_metric_storage.h | 5 +- sdk/test/metrics/cardinality_limit_test.cc | 73 +++++++++++++++++-- 2 files changed, 70 insertions(+), 8 deletions(-) diff --git a/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h b/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h index 37b0e06707..91465acd38 100644 --- a/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h +++ b/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h @@ -40,9 +40,10 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage const AttributesProcessor *attributes_processor, nostd::shared_ptr &&exemplar_reservoir OPENTELEMETRY_MAYBE_UNUSED, - const AggregationConfig *aggregation_config) + const AggregationConfig *aggregation_config, + size_t attributes_limit = kAggregationCardinalityLimit) : instrument_descriptor_(instrument_descriptor), - attributes_hashmap_(new AttributesHashMap()), + attributes_hashmap_(new AttributesHashMap(attributes_limit)), attributes_processor_(attributes_processor), #ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW exemplar_reservoir_(exemplar_reservoir), diff --git a/sdk/test/metrics/cardinality_limit_test.cc b/sdk/test/metrics/cardinality_limit_test.cc index 18dbcc8771..a55ea0c36a 100644 --- a/sdk/test/metrics/cardinality_limit_test.cc +++ b/sdk/test/metrics/cardinality_limit_test.cc @@ -1,10 +1,12 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 +#include "common.h" #include "opentelemetry/common/key_value_iterable_view.h" #include "opentelemetry/sdk/metrics/aggregation/sum_aggregation.h" #include "opentelemetry/sdk/metrics/instruments.h" #include "opentelemetry/sdk/metrics/state/attributes_hashmap.h" +#include "opentelemetry/sdk/metrics/state/sync_metric_storage.h" #include #include @@ -26,15 +28,19 @@ TEST(CardinalityLimit, AttributesHashMapBasicTests) { OrderedAttributeMap attributes = {{"key", std::to_string(i)}}; auto hash = opentelemetry::sdk::common::GetHashForAttributeMap(attributes); - hash_map.GetOrSetDefault(attributes, aggregation_callback, hash)->Aggregate(record_value); + static_cast( + hash_map.GetOrSetDefault(attributes, aggregation_callback, hash)) + ->Aggregate(record_value); } EXPECT_EQ(hash_map.Size(), 10); - // add 5 unique metric points above limit + // add 5 unique metric points above limit, they should get consolidated as single metric point. for (auto i = 10; i < 15; i++) { OrderedAttributeMap attributes = {{"key", std::to_string(i)}}; auto hash = opentelemetry::sdk::common::GetHashForAttributeMap(attributes); - hash_map.GetOrSetDefault(attributes, aggregation_callback, hash)->Aggregate(record_value); + static_cast( + hash_map.GetOrSetDefault(attributes, aggregation_callback, hash)) + ->Aggregate(record_value); } EXPECT_EQ(hash_map.Size(), 11); // only one more metric point should be added as overflow. // get the overflow metric point @@ -42,7 +48,62 @@ TEST(CardinalityLimit, AttributesHashMapBasicTests) OrderedAttributeMap({{kAttributesLimitOverflowKey, kAttributesLimitOverflowValue}}), aggregation_callback, kOverflowAttributesHash); EXPECT_NE(agg, nullptr); - auto sum_agg = reinterpret_cast(agg); - EXPECT_EQ(nostd::get(nostd::get(sum_agg->ToPoint()).value_), + auto sum_agg = static_cast(agg); + EXPECT_EQ(nostd::get(nostd::get(sum_agg->ToPoint()).value_), record_value * 5); -} \ No newline at end of file +} + +class WritableMetricStorageCardinalityLimitTestFixture + : public ::testing::TestWithParam +{}; + +TEST_P(WritableMetricStorageCardinalityLimitTestFixture, LongCounterSumAggregation) +{ + auto sdk_start_ts = std::chrono::system_clock::now(); + const size_t attributes_limit = 10; + InstrumentDescriptor instr_desc = {"name", "desc", "1unit", InstrumentType::kCounter, + InstrumentValueType::kLong}; + std::unique_ptr default_attributes_processor{ + new DefaultAttributesProcessor{}}; + SyncMetricStorage storage(instr_desc, AggregationType::kSum, default_attributes_processor.get(), + ExemplarReservoir::GetNoExemplarReservoir(), nullptr, attributes_limit); + + long record_value = 100; + // add 10 unique metric points, and 5 more above limit. + for (auto i = 0; i < 15; i++) + { + std::map attributes = {{"key", std::to_string(i)}}; + storage.RecordLong(record_value, + KeyValueIterableView>(attributes), + opentelemetry::context::Context{}); + } + AggregationTemporality temporality = GetParam(); + std::shared_ptr collector(new MockCollectorHandle(temporality)); + std::vector> collectors; + collectors.push_back(collector); + //... Some computation here + auto collection_ts = std::chrono::system_clock::now(); + size_t count_attributes = 0; + bool overflow_present = false; + storage.Collect( + collector.get(), collectors, sdk_start_ts, collection_ts, [&](const MetricData &metric_data) { + for (const auto &data_attr : metric_data.point_data_attr_) + { + const auto &data = opentelemetry::nostd::get(data_attr.point_data); + count_attributes++; + if (opentelemetry::nostd::get(data_attr.attributes.begin()->second) == + kAttributesLimitOverflowValue) + { + EXPECT_EQ(nostd::get(data.value_), record_value * 5); + overflow_present = true; + } + } + return true; + }); + EXPECT_EQ(count_attributes, attributes_limit + 1); // +1 for overflow metric point. + EXPECT_EQ(overflow_present, true); +} +INSTANTIATE_TEST_SUITE_P(All, + WritableMetricStorageCardinalityLimitTestFixture, + ::testing::Values(AggregationTemporality::kCumulative, + AggregationTemporality::kDelta)); From 097721e000892c1dbc94f74170d9d5a437de833e Mon Sep 17 00:00:00 2001 From: Lalit Date: Sun, 8 Oct 2023 22:51:43 -0700 Subject: [PATCH 4/5] Fix tests --- sdk/test/metrics/cardinality_limit_test.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdk/test/metrics/cardinality_limit_test.cc b/sdk/test/metrics/cardinality_limit_test.cc index a55ea0c36a..834e87ef56 100644 --- a/sdk/test/metrics/cardinality_limit_test.cc +++ b/sdk/test/metrics/cardinality_limit_test.cc @@ -105,5 +105,4 @@ TEST_P(WritableMetricStorageCardinalityLimitTestFixture, LongCounterSumAggregati } INSTANTIATE_TEST_SUITE_P(All, WritableMetricStorageCardinalityLimitTestFixture, - ::testing::Values(AggregationTemporality::kCumulative, - AggregationTemporality::kDelta)); + ::testing::Values(AggregationTemporality::kDelta)); From d8ed5c3a7dd655abf4620287cff8036164e818e9 Mon Sep 17 00:00:00 2001 From: Lalit Date: Thu, 9 Nov 2023 09:36:34 -0800 Subject: [PATCH 5/5] fix hasmap overflow size --- .../sdk/metrics/state/attributes_hashmap.h | 20 ++++++++++--------- sdk/test/metrics/cardinality_limit_test.cc | 18 ++++++++--------- 2 files changed, 20 insertions(+), 18 deletions(-) diff --git a/sdk/include/opentelemetry/sdk/metrics/state/attributes_hashmap.h b/sdk/include/opentelemetry/sdk/metrics/state/attributes_hashmap.h index a2af5a5a54..85009a9fcb 100644 --- a/sdk/include/opentelemetry/sdk/metrics/state/attributes_hashmap.h +++ b/sdk/include/opentelemetry/sdk/metrics/state/attributes_hashmap.h @@ -23,12 +23,12 @@ namespace metrics using opentelemetry::sdk::common::OrderedAttributeMap; -constexpr uint16_t kAggregationCardinalityLimit = 2000; +constexpr size_t kAggregationCardinalityLimit = 2000; const std::string kAggregationCardinalityLimitOverflowError = "Maximum data points for metric stream exceeded. Entry added to overflow"; -const std::string kAttributesLimitOverflowKey = "otel.metrics.overflow"; -const std::string kAttributesLimitOverflowValue = "true"; -const size_t kOverflowAttributesHash = common::GetHashForAttributeMap( +const std::string kAttributesLimitOverflowKey = "otel.metrics.overflow"; +const bool kAttributesLimitOverflowValue = true; +const size_t kOverflowAttributesHash = common::GetHashForAttributeMap( {{kAttributesLimitOverflowKey, kAttributesLimitOverflowValue}}); // precalculated for optimization @@ -78,7 +78,7 @@ class AttributesHashMap return it->second.second.get(); } - if (hash_map_.size() >= attributes_limit_) + if (IsOverflowAttributes()) { return GetOrSetOveflowAttributes(aggregation_callback); } @@ -98,7 +98,7 @@ class AttributesHashMap return it->second.second.get(); } - if (hash_map_.size() >= attributes_limit_) + if (IsOverflowAttributes()) { return GetOrSetOveflowAttributes(aggregation_callback); } @@ -118,7 +118,7 @@ class AttributesHashMap return it->second.second.get(); } - if (hash_map_.size() >= attributes_limit_) + if (IsOverflowAttributes()) { return GetOrSetOveflowAttributes(aggregation_callback); } @@ -141,7 +141,7 @@ class AttributesHashMap { it->second.second = std::move(aggr); } - else if (hash_map_.size() >= attributes_limit_) + else if (IsOverflowAttributes()) { hash_map_[kOverflowAttributesHash] = { MetricAttributes{{kAttributesLimitOverflowKey, kAttributesLimitOverflowValue}}, @@ -161,7 +161,7 @@ class AttributesHashMap { it->second.second = std::move(aggr); } - else if (hash_map_.size() >= attributes_limit_) + else if (IsOverflowAttributes()) { hash_map_[kOverflowAttributesHash] = { MetricAttributes{{kAttributesLimitOverflowKey, kAttributesLimitOverflowValue}}, @@ -218,6 +218,8 @@ class AttributesHashMap hash_map_[kOverflowAttributesHash] = {attr, std::move(agg)}; return hash_map_[kOverflowAttributesHash].second.get(); } + + bool IsOverflowAttributes() const { return (hash_map_.size() + 1 >= attributes_limit_); } }; } // namespace metrics diff --git a/sdk/test/metrics/cardinality_limit_test.cc b/sdk/test/metrics/cardinality_limit_test.cc index 834e87ef56..d290f889b5 100644 --- a/sdk/test/metrics/cardinality_limit_test.cc +++ b/sdk/test/metrics/cardinality_limit_test.cc @@ -22,7 +22,7 @@ TEST(CardinalityLimit, AttributesHashMapBasicTests) []() -> std::unique_ptr { return std::unique_ptr(new LongSumAggregation(true)); }; - // add 10 unique metric points. + // add 10 unique metric points. 9 should be added to hashmap, 10th should be overflow. long record_value = 100; for (auto i = 0; i < 10; i++) { @@ -33,7 +33,8 @@ TEST(CardinalityLimit, AttributesHashMapBasicTests) ->Aggregate(record_value); } EXPECT_EQ(hash_map.Size(), 10); - // add 5 unique metric points above limit, they should get consolidated as single metric point. + // add 5 unique metric points above limit, they all should get consolidated as single + // overflowmetric point. for (auto i = 10; i < 15; i++) { OrderedAttributeMap attributes = {{"key", std::to_string(i)}}; @@ -42,7 +43,7 @@ TEST(CardinalityLimit, AttributesHashMapBasicTests) hash_map.GetOrSetDefault(attributes, aggregation_callback, hash)) ->Aggregate(record_value); } - EXPECT_EQ(hash_map.Size(), 11); // only one more metric point should be added as overflow. + EXPECT_EQ(hash_map.Size(), 10); // only one more metric point should be added as overflow. // get the overflow metric point auto agg = hash_map.GetOrSetDefault( OrderedAttributeMap({{kAttributesLimitOverflowKey, kAttributesLimitOverflowValue}}), @@ -50,7 +51,7 @@ TEST(CardinalityLimit, AttributesHashMapBasicTests) EXPECT_NE(agg, nullptr); auto sum_agg = static_cast(agg); EXPECT_EQ(nostd::get(nostd::get(sum_agg->ToPoint()).value_), - record_value * 5); + record_value * 6); // 1 from previous 10, 5 from current 5. } class WritableMetricStorageCardinalityLimitTestFixture @@ -69,7 +70,7 @@ TEST_P(WritableMetricStorageCardinalityLimitTestFixture, LongCounterSumAggregati ExemplarReservoir::GetNoExemplarReservoir(), nullptr, attributes_limit); long record_value = 100; - // add 10 unique metric points, and 5 more above limit. + // add 9 unique metric points, and 6 more above limit. for (auto i = 0; i < 15; i++) { std::map attributes = {{"key", std::to_string(i)}}; @@ -91,16 +92,15 @@ TEST_P(WritableMetricStorageCardinalityLimitTestFixture, LongCounterSumAggregati { const auto &data = opentelemetry::nostd::get(data_attr.point_data); count_attributes++; - if (opentelemetry::nostd::get(data_attr.attributes.begin()->second) == - kAttributesLimitOverflowValue) + if (data_attr.attributes.begin()->first == kAttributesLimitOverflowKey) { - EXPECT_EQ(nostd::get(data.value_), record_value * 5); + EXPECT_EQ(nostd::get(data.value_), record_value * 6); overflow_present = true; } } return true; }); - EXPECT_EQ(count_attributes, attributes_limit + 1); // +1 for overflow metric point. + EXPECT_EQ(count_attributes, attributes_limit); EXPECT_EQ(overflow_present, true); } INSTANTIATE_TEST_SUITE_P(All,