diff --git a/modules/ingestor/src/graph/product/mod.rs b/modules/ingestor/src/graph/product/mod.rs index 21ccfb93..19c9d1b9 100644 --- a/modules/ingestor/src/graph/product/mod.rs +++ b/modules/ingestor/src/graph/product/mod.rs @@ -166,7 +166,8 @@ impl Graph { cpe_key: organization_cpe_key, website: None, }; - let org = self.ingest_organization(vendor, org, connection).await?; + let org: OrganizationContext<'_> = + self.ingest_organization(vendor, org, connection).await?; product::ActiveModel { id: Default::default(), diff --git a/modules/ingestor/src/service/advisory/csaf/creator.rs b/modules/ingestor/src/service/advisory/csaf/creator.rs index 5221d059..6beea4c4 100644 --- a/modules/ingestor/src/service/advisory/csaf/creator.rs +++ b/modules/ingestor/src/service/advisory/csaf/creator.rs @@ -2,12 +2,12 @@ use crate::{ graph::{ advisory::advisory_vulnerability::{Version, VersionInfo, VersionSpec}, cpe::CpeCreator, - product::ProductInformation, + organization::{OrganizationContext, OrganizationInformation}, purl::creator::PurlCreator, Graph, }, service::{ - advisory::csaf::product_status::ProductStatus, advisory::csaf::util::ResolveProductIdCache, + advisory::csaf::{product_status::ProductStatus, util::ResolveProductIdCache}, Error, }, }; @@ -18,7 +18,8 @@ use std::collections::{hash_map::Entry, HashMap, HashSet}; use tracing::instrument; use trustify_common::{cpe::Cpe, db::chunk::EntityChunkedIter, purl::Purl}; use trustify_entity::{ - product_status, purl_status, status, version_range, version_scheme::VersionScheme, + organization, product, product_status, product_version_range, purl_status, status, + version_range, version_scheme::VersionScheme, }; use uuid::Uuid; @@ -108,6 +109,11 @@ impl<'a> StatusCreator<'a> { let mut purls = PurlCreator::new(); let mut cpes = CpeCreator::new(); + let mut org_cache: HashMap<&String, organization::Model> = HashMap::new(); + let mut products = Vec::new(); + let mut version_ranges = Vec::new(); + let mut product_version_ranges = Vec::new(); + for product in &self.products { // ensure a correct status, and get id if let Entry::Vacant(entry) = checked.entry(product.status) { @@ -120,24 +126,61 @@ impl<'a> StatusCreator<'a> { )) })?; - // Ingest product - let pr = graph - .ingest_product( - &product.product, - ProductInformation { - vendor: product.vendor.clone(), - cpe: product.cpe.clone(), - }, - connection, - ) - .await?; + // There should be only a few organizations per document, + // so simple caching should work here. + // If we find examples where this is not a case, we can switch to + // batch ingesting of organizations as well. + let org_id = match &product.vendor { + Some(vendor) => match org_cache.get(vendor) { + Some(entry) => Some(entry.id), + None => { + let organization_cpe_key = product + .cpe + .clone() + .map(|cpe| cpe.vendor().as_ref().to_string()); + let org = OrganizationInformation { + cpe_key: organization_cpe_key, + website: None, + }; + let org: OrganizationContext<'_> = + graph.ingest_organization(vendor, org, connection).await?; + org_cache.entry(vendor).or_insert(org.organization.clone()); + Some(org.organization.id) + } + }, + None => None, + }; + + // Create all product entities for batch ingesting + let product_cpe_key = product + .cpe + .clone() + .map(|cpe| cpe.product().as_ref().to_string()); + + let product_entity = product::ActiveModel { + id: Set(Uuid::now_v7()), + name: Set(product.product.clone()), + vendor_id: Set(org_id), + cpe_key: Set(product_cpe_key), + }; + products.push(product_entity.clone()); - // Ingest product range + // Create all product ranges for batch ingesting let product_version_range = match product.version { - Some(ref ver) => Some( - pr.ingest_product_version_range(ver.clone(), None, connection) - .await?, - ), + Some(ref ver) => { + let mut version_range_entity = ver.clone().into_active_model(); + version_range_entity.id = Set(Uuid::now_v7()); + version_ranges.push(version_range_entity.clone()); + + let product_version_range_entity = product_version_range::ActiveModel { + id: Set(Uuid::now_v7()), + product_id: product_entity.id, + version_range_id: version_range_entity.id, + cpe_key: Set(None), + }; + product_version_ranges.push(product_version_range_entity.clone()); + Some(product_version_range_entity) + } None => None, }; @@ -156,7 +199,7 @@ impl<'a> StatusCreator<'a> { for package in packages { let base_product = product_status::ActiveModel { id: Default::default(), - product_version_range_id: Set(range.id), + product_version_range_id: range.clone().id, advisory_id: Set(self.advisory_id), vulnerability_id: Set(self.vulnerability_id.clone()), package: Set(package), @@ -213,6 +256,22 @@ impl<'a> StatusCreator<'a> { self.create_status(connection, checked).await?; + for batch in &products.chunked() { + product::Entity::insert_many(batch).exec(connection).await?; + } + + for batch in &version_ranges.chunked() { + version_range::Entity::insert_many(batch) + .exec(connection) + .await?; + } + + for batch in &product_version_ranges.chunked() { + product_version_range::Entity::insert_many(batch) + .exec(connection) + .await?; + } + for batch in &product_statuses.chunked() { product_status::Entity::insert_many(batch) .exec(connection)