From 8a527511631694a54e3664aceee1dc19af7ab423 Mon Sep 17 00:00:00 2001 From: Andy Lok Date: Wed, 7 Feb 2024 16:38:21 +0800 Subject: [PATCH] add Config::report_before_root_finish (#204) * add Config::report_before_root_finish Signed-off-by: Andy Lok * fix Signed-off-by: Andy Lok --------- Signed-off-by: Andy Lok --- CHANGELOG.md | 1 + minitrace/src/collector/global_collector.rs | 223 +++++++++++--------- minitrace/src/collector/mod.rs | 21 ++ minitrace/src/lib.rs | 3 +- test-statically-disable/src/main.rs | 3 +- 5 files changed, 154 insertions(+), 97 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e020ae31..771ffeed 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## Unreleased - Add `LocalSpan::add_property` and `LocalSpan::add_properties`. +- Add `Config::report_before_root_finish`. ## v0.6.3 diff --git a/minitrace/src/collector/global_collector.rs b/minitrace/src/collector/global_collector.rs index 06b61e3d..69717935 100644 --- a/minitrace/src/collector/global_collector.rs +++ b/minitrace/src/collector/global_collector.rs @@ -188,11 +188,18 @@ enum SpanCollection { }, } +#[derive(Default)] +struct ActiveCollector { + span_collections: Vec, + span_count: usize, + dangling_events: HashMap>, +} + pub(crate) struct GlobalCollector { config: Config, reporter: Option>, - active_collectors: HashMap, usize)>, + active_collectors: HashMap, committed_records: Vec, last_report: Instant, @@ -201,7 +208,6 @@ pub(crate) struct GlobalCollector { drop_collects: Vec, commit_collects: Vec, submit_spans: Vec, - dangling_events: HashMap>, } impl GlobalCollector { @@ -242,7 +248,6 @@ impl GlobalCollector { drop_collects: Vec::new(), commit_collects: Vec::new(), submit_spans: Vec::new(), - dangling_events: HashMap::new(), } } @@ -251,7 +256,6 @@ impl GlobalCollector { debug_assert!(self.drop_collects.is_empty()); debug_assert!(self.commit_collects.is_empty()); debug_assert!(self.submit_spans.is_empty()); - debug_assert!(self.dangling_events.is_empty()); let start_collects = &mut self.start_collects; let drop_collects = &mut self.drop_collects; @@ -290,7 +294,8 @@ impl GlobalCollector { } for StartCollect { collect_id } in self.start_collects.drain(..) { - self.active_collectors.insert(collect_id, (Vec::new(), 0)); + self.active_collectors + .insert(collect_id, ActiveCollector::default()); } for DropCollect { collect_id } in self.drop_collects.drain(..) { @@ -306,114 +311,66 @@ impl GlobalCollector { if collect_token.len() == 1 { let item = collect_token[0]; - if let Some((buf, span_count)) = self.active_collectors.get_mut(&item.collect_id) { - if *span_count < self.config.max_spans_per_trace.unwrap_or(usize::MAX) + if let Some(active_collector) = self.active_collectors.get_mut(&item.collect_id) { + if active_collector.span_count + < self.config.max_spans_per_trace.unwrap_or(usize::MAX) || item.is_root { - *span_count += spans.len(); - buf.push(SpanCollection::Owned { - spans, - trace_id: item.trace_id, - parent_id: item.parent_id, - }); + active_collector.span_count += spans.len(); + active_collector + .span_collections + .push(SpanCollection::Owned { + spans, + trace_id: item.trace_id, + parent_id: item.parent_id, + }); } } } else { let spans = Arc::new(spans); for item in collect_token.iter() { - if let Some((buf, span_count)) = - self.active_collectors.get_mut(&item.collect_id) + if let Some(active_collector) = self.active_collectors.get_mut(&item.collect_id) { // Multiple items in a collect token are built from `Span::enter_from_parents`, // so relative span cannot be a root span. - if *span_count < self.config.max_spans_per_trace.unwrap_or(usize::MAX) { - *span_count += spans.len(); - buf.push(SpanCollection::Shared { - spans: spans.clone(), - trace_id: item.trace_id, - parent_id: item.parent_id, - }); + if active_collector.span_count + < self.config.max_spans_per_trace.unwrap_or(usize::MAX) + { + active_collector.span_count += spans.len(); + active_collector + .span_collections + .push(SpanCollection::Shared { + spans: spans.clone(), + trace_id: item.trace_id, + parent_id: item.parent_id, + }); } } } } } + let anchor = Anchor::new(); + for CommitCollect { collect_id } in commit_collects.drain(..) { - if let Some((span_collections, _)) = self.active_collectors.remove(&collect_id) { - debug_assert!(self.dangling_events.is_empty()); - let dangling_events = &mut self.dangling_events; - - let anchor: Anchor = Anchor::new(); - let committed_len = committed_records.len(); - - for span_collection in span_collections { - match span_collection { - SpanCollection::Owned { - spans, - trace_id, - parent_id, - } => match spans { - SpanSet::Span(raw_span) => amend_span( - &raw_span, - trace_id, - parent_id, - committed_records, - dangling_events, - &anchor, - ), - SpanSet::LocalSpansInner(local_spans) => amend_local_span( - &local_spans, - trace_id, - parent_id, - committed_records, - dangling_events, - &anchor, - ), - SpanSet::SharedLocalSpans(local_spans) => amend_local_span( - &local_spans, - trace_id, - parent_id, - committed_records, - dangling_events, - &anchor, - ), - }, - SpanCollection::Shared { - spans, - trace_id, - parent_id, - } => match &*spans { - SpanSet::Span(raw_span) => amend_span( - raw_span, - trace_id, - parent_id, - committed_records, - dangling_events, - &anchor, - ), - SpanSet::LocalSpansInner(local_spans) => amend_local_span( - local_spans, - trace_id, - parent_id, - committed_records, - dangling_events, - &anchor, - ), - SpanSet::SharedLocalSpans(local_spans) => amend_local_span( - local_spans, - trace_id, - parent_id, - committed_records, - dangling_events, - &anchor, - ), - }, - } - } + if let Some(mut active_collector) = self.active_collectors.remove(&collect_id) { + postprocess_span_collection( + active_collector.span_collections, + &anchor, + committed_records, + &mut active_collector.dangling_events, + ); + } + } - mount_events(&mut committed_records[committed_len..], dangling_events); - dangling_events.clear(); + if self.config.report_before_root_finish { + for active_collector in self.active_collectors.values_mut() { + postprocess_span_collection( + active_collector.span_collections.drain(..), + &anchor, + committed_records, + &mut active_collector.dangling_events, + ); } } @@ -448,6 +405,82 @@ impl LocalSpansInner { } } +fn postprocess_span_collection( + span_collections: impl IntoIterator, + anchor: &Anchor, + committed_records: &mut Vec, + dangling_events: &mut HashMap>, +) { + let committed_len = committed_records.len(); + + for span_collection in span_collections { + match span_collection { + SpanCollection::Owned { + spans, + trace_id, + parent_id, + } => match spans { + SpanSet::Span(raw_span) => amend_span( + &raw_span, + trace_id, + parent_id, + committed_records, + dangling_events, + anchor, + ), + SpanSet::LocalSpansInner(local_spans) => amend_local_span( + &local_spans, + trace_id, + parent_id, + committed_records, + dangling_events, + anchor, + ), + SpanSet::SharedLocalSpans(local_spans) => amend_local_span( + &local_spans, + trace_id, + parent_id, + committed_records, + dangling_events, + anchor, + ), + }, + SpanCollection::Shared { + spans, + trace_id, + parent_id, + } => match &*spans { + SpanSet::Span(raw_span) => amend_span( + raw_span, + trace_id, + parent_id, + committed_records, + dangling_events, + anchor, + ), + SpanSet::LocalSpansInner(local_spans) => amend_local_span( + local_spans, + trace_id, + parent_id, + committed_records, + dangling_events, + anchor, + ), + SpanSet::SharedLocalSpans(local_spans) => amend_local_span( + local_spans, + trace_id, + parent_id, + committed_records, + dangling_events, + anchor, + ), + }, + } + } + + mount_events(&mut committed_records[committed_len..], dangling_events); +} + fn amend_local_span( local_spans: &LocalSpansInner, trace_id: TraceId, diff --git a/minitrace/src/collector/mod.rs b/minitrace/src/collector/mod.rs index ae577add..76a01e52 100644 --- a/minitrace/src/collector/mod.rs +++ b/minitrace/src/collector/mod.rs @@ -268,6 +268,7 @@ pub struct Config { pub(crate) max_spans_per_trace: Option, pub(crate) batch_report_interval: Duration, pub(crate) batch_report_max_spans: Option, + pub(crate) report_before_root_finish: bool, } impl Config { @@ -346,6 +347,25 @@ impl Config { ..self } } + + /// Whether to report the spans before the root span finishes. + /// + /// The default value is `false`. + /// + /// # Examples + /// + /// ``` + /// use minitrace::collector::Config; + /// + /// let config = Config::default().report_before_root_finish(true); + /// minitrace::set_reporter(minitrace::collector::ConsoleReporter, config); + /// ``` + pub fn report_before_root_finish(self, report_before_root_finish: bool) -> Self { + Self { + report_before_root_finish, + ..self + } + } } impl Default for Config { @@ -354,6 +374,7 @@ impl Default for Config { max_spans_per_trace: None, batch_report_interval: Duration::from_millis(500), batch_report_max_spans: None, + report_before_root_finish: false, } } } diff --git a/minitrace/src/lib.rs b/minitrace/src/lib.rs index 7daa1766..95adfe5f 100644 --- a/minitrace/src/lib.rs +++ b/minitrace/src/lib.rs @@ -71,7 +71,8 @@ //! //! When the root span is dropped, all of its children spans and itself will be reported at once. //! Since that, it's recommended to create root spans for short tasks, such as handling a request, -//! just like the example below. Otherwise, an endingless trace will never be reported. +//! just like the example below. Otherwise, an endingless trace will never be reported. To override +//! this behavior, set the `report_before_root_finish` option to `true` in the [`Config`]. //! //! ``` //! use minitrace::collector::Config; diff --git a/test-statically-disable/src/main.rs b/test-statically-disable/src/main.rs index 6cf42ce8..f09af1eb 100644 --- a/test-statically-disable/src/main.rs +++ b/test-statically-disable/src/main.rs @@ -20,7 +20,8 @@ fn main() { Config::default() .batch_report_interval(Duration::from_secs(1)) .max_spans_per_trace(Some(100)) - .batch_report_max_spans(Some(200)), + .batch_report_max_spans(Some(200)) + .report_before_root_finish(true), ); let mut root = Span::root("root", SpanContext::new(TraceId(0), SpanId(0)))