Skip to content

Commit

Permalink
improve: use ringbuffer to send spans
Browse files Browse the repository at this point in the history
Signed-off-by: Andy Lok <[email protected]>
  • Loading branch information
andylokandy committed Sep 18, 2023
1 parent 0ca8cc8 commit 5de13b0
Show file tree
Hide file tree
Showing 8 changed files with 191 additions and 123 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ members = [
"minitrace-jaeger",
"minitrace-datadog",
"minitrace-opentelemetry",
"test-no-report",
"test-statically-disable",
]

[profile.bench]
Expand Down
1 change: 1 addition & 0 deletions minitrace/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pin-project = "1"
# TODO: Remove once_cell once #![feature(once_cell)] is stabilized
once_cell = "1"
rand = "0.8"
rtrb = "0.2"

[dev-dependencies]
# The procedural macro `trace` only supports async-trait higher than 0.1.52
Expand Down
177 changes: 121 additions & 56 deletions minitrace/benches/spsc.rs
Original file line number Diff line number Diff line change
@@ -1,65 +1,85 @@
// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0.

use std::time::Duration;
use std::time::Instant;

use criterion::criterion_group;
use criterion::criterion_main;
use criterion::Criterion;

fn crossbeam(nmsg: usize) {
let (tx, rx) = crossbeam::channel::bounded(10240);

crossbeam::scope(|scope| {
scope.spawn(|_| {
for i in 0..nmsg {
tx.send(i).unwrap();
}
});

for _ in 0..nmsg {
while rx.try_recv().is_ok() {}
}
})
.unwrap();
}

fn crossbeam_send_only(nmsg: usize) {
let (tx, _rx) = crossbeam::channel::bounded(10240);

for i in 0..nmsg {
tx.send(i).unwrap();
}
}

fn minitrace(nmsg: usize) {
let (tx, mut rx) = minitrace::util::spsc::bounded(10240);

crossbeam::scope(|scope| {
scope.spawn(|_| {
for i in 0..nmsg {
tx.send(i).unwrap();
}
});

for _ in 0..nmsg {
while rx.try_recv().unwrap().is_some() {}
}
})
.unwrap();
}

fn minitrace_send_only(nmsg: usize) {
let (tx, _rx) = minitrace::util::spsc::bounded(10240);

for i in 0..nmsg {
tx.send(i).unwrap();
}
}

