Skip to content

Commit

Permalink
merge: #3173
Browse files Browse the repository at this point in the history
3173: feat: augment NATS tracing & add INFO span support for subscriptions r=fnichol a=fnichol

This change is doing...a lot, but is primarily focused on enabling a
tracing span that can sit over an invoming NATS message that ultimately
send one or more reply messages at the end (in this way, a very RPC-like
interaction).

The low level span-creating machinery and OpenTelemetry header
injection/extraction is contained in the `lib/telemetry-nats-rs` Rust
crate. This machinery is wired in to our somewhat generic NATS
subscription streaming crate `lib/nats-subscriber` as Pinga, Veritech,
and parts of SDF already use this as their interface to a NATS
subscription.

As part of creating this "request span", a fair amount of span metadata
has been updated in the `lib/si-data-nats` crate to help with more
meaningful/contextual information.

At present in `lib/nats-subscriber`, the `Subscriber` stream yields a
generic `Request` type (as before) but now contains this "request span"
and as long as this field and/or the request type is kept in some scope
(that is, it isn't dropped), this will keep the "request span" open.
Some effort in this change (and more in future work) is being made to
make all associated work be captured in child spans or "follows from"
spans if it is related work but not bound to the lifetime of the
request/reponse lifetime.

At the moment, Pinga is most mature here, creating a `pinga-jobs
receive` span for each incoming job request. While not 100% of the
related work is fully parented/associated, almost all of it is ;)

<img src="https://media2.giphy.com/media/GqMCSzBqxEOF7lrWwF/giphy.gif"/>


Co-authored-by: Fletcher Nichol <[email protected]>
  • Loading branch information
si-bors-ng[bot] and fnichol authored Jan 18, 2024
2 parents 204fa81 + a10f2fb commit b52a048
Show file tree
Hide file tree
Showing 31 changed files with 1,853 additions and 717 deletions.
254 changes: 177 additions & 77 deletions Cargo.lock

Large diffs are not rendered by default.

