Skip to content

Commit

Permalink
Use notify_all to notify worker threads to avoid notification miss …
Browse files Browse the repository at this point in the history
…in critical section.
  • Loading branch information
owent committed Mar 22, 2024
1 parent e7031c2 commit e194608
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 10 deletions.
6 changes: 3 additions & 3 deletions sdk/src/logs/batch_log_record_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ void BatchLogRecordProcessor::OnEmit(std::unique_ptr<Recordable> &&record) noexc
{
// signal the worker thread
synchronization_data_->is_force_wakeup_background_worker.store(true, std::memory_order_release);
synchronization_data_->cv.notify_one();
synchronization_data_->cv.notify_all();
}
}

Expand Down Expand Up @@ -93,7 +93,7 @@ bool BatchLogRecordProcessor::ForceFlush(std::chrono::microseconds timeout) noex
if (synchronization_data_->force_flush_pending_sequence.load(std::memory_order_acquire) >
synchronization_data_->force_flush_notified_sequence.load(std::memory_order_acquire))
{
synchronization_data_->cv.notify_one();
synchronization_data_->cv.notify_all();
}

return synchronization_data_->force_flush_notified_sequence.load(std::memory_order_acquire) >=
Expand Down Expand Up @@ -285,7 +285,7 @@ bool BatchLogRecordProcessor::Shutdown(std::chrono::microseconds timeout) noexce
if (worker_thread_.joinable())
{
synchronization_data_->is_force_wakeup_background_worker.store(true, std::memory_order_release);
synchronization_data_->cv.notify_one();
synchronization_data_->cv.notify_all();
worker_thread_.join();
}

Expand Down
6 changes: 3 additions & 3 deletions sdk/src/metrics/export/periodic_exporting_metric_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,12 @@ bool PeriodicExportingMetricReader::OnForceFlush(std::chrono::microseconds timeo
return true;
}

// Wake up the worker thread once.
// Wake up the worker thread.
if (force_flush_pending_sequence_.load(std::memory_order_acquire) >
force_flush_notified_sequence_.load(std::memory_order_acquire))
{
is_force_wakeup_background_worker_.store(true, std::memory_order_release);
cv_.notify_one();
cv_.notify_all();
}
return force_flush_notified_sequence_.load(std::memory_order_acquire) >= current_sequence;
};
Expand Down Expand Up @@ -188,7 +188,7 @@ bool PeriodicExportingMetricReader::OnShutDown(std::chrono::microseconds timeout
{
if (worker_thread_.joinable())
{
cv_.notify_one();
cv_.notify_all();
worker_thread_.join();
}
return exporter_->Shutdown(timeout);
Expand Down
8 changes: 4 additions & 4 deletions sdk/src/trace/batch_span_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ void BatchSpanProcessor::OnEnd(std::unique_ptr<Recordable> &&span) noexcept
if (buffer_size >= max_queue_size_ / 2 || buffer_size >= max_export_batch_size_)
{
// signal the worker thread
synchronization_data_->cv.notify_one();
synchronization_data_->cv.notify_all();
}
}

Expand All @@ -86,13 +86,13 @@ bool BatchSpanProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept
return true;
}

// Wake up the worker thread once.
// Wake up the worker thread.
if (synchronization_data_->force_flush_pending_sequence.load(std::memory_order_acquire) >
synchronization_data_->force_flush_notified_sequence.load(std::memory_order_acquire))
{
synchronization_data_->is_force_wakeup_background_worker.store(true,
std::memory_order_release);
synchronization_data_->cv.notify_one();
synchronization_data_->cv.notify_all();
}

return synchronization_data_->force_flush_notified_sequence.load(std::memory_order_acquire) >=
Expand Down Expand Up @@ -281,7 +281,7 @@ bool BatchSpanProcessor::Shutdown(std::chrono::microseconds timeout) noexcept
if (worker_thread_.joinable())
{
synchronization_data_->is_force_wakeup_background_worker.store(true, std::memory_order_release);
synchronization_data_->cv.notify_one();
synchronization_data_->cv.notify_all();
worker_thread_.join();
}

Expand Down

0 comments on commit e194608

Please sign in to comment.