Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: optimize import of product related entities during csaf ingestion #1109

Merged
merged 1 commit into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion modules/ingestor/src/graph/product/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ impl Graph {
cpe_key: organization_cpe_key,
website: None,
};
let org = self.ingest_organization(vendor, org, connection).await?;
let org: OrganizationContext<'_> =
self.ingest_organization(vendor, org, connection).await?;

product::ActiveModel {
id: Default::default(),
Expand Down
99 changes: 79 additions & 20 deletions modules/ingestor/src/service/advisory/csaf/creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ use crate::{
graph::{
advisory::advisory_vulnerability::{Version, VersionInfo, VersionSpec},
cpe::CpeCreator,
product::ProductInformation,
organization::{OrganizationContext, OrganizationInformation},
purl::creator::PurlCreator,
Graph,
},
service::{
advisory::csaf::product_status::ProductStatus, advisory::csaf::util::ResolveProductIdCache,
advisory::csaf::{product_status::ProductStatus, util::ResolveProductIdCache},
Error,
},
};
Expand All @@ -18,7 +18,8 @@ use std::collections::{hash_map::Entry, HashMap, HashSet};
use tracing::instrument;
use trustify_common::{cpe::Cpe, db::chunk::EntityChunkedIter, purl::Purl};
use trustify_entity::{
product_status, purl_status, status, version_range, version_scheme::VersionScheme,
organization, product, product_status, product_version_range, purl_status, status,
version_range, version_scheme::VersionScheme,
};
use uuid::Uuid;

Expand Down Expand Up @@ -108,6 +109,11 @@ impl<'a> StatusCreator<'a> {
let mut purls = PurlCreator::new();
let mut cpes = CpeCreator::new();

let mut org_cache: HashMap<&String, organization::Model> = HashMap::new();
let mut products = Vec::new();
let mut version_ranges = Vec::new();
let mut product_version_ranges = Vec::new();

for product in &self.products {
// ensure a correct status, and get id
if let Entry::Vacant(entry) = checked.entry(product.status) {
Expand All @@ -120,24 +126,61 @@ impl<'a> StatusCreator<'a> {
))
})?;

// Ingest product
let pr = graph
.ingest_product(
&product.product,
ProductInformation {
vendor: product.vendor.clone(),
cpe: product.cpe.clone(),
},
connection,
)
.await?;
// There should be only a few organizations per document,
// so simple caching should work here.
// If we find examples where this is not a case, we can switch to
// batch ingesting of organizations as well.
let org_id = match &product.vendor {
Some(vendor) => match org_cache.get(vendor) {
Some(entry) => Some(entry.id),
None => {
let organization_cpe_key = product
.cpe
.clone()
.map(|cpe| cpe.vendor().as_ref().to_string());
let org = OrganizationInformation {
cpe_key: organization_cpe_key,
website: None,
};
let org: OrganizationContext<'_> =
graph.ingest_organization(vendor, org, connection).await?;
org_cache.entry(vendor).or_insert(org.organization.clone());
Some(org.organization.id)
}
},
None => None,
};

// Create all product entities for batch ingesting
let product_cpe_key = product
.cpe
.clone()
.map(|cpe| cpe.product().as_ref().to_string());

let product_entity = product::ActiveModel {
id: Set(Uuid::now_v7()),
name: Set(product.product.clone()),
vendor_id: Set(org_id),
cpe_key: Set(product_cpe_key),
};
products.push(product_entity.clone());

// Ingest product range
// Create all product ranges for batch ingesting
let product_version_range = match product.version {
Some(ref ver) => Some(
pr.ingest_product_version_range(ver.clone(), None, connection)
.await?,
),
Some(ref ver) => {
let mut version_range_entity = ver.clone().into_active_model();
version_range_entity.id = Set(Uuid::now_v7());
version_ranges.push(version_range_entity.clone());

let product_version_range_entity = product_version_range::ActiveModel {
id: Set(Uuid::now_v7()),
product_id: product_entity.id,
version_range_id: version_range_entity.id,
cpe_key: Set(None),
};
product_version_ranges.push(product_version_range_entity.clone());
Some(product_version_range_entity)
}
None => None,
};

Expand All @@ -156,7 +199,7 @@ impl<'a> StatusCreator<'a> {
for package in packages {
let base_product = product_status::ActiveModel {
id: Default::default(),
product_version_range_id: Set(range.id),
product_version_range_id: range.clone().id,
advisory_id: Set(self.advisory_id),
vulnerability_id: Set(self.vulnerability_id.clone()),
package: Set(package),
Expand Down Expand Up @@ -213,6 +256,22 @@ impl<'a> StatusCreator<'a> {

self.create_status(connection, checked).await?;

for batch in &products.chunked() {
product::Entity::insert_many(batch).exec(connection).await?;
}

for batch in &version_ranges.chunked() {
version_range::Entity::insert_many(batch)
.exec(connection)
.await?;
}

for batch in &product_version_ranges.chunked() {
product_version_range::Entity::insert_many(batch)
.exec(connection)
.await?;
}

for batch in &product_statuses.chunked() {
product_status::Entity::insert_many(batch)
.exec(connection)
Expand Down
Loading