Skip to content

Commit

Permalink
graph-builder: scrape only once per stream
Browse files Browse the repository at this point in the history
Currently, we're starting a total of N x M scrapers, where N is the
number of streams, and M the number of arches. So right now, this would
be 3 x 4 = 12 scrapers.

This is very wasteful because each scraper individually downloads the
release index and update metadata every 30 seconds, even though that
metadata is not different per architecture. I think the reason it was
set up this way is in case we wanted to host separate e.g. release
index or update files _per_ architecture in S3 instead of all togother.
This can be seen by the fact the code supports templating those URLs
with `basearch`. However, it's unlikely we'll be changing that design
decision, so let's just do the saner thing and rework the scraping to
be stream-based.

This is done by changing the scraper to host not one single `Graph`
object, but instead a `HashMap<String, Graph>` which maps architectures
to graphs. Then, when a request for a graph comes in, we lookup in our
cache keying off of the requested architecture.

This is prep for adding another dimension to the matrix, which is
whether the OCI version of the graph was reported. If we didn't do this
cleanup first, it would have blown up the number of scrapers to 24.

Part of coreos/fedora-coreos-tracker#1823.
  • Loading branch information
jlebon committed Nov 1, 2024
1 parent 564d6ed commit 0ed1073
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 79 deletions.
18 changes: 9 additions & 9 deletions commons/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,55 +24,55 @@ pub static START_EPOCH: &str = "org.fedoraproject.coreos.updates.start_epoch";
pub static START_VALUE: &str = "org.fedoraproject.coreos.updates.start_value";

/// Fedora CoreOS release index.
#[derive(Debug, Deserialize)]
#[derive(Clone, Debug, Deserialize)]
pub struct ReleasesJSON {
pub releases: Vec<Release>,
}

#[derive(Debug, Deserialize)]
#[derive(Clone, Debug, Deserialize)]
pub struct Release {
pub commits: Vec<ReleaseCommit>,
pub version: String,
pub metadata: String,
}

#[derive(Debug, Deserialize)]
#[derive(Clone, Debug, Deserialize)]
pub struct ReleaseCommit {
pub architecture: String,
pub checksum: String,
}

/// Fedora CoreOS updates metadata
#[derive(Debug, Deserialize)]
#[derive(Clone, Debug, Deserialize)]
pub struct UpdatesJSON {
pub stream: String,
pub releases: Vec<ReleaseUpdate>,
}

#[derive(Debug, Deserialize)]
#[derive(Clone, Debug, Deserialize)]
pub struct ReleaseUpdate {
pub version: String,
pub metadata: UpdateMetadata,
}

#[derive(Debug, Deserialize)]
#[derive(Clone, Debug, Deserialize)]
pub struct UpdateMetadata {
pub barrier: Option<UpdateBarrier>,
pub deadend: Option<UpdateDeadend>,
pub rollout: Option<UpdateRollout>,
}

#[derive(Debug, Deserialize)]
#[derive(Clone, Debug, Deserialize)]
pub struct UpdateBarrier {
pub reason: String,
}

#[derive(Debug, Deserialize)]
#[derive(Clone, Debug, Deserialize)]
pub struct UpdateDeadend {
pub reason: String,
}

