From 0ed10738258dd6e84e2f4a548001b72d5ab759e4 Mon Sep 17 00:00:00 2001 From: Jonathan Lebon Date: Fri, 1 Nov 2024 11:50:19 -0400 Subject: [PATCH] graph-builder: scrape only once per stream Currently, we're starting a total of N x M scrapers, where N is the number of streams, and M the number of arches. So right now, this would be 3 x 4 = 12 scrapers. This is very wasteful because each scraper individually downloads the release index and update metadata every 30 seconds, even though that metadata is not different per architecture. I think the reason it was set up this way is in case we wanted to host separate e.g. release index or update files _per_ architecture in S3 instead of all togother. This can be seen by the fact the code supports templating those URLs with `basearch`. However, it's unlikely we'll be changing that design decision, so let's just do the saner thing and rework the scraping to be stream-based. This is done by changing the scraper to host not one single `Graph` object, but instead a `HashMap` which maps architectures to graphs. Then, when a request for a graph comes in, we lookup in our cache keying off of the requested architecture. This is prep for adding another dimension to the matrix, which is whether the OCI version of the graph was reported. If we didn't do this cleanup first, it would have blown up the number of scrapers to 24. Part of https://github.com/coreos/fedora-coreos-tracker/issues/1823. --- commons/src/metadata.rs | 18 ++--- fcos-graph-builder/src/main.rs | 18 +++-- fcos-graph-builder/src/scraper.rs | 104 ++++++++++++++++++----------- fcos-graph-builder/src/settings.rs | 33 +++------ 4 files changed, 94 insertions(+), 79 deletions(-) diff --git a/commons/src/metadata.rs b/commons/src/metadata.rs index 077df53..ecd7ec0 100644 --- a/commons/src/metadata.rs +++ b/commons/src/metadata.rs @@ -24,55 +24,55 @@ pub static START_EPOCH: &str = "org.fedoraproject.coreos.updates.start_epoch"; pub static START_VALUE: &str = "org.fedoraproject.coreos.updates.start_value"; /// Fedora CoreOS release index. -#[derive(Debug, Deserialize)] +#[derive(Clone, Debug, Deserialize)] pub struct ReleasesJSON { pub releases: Vec, } -#[derive(Debug, Deserialize)] +#[derive(Clone, Debug, Deserialize)] pub struct Release { pub commits: Vec, pub version: String, pub metadata: String, } -#[derive(Debug, Deserialize)] +#[derive(Clone, Debug, Deserialize)] pub struct ReleaseCommit { pub architecture: String, pub checksum: String, } /// Fedora CoreOS updates metadata -#[derive(Debug, Deserialize)] +#[derive(Clone, Debug, Deserialize)] pub struct UpdatesJSON { pub stream: String, pub releases: Vec, } -#[derive(Debug, Deserialize)] +#[derive(Clone, Debug, Deserialize)] pub struct ReleaseUpdate { pub version: String, pub metadata: UpdateMetadata, } -#[derive(Debug, Deserialize)] +#[derive(Clone, Debug, Deserialize)] pub struct UpdateMetadata { pub barrier: Option, pub deadend: Option, pub rollout: Option, } -#[derive(Debug, Deserialize)] +#[derive(Clone, Debug, Deserialize)] pub struct UpdateBarrier { pub reason: String, } -#[derive(Debug, Deserialize)] +#[derive(Clone, Debug, Deserialize)] pub struct UpdateDeadend { pub reason: String, } -#[derive(Debug, Deserialize)] +#[derive(Clone, Debug, Deserialize)] pub struct UpdateRollout { pub start_epoch: Option, pub start_percentage: Option, diff --git a/fcos-graph-builder/src/main.rs b/fcos-graph-builder/src/main.rs index 0d2767a..8979c3f 100644 --- a/fcos-graph-builder/src/main.rs +++ b/fcos-graph-builder/src/main.rs @@ -44,7 +44,7 @@ lazy_static::lazy_static! { static ref UPSTREAM_SCRAPES: IntCounterVec = register_int_counter_vec!( "fcos_cincinnati_gb_scraper_upstream_scrapes_total", "Total number of upstream scrapes", - &["basearch", "stream"] + &["stream"] ).unwrap(); // NOTE(lucab): alternatively this could come from the runtime library, see // https://prometheus.io/docs/instrumenting/writing_clientlibs/#process-metrics @@ -76,10 +76,14 @@ fn main() -> Fallible<()> { (settings.service, settings.status) }; - let mut scrapers = HashMap::with_capacity(service_settings.scopes.len()); - for scope in &service_settings.scopes { - let addr = scraper::Scraper::new(scope.clone())?.start(); - scrapers.insert(scope.clone(), addr); + let mut scrapers = HashMap::with_capacity(service_settings.streams.len()); + for (&stream, &arches) in &service_settings.streams { + let addr = scraper::Scraper::new( + stream.to_string(), + arches.iter().map(|&arch| String::from(arch)).collect(), + )? + .start(); + scrapers.insert(stream.to_string(), addr); } // TODO(lucab): get allowed scopes from config file. @@ -126,7 +130,7 @@ fn main() -> Fallible<()> { #[derive(Clone, Debug)] pub(crate) struct AppState { scope_filter: Option>, - scrapers: HashMap>, + scrapers: HashMap>, } /// Mandatory parameters for querying a graph from graph-builder. @@ -156,7 +160,7 @@ pub(crate) async fn gb_serve_graph( } }; - let addr = match data.scrapers.get(&scope) { + let addr = match data.scrapers.get(&scope.stream) { None => { log::error!( "no scraper configured for scope: basearch='{}', stream='{}'", diff --git a/fcos-graph-builder/src/scraper.rs b/fcos-graph-builder/src/scraper.rs index fe05b05..e223494 100644 --- a/fcos-graph-builder/src/scraper.rs +++ b/fcos-graph-builder/src/scraper.rs @@ -3,6 +3,7 @@ use actix_web::web::Bytes; use commons::{graph, metadata}; use failure::{Error, Fallible}; use reqwest::Method; +use std::collections::HashMap; use std::num::NonZeroU64; use std::time::Duration; @@ -12,25 +13,29 @@ const DEFAULT_HTTP_REQ_TIMEOUT: Duration = Duration::from_secs(30 * 60); /// Release scraper. #[derive(Clone, Debug)] pub struct Scraper { - graph: Bytes, + stream: String, + /// arch -> graph + graphs: HashMap, hclient: reqwest::Client, pause_secs: NonZeroU64, release_index_url: reqwest::Url, - scope: graph::GraphScope, updates_url: reqwest::Url, } impl Scraper { - pub(crate) fn new(scope: graph::GraphScope) -> Fallible { - let graph = { + pub(crate) fn new(stream: String, arches: Vec) -> Fallible { + let empty = { let empty_graph = graph::Graph::default(); let data = serde_json::to_vec(&empty_graph)?; Bytes::from(data) }; + let graphs = arches + .into_iter() + .map(|arch| (arch, empty.clone())) + .collect(); let vars = maplit::hashmap! { - "basearch".to_string() => scope.basearch.clone(), - "stream".to_string() => scope.stream.clone(), + "stream".to_string() => stream.clone(), }; let releases_json = envsubst::substitute(metadata::RELEASES_JSON, &vars)?; let updates_json = envsubst::substitute(metadata::UPDATES_JSON, &vars)?; @@ -40,10 +45,10 @@ impl Scraper { .build()?; let scraper = Self { - graph, + graphs, hclient, pause_secs: NonZeroU64::new(30).expect("non-zero pause"), - scope, + stream, release_index_url: reqwest::Url::parse(&releases_json)?, updates_url: reqwest::Url::parse(&updates_json)?, }; @@ -88,44 +93,61 @@ impl Scraper { } /// Combine release-index and updates metadata. - fn assemble_graph(&self) -> impl Future> { + fn assemble_graphs( + &self, + ) -> impl Future, Error>> { let stream_releases = self.fetch_releases(); let stream_updates = self.fetch_updates(); - let scope = self.scope.clone(); - // NOTE(lucab): this inner scope is in order to get a 'static lifetime on - // the future for actix compatibility. - async { + // yuck... we clone a bunch here to keep the async closure 'static + let stream = self.stream.clone(); + let arches: Vec = self.graphs.keys().cloned().collect(); + + async move { + let mut map = HashMap::with_capacity(arches.len()); let (graph, updates) = futures::future::try_join(stream_releases, stream_updates).await?; - graph::Graph::from_metadata(graph, updates, scope) + for arch in arches { + map.insert( + arch.clone(), + graph::Graph::from_metadata( + graph.clone(), + updates.clone(), + graph::GraphScope { + basearch: arch.clone(), + stream: stream.clone(), + }, + )?, + ); + } + Ok(map) } } /// Update cached graph. - fn update_cached_graph(&mut self, graph: graph::Graph) -> Result<(), Error> { + fn update_cached_graph(&mut self, arch: String, graph: graph::Graph) -> Result<(), Error> { let data = serde_json::to_vec_pretty(&graph).map_err(|e| failure::format_err!("{}", e))?; - self.graph = Bytes::from(data); let refresh_timestamp = chrono::Utc::now(); crate::LAST_REFRESH - .with_label_values(&[&self.scope.basearch, &self.scope.stream]) + .with_label_values(&[&arch, &self.stream]) .set(refresh_timestamp.timestamp()); crate::GRAPH_FINAL_EDGES - .with_label_values(&[&self.scope.basearch, &self.scope.stream]) + .with_label_values(&[&arch, &self.stream]) .set(graph.edges.len() as i64); crate::GRAPH_FINAL_RELEASES - .with_label_values(&[&self.scope.basearch, &self.scope.stream]) + .with_label_values(&[&arch, &self.stream]) .set(graph.nodes.len() as i64); log::trace!( "cached graph for {}/{}: releases={}, edges={}", - self.scope.basearch, - self.scope.stream, + &arch, + self.stream, graph.nodes.len(), graph.edges.len() ); + self.graphs.insert(arch, Bytes::from(data)); Ok(()) } } @@ -150,13 +172,17 @@ impl Handler for Scraper { fn handle(&mut self, _msg: RefreshTick, _ctx: &mut Self::Context) -> Self::Result { crate::UPSTREAM_SCRAPES - .with_label_values(&[&self.scope.basearch, &self.scope.stream]) + .with_label_values(&[&self.stream]) .inc(); - let latest_graph = self.assemble_graph(); - let update_graph = actix::fut::wrap_future::<_, Self>(latest_graph) - .map(|graph, actor, _ctx| { - let res = graph.and_then(|g| actor.update_cached_graph(g)); + let latest_graphs = self.assemble_graphs(); + let update_graphs = actix::fut::wrap_future::<_, Self>(latest_graphs) + .map(|graphs, actor, _ctx| { + let res: Result<(), Error> = graphs.and_then(|g| { + g.into_iter() + .map(|(arch, graph)| actor.update_cached_graph(arch, graph)) + .collect() + }); if let Err(e) = res { log::error!("transient scraping failure: {}", e); }; @@ -167,7 +193,7 @@ impl Handler for Scraper { actix::fut::ok(()) }); - Box::new(update_graph) + Box::new(update_graphs) } } @@ -185,24 +211,24 @@ impl Handler for Scraper { fn handle(&mut self, msg: GetCachedGraph, _ctx: &mut Self::Context) -> Self::Result { use failure::format_err; - if msg.scope.basearch != self.scope.basearch { - return Box::new(actix::fut::err(format_err!( - "unexpected basearch '{}'", - msg.scope.basearch - ))); - } - if msg.scope.stream != self.scope.stream { + if msg.scope.stream != self.stream { return Box::new(actix::fut::err(format_err!( "unexpected stream '{}'", msg.scope.stream ))); } + if let Some(graph) = self.graphs.get(&msg.scope.basearch) { + crate::CACHED_GRAPH_REQUESTS + .with_label_values(&[&msg.scope.basearch, &msg.scope.stream]) + .inc(); - crate::CACHED_GRAPH_REQUESTS - .with_label_values(&[&self.scope.basearch, &self.scope.stream]) - .inc(); - - Box::new(actix::fut::ok(self.graph.clone())) + Box::new(actix::fut::ok(graph.clone())) + } else { + return Box::new(actix::fut::err(format_err!( + "unexpected basearch '{}'", + msg.scope.basearch + ))); + } } } diff --git a/fcos-graph-builder/src/settings.rs b/fcos-graph-builder/src/settings.rs index 2799d2d..55574be 100644 --- a/fcos-graph-builder/src/settings.rs +++ b/fcos-graph-builder/src/settings.rs @@ -1,7 +1,6 @@ use crate::config::FileConfig; -use commons::graph::GraphScope; use failure::Fallible; -use std::collections::BTreeSet; +use std::collections::BTreeMap; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; /// Runtime settings for the graph-builder. @@ -25,7 +24,8 @@ pub struct ServiceSettings { pub(crate) origin_allowlist: Option>, pub(crate) ip_addr: IpAddr, pub(crate) port: u16, - pub(crate) scopes: BTreeSet, + // stream --> set of valid arches for it + pub(crate) streams: BTreeMap<&'static str, &'static [&'static str]>, } impl ServiceSettings { @@ -33,20 +33,11 @@ impl ServiceSettings { const DEFAULT_GB_SERVICE_ADDR: Ipv4Addr = Ipv4Addr::UNSPECIFIED; /// Default TCP port for graph-builder main service. const DEFAULT_GB_SERVICE_PORT: u16 = 8080; - /// Default scopes (basearch plus stream) to process. - const DEFAULT_SCOPES: [(&'static str, &'static str); 12] = [ - ("aarch64", "next"), - ("aarch64", "stable"), - ("aarch64", "testing"), - ("ppc64le", "next"), - ("ppc64le", "stable"), - ("ppc64le", "testing"), - ("s390x", "next"), - ("s390x", "stable"), - ("s390x", "testing"), - ("x86_64", "next"), - ("x86_64", "stable"), - ("x86_64", "testing"), + /// Default streams and their basearches to process. + const DEFAULT_STREAMS: [(&'static str, &'static [&'static str]); 3] = [ + ("stable", &["x86_64", "aarch64", "s390x", "ppc64le"]), + ("testing", &["x86_64", "aarch64", "s390x", "ppc64le"]), + ("next", &["x86_64", "aarch64", "s390x", "ppc64le"]), ]; pub fn socket_addr(&self) -> SocketAddr { @@ -60,13 +51,7 @@ impl Default for ServiceSettings { origin_allowlist: None, ip_addr: Self::DEFAULT_GB_SERVICE_ADDR.into(), port: Self::DEFAULT_GB_SERVICE_PORT, - scopes: Self::DEFAULT_SCOPES - .iter() - .map(|(basearch, stream)| GraphScope { - basearch: basearch.to_string(), - stream: stream.to_string(), - }) - .collect(), + streams: Self::DEFAULT_STREAMS.iter().map(|&t| t).collect(), } } }