Skip to content

Commit

Permalink
add more tests, and cardinality limit configurable at Sync storage
Browse files Browse the repository at this point in the history
  • Loading branch information
lalitb committed Aug 7, 2023
1 parent 3b6666d commit 8df2f81
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage
const AttributesProcessor *attributes_processor,
nostd::shared_ptr<ExemplarReservoir> &&exemplar_reservoir
OPENTELEMETRY_MAYBE_UNUSED,
const AggregationConfig *aggregation_config)
const AggregationConfig *aggregation_config,
size_t attributes_limit = kAggregationCardinalityLimit)
: instrument_descriptor_(instrument_descriptor),
attributes_hashmap_(new AttributesHashMap()),
attributes_hashmap_(new AttributesHashMap(attributes_limit)),
attributes_processor_(attributes_processor),
#ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW
exemplar_reservoir_(exemplar_reservoir),
Expand Down
73 changes: 67 additions & 6 deletions sdk/test/metrics/cardinality_limit_test.cc
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#include "common.h"
#include "opentelemetry/common/key_value_iterable_view.h"
#include "opentelemetry/sdk/metrics/aggregation/sum_aggregation.h"
#include "opentelemetry/sdk/metrics/instruments.h"
#include "opentelemetry/sdk/metrics/state/attributes_hashmap.h"
#include "opentelemetry/sdk/metrics/state/sync_metric_storage.h"

#include <gtest/gtest.h>
#include <functional>
Expand All @@ -26,23 +28,82 @@ TEST(CardinalityLimit, AttributesHashMapBasicTests)
{
OrderedAttributeMap attributes = {{"key", std::to_string(i)}};
auto hash = opentelemetry::sdk::common::GetHashForAttributeMap(attributes);
hash_map.GetOrSetDefault(attributes, aggregation_callback, hash)->Aggregate(record_value);
static_cast<LongSumAggregation *>(
hash_map.GetOrSetDefault(attributes, aggregation_callback, hash))
->Aggregate(record_value);
}
EXPECT_EQ(hash_map.Size(), 10);
// add 5 unique metric points above limit
// add 5 unique metric points above limit, they should get consolidated as single metric point.
for (auto i = 10; i < 15; i++)
{
OrderedAttributeMap attributes = {{"key", std::to_string(i)}};
auto hash = opentelemetry::sdk::common::GetHashForAttributeMap(attributes);
hash_map.GetOrSetDefault(attributes, aggregation_callback, hash)->Aggregate(record_value);
static_cast<LongSumAggregation *>(
hash_map.GetOrSetDefault(attributes, aggregation_callback, hash))
->Aggregate(record_value);
}
EXPECT_EQ(hash_map.Size(), 11); // only one more metric point should be added as overflow.
// get the overflow metric point
auto agg = hash_map.GetOrSetDefault(
OrderedAttributeMap({{kAttributesLimitOverflowKey, kAttributesLimitOverflowValue}}),
aggregation_callback, kOverflowAttributesHash);
EXPECT_NE(agg, nullptr);
auto sum_agg = reinterpret_cast<LongSumAggregation *>(agg);
EXPECT_EQ(nostd::get<long>(nostd::get<SumPointData>(sum_agg->ToPoint()).value_),
auto sum_agg = static_cast<LongSumAggregation *>(agg);
EXPECT_EQ(nostd::get<int64_t>(nostd::get<SumPointData>(sum_agg->ToPoint()).value_),
record_value * 5);
}
}

class WritableMetricStorageCardinalityLimitTestFixture
: public ::testing::TestWithParam<AggregationTemporality>
{};

TEST_P(WritableMetricStorageCardinalityLimitTestFixture, LongCounterSumAggregation)
{
auto sdk_start_ts = std::chrono::system_clock::now();
const size_t attributes_limit = 10;
InstrumentDescriptor instr_desc = {"name", "desc", "1unit", InstrumentType::kCounter,
InstrumentValueType::kLong};
std::unique_ptr<DefaultAttributesProcessor> default_attributes_processor{
new DefaultAttributesProcessor{}};
SyncMetricStorage storage(instr_desc, AggregationType::kSum, default_attributes_processor.get(),
ExemplarReservoir::GetNoExemplarReservoir(), nullptr, attributes_limit);

long record_value = 100;
// add 10 unique metric points, and 5 more above limit.
for (auto i = 0; i < 15; i++)
{
std::map<std::string, std::string> attributes = {{"key", std::to_string(i)}};
storage.RecordLong(record_value,
KeyValueIterableView<std::map<std::string, std::string>>(attributes),
opentelemetry::context::Context{});
}
AggregationTemporality temporality = GetParam();
std::shared_ptr<CollectorHandle> collector(new MockCollectorHandle(temporality));
std::vector<std::shared_ptr<CollectorHandle>> collectors;
collectors.push_back(collector);
//... Some computation here
auto collection_ts = std::chrono::system_clock::now();
size_t count_attributes = 0;
bool overflow_present = false;
storage.Collect(
collector.get(), collectors, sdk_start_ts, collection_ts, [&](const MetricData &metric_data) {
for (const auto &data_attr : metric_data.point_data_attr_)
{
const auto &data = opentelemetry::nostd::get<SumPointData>(data_attr.point_data);
count_attributes++;
if (opentelemetry::nostd::get<std::string>(data_attr.attributes.begin()->second) ==
kAttributesLimitOverflowValue)
{
EXPECT_EQ(nostd::get<int64_t>(data.value_), record_value * 5);
overflow_present = true;
}
}
return true;
});
EXPECT_EQ(count_attributes, attributes_limit + 1); // +1 for overflow metric point.
EXPECT_EQ(overflow_present, true);
}
INSTANTIATE_TEST_SUITE_P(All,
WritableMetricStorageCardinalityLimitTestFixture,
::testing::Values(AggregationTemporality::kCumulative,
AggregationTemporality::kDelta));

0 comments on commit 8df2f81

Please sign in to comment.