From 5de13b095c64ed59897d9b66c1138e10933299a3 Mon Sep 17 00:00:00 2001 From: Andy Lok Date: Tue, 19 Sep 2023 03:16:21 +0800 Subject: [PATCH] improve: use ringbuffer to send spans Signed-off-by: Andy Lok --- Cargo.toml | 2 +- minitrace/Cargo.toml | 1 + minitrace/benches/spsc.rs | 177 ++++++++++++------ minitrace/benches/trace.rs | 41 ++-- minitrace/src/collector/global_collector.rs | 14 +- minitrace/src/util/spsc.rs | 75 ++++---- .../Cargo.toml | 2 +- .../src/main.rs | 2 +- 8 files changed, 191 insertions(+), 123 deletions(-) rename {test-no-report => test-statically-disable}/Cargo.toml (85%) rename {test-no-report => test-statically-disable}/src/main.rs (98%) diff --git a/Cargo.toml b/Cargo.toml index 958fe469..b17ba21f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,7 @@ members = [ "minitrace-jaeger", "minitrace-datadog", "minitrace-opentelemetry", - "test-no-report", + "test-statically-disable", ] [profile.bench] diff --git a/minitrace/Cargo.toml b/minitrace/Cargo.toml index 7bfb9c27..0eb2365c 100644 --- a/minitrace/Cargo.toml +++ b/minitrace/Cargo.toml @@ -24,6 +24,7 @@ pin-project = "1" # TODO: Remove once_cell once #![feature(once_cell)] is stabilized once_cell = "1" rand = "0.8" +rtrb = "0.2" [dev-dependencies] # The procedural macro `trace` only supports async-trait higher than 0.1.52 diff --git a/minitrace/benches/spsc.rs b/minitrace/benches/spsc.rs index e46f8f0f..dacabce5 100644 --- a/minitrace/benches/spsc.rs +++ b/minitrace/benches/spsc.rs @@ -1,65 +1,85 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. +use std::time::Duration; +use std::time::Instant; + use criterion::criterion_group; use criterion::criterion_main; use criterion::Criterion; -fn crossbeam(nmsg: usize) { - let (tx, rx) = crossbeam::channel::bounded(10240); - - crossbeam::scope(|scope| { - scope.spawn(|_| { - for i in 0..nmsg { - tx.send(i).unwrap(); - } - }); - - for _ in 0..nmsg { - while rx.try_recv().is_ok() {} - } - }) - .unwrap(); -} - -fn crossbeam_send_only(nmsg: usize) { - let (tx, _rx) = crossbeam::channel::bounded(10240); - - for i in 0..nmsg { - tx.send(i).unwrap(); - } -} - -fn minitrace(nmsg: usize) { - let (tx, mut rx) = minitrace::util::spsc::bounded(10240); - - crossbeam::scope(|scope| { - scope.spawn(|_| { - for i in 0..nmsg { - tx.send(i).unwrap(); - } - }); - - for _ in 0..nmsg { - while rx.try_recv().unwrap().is_some() {} - } - }) - .unwrap(); -} - -fn minitrace_send_only(nmsg: usize) { - let (tx, _rx) = minitrace::util::spsc::bounded(10240); - - for i in 0..nmsg { - tx.send(i).unwrap(); - } -} - fn spsc_comparison(c: &mut Criterion) { let mut bgroup = c.benchmark_group("spsc channel"); - for len in &[1, 10, 100, 1000, 10000] { - bgroup.bench_function(format!("crossbeam/{}", len), |b| b.iter(|| crossbeam(*len))); - bgroup.bench_function(format!("minitrace/{}", len), |b| b.iter(|| minitrace(*len))); + for &len in &[1, 10, 100, 1000, 10000] { + bgroup.bench_function(format!("crossbeam/{}", len), |b| { + b.iter_custom(|iters| { + let mut total_time = Duration::default(); + for _ in 0..iters { + let (tx, rx) = crossbeam::channel::bounded(10240); + + let start = Instant::now(); + + std::thread::spawn(move || { + for i in 0..len { + while tx.try_send(i).is_err() {} + } + }); + + for _ in 0..len { + while rx.try_recv().is_err() {} + } + + total_time += start.elapsed(); + } + total_time + }) + }); + bgroup.bench_function(format!("ringbuffer/{}", len), |b| { + b.iter_custom(|iters| { + let mut total_time = Duration::default(); + for _ in 0..iters { + let (mut tx, mut rx) = rtrb::RingBuffer::new(10240); + + let start = Instant::now(); + + std::thread::spawn(move || { + for i in 0..len { + while tx.push(i).is_err() {} + } + }); + + for _ in 0..len { + while rx.pop().is_err() {} + } + + total_time += start.elapsed(); + } + total_time + }) + }); + bgroup.bench_function(format!("minitrace/{}", len), |b| { + b.iter_custom(|iters| { + let mut total_time = Duration::default(); + for _ in 0..iters { + let (mut tx, mut rx) = minitrace::util::spsc::bounded(10240); + + let start = Instant::now(); + + std::thread::spawn(move || { + for i in 0..len { + while tx.send(i).is_err() {} + } + }); + + for _ in 0..len { + while rx.try_recv().is_err() {} + } + + total_time += start.elapsed(); + } + total_time + }) + }); } bgroup.finish(); @@ -68,12 +88,57 @@ fn spsc_comparison(c: &mut Criterion) { fn spsc_send_only_comparison(c: &mut Criterion) { let mut bgroup = c.benchmark_group("spsc channel send only"); - for len in &[1, 10, 100, 1000, 10000] { + for &len in &[1, 10, 100, 1000, 10000] { bgroup.bench_function(format!("crossbeam/{}", len), |b| { - b.iter(|| crossbeam_send_only(*len)) + b.iter_custom(|iters| { + let mut total_time = Duration::default(); + for _ in 0..iters { + let (tx, _rx) = crossbeam::channel::bounded(10240); + + let start = Instant::now(); + + for i in 0..len { + tx.send(i).unwrap(); + } + + total_time += start.elapsed(); + } + total_time + }) + }); + bgroup.bench_function(format!("ringbuffer/{}", len), |b| { + b.iter_custom(|iters| { + let mut total_time = Duration::default(); + for _ in 0..iters { + let (mut tx, _rx) = rtrb::RingBuffer::new(10240); + + let start = Instant::now(); + + for i in 0..len { + tx.push(i).unwrap(); + } + + total_time += start.elapsed(); + } + total_time + }) }); bgroup.bench_function(format!("minitrace/{}", len), |b| { - b.iter(|| minitrace_send_only(*len)) + b.iter_custom(|iters| { + let mut total_time = Duration::default(); + for _ in 0..iters { + let (mut tx, _rx) = minitrace::util::spsc::bounded(10240); + + let start = Instant::now(); + + for i in 0..len { + tx.send(i).unwrap(); + } + + total_time += start.elapsed(); + } + total_time + }) }); } diff --git a/minitrace/benches/trace.rs b/minitrace/benches/trace.rs index 22eaa803..a816dbb3 100644 --- a/minitrace/benches/trace.rs +++ b/minitrace/benches/trace.rs @@ -7,6 +7,16 @@ use criterion::Criterion; use minitrace::local::LocalCollector; use minitrace::prelude::*; +fn init_minitrace() { + struct DummyReporter; + + impl minitrace::collector::Reporter for DummyReporter { + fn report(&mut self, _spans: &[minitrace::prelude::SpanRecord]) {} + } + + minitrace::set_reporter(DummyReporter, minitrace::collector::Config::default()); +} + fn dummy_iter(i: usize) { #[trace] fn dummy() {} @@ -25,7 +35,6 @@ fn dummy_rec(i: usize) { fn bench_trace_wide_raw(c: &mut Criterion) { let mut group = c.benchmark_group("trace_wide_raw"); - for len in &[1, 10, 100, 1000, 10000] { group.bench_function(len.to_string(), |b| { b.iter(|| { @@ -37,30 +46,26 @@ fn bench_trace_wide_raw(c: &mut Criterion) { } group.finish(); + minitrace::flush(); } fn bench_trace_wide(c: &mut Criterion) { + init_minitrace(); + let mut group = c.benchmark_group("trace_wide"); for len in &[1, 10, 100, 1000, 10000] { - group.bench_function(format!("with-collect-{}", len), |b| { + group.bench_function(len.to_string(), |b| { b.iter(|| { let root = Span::root("root", SpanContext::new(TraceId(12), SpanId::default())); let _sg = root.set_local_parent(); dummy_iter(*len - 1); }) }); - group.bench_function(format!("without-collect-{}", len), |b| { - b.iter(|| { - let root: Span = - Span::root("root", SpanContext::new(TraceId(12), SpanId::default())); - let _sg = root.set_local_parent(); - dummy_iter(*len - 1); - }) - }); } group.finish(); + minitrace::flush() } fn bench_trace_deep_raw(c: &mut Criterion) { @@ -77,20 +82,16 @@ fn bench_trace_deep_raw(c: &mut Criterion) { } group.finish(); + minitrace::flush() } fn bench_trace_deep(c: &mut Criterion) { + init_minitrace(); + let mut group = c.benchmark_group("trace_deep"); for len in &[1, 10, 100, 1000] { - group.bench_function(format!("with-collect-{}", len), |b| { - b.iter(|| { - let root = Span::root("root", SpanContext::new(TraceId(12), SpanId::default())); - let _sg = root.set_local_parent(); - dummy_rec(*len - 1); - }) - }); - group.bench_function(format!("without-collect-{}", len), |b| { + group.bench_function(len.to_string(), |b| { b.iter(|| { let root = Span::root("root", SpanContext::new(TraceId(12), SpanId::default())); let _sg = root.set_local_parent(); @@ -100,9 +101,12 @@ fn bench_trace_deep(c: &mut Criterion) { } group.finish(); + minitrace::flush() } fn bench_trace_future(c: &mut Criterion) { + init_minitrace(); + async fn f(i: u32) { for _ in 0..i - 1 { async {}.enter_on_poll(black_box("")).await @@ -121,6 +125,7 @@ fn bench_trace_future(c: &mut Criterion) { } group.finish(); + minitrace::flush() } criterion_group!( diff --git a/minitrace/src/collector/global_collector.rs b/minitrace/src/collector/global_collector.rs index fdb34fb1..8d99fd7b 100644 --- a/minitrace/src/collector/global_collector.rs +++ b/minitrace/src/collector/global_collector.rs @@ -1,5 +1,6 @@ // Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0. +use std::cell::UnsafeCell; use std::collections::HashMap; use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicUsize; @@ -38,10 +39,10 @@ static SPSC_RXS: Lazy>>> = Lazy::new(|| Mutex static REPORTER_READY: AtomicBool = AtomicBool::new(false); thread_local! { - static COMMAND_SENDER: Sender = { + static COMMAND_SENDER: UnsafeCell> = { let (tx, rx) = spsc::bounded(10240); register_receiver(rx); - tx + UnsafeCell::new(tx) }; } @@ -50,12 +51,14 @@ fn register_receiver(rx: Receiver) { } fn send_command(cmd: CollectCommand) { - COMMAND_SENDER.try_with(|sender| sender.send(cmd).ok()).ok(); + COMMAND_SENDER + .try_with(|sender| unsafe { (*sender.get()).send(cmd).ok() }) + .ok(); } fn force_send_command(cmd: CollectCommand) { COMMAND_SENDER - .try_with(|sender| sender.force_send(cmd)) + .try_with(|sender| unsafe { (*sender.get()).force_send(cmd) }) .ok(); } @@ -251,10 +254,11 @@ impl GlobalCollector { Ok(Some(CollectCommand::CommitCollect(cmd))) => commit_collects.push(cmd), Ok(Some(CollectCommand::SubmitSpans(cmd))) => submit_spans.push(cmd), Ok(None) => { + // Channel is empty. return true; } Err(_) => { - // Channel disconnected. It must be because the sender thread has stopped. + // Channel closed. Remove it from the channel list. return false; } } diff --git a/minitrace/src/util/spsc.rs b/minitrace/src/util/spsc.rs index d7b57c66..84ba82a8 100644 --- a/minitrace/src/util/spsc.rs +++ b/minitrace/src/util/spsc.rs @@ -1,31 +1,28 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. -use std::sync::Arc; - -use parking_lot::Mutex; +use rtrb::Consumer; +use rtrb::Producer; +use rtrb::PushError; +use rtrb::RingBuffer; pub fn bounded(capacity: usize) -> (Sender, Receiver) { - let page = Arc::new(Mutex::new(Vec::with_capacity(capacity))); + let (tx, rx) = RingBuffer::new(capacity); ( Sender { - page: page.clone(), - capacity, - }, - Receiver { - page, - received: Vec::with_capacity(capacity), + tx, + pending_messages: Vec::new(), }, + Receiver { rx }, ) } pub struct Sender { - page: Arc>>, - capacity: usize, + tx: Producer, + pending_messages: Vec, } pub struct Receiver { - page: Arc>>, - received: Vec, + rx: Consumer, } #[derive(Debug)] @@ -35,41 +32,37 @@ pub struct ChannelFull; pub struct ChannelClosed; impl Sender { - pub fn send(&self, value: T) -> Result<(), ChannelFull> { - let mut page = self.page.lock(); - if page.len() < self.capacity { - page.push(value); - Ok(()) - } else { - Err(ChannelFull) + pub fn send(&mut self, value: T) -> Result<(), ChannelFull> { + while let Some(value) = self.pending_messages.pop() { + if let Err(PushError::Full(value)) = self.tx.push(value) { + self.pending_messages.push(value); + return Err(ChannelFull); + } } + + self.tx.push(value).map_err(|_| ChannelFull) } - pub fn force_send(&self, value: T) { - let mut page = self.page.lock(); - page.push(value); + pub fn force_send(&mut self, value: T) { + while let Some(value) = self.pending_messages.pop() { + if let Err(PushError::Full(value)) = self.tx.push(value) { + self.pending_messages.push(value); + break; + } + } + + if let Err(PushError::Full(value)) = self.tx.push(value) { + self.pending_messages.push(value); + } } } impl Receiver { pub fn try_recv(&mut self) -> Result, ChannelClosed> { - match self.received.pop() { - Some(val) => Ok(Some(val)), - None => { - let mut page = self.page.lock(); - std::mem::swap(&mut *page, &mut self.received); - match self.received.pop() { - Some(val) => Ok(Some(val)), - None => { - let is_disconnected = Arc::strong_count(&self.page) < 2; - if is_disconnected { - Err(ChannelClosed) - } else { - Ok(None) - } - } - } - } + match self.rx.pop() { + Ok(val) => Ok(Some(val)), + Err(_) if self.rx.is_abandoned() => Err(ChannelClosed), + Err(_) => Ok(None), } } } diff --git a/test-no-report/Cargo.toml b/test-statically-disable/Cargo.toml similarity index 85% rename from test-no-report/Cargo.toml rename to test-statically-disable/Cargo.toml index 04ca22ff..44836797 100644 --- a/test-no-report/Cargo.toml +++ b/test-statically-disable/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "test-no-report" +name = "test-statically-disable" version = "0.0.1" authors = ["The TiKV Project Authors"] license = "Apache-2.0" diff --git a/test-no-report/src/main.rs b/test-statically-disable/src/main.rs similarity index 98% rename from test-no-report/src/main.rs rename to test-statically-disable/src/main.rs index c11fec7b..6a5dccc6 100644 --- a/test-no-report/src/main.rs +++ b/test-statically-disable/src/main.rs @@ -2,7 +2,7 @@ // The libraries may have tracing instrument embedded in the code for tracing purposes. However, // if the executable does not enable minitrace, it will be statically disabled. This results in -// zero overhead to the libraries, achieved through conditional compilation with the "report" feature. +// zero overhead to the libraries, achieved through conditional compilation with the "enable" feature. // // The following test is designed to confirm that minitrace compiles when it's statically disabled in the executable.