fn spsc_comparison(c: &mut Criterion) {
let mut bgroup = c.benchmark_group("spsc channel");

for len in &[1, 10, 100, 1000, 10000] {
bgroup.bench_function(format!("crossbeam/{}", len), |b| b.iter(|| crossbeam(*len)));
bgroup.bench_function(format!("minitrace/{}", len), |b| b.iter(|| minitrace(*len)));
for &len in &[1, 10, 100, 1000, 10000] {
bgroup.bench_function(format!("crossbeam/{}", len), |b| {
b.iter_custom(|iters| {
let mut total_time = Duration::default();
for _ in 0..iters {
let (tx, rx) = crossbeam::channel::bounded(10240);

let start = Instant::now();

std::thread::spawn(move || {
for i in 0..len {
while tx.try_send(i).is_err() {}
}
});

for _ in 0..len {
while rx.try_recv().is_err() {}
}

total_time += start.elapsed();
}
total_time
})
});
bgroup.bench_function(format!("ringbuffer/{}", len), |b| {
b.iter_custom(|iters| {
let mut total_time = Duration::default();
for _ in 0..iters {
let (mut tx, mut rx) = rtrb::RingBuffer::new(10240);

let start = Instant::now();

std::thread::spawn(move || {
for i in 0..len {
while tx.push(i).is_err() {}
}
});

for _ in 0..len {
while rx.pop().is_err() {}
}

total_time += start.elapsed();
}
total_time
})
});
bgroup.bench_function(format!("minitrace/{}", len), |b| {
b.iter_custom(|iters| {
let mut total_time = Duration::default();
for _ in 0..iters {
let (mut tx, mut rx) = minitrace::util::spsc::bounded(10240);

let start = Instant::now();

std::thread::spawn(move || {
for i in 0..len {
while tx.send(i).is_err() {}
}
});

for _ in 0..len {
while rx.try_recv().is_err() {}
}

total_time += start.elapsed();
}
total_time
})
});
}

bgroup.finish();
Expand All @@ -68,12 +88,57 @@ fn spsc_comparison(c: &mut Criterion) {
fn spsc_send_only_comparison(c: &mut Criterion) {
let mut bgroup = c.benchmark_group("spsc channel send only");

for len in &[1, 10, 100, 1000, 10000] {
for &len in &[1, 10, 100, 1000, 10000] {
bgroup.bench_function(format!("crossbeam/{}", len), |b| {
b.iter(|| crossbeam_send_only(*len))
b.iter_custom(|iters| {
let mut total_time = Duration::default();
for _ in 0..iters {
let (tx, _rx) = crossbeam::channel::bounded(10240);

let start = Instant::now();

for i in 0..len {
tx.send(i).unwrap();
}

total_time += start.elapsed();
}
total_time
})
});
bgroup.bench_function(format!("ringbuffer/{}", len), |b| {
b.iter_custom(|iters| {
let mut total_time = Duration::default();
for _ in 0..iters {
let (mut tx, _rx) = rtrb::RingBuffer::new(10240);

let start = Instant::now();

for i in 0..len {
tx.push(i).unwrap();
}

total_time += start.elapsed();
}
total_time
})
});
bgroup.bench_function(format!("minitrace/{}", len), |b| {
b.iter(|| minitrace_send_only(*len))
b.iter_custom(|iters| {
let mut total_time = Duration::default();
for _ in 0..iters {
let (mut tx, _rx) = minitrace::util::spsc::bounded(10240);

let start = Instant::now();

for i in 0..len {
tx.send(i).unwrap();
}

total_time += start.elapsed();
}
total_time
})
});
}

Expand Down
41 changes: 23 additions & 18 deletions minitrace/benches/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@ use criterion::Criterion;
use minitrace::local::LocalCollector;
use minitrace::prelude::*;

fn init_minitrace() {
struct DummyReporter;

impl minitrace::collector::Reporter for DummyReporter {
fn report(&mut self, _spans: &[minitrace::prelude::SpanRecord]) {}
}

minitrace::set_reporter(DummyReporter, minitrace::collector::Config::default());
}

fn dummy_iter(i: usize) {
#[trace]
fn dummy() {}
Expand All @@ -25,7 +35,6 @@ fn dummy_rec(i: usize) {

fn bench_trace_wide_raw(c: &mut Criterion) {
let mut group = c.benchmark_group("trace_wide_raw");

for len in &[1, 10, 100, 1000, 10000] {
group.bench_function(len.to_string(), |b| {
b.iter(|| {
Expand All @@ -37,30 +46,26 @@ fn bench_trace_wide_raw(c: &mut Criterion) {
}

group.finish();
minitrace::flush();
}

fn bench_trace_wide(c: &mut Criterion) {
init_minitrace();

let mut group = c.benchmark_group("trace_wide");

for len in &[1, 10, 100, 1000, 10000] {
group.bench_function(format!("with-collect-{}", len), |b| {
group.bench_function(len.to_string(), |b| {
b.iter(|| {
let root = Span::root("root", SpanContext::new(TraceId(12), SpanId::default()));
let _sg = root.set_local_parent();
dummy_iter(*len - 1);
})
});
group.bench_function(format!("without-collect-{}", len), |b| {
b.iter(|| {
let root: Span =
Span::root("root", SpanContext::new(TraceId(12), SpanId::default()));
let _sg = root.set_local_parent();
dummy_iter(*len - 1);
})
});
}

group.finish();
minitrace::flush()
}

fn bench_trace_deep_raw(c: &mut Criterion) {
Expand All @@ -77,20 +82,16 @@ fn bench_trace_deep_raw(c: &mut Criterion) {
}

group.finish();
minitrace::flush()
}

fn bench_trace_deep(c: &mut Criterion) {
init_minitrace();

let mut group = c.benchmark_group("trace_deep");

for len in &[1, 10, 100, 1000] {
group.bench_function(format!("with-collect-{}", len), |b| {
b.iter(|| {
let root = Span::root("root", SpanContext::new(TraceId(12), SpanId::default()));
let _sg = root.set_local_parent();
dummy_rec(*len - 1);
})
});
group.bench_function(format!("without-collect-{}", len), |b| {
group.bench_function(len.to_string(), |b| {
b.iter(|| {
let root = Span::root("root", SpanContext::new(TraceId(12), SpanId::default()));
let _sg = root.set_local_parent();
Expand All @@ -100,9 +101,12 @@ fn bench_trace_deep(c: &mut Criterion) {
}

group.finish();
minitrace::flush()
}

fn bench_trace_future(c: &mut Criterion) {
init_minitrace();

async fn f(i: u32) {
for _ in 0..i - 1 {
async {}.enter_on_poll(black_box("")).await
Expand All @@ -121,6 +125,7 @@ fn bench_trace_future(c: &mut Criterion) {
}

group.finish();
minitrace::flush()
}

criterion_group!(
Expand Down
14 changes: 9 additions & 5 deletions minitrace/src/collector/global_collector.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0.

use std::cell::UnsafeCell;
use std::collections::HashMap;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicUsize;
Expand Down Expand Up @@ -38,10 +39,10 @@ static SPSC_RXS: Lazy<Mutex<Vec<Receiver<CollectCommand>>>> = Lazy::new(|| Mutex
static REPORTER_READY: AtomicBool = AtomicBool::new(false);

thread_local! {
static COMMAND_SENDER: Sender<CollectCommand> = {
static COMMAND_SENDER: UnsafeCell<Sender<CollectCommand>> = {
let (tx, rx) = spsc::bounded(10240);
register_receiver(rx);
tx
UnsafeCell::new(tx)
};
}

Expand All @@ -50,12 +51,14 @@ fn register_receiver(rx: Receiver<CollectCommand>) {
}

fn send_command(cmd: CollectCommand) {
COMMAND_SENDER.try_with(|sender| sender.send(cmd).ok()).ok();
COMMAND_SENDER
.try_with(|sender| unsafe { (*sender.get()).send(cmd).ok() })
.ok();
}

fn force_send_command(cmd: CollectCommand) {
COMMAND_SENDER
.try_with(|sender| sender.force_send(cmd))
.try_with(|sender| unsafe { (*sender.get()).force_send(cmd) })
.ok();
}

Expand Down Expand Up @@ -251,10 +254,11 @@ impl GlobalCollector {
Ok(Some(CollectCommand::CommitCollect(cmd))) => commit_collects.push(cmd),
Ok(Some(CollectCommand::SubmitSpans(cmd))) => submit_spans.push(cmd),
Ok(None) => {
// Channel is empty.
return true;
}
Err(_) => {
// Channel disconnected. It must be because the sender thread has stopped.
// Channel closed. Remove it from the channel list.
return false;
}
}
Expand Down
Loading

0 comments on commit 5de13b0

Please sign in to comment.