Skip to content

Commit

Permalink
Avoid missing conditional variable update and simplify atomic bool
Browse files Browse the repository at this point in the history
Addresses two issues -
1. Fix the use of a conditional variable where a wait on the variable might not be in flight when a notify is called. This is fixed by ensuring that an associated lock is aquired before calling the notify.
2. Instead of relying on a lock an a boolean, replace the use wit a single atomic boolean.
  • Loading branch information
arekay committed Feb 23, 2024
1 parent 563094b commit 2b357fb
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 11 deletions.
5 changes: 2 additions & 3 deletions sdk/include/opentelemetry/sdk/metrics/metric_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@

#pragma once

#include <atomic>
#include <chrono>
#include <memory>

#include "opentelemetry/common/spin_lock_mutex.h"
#include "opentelemetry/nostd/function_ref.h"
#include "opentelemetry/sdk/metrics/data/metric_data.h"
#include "opentelemetry/sdk/metrics/instruments.h"
Expand Down Expand Up @@ -72,8 +72,7 @@ class MetricReader
protected:
private:
MetricProducer *metric_producer_;
mutable opentelemetry::common::SpinLockMutex lock_;
bool shutdown_;
std::atomic<bool> shutdown_;
};
} // namespace metrics
} // namespace sdk
Expand Down
8 changes: 7 additions & 1 deletion sdk/src/metrics/export/periodic_exporting_metric_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h"
#include "opentelemetry/sdk/common/global_log_handler.h"
#include "opentelemetry/sdk/metrics/push_metric_exporter.h"
#include "opentelemetry/common/spin_lock_mutex.h"

#include <chrono>
#if defined(_MSC_VER)
Expand Down Expand Up @@ -101,6 +102,7 @@ bool PeriodicExportingMetricReader::CollectAndExportOnce()
bool notify_force_flush = is_force_flush_pending_.exchange(false, std::memory_order_acq_rel);
if (notify_force_flush)
{
std::unique_lock<std::mutex> lk(force_flush_m_);
is_force_flush_notified_.store(true, std::memory_order_release);
force_flush_cv_.notify_one();
}
Expand Down Expand Up @@ -191,7 +193,11 @@ bool PeriodicExportingMetricReader::OnShutDown(std::chrono::microseconds timeout
{
if (worker_thread_.joinable())
{
cv_.notify_one();
{
//ensure that `cv_` is awaiting, and the update doesn't get lost
std::unique_lock<std::mutex> lk(cv_m_);
cv_.notify_one();
}
worker_thread_.join();
}
return exporter_->Shutdown(timeout);
Expand Down
10 changes: 3 additions & 7 deletions sdk/src/metrics/metric_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,7 @@ bool MetricReader::Shutdown(std::chrono::microseconds timeout) noexcept
OTEL_INTERNAL_LOG_WARN("MetricReader::Shutdown - Cannot invoke shutdown twice!");
}

{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
shutdown_ = true;
}
shutdown_.store(true, std::memory_order_release);

if (!OnShutDown(timeout))
{
Expand All @@ -65,7 +62,7 @@ bool MetricReader::Shutdown(std::chrono::microseconds timeout) noexcept
bool MetricReader::ForceFlush(std::chrono::microseconds timeout) noexcept
{
bool status = true;
if (shutdown_)
if (IsShutdown())
{
OTEL_INTERNAL_LOG_WARN("MetricReader::Shutdown Cannot invoke Force flush on shutdown reader!");
}
Expand All @@ -79,8 +76,7 @@ bool MetricReader::ForceFlush(std::chrono::microseconds timeout) noexcept

bool MetricReader::IsShutdown() const noexcept
{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
return shutdown_;
return shutdown_.load(std::memory_order_acquire);
}

} // namespace metrics
Expand Down

0 comments on commit 2b357fb

Please sign in to comment.