Skip to content

Commit

Permalink
add Config::report_before_root_finish (#204)
Browse files Browse the repository at this point in the history
* add Config::report_before_root_finish

Signed-off-by: Andy Lok <[email protected]>

* fix

Signed-off-by: Andy Lok <[email protected]>

---------

Signed-off-by: Andy Lok <[email protected]>
  • Loading branch information
andylokandy authored Feb 7, 2024
1 parent 34fccc7 commit 8a52751
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 97 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## Unreleased

- Add `LocalSpan::add_property` and `LocalSpan::add_properties`.
- Add `Config::report_before_root_finish`.

## v0.6.3

Expand Down
223 changes: 128 additions & 95 deletions minitrace/src/collector/global_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,18 @@ enum SpanCollection {
},
}

#[derive(Default)]
struct ActiveCollector {
span_collections: Vec<SpanCollection>,
span_count: usize,
dangling_events: HashMap<SpanId, Vec<EventRecord>>,
}

pub(crate) struct GlobalCollector {
config: Config,
reporter: Option<Box<dyn Reporter>>,

active_collectors: HashMap<usize, (Vec<SpanCollection>, usize)>,
active_collectors: HashMap<usize, ActiveCollector>,
committed_records: Vec<SpanRecord>,
last_report: Instant,

Expand All @@ -201,7 +208,6 @@ pub(crate) struct GlobalCollector {
drop_collects: Vec<DropCollect>,
commit_collects: Vec<CommitCollect>,
submit_spans: Vec<SubmitSpans>,
dangling_events: HashMap<SpanId, Vec<EventRecord>>,
}

impl GlobalCollector {
Expand Down Expand Up @@ -242,7 +248,6 @@ impl GlobalCollector {
drop_collects: Vec::new(),
commit_collects: Vec::new(),
submit_spans: Vec::new(),
dangling_events: HashMap::new(),
}
}

Expand All @@ -251,7 +256,6 @@ impl GlobalCollector {
debug_assert!(self.drop_collects.is_empty());
debug_assert!(self.commit_collects.is_empty());
debug_assert!(self.submit_spans.is_empty());
debug_assert!(self.dangling_events.is_empty());

let start_collects = &mut self.start_collects;
let drop_collects = &mut self.drop_collects;
Expand Down Expand Up @@ -290,7 +294,8 @@ impl GlobalCollector {
}

for StartCollect { collect_id } in self.start_collects.drain(..) {
self.active_collectors.insert(collect_id, (Vec::new(), 0));
self.active_collectors
.insert(collect_id, ActiveCollector::default());
}

for DropCollect { collect_id } in self.drop_collects.drain(..) {
Expand All @@ -306,114 +311,66 @@ impl GlobalCollector {

if collect_token.len() == 1 {
let item = collect_token[0];
if let Some((buf, span_count)) = self.active_collectors.get_mut(&item.collect_id) {
if *span_count < self.config.max_spans_per_trace.unwrap_or(usize::MAX)
if let Some(active_collector) = self.active_collectors.get_mut(&item.collect_id) {
if active_collector.span_count
< self.config.max_spans_per_trace.unwrap_or(usize::MAX)
|| item.is_root
{
*span_count += spans.len();
buf.push(SpanCollection::Owned {
spans,
trace_id: item.trace_id,
parent_id: item.parent_id,
});
active_collector.span_count += spans.len();
active_collector
.span_collections
.push(SpanCollection::Owned {
spans,
trace_id: item.trace_id,
parent_id: item.parent_id,
});
}
}
} else {
let spans = Arc::new(spans);
for item in collect_token.iter() {
if let Some((buf, span_count)) =
self.active_collectors.get_mut(&item.collect_id)
if let Some(active_collector) = self.active_collectors.get_mut(&item.collect_id)
{
// Multiple items in a collect token are built from `Span::enter_from_parents`,
// so relative span cannot be a root span.
if *span_count < self.config.max_spans_per_trace.unwrap_or(usize::MAX) {
*span_count += spans.len();
buf.push(SpanCollection::Shared {
spans: spans.clone(),
trace_id: item.trace_id,
parent_id: item.parent_id,
});
if active_collector.span_count
< self.config.max_spans_per_trace.unwrap_or(usize::MAX)
{
active_collector.span_count += spans.len();
active_collector
.span_collections
.push(SpanCollection::Shared {
spans: spans.clone(),
trace_id: item.trace_id,
parent_id: item.parent_id,
});
}
}
}
}
}

let anchor = Anchor::new();

for CommitCollect { collect_id } in commit_collects.drain(..) {
if let Some((span_collections, _)) = self.active_collectors.remove(&collect_id) {
debug_assert!(self.dangling_events.is_empty());
let dangling_events = &mut self.dangling_events;

let anchor: Anchor = Anchor::new();
let committed_len = committed_records.len();

for span_collection in span_collections {
match span_collection {
SpanCollection::Owned {
spans,
trace_id,
parent_id,
} => match spans {
SpanSet::Span(raw_span) => amend_span(
&raw_span,
trace_id,
parent_id,
committed_records,
dangling_events,
&anchor,
),
SpanSet::LocalSpansInner(local_spans) => amend_local_span(
&local_spans,
trace_id,
parent_id,
committed_records,
dangling_events,
&anchor,
),
SpanSet::SharedLocalSpans(local_spans) => amend_local_span(
&local_spans,
trace_id,
parent_id,
committed_records,
dangling_events,
&anchor,
),
},
SpanCollection::Shared {
spans,
trace_id,
parent_id,
} => match &*spans {
SpanSet::Span(raw_span) => amend_span(
raw_span,
trace_id,
parent_id,
committed_records,
dangling_events,
&anchor,
),
SpanSet::LocalSpansInner(local_spans) => amend_local_span(
local_spans,
trace_id,
parent_id,
committed_records,
dangling_events,
&anchor,
),
SpanSet::SharedLocalSpans(local_spans) => amend_local_span(
local_spans,
trace_id,
parent_id,
committed_records,
dangling_events,
&anchor,
),
},
}
}
if let Some(mut active_collector) = self.active_collectors.remove(&collect_id) {
postprocess_span_collection(
active_collector.span_collections,
&anchor,
committed_records,
&mut active_collector.dangling_events,
);
}
}

mount_events(&mut committed_records[committed_len..], dangling_events);
dangling_events.clear();
if self.config.report_before_root_finish {
for active_collector in self.active_collectors.values_mut() {
postprocess_span_collection(
active_collector.span_collections.drain(..),
&anchor,
committed_records,
&mut active_collector.dangling_events,
);
}
}

Expand Down Expand Up @@ -448,6 +405,82 @@ impl LocalSpansInner {
}
}

fn postprocess_span_collection(
span_collections: impl IntoIterator<Item = SpanCollection>,
anchor: &Anchor,
committed_records: &mut Vec<SpanRecord>,
dangling_events: &mut HashMap<SpanId, Vec<EventRecord>>,
) {
let committed_len = committed_records.len();

for span_collection in span_collections {
match span_collection {
SpanCollection::Owned {
spans,
trace_id,
parent_id,
} => match spans {
SpanSet::Span(raw_span) => amend_span(
&raw_span,
trace_id,
parent_id,
committed_records,
dangling_events,
anchor,
),
SpanSet::LocalSpansInner(local_spans) => amend_local_span(
&local_spans,
trace_id,
parent_id,
committed_records,
dangling_events,
anchor,
),
SpanSet::SharedLocalSpans(local_spans) => amend_local_span(
&local_spans,
trace_id,
parent_id,
committed_records,
dangling_events,
anchor,
),
},
SpanCollection::Shared {
spans,
trace_id,
parent_id,
} => match &*spans {
SpanSet::Span(raw_span) => amend_span(
raw_span,
trace_id,
parent_id,
committed_records,
dangling_events,
anchor,
),
SpanSet::LocalSpansInner(local_spans) => amend_local_span(
local_spans,
trace_id,
parent_id,
committed_records,
dangling_events,
anchor,
),
SpanSet::SharedLocalSpans(local_spans) => amend_local_span(
local_spans,
trace_id,
parent_id,
committed_records,
dangling_events,
anchor,
),
},
}
}

mount_events(&mut committed_records[committed_len..], dangling_events);
}

fn amend_local_span(
local_spans: &LocalSpansInner,
trace_id: TraceId,
Expand Down
21 changes: 21 additions & 0 deletions minitrace/src/collector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ pub struct Config {
pub(crate) max_spans_per_trace: Option<usize>,
pub(crate) batch_report_interval: Duration,
pub(crate) batch_report_max_spans: Option<usize>,
pub(crate) report_before_root_finish: bool,
}

impl Config {
Expand Down Expand Up @@ -346,6 +347,25 @@ impl Config {
..self
}
}

/// Whether to report the spans before the root span finishes.
///
/// The default value is `false`.
///
/// # Examples
///
/// ```
/// use minitrace::collector::Config;
///
/// let config = Config::default().report_before_root_finish(true);
/// minitrace::set_reporter(minitrace::collector::ConsoleReporter, config);
/// ```
pub fn report_before_root_finish(self, report_before_root_finish: bool) -> Self {
Self {
report_before_root_finish,
..self
}
}
}

impl Default for Config {
Expand All @@ -354,6 +374,7 @@ impl Default for Config {
max_spans_per_trace: None,
batch_report_interval: Duration::from_millis(500),
batch_report_max_spans: None,
report_before_root_finish: false,
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion minitrace/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@
//!
//! When the root span is dropped, all of its children spans and itself will be reported at once.
//! Since that, it's recommended to create root spans for short tasks, such as handling a request,
//! just like the example below. Otherwise, an endingless trace will never be reported.
//! just like the example below. Otherwise, an endingless trace will never be reported. To override
//! this behavior, set the `report_before_root_finish` option to `true` in the [`Config`].
//!
//! ```
//! use minitrace::collector::Config;
Expand Down
3 changes: 2 additions & 1 deletion test-statically-disable/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ fn main() {
Config::default()
.batch_report_interval(Duration::from_secs(1))
.max_spans_per_trace(Some(100))
.batch_report_max_spans(Some(200)),
.batch_report_max_spans(Some(200))
.report_before_root_finish(true),
);

let mut root = Span::root("root", SpanContext::new(TraceId(0), SpanId(0)))
Expand Down

0 comments on commit 8a52751

Please sign in to comment.