Skip to content

Commit

Permalink
feat: add span metadata
Browse files Browse the repository at this point in the history
Signed-off-by: Dotan Simha <[email protected]>
  • Loading branch information
dotansimha committed Jan 21, 2024
1 parent e62aca4 commit cd27633
Show file tree
Hide file tree
Showing 8 changed files with 334 additions and 88 deletions.
31 changes: 22 additions & 9 deletions minitrace/src/collector/global_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use once_cell::sync::Lazy;
use parking_lot::Mutex;

use super::EventRecord;
use super::SpanMetadata;
use crate::collector::command::CollectCommand;
use crate::collector::command::CommitCollect;
use crate::collector::command::DropCollect;
Expand Down Expand Up @@ -169,11 +170,13 @@ enum SpanCollection {
spans: SpanSet,
trace_id: TraceId,
parent_id: SpanId,
metadata: Option<SpanMetadata>,
},
Shared {
spans: Arc<SpanSet>,
trace_id: TraceId,
parent_id: SpanId,
metadata: Option<SpanMetadata>,
},
}

Expand Down Expand Up @@ -205,14 +208,10 @@ impl GlobalCollector {

std::thread::Builder::new()
.name("minitrace-global-collector".to_string())
.spawn(move || {
loop {
let begin_instant = std::time::Instant::now();
GLOBAL_COLLECTOR.lock().handle_commands(false);
std::thread::sleep(
COLLECT_LOOP_INTERVAL.saturating_sub(begin_instant.elapsed()),
);
}
.spawn(move || loop {
let begin_instant = std::time::Instant::now();
GLOBAL_COLLECTOR.lock().handle_commands(false);
std::thread::sleep(COLLECT_LOOP_INTERVAL.saturating_sub(begin_instant.elapsed()));
})
.unwrap();

Expand Down Expand Up @@ -291,7 +290,7 @@ impl GlobalCollector {
debug_assert!(!collect_token.is_empty());

if collect_token.len() == 1 {
let item = collect_token[0];
let item = collect_token[0].clone();
if let Some((buf, span_count)) = self.active_collectors.get_mut(&item.collect_id) {
if *span_count < self.config.max_spans_per_trace.unwrap_or(usize::MAX)
|| item.is_root
Expand All @@ -301,6 +300,7 @@ impl GlobalCollector {
spans,
trace_id: item.trace_id,
parent_id: item.parent_id,
metadata: item.metadata,
});
}
}
Expand All @@ -318,6 +318,7 @@ impl GlobalCollector {
spans: spans.clone(),
trace_id: item.trace_id,
parent_id: item.parent_id,
metadata: item.metadata.clone(),
});
}
}
Expand All @@ -339,6 +340,7 @@ impl GlobalCollector {
spans,
trace_id,
parent_id,
metadata,
} => match spans {
SpanSet::Span(raw_span) => amend_span(
&raw_span,
Expand All @@ -347,6 +349,7 @@ impl GlobalCollector {
committed_records,
dangling_events,
&anchor,
metadata,
),
SpanSet::LocalSpansInner(local_spans) => amend_local_span(
&local_spans,
Expand All @@ -355,6 +358,7 @@ impl GlobalCollector {
committed_records,
dangling_events,
&anchor,
metadata,
),
SpanSet::SharedLocalSpans(local_spans) => amend_local_span(
&local_spans,
Expand All @@ -363,12 +367,14 @@ impl GlobalCollector {
committed_records,
dangling_events,
&anchor,
metadata,
),
},
SpanCollection::Shared {
spans,
trace_id,
parent_id,
metadata,
} => match &*spans {
SpanSet::Span(raw_span) => amend_span(
raw_span,
Expand All @@ -377,6 +383,7 @@ impl GlobalCollector {
committed_records,
dangling_events,
&anchor,
metadata,
),
SpanSet::LocalSpansInner(local_spans) => amend_local_span(
local_spans,
Expand All @@ -385,6 +392,7 @@ impl GlobalCollector {
committed_records,
dangling_events,
&anchor,
metadata,
),
SpanSet::SharedLocalSpans(local_spans) => amend_local_span(
local_spans,
Expand All @@ -393,6 +401,7 @@ impl GlobalCollector {
committed_records,
dangling_events,
&anchor,
metadata,
),
},
}
Expand Down Expand Up @@ -423,6 +432,7 @@ fn amend_local_span(
spans: &mut Vec<SpanRecord>,
events: &mut HashMap<SpanId, Vec<EventRecord>>,
anchor: &Anchor,
metadata: Option<SpanMetadata>,
) {
for span in local_spans.spans.iter() {
let begin_time_unix_ns = span.begin_instant.as_unix_nanos(anchor);
Expand Down Expand Up @@ -456,6 +466,7 @@ fn amend_local_span(
name: span.name.clone(),
properties: span.properties.clone(),
events: vec![],
metadata: metadata.clone(),
});
}
}
Expand All @@ -467,6 +478,7 @@ fn amend_span(
spans: &mut Vec<SpanRecord>,
events: &mut HashMap<SpanId, Vec<EventRecord>>,
anchor: &Anchor,
metadata: Option<SpanMetadata>,
) {
let begin_time_unix_ns = raw_span.begin_instant.as_unix_nanos(anchor);

Expand All @@ -490,6 +502,7 @@ fn amend_span(
name: raw_span.name.clone(),
properties: raw_span.properties.clone(),
events: vec![],
metadata,
});
}

Expand Down
138 changes: 133 additions & 5 deletions minitrace/src/collector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub(crate) mod global_collector;
pub(crate) mod id;
mod test_reporter;

use std::any::Any;
use std::borrow::Cow;
use std::rc::Rc;
use std::sync::Arc;
Expand Down Expand Up @@ -41,6 +42,64 @@ pub enum SpanSet {
SharedLocalSpans(Arc<LocalSpansInner>),
}

/// A struct representing the metadata of a span. It can be used to store arbitrary data
/// during the processing and reporting of the span.
///
/// The metadata is propagated from the root span to all its children, so it can help to identify span traces and their association with a particular flow.
///
/// Metadata is not sent to the collector, and is only available to the reporter as part of the `SpanRecord`.
///
/// # Examples
///
/// ```
/// use minitrace::prelude::*;
///
/// let context = SpanContext::new_with_metadata(TraceId(12), SpanId::default(), SpanMetadata::create::<i32>(123));
/// ```
///
/// In your reporter implementation, you can access the metadata by passing the explicit structure type:
///
/// ```
/// struct MyReporter;
///
/// impl Reporter for MyReporter {
/// fn report(&mut self, spans: &[SpanRecord]) {
/// for span in spans {
/// let metadata: &i32 = span.metadata::<i32>().unwrap();
/// }
/// }
/// }
/// ```
///
/// use minitrace::prelude::*;
#[derive(Debug, Clone)]
pub struct SpanMetadata {
inner: Arc<dyn Any + Send + Sync>,
}

impl SpanMetadata {
pub fn create<T: Send + Sync + 'static>(metadata: T) -> SpanMetadata {
Self {
inner: Arc::new(metadata),
}
}

pub fn data<T>(&self) -> Option<&T>
where
T: 'static,
{
self.inner.downcast_ref::<T>()
}
}

impl PartialEq for SpanMetadata {
fn eq(&self, other: &Self) -> bool {
Arc::ptr_eq(&self.inner, &other.inner)
}
}

impl Eq for SpanMetadata {}

/// A record of a span that includes all the information about the span,
/// such as its identifiers, timing information, name, and associated properties.
#[derive(Clone, Debug, Default)]
Expand All @@ -53,6 +112,16 @@ pub struct SpanRecord {
pub name: Cow<'static, str>,
pub properties: Vec<(Cow<'static, str>, Cow<'static, str>)>,
pub events: Vec<EventRecord>,
metadata: Option<SpanMetadata>,
}

impl SpanRecord {
pub fn metadata<T>(&self) -> Option<&T>
where
T: 'static,
{
self.metadata.as_ref()?.data::<T>()
}
}

/// A record of an event that occurred during the execution of a span.
Expand All @@ -64,26 +133,37 @@ pub struct EventRecord {
}

#[doc(hidden)]
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct CollectTokenItem {
pub trace_id: TraceId,
pub parent_id: SpanId,
pub collect_id: usize,
pub is_root: bool,
pub metadata: Option<SpanMetadata>,
}

impl CollectTokenItem {
pub fn metadata<T>(&self) -> Option<&T>
where
T: 'static,
{
self.metadata.as_ref()?.data::<T>()
}
}

/// A struct representing the context of a span, including its [`TraceId`] and [`SpanId`].
///
/// [`TraceId`]: crate::collector::TraceId
/// [`SpanId`]: crate::collector::SpanId
#[derive(Clone, Copy, Debug, Default)]
#[derive(Clone, Debug, Default)]
pub struct SpanContext {
pub trace_id: TraceId,
pub span_id: SpanId,
pub metadata: Option<SpanMetadata>,
}

impl SpanContext {
/// Creates a new `SpanContext` with the given [`TraceId`] and [`SpanId`].
/// Creates a new `SpanContext` with the given [`TraceId`] and [`SpanId`] and [`SpanMetadata`].
///
/// # Examples
///
Expand All @@ -96,7 +176,35 @@ impl SpanContext {
/// [`TraceId`]: crate::collector::TraceId
/// [`SpanId`]: crate::collector::SpanId
pub fn new(trace_id: TraceId, span_id: SpanId) -> Self {
Self { trace_id, span_id }
Self {
trace_id,
span_id,
metadata: None,
}
}

/// Creates a new `SpanContext` with the given [`TraceId`] and [`SpanId`] and optional [`SpanMetadata`].
///
/// # Examples
///
/// ```
/// use minitrace::prelude::*;
///
/// let context = SpanContext::new(TraceId(12), SpanId::default());
/// ```
///
/// [`TraceId`]: crate::collector::TraceId
/// [`SpanId`]: crate::collector::SpanId
pub fn new_with_metadata(
trace_id: TraceId,
span_id: SpanId,
metadata: Option<SpanMetadata>,
) -> Self {
Self {
trace_id,
span_id,
metadata,
}
}

/// Create a new `SpanContext` with a random trace id.
Expand All @@ -112,6 +220,24 @@ impl SpanContext {
Self {
trace_id: TraceId(rand::random()),
span_id: SpanId::default(),
metadata: None,
}
}

/// Create a new `SpanContext` with a random trace id.
///
/// # Examples
///
/// ```
/// use minitrace::prelude::*;
///
/// let root = Span::root("root", SpanContext::random());
/// ```
pub fn random_with_metadata(metadata: SpanMetadata) -> Self {
Self {
trace_id: TraceId(rand::random()),
span_id: SpanId::default(),
metadata: Some(metadata),
}
}

Expand Down Expand Up @@ -142,6 +268,7 @@ impl SpanContext {
Some(Self {
trace_id: collect_token.trace_id,
span_id: collect_token.parent_id,
metadata: collect_token.metadata.clone(),
})
}
}
Expand Down Expand Up @@ -170,11 +297,12 @@ impl SpanContext {
let stack = LOCAL_SPAN_STACK.try_with(Rc::clone).ok()?;

let mut stack = stack.borrow_mut();
let collect_token = stack.current_collect_token()?[0];
let collect_token = stack.current_collect_token()?[0].clone();

Some(Self {
trace_id: collect_token.trace_id,
span_id: collect_token.parent_id,
metadata: collect_token.metadata.clone(),
})
}
}
Expand Down
4 changes: 3 additions & 1 deletion minitrace/src/local/local_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,9 @@ mod tests {
parent_id: SpanId::default(),
collect_id: 42,
is_root: false,
metadata: None,
};
let collector2 = LocalCollector::new(Some(token2.into()), stack.clone());
let collector2 = LocalCollector::new(Some(token2.clone().into()), stack.clone());
let span2 = stack.borrow_mut().enter_span("span2").unwrap();
let span3 = stack.borrow_mut().enter_span("span3").unwrap();
stack.borrow_mut().exit_span(span3);
Expand Down Expand Up @@ -242,6 +243,7 @@ span1 []
parent_id: SpanId::default(),
collect_id: 42,
is_root: false,
metadata: None,
};
let collector2 = LocalCollector::new(Some(token2.into()), stack.clone());
let span2 = stack.borrow_mut().enter_span("span2").unwrap();
Expand Down
Loading

0 comments on commit cd27633

Please sign in to comment.