13 changes: 7 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ members = [
"lib/buck2-resources",
"lib/bytes-lines-codec",
"lib/config-file",
"lib/council-server",
"lib/cyclone-client",
"lib/cyclone-core",
"lib/cyclone-server",
"lib/council-server",
"lib/dal",
"lib/dal-test",
"lib/deadpool-cyclone",
Expand All @@ -33,12 +33,13 @@ members = [
"lib/si-data-pg",
"lib/si-hash",
"lib/si-pkg",
"lib/si-posthog-rs",
"lib/si-settings",
"lib/si-std",
"lib/si-test-macros",
"lib/si-posthog-rs",
"lib/telemetry-application-rs",
"lib/telemetry-http-rs",
"lib/telemetry-nats-rs",
"lib/telemetry-rs",
"lib/veritech-client",
"lib/veritech-core",
Expand Down Expand Up @@ -112,7 +113,7 @@ regex = "1.8.1"
remain = "0.2.8"
reqwest = { version = "0.11.17", default-features = false, features = ["rustls-tls", "json", "multipart"] }
ring = "=0.17.5" # Upgrading this is possible, but a pain, so we don't want to pick up every new minor version (see: https://github.com/facebook/buck2/commit/91af40b66960d003067c3d241595fb53d1e636c8)
rustls = { version = "0.21.1" } # pinned because ring above depends on it
rustls = { version = "0.22.2" }
rustls-pemfile = { version = "2.0.0" }
rust-s3 = { version = "0.34.0-rc4", default-features = false, features = ["tokio-rustls-tls"] }
sea-orm = { version = "0.12.0", features = ["sqlx-postgres", "runtime-tokio-rustls", "macros", "with-chrono", "debug-print"] }
Expand All @@ -133,7 +134,7 @@ test-log = { version = "0.2.11", default-features = false, features = ["trace"]
thiserror = "1.0.40"
tokio = { version = "1.28.0", features = ["full"] }
tokio-postgres = { version = "0.7.8", features = ["runtime", "with-chrono-0_4", "with-serde_json-1"] }
tokio-postgres-rustls = { version = "0.10.0" }
tokio-postgres-rustls = { version = "0.11.0" }
tokio-serde = { version = "0.8.0", features = ["json"] }
tokio-stream = "0.1.14"
tokio-test = "0.4.2"
Expand Down Expand Up @@ -168,5 +169,5 @@ docker-api = { git = "https://github.com/vv9k/docker-api-rs.git", branch = "mast
# Note that this helps us to narrow down the number of `ring`/`rustls` versions to 1 each
rust-s3 = { git = "https://github.com/ScuffleTV/rust-s3.git", branch = "troy/rustls" }
# pending a potential merge and release of
# https://github.com/jbg/tokio-postgres-rustls/pull/17
tokio-postgres-rustls = { git = "https://github.com/fnichol/tokio-postgres-rustls.git", branch = "ring-0.17" }
# https://github.com/jbg/tokio-postgres-rustls/pull/18
tokio-postgres-rustls = { git = "https://github.com/jbg/tokio-postgres-rustls.git", branch = "master" }
10 changes: 5 additions & 5 deletions lib/council-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{Graph, Id, Request, Response};
use std::time::Duration;

use futures::StreamExt;
use si_data_nats::NatsClient;
use si_data_nats::{NatsClient, Subject};
use telemetry::prelude::*;
use tokio::{signal, sync::watch};

Expand Down Expand Up @@ -213,7 +213,7 @@ pub enum Error {
#[instrument(level = "info")]
pub async fn register_graph_from_job(
complete_graph: &mut ChangeSetGraph,
reply_channel: String,
reply_channel: Subject,
change_set_id: Id,
new_dependency_data: Graph,
) -> Result<(), Error> {
Expand All @@ -225,7 +225,7 @@ pub async fn register_graph_from_job(
pub async fn job_processed_a_value(
nats: &NatsClient,
complete_graph: &mut ChangeSetGraph,
reply_channel: String,
reply_channel: Subject,
change_set_id: Id,
node_id: Id,
) -> Result<(), Error> {
Expand All @@ -251,7 +251,7 @@ pub async fn job_processed_a_value(
pub async fn job_failed_processing_a_value(
nats: &NatsClient,
complete_graph: &mut ChangeSetGraph,
reply_channel: String,
reply_channel: Subject,
change_set_id: Id,
node_id: Id,
) -> Result<(), Error> {
Expand All @@ -278,7 +278,7 @@ pub async fn job_failed_processing_a_value(
#[instrument(level = "info")]
pub async fn job_is_going_away(
complete_graph: &mut ChangeSetGraph,
reply_channel: String,
reply_channel: Subject,
change_set_id: Id,
) -> Result<(), Error> {
debug!(%reply_channel, %change_set_id, ?complete_graph, "Job is going away");
Expand Down
16 changes: 10 additions & 6 deletions lib/council-server/src/server/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::collections::{HashMap, HashSet, VecDeque};
mod node_metadata;

use node_metadata::NodeMetadata;
use si_data_nats::Subject;

#[derive(Default, Debug)]
pub struct ChangeSetGraph {
Expand All @@ -20,7 +21,10 @@ impl ChangeSetGraph {
for graph in self.dependency_data.values_mut() {
for (id, metadata) in graph.iter_mut() {
if let Some(reply_channel) = metadata.next_to_process() {
result.entry(reply_channel.clone()).or_default().push(*id);
result
.entry(reply_channel.to_string())
.or_default()
.push(*id);
}
}
}
Expand All @@ -29,7 +33,7 @@ impl ChangeSetGraph {

pub fn merge_dependency_graph(
&mut self,
reply_channel: String,
reply_channel: Subject,
new_dependency_data: Graph,
change_set_id: Id,
) -> Result<(), Error> {
Expand Down Expand Up @@ -68,7 +72,7 @@ impl ChangeSetGraph {

pub fn mark_node_as_processed(
&mut self,
reply_channel: &str,
reply_channel: &Subject,
change_set_id: Id,
node_id: Id,
) -> Result<HashSet<String>, Error> {
Expand Down Expand Up @@ -96,7 +100,7 @@ impl ChangeSetGraph {
Ok(wanted_by_reply_channels)
}

pub fn remove_channel(&mut self, change_set_id: Id, reply_channel: &str) {
pub fn remove_channel(&mut self, change_set_id: Id, reply_channel: &Subject) {
if let Some(graph) = self.dependency_data.get_mut(&change_set_id) {
let mut to_remove = Vec::new();
for (id, metadata) in graph.iter_mut() {
Expand All @@ -118,10 +122,10 @@ impl ChangeSetGraph {
/// for the nodes that are being removed.
pub fn remove_node_and_dependents(
&mut self,
reply_channel: String,
reply_channel: Subject,
change_set_id: Id,
node_id: Id,
) -> Result<Vec<(String, Id)>, Error> {
) -> Result<Vec<(Subject, Id)>, Error> {
let mut failure_notifications = Vec::new();
let change_set_graph_data = self.dependency_data.get_mut(&change_set_id).unwrap();

Expand Down
24 changes: 13 additions & 11 deletions lib/council-server/src/server/graph/node_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@ use std::{
time::Instant,
};

use si_data_nats::Subject;

use crate::{server::Error, Id};

#[derive(Debug)]
pub struct NodeMetadata {
// This should really be an ordered set, to remove duplicates, but we'll deal with
// that later.
wanted_by_reply_channels: VecDeque<String>,
processing_reply_channel: Option<String>,
wanted_by_reply_channels: VecDeque<Subject>,
processing_reply_channel: Option<Subject>,
depends_on_node_ids: HashSet<Id>,
processing_started_at: Option<Instant>,
last_updated_at: Instant,
Expand All @@ -29,7 +31,7 @@ impl Default for NodeMetadata {
}

impl NodeMetadata {
pub fn add_wanted_by_reply_channel(&mut self, reply_channel: &str) {
pub fn add_wanted_by_reply_channel(&mut self, reply_channel: &Subject) {
self.wanted_by_reply_channels
.push_back(reply_channel.to_owned());
self.last_updated_at = Instant::now();
Expand Down Expand Up @@ -62,7 +64,7 @@ impl NodeMetadata {

pub fn mark_as_processed(
&mut self,
reply_channel: &str,
reply_channel: &Subject,
) -> Result<(bool, HashSet<String>), Error> {
if self.processing_reply_channel().map(|p| &**p) != Some(reply_channel) {
return Err(Error::ShouldNotBeProcessingByJob);
Expand All @@ -79,7 +81,7 @@ impl NodeMetadata {
if self.dependencies_satisfied() {
let mut wanted_by_reply_channels = self.wanted_by_reply_channels();
if let Some(processed_by_reply_channel) = processing_reply_channel {
wanted_by_reply_channels.insert(processed_by_reply_channel);
wanted_by_reply_channels.insert(processed_by_reply_channel.to_string());
}

Ok((true, wanted_by_reply_channels))
Expand All @@ -88,7 +90,7 @@ impl NodeMetadata {
}
}

pub fn merge_metadata(&mut self, reply_channel: String, dependencies: &Vec<Id>) {
pub fn merge_metadata(&mut self, reply_channel: Subject, dependencies: &Vec<Id>) {
self.last_updated_at = Instant::now();

if !self.wanted_by_reply_channels.contains(&reply_channel) {
Expand All @@ -97,7 +99,7 @@ impl NodeMetadata {
self.depends_on_node_ids.extend(dependencies);
}

pub fn next_to_process(&mut self) -> Option<String> {
pub fn next_to_process(&mut self) -> Option<Subject> {
if self.depends_on_node_ids.is_empty() && self.processing_reply_channel.is_none() {
self.last_updated_at = Instant::now();

Expand All @@ -112,11 +114,11 @@ impl NodeMetadata {
None
}

pub fn processing_reply_channel(&self) -> Option<&String> {
pub fn processing_reply_channel(&self) -> Option<&Subject> {
self.processing_reply_channel.as_ref()
}

pub fn remove_channel(&mut self, reply_channel: &str) {
pub fn remove_channel(&mut self, reply_channel: &Subject) {
self.last_updated_at = Instant::now();

self.wanted_by_reply_channels
Expand All @@ -134,10 +136,10 @@ impl NodeMetadata {
}

pub fn wanted_by_reply_channels(&self) -> HashSet<String> {
HashSet::from_iter(self.wanted_by_reply_channels.iter().cloned())
HashSet::from_iter(self.wanted_by_reply_channels.iter().map(|s| s.to_string()))
}

pub fn wanted_by_reply_channels_iter(&self) -> Iter<'_, String> {
pub fn wanted_by_reply_channels_iter(&self) -> Iter<'_, Subject> {
self.wanted_by_reply_channels.iter()
}
}
3 changes: 3 additions & 0 deletions lib/dal/src/job/definition/dependent_values_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ impl DependentValuesUpdate {
task_ctx,
attribute_value,
pub_council.clone(),
Span::current(),
));
}
}
Expand Down Expand Up @@ -317,6 +318,7 @@ impl DependentValuesUpdate {
/// play more nicely with being spawned into a `JoinSet`.
#[instrument(
name = "dependent_values_update.update_value",
parent = &parent_span,
skip_all,
level = "info",
fields(
Expand All @@ -327,6 +329,7 @@ async fn update_value(
ctx: DalContext,
mut attribute_value: AttributeValue,
council: council_server::PubClient,
parent_span: Span,
) -> JobConsumerResult<()> {
let update_result = attribute_value.update_from_prototype_function(&ctx).await;
// We don't propagate the error up, because we want the rest of the nodes in the graph to make progress
Expand Down
26 changes: 25 additions & 1 deletion lib/dal/src/job/definition/fix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,14 @@ impl JobConsumerMetadata for FixesJob {

#[async_trait]
impl JobConsumer for FixesJob {
#[instrument(
name = "fixes_job.run",
skip_all,
level = "info",
fields(
// TODO(fnichol): add some?
)
)]
async fn run(&self, ctx: &mut DalContext) -> JobConsumerResult<()> {
let mut fixes = self.fixes.clone();

Expand Down Expand Up @@ -150,7 +158,13 @@ impl JobConsumer for FixesJob {
.await?;
handles.push(async move {
let id = fix_item.id;
let res = tokio::task::spawn(fix_task(task_ctx, self.batch_id, fix_item)).await;
let res = tokio::task::spawn(fix_task(
task_ctx,
self.batch_id,
fix_item,
Span::current(),
))
.await;
(id, res)
});
}
Expand Down Expand Up @@ -309,10 +323,20 @@ async fn finish_batch(ctx: &DalContext, id: FixBatchId) -> JobConsumerResult<()>
Ok(())
}

#[instrument(
name = "fixes_job.fix_task",
parent = &parent_span,
skip_all,
level = "info",
fields(
// TODO(fnichol): add some?
)
)]
async fn fix_task(
ctx: DalContext,
batch_id: FixBatchId,
fix_item: FixItem,
parent_span: Span,
) -> JobConsumerResult<(Fix, Vec<String>)> {
let deleted_ctx = &ctx.clone_with_delete_visibility();
// Get the workflow for the action we need to run.
Expand Down
1 change: 1 addition & 0 deletions lib/nats-subscriber/BUCK
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ rust_library(
deps = [
"//lib/si-data-nats:si-data-nats",
"//lib/telemetry-rs:telemetry",
"//lib/telemetry-nats-rs:telemetry-nats",
"//third-party/rust:futures",
"//third-party/rust:futures-lite",
"//third-party/rust:pin-project-lite",
Expand Down
1 change: 1 addition & 0 deletions lib/nats-subscriber/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ serde = { workspace = true }
serde_json = { workspace = true }
si-data-nats = { path = "../../lib/si-data-nats" }
telemetry = { path = "../../lib/telemetry-rs" }
telemetry-nats = { path = "../../lib/telemetry-nats-rs" }
thiserror = { workspace = true }
tokio = { workspace = true }
Loading

0 comments on commit b52a048

Please sign in to comment.