Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SDK] Add exemplar reservoir to async metric storage #2319

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "opentelemetry/nostd/shared_ptr.h"
#include "opentelemetry/sdk/common/attributemap_hash.h"
#include "opentelemetry/sdk/metrics/aggregation/default_aggregation.h"
#include "opentelemetry/sdk/metrics/exemplar/reservoir.h"
#include "opentelemetry/sdk/metrics/instruments.h"
#include "opentelemetry/sdk/metrics/observer_result.h"
#include "opentelemetry/sdk/metrics/state/attributes_hashmap.h"
Expand All @@ -29,11 +30,16 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora
public:
AsyncMetricStorage(InstrumentDescriptor instrument_descriptor,
const AggregationType aggregation_type,
nostd::shared_ptr<ExemplarReservoir> &&exemplar_reservoir
OPENTELEMETRY_MAYBE_UNUSED,
const AggregationConfig *aggregation_config)
: instrument_descriptor_(instrument_descriptor),
aggregation_type_{aggregation_type},
cumulative_hash_map_(new AttributesHashMap()),
delta_hash_map_(new AttributesHashMap()),
#ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW
exemplar_reservoir_(exemplar_reservoir),
#endif
temporal_metric_storage_(instrument_descriptor, aggregation_type, aggregation_config)
{}

Expand All @@ -47,6 +53,11 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora
std::lock_guard<opentelemetry::common::SpinLockMutex> guard(hashmap_lock_);
for (auto &measurement : measurements)
{
#ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW
exemplar_reservoir_->OfferMeasurement(measurement.second, {}, {},
std::chrono::system_clock::now());
#endif
Comment on lines +56 to +59
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes the async code at par with the sync metric storage, so approved.

In the long term, I think we will need to refactor the exemplar reservoir interface, to reduce overhead.

This calls:

  • creates dummy objects, for each measurements
  • calls now(), for each measurement
  • invoke a virtual function
  • only to land on a noop implementation in common cases

Very wasteful compared to representing a noop reservoir with a nullptr, and testing a pointer.

Ok as long as we have a feature flag.


auto aggr = DefaultAggregation::CreateAggregation(aggregation_type_, instrument_descriptor_);
aggr->Aggregate(measurement.second);
auto hash = opentelemetry::sdk::common::GetHashForAttributeMap(measurement.first);
Expand Down Expand Up @@ -119,6 +130,9 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora
std::unique_ptr<AttributesHashMap> cumulative_hash_map_;
std::unique_ptr<AttributesHashMap> delta_hash_map_;
opentelemetry::common::SpinLockMutex hashmap_lock_;
#ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW
nostd::shared_ptr<ExemplarReservoir> exemplar_reservoir_;
#endif
TemporalMetricStorage temporal_metric_storage_;
};

Expand Down
3 changes: 2 additions & 1 deletion sdk/src/metrics/meter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,8 @@ std::unique_ptr<AsyncWritableMetricStorage> Meter::RegisterAsyncMetricStorage(
view_instr_desc.description_ = view.GetDescription();
}
auto storage = std::shared_ptr<AsyncMetricStorage>(new AsyncMetricStorage(
view_instr_desc, view.GetAggregationType(), view.GetAggregationConfig()));
view_instr_desc, view.GetAggregationType(), ExemplarReservoir::GetNoExemplarReservoir(),
view.GetAggregationConfig()));
storage_registry_[instrument_descriptor.name_] = storage;
static_cast<AsyncMultiMetricStorage *>(storages.get())->AddStorage(storage);
return true;
Expand Down
14 changes: 8 additions & 6 deletions sdk/test/metrics/async_metric_storage_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include "opentelemetry/common/key_value_iterable_view.h"
#include "opentelemetry/sdk/metrics/async_instruments.h"
#include "opentelemetry/sdk/metrics/exemplar/reservoir.h"
#include "opentelemetry/sdk/metrics/instruments.h"
#include "opentelemetry/sdk/metrics/meter_context.h"
#include "opentelemetry/sdk/metrics/metric_reader.h"
Expand Down Expand Up @@ -53,8 +54,8 @@ TEST_P(WritableMetricStorageTestFixture, TestAggregation)
std::vector<std::shared_ptr<CollectorHandle>> collectors;
collectors.push_back(collector);

opentelemetry::sdk::metrics::AsyncMetricStorage storage(instr_desc, AggregationType::kSum,
nullptr);
opentelemetry::sdk::metrics::AsyncMetricStorage storage(
instr_desc, AggregationType::kSum, ExemplarReservoir::GetNoExemplarReservoir(), nullptr);
int64_t get_count1 = 20;
int64_t put_count1 = 10;
std::unordered_map<MetricAttributes, int64_t, AttributeHashGenerator> measurements1 = {
Expand Down Expand Up @@ -144,8 +145,8 @@ TEST_P(WritableMetricStorageTestUpDownFixture, TestAggregation)
std::vector<std::shared_ptr<CollectorHandle>> collectors;
collectors.push_back(collector);

opentelemetry::sdk::metrics::AsyncMetricStorage storage(instr_desc, AggregationType::kDefault,
nullptr);
opentelemetry::sdk::metrics::AsyncMetricStorage storage(
instr_desc, AggregationType::kDefault, ExemplarReservoir::GetNoExemplarReservoir(), nullptr);
int64_t get_count1 = 20;
int64_t put_count1 = 10;
std::unordered_map<MetricAttributes, int64_t, AttributeHashGenerator> measurements1 = {
Expand Down Expand Up @@ -234,8 +235,9 @@ TEST_P(WritableMetricStorageTestObservableGaugeFixture, TestAggregation)
std::vector<std::shared_ptr<CollectorHandle>> collectors;
collectors.push_back(collector);

opentelemetry::sdk::metrics::AsyncMetricStorage storage(instr_desc, AggregationType::kLastValue,
nullptr);
opentelemetry::sdk::metrics::AsyncMetricStorage storage(
instr_desc, AggregationType::kLastValue, ExemplarReservoir::GetNoExemplarReservoir(),
nullptr);
int64_t freq_cpu0 = 3;
int64_t freq_cpu1 = 5;
std::unordered_map<MetricAttributes, int64_t, AttributeHashGenerator> measurements1 = {
Expand Down