#[derive(Debug, Deserialize)]
#[derive(Clone, Debug, Deserialize)]
pub struct UpdateRollout {
pub start_epoch: Option<i64>,
pub start_percentage: Option<f64>,
Expand Down
18 changes: 11 additions & 7 deletions fcos-graph-builder/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ lazy_static::lazy_static! {
static ref UPSTREAM_SCRAPES: IntCounterVec = register_int_counter_vec!(
"fcos_cincinnati_gb_scraper_upstream_scrapes_total",
"Total number of upstream scrapes",
&["basearch", "stream"]
&["stream"]
).unwrap();
// NOTE(lucab): alternatively this could come from the runtime library, see
// https://prometheus.io/docs/instrumenting/writing_clientlibs/#process-metrics
Expand Down Expand Up @@ -76,10 +76,14 @@ fn main() -> Fallible<()> {
(settings.service, settings.status)
};

let mut scrapers = HashMap::with_capacity(service_settings.scopes.len());
for scope in &service_settings.scopes {
let addr = scraper::Scraper::new(scope.clone())?.start();
scrapers.insert(scope.clone(), addr);
let mut scrapers = HashMap::with_capacity(service_settings.streams.len());
for (&stream, &arches) in &service_settings.streams {
let addr = scraper::Scraper::new(
stream.to_string(),
arches.iter().map(|&arch| String::from(arch)).collect(),
)?
.start();
scrapers.insert(stream.to_string(), addr);
}

// TODO(lucab): get allowed scopes from config file.
Expand Down Expand Up @@ -126,7 +130,7 @@ fn main() -> Fallible<()> {
#[derive(Clone, Debug)]
pub(crate) struct AppState {
scope_filter: Option<HashSet<graph::GraphScope>>,
scrapers: HashMap<graph::GraphScope, Addr<scraper::Scraper>>,
scrapers: HashMap<String, Addr<scraper::Scraper>>,
}

/// Mandatory parameters for querying a graph from graph-builder.
Expand Down Expand Up @@ -156,7 +160,7 @@ pub(crate) async fn gb_serve_graph(
}
};

let addr = match data.scrapers.get(&scope) {
let addr = match data.scrapers.get(&scope.stream) {
None => {
log::error!(
"no scraper configured for scope: basearch='{}', stream='{}'",
Expand Down
104 changes: 65 additions & 39 deletions fcos-graph-builder/src/scraper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use actix_web::web::Bytes;
use commons::{graph, metadata};
use failure::{Error, Fallible};
use reqwest::Method;
use std::collections::HashMap;
use std::num::NonZeroU64;
use std::time::Duration;

Expand All @@ -12,25 +13,29 @@ const DEFAULT_HTTP_REQ_TIMEOUT: Duration = Duration::from_secs(30 * 60);
/// Release scraper.
#[derive(Clone, Debug)]
pub struct Scraper {
graph: Bytes,
stream: String,
/// arch -> graph
graphs: HashMap<String, Bytes>,
hclient: reqwest::Client,
pause_secs: NonZeroU64,
release_index_url: reqwest::Url,
scope: graph::GraphScope,
updates_url: reqwest::Url,
}

impl Scraper {
pub(crate) fn new(scope: graph::GraphScope) -> Fallible<Self> {
let graph = {
pub(crate) fn new(stream: String, arches: Vec<String>) -> Fallible<Self> {
let empty = {
let empty_graph = graph::Graph::default();
let data = serde_json::to_vec(&empty_graph)?;
Bytes::from(data)
};
let graphs = arches
.into_iter()
.map(|arch| (arch, empty.clone()))
.collect();

let vars = maplit::hashmap! {
"basearch".to_string() => scope.basearch.clone(),
"stream".to_string() => scope.stream.clone(),
"stream".to_string() => stream.clone(),
};
let releases_json = envsubst::substitute(metadata::RELEASES_JSON, &vars)?;
let updates_json = envsubst::substitute(metadata::UPDATES_JSON, &vars)?;
Expand All @@ -40,10 +45,10 @@ impl Scraper {
.build()?;

let scraper = Self {
graph,
graphs,
hclient,
pause_secs: NonZeroU64::new(30).expect("non-zero pause"),
scope,
stream,
release_index_url: reqwest::Url::parse(&releases_json)?,
updates_url: reqwest::Url::parse(&updates_json)?,
};
Expand Down Expand Up @@ -88,44 +93,61 @@ impl Scraper {
}

/// Combine release-index and updates metadata.
fn assemble_graph(&self) -> impl Future<Output = Result<graph::Graph, Error>> {
fn assemble_graphs(
&self,
) -> impl Future<Output = Result<HashMap<String, graph::Graph>, Error>> {
let stream_releases = self.fetch_releases();
let stream_updates = self.fetch_updates();
let scope = self.scope.clone();

// NOTE(lucab): this inner scope is in order to get a 'static lifetime on
// the future for actix compatibility.
async {
// yuck... we clone a bunch here to keep the async closure 'static
let stream = self.stream.clone();
let arches: Vec<String> = self.graphs.keys().cloned().collect();

async move {
let mut map = HashMap::with_capacity(arches.len());
let (graph, updates) =
futures::future::try_join(stream_releases, stream_updates).await?;
graph::Graph::from_metadata(graph, updates, scope)
for arch in arches {
map.insert(
arch.clone(),
graph::Graph::from_metadata(
graph.clone(),
updates.clone(),
graph::GraphScope {
basearch: arch.clone(),
stream: stream.clone(),
},
)?,
);
}
Ok(map)
}
}

/// Update cached graph.
fn update_cached_graph(&mut self, graph: graph::Graph) -> Result<(), Error> {
fn update_cached_graph(&mut self, arch: String, graph: graph::Graph) -> Result<(), Error> {
let data = serde_json::to_vec_pretty(&graph).map_err(|e| failure::format_err!("{}", e))?;
self.graph = Bytes::from(data);

let refresh_timestamp = chrono::Utc::now();
crate::LAST_REFRESH
.with_label_values(&[&self.scope.basearch, &self.scope.stream])
.with_label_values(&[&arch, &self.stream])
.set(refresh_timestamp.timestamp());
crate::GRAPH_FINAL_EDGES
.with_label_values(&[&self.scope.basearch, &self.scope.stream])
.with_label_values(&[&arch, &self.stream])
.set(graph.edges.len() as i64);
crate::GRAPH_FINAL_RELEASES
.with_label_values(&[&self.scope.basearch, &self.scope.stream])
.with_label_values(&[&arch, &self.stream])
.set(graph.nodes.len() as i64);

log::trace!(
"cached graph for {}/{}: releases={}, edges={}",
self.scope.basearch,
self.scope.stream,
&arch,
self.stream,
graph.nodes.len(),
graph.edges.len()
);

self.graphs.insert(arch, Bytes::from(data));
Ok(())
}
}
Expand All @@ -150,13 +172,17 @@ impl Handler<RefreshTick> for Scraper {

fn handle(&mut self, _msg: RefreshTick, _ctx: &mut Self::Context) -> Self::Result {
crate::UPSTREAM_SCRAPES
.with_label_values(&[&self.scope.basearch, &self.scope.stream])
.with_label_values(&[&self.stream])
.inc();

let latest_graph = self.assemble_graph();
let update_graph = actix::fut::wrap_future::<_, Self>(latest_graph)
.map(|graph, actor, _ctx| {
let res = graph.and_then(|g| actor.update_cached_graph(g));
let latest_graphs = self.assemble_graphs();
let update_graphs = actix::fut::wrap_future::<_, Self>(latest_graphs)
.map(|graphs, actor, _ctx| {
let res: Result<(), Error> = graphs.and_then(|g| {
g.into_iter()
.map(|(arch, graph)| actor.update_cached_graph(arch, graph))
.collect()
});
if let Err(e) = res {
log::error!("transient scraping failure: {}", e);
};
Expand All @@ -167,7 +193,7 @@ impl Handler<RefreshTick> for Scraper {
actix::fut::ok(())
});

Box::new(update_graph)
Box::new(update_graphs)
}
}

Expand All @@ -185,24 +211,24 @@ impl Handler<GetCachedGraph> for Scraper {
fn handle(&mut self, msg: GetCachedGraph, _ctx: &mut Self::Context) -> Self::Result {
use failure::format_err;

if msg.scope.basearch != self.scope.basearch {
return Box::new(actix::fut::err(format_err!(
"unexpected basearch '{}'",
msg.scope.basearch
)));
}
if msg.scope.stream != self.scope.stream {
if msg.scope.stream != self.stream {
return Box::new(actix::fut::err(format_err!(
"unexpected stream '{}'",
msg.scope.stream
)));
}
if let Some(graph) = self.graphs.get(&msg.scope.basearch) {
crate::CACHED_GRAPH_REQUESTS
.with_label_values(&[&msg.scope.basearch, &msg.scope.stream])
.inc();

crate::CACHED_GRAPH_REQUESTS
.with_label_values(&[&self.scope.basearch, &self.scope.stream])
.inc();

Box::new(actix::fut::ok(self.graph.clone()))
Box::new(actix::fut::ok(graph.clone()))
} else {
return Box::new(actix::fut::err(format_err!(
"unexpected basearch '{}'",
msg.scope.basearch
)));
}
}
}

Expand Down
33 changes: 9 additions & 24 deletions fcos-graph-builder/src/settings.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::config::FileConfig;
use commons::graph::GraphScope;
use failure::Fallible;
use std::collections::BTreeSet;
use std::collections::BTreeMap;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};

/// Runtime settings for the graph-builder.
Expand All @@ -25,28 +24,20 @@ pub struct ServiceSettings {
pub(crate) origin_allowlist: Option<Vec<String>>,
pub(crate) ip_addr: IpAddr,
pub(crate) port: u16,
pub(crate) scopes: BTreeSet<GraphScope>,
// stream --> set of valid arches for it
pub(crate) streams: BTreeMap<&'static str, &'static [&'static str]>,
}

impl ServiceSettings {
/// Default IP address for graph-builder main service.
const DEFAULT_GB_SERVICE_ADDR: Ipv4Addr = Ipv4Addr::UNSPECIFIED;
/// Default TCP port for graph-builder main service.
const DEFAULT_GB_SERVICE_PORT: u16 = 8080;
/// Default scopes (basearch plus stream) to process.
const DEFAULT_SCOPES: [(&'static str, &'static str); 12] = [
("aarch64", "next"),
("aarch64", "stable"),
("aarch64", "testing"),
("ppc64le", "next"),
("ppc64le", "stable"),
("ppc64le", "testing"),
("s390x", "next"),
("s390x", "stable"),
("s390x", "testing"),
("x86_64", "next"),
("x86_64", "stable"),
("x86_64", "testing"),
/// Default streams and their basearches to process.
const DEFAULT_STREAMS: [(&'static str, &'static [&'static str]); 3] = [
("stable", &["x86_64", "aarch64", "s390x", "ppc64le"]),
("testing", &["x86_64", "aarch64", "s390x", "ppc64le"]),
("next", &["x86_64", "aarch64", "s390x", "ppc64le"]),
];

pub fn socket_addr(&self) -> SocketAddr {
Expand All @@ -60,13 +51,7 @@ impl Default for ServiceSettings {
origin_allowlist: None,
ip_addr: Self::DEFAULT_GB_SERVICE_ADDR.into(),
port: Self::DEFAULT_GB_SERVICE_PORT,
scopes: Self::DEFAULT_SCOPES
.iter()
.map(|(basearch, stream)| GraphScope {
basearch: basearch.to_string(),
stream: stream.to_string(),
})
.collect(),
streams: Self::DEFAULT_STREAMS.iter().map(|&t| t).collect(),
}
}
}
Expand Down

0 comments on commit 0ed1073

Please sign in to comment.