Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add Config::report_before_root_finish #204

Merged
merged 2 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## Unreleased

- Add `LocalSpan::add_property` and `LocalSpan::add_properties`.
- Add `Config::report_before_root_finish`.

## v0.6.3

Expand Down
223 changes: 128 additions & 95 deletions minitrace/src/collector/global_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,18 @@ enum SpanCollection {
},
}

#[derive(Default)]
struct ActiveCollector {
span_collections: Vec<SpanCollection>,
span_count: usize,
dangling_events: HashMap<SpanId, Vec<EventRecord>>,
}

pub(crate) struct GlobalCollector {
config: Config,
reporter: Option<Box<dyn Reporter>>,

active_collectors: HashMap<usize, (Vec<SpanCollection>, usize)>,
active_collectors: HashMap<usize, ActiveCollector>,
committed_records: Vec<SpanRecord>,
last_report: Instant,

Expand All @@ -201,7 +208,6 @@ pub(crate) struct GlobalCollector {
drop_collects: Vec<DropCollect>,
commit_collects: Vec<CommitCollect>,
submit_spans: Vec<SubmitSpans>,
dangling_events: HashMap<SpanId, Vec<EventRecord>>,
}

impl GlobalCollector {
Expand Down Expand Up @@ -242,7 +248,6 @@ impl GlobalCollector {
drop_collects: Vec::new(),
commit_collects: Vec::new(),
submit_spans: Vec::new(),
dangling_events: HashMap::new(),
}
}

Expand All @@ -251,7 +256,6 @@ impl GlobalCollector {
debug_assert!(self.drop_collects.is_empty());
debug_assert!(self.commit_collects.is_empty());
debug_assert!(self.submit_spans.is_empty());
debug_assert!(self.dangling_events.is_empty());

let start_collects = &mut self.start_collects;
let drop_collects = &mut self.drop_collects;
Expand Down Expand Up @@ -290,7 +294,8 @@ impl GlobalCollector {
}

for StartCollect { collect_id } in self.start_collects.drain(..) {
self.active_collectors.insert(collect_id, (Vec::new(), 0));
self.active_collectors
.insert(collect_id, ActiveCollector::default());
}

for DropCollect { collect_id } in self.drop_collects.drain(..) {
Expand All @@ -306,114 +311,66 @@ impl GlobalCollector {

if collect_token.len() == 1 {
let item = collect_token[0];
if let Some((buf, span_count)) = self.active_collectors.get_mut(&item.collect_id) {
if *span_count < self.config.max_spans_per_trace.unwrap_or(usize::MAX)
if let Some(active_collector) = self.active_collectors.get_mut(&item.collect_id) {
if active_collector.span_count
< self.config.max_spans_per_trace.unwrap_or(usize::MAX)
|| item.is_root
{
*span_count += spans.len();
buf.push(SpanCollection::Owned {
spans,
trace_id: item.trace_id,
parent_id: item.parent_id,
});
active_collector.span_count += spans.len();
active_collector
.span_collections
.push(SpanCollection::Owned {
spans,
trace_id: item.trace_id,
parent_id: item.parent_id,
});
}
}
} else {
let spans = Arc::new(spans);
for item in collect_token.iter() {
if let Some((buf, span_count)) =
self.active_collectors.get_mut(&item.collect_id)
if let Some(active_collector) = self.active_collectors.get_mut(&item.collect_id)
{
// Multiple items in a collect token are built from `Span::enter_from_parents`,
// so relative span cannot be a root span.
if *span_count < self.config.max_spans_per_trace.unwrap_or(usize::MAX) {
*span_count += spans.len();
buf.push(SpanCollection::Shared {
spans: spans.clone(),
trace_id: item.trace_id,
parent_id: item.parent_id,
});
if active_collector.span_count
< self.config.max_spans_per_trace.unwrap_or(usize::MAX)
{
active_collector.span_count += spans.len();
active_collector
.span_collections
.push(SpanCollection::Shared {
spans: spans.clone(),
trace_id: item.trace_id,
parent_id: item.parent_id,
});
}
}
}
}
}

let anchor = Anchor::new();

for CommitCollect { collect_id } in commit_collects.drain(..) {
if let Some((span_collections, _)) = self.active_collectors.remove(&collect_id) {
debug_assert!(self.dangling_events.is_empty());
let dangling_events = &mut self.dangling_events;

let anchor: Anchor = Anchor::new();
let committed_len = committed_records.len();

for span_collection in span_collections {
match span_collection {
SpanCollection::Owned {
spans,
trace_id,
parent_id,
} => match spans {
SpanSet::Span(raw_span) => amend_span(
&raw_span,
trace_id,
parent_id,
committed_records,
dangling_events,
&anchor,
),
SpanSet::LocalSpansInner(local_spans) => amend_local_span(
&local_spans,
trace_id,
parent_id,
committed_records,
dangling_events,
&anchor,
),
SpanSet::SharedLocalSpans(local_spans) => amend_local_span(
&local_spans,
trace_id,
parent_id,
committed_records,
dangling_events,
&anchor,
),
},
SpanCollection::Shared {
spans,
trace_id,
parent_id,
} => match &*spans {
SpanSet::Span(raw_span) => amend_span(
raw_span,
trace_id,
parent_id,
committed_records,
dangling_events,
&anchor,
),
SpanSet::LocalSpansInner(local_spans) => amend_local_span(
local_spans,
trace_id,
parent_id,
committed_records,
dangling_events,
&anchor,
),
SpanSet::SharedLocalSpans(local_spans) => amend_local_span(
local_spans,
trace_id,
parent_id,
committed_records,
dangling_events,
&anchor,
),
},
}
}
if let Some(mut active_collector) = self.active_collectors.remove(&collect_id) {
postprocess_span_collection(
active_collector.span_collections,
&anchor,
committed_records,
&mut active_collector.dangling_events,
);
}
}

mount_events(&mut committed_records[committed_len..], dangling_events);
dangling_events.clear();
if self.config.report_before_root_finish {
for active_collector in self.active_collectors.values_mut() {
postprocess_span_collection(
active_collector.span_collections.drain(..),
&anchor,
committed_records,
&mut active_collector.dangling_events,
);
}
}

Expand Down Expand Up @@ -448,6 +405,82 @@ impl LocalSpansInner {
}
}

fn postprocess_span_collection(
span_collections: impl IntoIterator<Item = SpanCollection>,
anchor: &Anchor,
committed_records: &mut Vec<SpanRecord>,
dangling_events: &mut HashMap<SpanId, Vec<EventRecord>>,
) {
let committed_len = committed_records.len();

for span_collection in span_collections {
match span_collection {
SpanCollection::Owned {
spans,
trace_id,
parent_id,
} => match spans {
SpanSet::Span(raw_span) => amend_span(
&raw_span,
trace_id,
parent_id,
committed_records,
dangling_events,
anchor,
),
SpanSet::LocalSpansInner(local_spans) => amend_local_span(
&local_spans,
trace_id,
parent_id,
committed_records,
dangling_events,
anchor,
),
SpanSet::SharedLocalSpans(local_spans) => amend_local_span(
&local_spans,
trace_id,
parent_id,
committed_records,
dangling_events,
anchor,
),
},
SpanCollection::Shared {
spans,
trace_id,
parent_id,
} => match &*spans {
SpanSet::Span(raw_span) => amend_span(
raw_span,
trace_id,
parent_id,
committed_records,
dangling_events,
anchor,
),
SpanSet::LocalSpansInner(local_spans) => amend_local_span(
local_spans,
trace_id,
parent_id,
committed_records,
dangling_events,
anchor,
),
SpanSet::SharedLocalSpans(local_spans) => amend_local_span(
local_spans,
trace_id,
parent_id,
committed_records,
dangling_events,
anchor,
),
},
}
}

mount_events(&mut committed_records[committed_len..], dangling_events);
}

fn amend_local_span(
local_spans: &LocalSpansInner,
trace_id: TraceId,
Expand Down
21 changes: 21 additions & 0 deletions minitrace/src/collector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ pub struct Config {
pub(crate) max_spans_per_trace: Option<usize>,
pub(crate) batch_report_interval: Duration,
pub(crate) batch_report_max_spans: Option<usize>,
pub(crate) report_before_root_finish: bool,
}

impl Config {
Expand Down Expand Up @@ -346,6 +347,25 @@ impl Config {
..self
}
}

/// Whether to report the spans before the root span finishes.
///
/// The default value is `false`.
///
/// # Examples
///
/// ```
/// use minitrace::collector::Config;
///
/// let config = Config::default().report_before_root_finish(true);
/// minitrace::set_reporter(minitrace::collector::ConsoleReporter, config);
/// ```
pub fn report_before_root_finish(self, report_before_root_finish: bool) -> Self {
Self {
report_before_root_finish,
..self
}
}
}

impl Default for Config {
Expand All @@ -354,6 +374,7 @@ impl Default for Config {
max_spans_per_trace: None,
batch_report_interval: Duration::from_millis(500),
batch_report_max_spans: None,
report_before_root_finish: false,
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion minitrace/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@
//!
//! When the root span is dropped, all of its children spans and itself will be reported at once.
//! Since that, it's recommended to create root spans for short tasks, such as handling a request,
//! just like the example below. Otherwise, an endingless trace will never be reported.
//! just like the example below. Otherwise, an endingless trace will never be reported. To override
//! this behavior, set the `report_before_root_finish` option to `true` in the [`Config`].
//!
//! ```
//! use minitrace::collector::Config;
Expand Down
3 changes: 2 additions & 1 deletion test-statically-disable/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ fn main() {
Config::default()
.batch_report_interval(Duration::from_secs(1))
.max_spans_per_trace(Some(100))
.batch_report_max_spans(Some(200)),
.batch_report_max_spans(Some(200))
.report_before_root_finish(true),
);

let mut root = Span::root("root", SpanContext::new(TraceId(0), SpanId(0)))
Expand Down
Loading