Skip to content

Commit

Permalink
agent: prune unbound collections during publish
Browse files Browse the repository at this point in the history
Updates the publications handler to automatically prune unbound collections
from the draft catalog.  This is done in order to allow the UI and the
discovers handler to just disable/enable bindings without needing to remove
collections for disabled bindings from the draft.

An "unbound" collection is defined as a draft collection spec that:

- has no pre-existing live spec
- is not a derivation
- is not used as the `source` or `target` in any drafted or live specification

During publication, unbound collections will be pruned from `live_specs`.  They
will still be validated, and can still cause the build to fail if they are
invalid.  Any pruned collections will _not_ count against the tenant's
collections quota.

As part of this, the agent was also updated to no longer use the `derivation`
property of collection specs in order to determine whether it should count
against the task quota. It now uses the `derive` property for that.
  • Loading branch information
psFried committed Sep 12, 2023
1 parent 2376bd2 commit c2a97ae
Show file tree
Hide file tree
Showing 7 changed files with 484 additions and 7 deletions.
26 changes: 26 additions & 0 deletions crates/agent-sql/src/publications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -663,3 +663,29 @@ pub async fn get_inferred_schemas(
.fetch_all(&pool)
.await
}

/// Deletes any `live_specs` (and `publication_specs`) that meet ALL of these criteria:
/// - were newly created by this (as yet uncommitted) publication
/// - are not used as the `source` or `target` of any enabled bindings
/// - are not derviations
///
/// Note that `publication_specs` are deleted due to the `on delete cascade` constraint
/// on that table.
pub async fn prune_unbound_collections(
pub_id: Id,
txn: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> sqlx::Result<Vec<String>> {
let res = sqlx::query!(r#"
delete from live_specs l
where l.spec_type = 'collection'
and l.last_pub_id = $1
and l.created_at = now()
and l.spec->'derive' is null
and (select 1 from live_spec_flows lsf where l.id = lsf.source_id or l.id = lsf.target_id) is null
returning l.catalog_name
"#, pub_id as Id)
.fetch_all(txn)
.await?;

Ok(res.into_iter().map(|r| r.catalog_name).collect())
}
21 changes: 20 additions & 1 deletion crates/agent/src/publications.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::HashSet;

use self::builds::IncompatibleCollection;
use self::validation::ControlPlane;
use super::{
Expand Down Expand Up @@ -34,6 +36,11 @@ pub enum JobStatus {
#[serde(skip_serializing_if = "Vec::is_empty")]
linked_materialization_publications: Vec<Id>,
},
/// Returned when there are no draft specs (after pruning unbound
/// collections). There will not be any `draft_errors` in this case, because
/// there's no `catalog_name` to associate with an error. And it may not be
/// desirable to treat this as an error, depending on the scenario.
EmptyDraft,
}

impl JobStatus {
Expand Down Expand Up @@ -218,6 +225,17 @@ impl PublishHandler {
.with_context(|| format!("applying spec updates for {}", spec_row.catalog_name))?;
}

let pruned_collections =
agent_sql::publications::prune_unbound_collections(row.pub_id, txn).await?;
if !pruned_collections.is_empty() {
tracing::info!(?pruned_collections, "pruned unbound collections");
}
let pruned_collections = pruned_collections.into_iter().collect::<HashSet<_>>();

if spec_rows.len() - pruned_collections.len() == 0 {
return stop_with_errors(Vec::new(), JobStatus::EmptyDraft, row, txn).await;
}

let errors = specs::enforce_resource_quotas(&spec_rows, prev_quota_usage, txn).await?;
if !errors.is_empty() {
return stop_with_errors(errors, JobStatus::build_failed(Vec::new()), row, txn).await;
Expand Down Expand Up @@ -375,7 +393,7 @@ impl PublishHandler {
}

// Add built specs to the live spec when publishing a build.
specs::add_built_specs_to_live_specs(&spec_rows, &build_output, txn)
specs::add_built_specs_to_live_specs(&spec_rows, &pruned_collections, &build_output, txn)
.await
.context("adding built specs to live specs")?;

Expand All @@ -389,6 +407,7 @@ impl PublishHandler {
&self.logs_tx,
row.pub_id,
&spec_rows,
&pruned_collections,
)
.await
.context("deploying build")?;
Expand Down
13 changes: 10 additions & 3 deletions crates/agent/src/publications/builds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use proto_flow::{
};
use serde::{Deserialize, Serialize};
use sqlx::types::Uuid;
use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashSet};
use std::io::Write;
use std::path;

Expand Down Expand Up @@ -384,15 +384,22 @@ pub async fn deploy_build(
logs_tx: &logs::Tx,
pub_id: Id,
spec_rows: &[SpecRow],
pruned_collections: &HashSet<String>,
) -> anyhow::Result<Vec<Error>> {
let mut errors = Vec::new();

let spec_rows = spec_rows
.iter()
// Filter specs which are tests, or are deletions of already-deleted specs.
// Filter specs to be activated
.filter(|r| match (r.live_type, r.draft_type) {
(None, None) => false, // Before and after are both deleted.
// Before and after are both deleted.
(None, None) => false,
// Tests don't need activated
(Some(CatalogType::Test), _) | (_, Some(CatalogType::Test)) => false,
// Collections may have been pruned if they are not bound to anything
(Some(_), Some(CatalogType::Collection)) => {
!pruned_collections.contains(r.catalog_name.as_str())
}
_ => true,
});

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
---
source: crates/agent/src/publications/specs.rs
expression: results
---
[
ScenarioResult {
draft_id: 1110000000000000,
status: Success {
linked_materialization_publications: [],
},
errors: [],
live_specs: [
LiveSpec {
catalog_name: "acmeCo/CaptureA",
connector_image_name: Some(
"allowed_connector",
),
connector_image_tag: Some(
"",
),
reads_from: None,
writes_to: Some(
[
"acmeCo/should_stay",
],
),
spec: Some(
Object {
"bindings": Array [
Object {
"resource": Object {
"thingy": String("foo"),
},
"target": String("acmeCo/should_stay"),
},
Object {
"disable": Bool(true),
"resource": Object {
"thingy": String("foo"),
},
"target": String("acmeCo/should_stay2"),
},
Object {
"disable": Bool(true),
"resource": Object {
"thingy": String("foo"),
},
"target": String("acmeCo/should_stay3"),
},
Object {
"disable": Bool(true),
"resource": Object {
"thingy": String("bar"),
},
"target": String("acmeCo/should_prune"),
},
],
"endpoint": Object {
"connector": Object {
"config": Object {},
"image": String("allowed_connector"),
},
},
},
),
spec_type: Some(
"capture",
),
},
LiveSpec {
catalog_name: "acmeCo/should_stay",
connector_image_name: None,
connector_image_tag: None,
reads_from: None,
writes_to: None,
spec: Some(
Object {
"key": Array [
String("/id"),
],
"schema": Object {
"type": String("object"),
},
},
),
spec_type: Some(
"collection",
),
},
LiveSpec {
catalog_name: "acmeCo/should_stay2",
connector_image_name: None,
connector_image_tag: None,
reads_from: None,
writes_to: None,
spec: Some(
Object {
"key": Array [
String("/id"),
],
"schema": Object {
"type": String("object"),
},
},
),
spec_type: Some(
"collection",
),
},
LiveSpec {
catalog_name: "acmeCo/should_stay3",
connector_image_name: None,
connector_image_tag: None,
reads_from: None,
writes_to: None,
spec: Some(
Object {
"key": Array [
String("/id"),
],
"schema": Object {
"type": String("object"),
},
},
),
spec_type: Some(
"collection",
),
},
LiveSpec {
catalog_name: "acmeCo/should_stay4",
connector_image_name: None,
connector_image_tag: None,
reads_from: None,
writes_to: None,
spec: Some(
Object {
"key": Array [
String("/id"),
],
"schema": Object {
"type": String("object"),
},
},
),
spec_type: Some(
"collection",
),
},
LiveSpec {
catalog_name: "acmeCo/should_stay5",
connector_image_name: None,
connector_image_tag: None,
reads_from: None,
writes_to: None,
spec: Some(
Object {
"key": Array [
String("/id"),
],
"schema": Object {
"type": String("object"),
},
},
),
spec_type: Some(
"collection",
),
},
LiveSpec {
catalog_name: "acmeCo/should_stay6",
connector_image_name: None,
connector_image_tag: None,
reads_from: Some(
[],
),
writes_to: None,
spec: Some(
Object {
"derive": Object {
"transforms": Array [],
"using": Object {
"sqlite": Object {},
},
},
"key": Array [
String("/id"),
],
"schema": Object {
"type": String("object"),
},
},
),
spec_type: Some(
"collection",
),
},
],
},
]
Loading

0 comments on commit c2a97ae

Please sign in to comment.