From e1946087b7d93776e06b4cfe2ac38367c7a87665 Mon Sep 17 00:00:00 2001 From: owent Date: Fri, 22 Mar 2024 19:54:37 +0800 Subject: [PATCH] Use `notify_all` to notify worker threads to avoid notification miss in critical section. --- sdk/src/logs/batch_log_record_processor.cc | 6 +++--- .../metrics/export/periodic_exporting_metric_reader.cc | 6 +++--- sdk/src/trace/batch_span_processor.cc | 8 ++++---- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/sdk/src/logs/batch_log_record_processor.cc b/sdk/src/logs/batch_log_record_processor.cc index 8257a3b935..c29e27bdd3 100644 --- a/sdk/src/logs/batch_log_record_processor.cc +++ b/sdk/src/logs/batch_log_record_processor.cc @@ -65,7 +65,7 @@ void BatchLogRecordProcessor::OnEmit(std::unique_ptr &&record) noexc { // signal the worker thread synchronization_data_->is_force_wakeup_background_worker.store(true, std::memory_order_release); - synchronization_data_->cv.notify_one(); + synchronization_data_->cv.notify_all(); } } @@ -93,7 +93,7 @@ bool BatchLogRecordProcessor::ForceFlush(std::chrono::microseconds timeout) noex if (synchronization_data_->force_flush_pending_sequence.load(std::memory_order_acquire) > synchronization_data_->force_flush_notified_sequence.load(std::memory_order_acquire)) { - synchronization_data_->cv.notify_one(); + synchronization_data_->cv.notify_all(); } return synchronization_data_->force_flush_notified_sequence.load(std::memory_order_acquire) >= @@ -285,7 +285,7 @@ bool BatchLogRecordProcessor::Shutdown(std::chrono::microseconds timeout) noexce if (worker_thread_.joinable()) { synchronization_data_->is_force_wakeup_background_worker.store(true, std::memory_order_release); - synchronization_data_->cv.notify_one(); + synchronization_data_->cv.notify_all(); worker_thread_.join(); } diff --git a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc index 6bc612592a..2d07011a24 100644 --- a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc +++ b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc @@ -122,12 +122,12 @@ bool PeriodicExportingMetricReader::OnForceFlush(std::chrono::microseconds timeo return true; } - // Wake up the worker thread once. + // Wake up the worker thread. if (force_flush_pending_sequence_.load(std::memory_order_acquire) > force_flush_notified_sequence_.load(std::memory_order_acquire)) { is_force_wakeup_background_worker_.store(true, std::memory_order_release); - cv_.notify_one(); + cv_.notify_all(); } return force_flush_notified_sequence_.load(std::memory_order_acquire) >= current_sequence; }; @@ -188,7 +188,7 @@ bool PeriodicExportingMetricReader::OnShutDown(std::chrono::microseconds timeout { if (worker_thread_.joinable()) { - cv_.notify_one(); + cv_.notify_all(); worker_thread_.join(); } return exporter_->Shutdown(timeout); diff --git a/sdk/src/trace/batch_span_processor.cc b/sdk/src/trace/batch_span_processor.cc index 329f3d4181..3827fad495 100644 --- a/sdk/src/trace/batch_span_processor.cc +++ b/sdk/src/trace/batch_span_processor.cc @@ -62,7 +62,7 @@ void BatchSpanProcessor::OnEnd(std::unique_ptr &&span) noexcept if (buffer_size >= max_queue_size_ / 2 || buffer_size >= max_export_batch_size_) { // signal the worker thread - synchronization_data_->cv.notify_one(); + synchronization_data_->cv.notify_all(); } } @@ -86,13 +86,13 @@ bool BatchSpanProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept return true; } - // Wake up the worker thread once. + // Wake up the worker thread. if (synchronization_data_->force_flush_pending_sequence.load(std::memory_order_acquire) > synchronization_data_->force_flush_notified_sequence.load(std::memory_order_acquire)) { synchronization_data_->is_force_wakeup_background_worker.store(true, std::memory_order_release); - synchronization_data_->cv.notify_one(); + synchronization_data_->cv.notify_all(); } return synchronization_data_->force_flush_notified_sequence.load(std::memory_order_acquire) >= @@ -281,7 +281,7 @@ bool BatchSpanProcessor::Shutdown(std::chrono::microseconds timeout) noexcept if (worker_thread_.joinable()) { synchronization_data_->is_force_wakeup_background_worker.store(true, std::memory_order_release); - synchronization_data_->cv.notify_one(); + synchronization_data_->cv.notify_all(); worker_thread_.join(); }