From c2a97aec283c2f8cd90676e08041c57d08753120 Mon Sep 17 00:00:00 2001 From: Phil Date: Tue, 12 Sep 2023 09:18:01 -0400 Subject: [PATCH] agent: prune unbound collections during publish Updates the publications handler to automatically prune unbound collections from the draft catalog. This is done in order to allow the UI and the discovers handler to just disable/enable bindings without needing to remove collections for disabled bindings from the draft. An "unbound" collection is defined as a draft collection spec that: - has no pre-existing live spec - is not a derivation - is not used as the `source` or `target` in any drafted or live specification During publication, unbound collections will be pruned from `live_specs`. They will still be validated, and can still cause the build to fail if they are invalid. Any pruned collections will _not_ count against the tenant's collections quota. As part of this, the agent was also updated to no longer use the `derivation` property of collection specs in order to determine whether it should count against the task quota. It now uses the `derive` property for that. --- crates/agent-sql/src/publications.rs | 26 +++ crates/agent/src/publications.rs | 21 +- crates/agent/src/publications/builds.rs | 13 +- ...prune_unbound_collections_publication.snap | 200 ++++++++++++++++++ crates/agent/src/publications/specs.rs | 96 ++++++++- .../test_resources/prune_collections.sql | 122 +++++++++++ .../27_cascade_publication_specs.sql | 13 ++ 7 files changed, 484 insertions(+), 7 deletions(-) create mode 100644 crates/agent/src/publications/snapshots/agent__publications__specs__test__prune_unbound_collections_publication.snap create mode 100644 crates/agent/src/publications/test_resources/prune_collections.sql create mode 100644 supabase/migrations/27_cascade_publication_specs.sql diff --git a/crates/agent-sql/src/publications.rs b/crates/agent-sql/src/publications.rs index 3769c68494..cf9f365e86 100644 --- a/crates/agent-sql/src/publications.rs +++ b/crates/agent-sql/src/publications.rs @@ -663,3 +663,29 @@ pub async fn get_inferred_schemas( .fetch_all(&pool) .await } + +/// Deletes any `live_specs` (and `publication_specs`) that meet ALL of these criteria: +/// - were newly created by this (as yet uncommitted) publication +/// - are not used as the `source` or `target` of any enabled bindings +/// - are not derviations +/// +/// Note that `publication_specs` are deleted due to the `on delete cascade` constraint +/// on that table. +pub async fn prune_unbound_collections( + pub_id: Id, + txn: &mut sqlx::Transaction<'_, sqlx::Postgres>, +) -> sqlx::Result> { + let res = sqlx::query!(r#" + delete from live_specs l + where l.spec_type = 'collection' + and l.last_pub_id = $1 + and l.created_at = now() + and l.spec->'derive' is null + and (select 1 from live_spec_flows lsf where l.id = lsf.source_id or l.id = lsf.target_id) is null + returning l.catalog_name + "#, pub_id as Id) + .fetch_all(txn) + .await?; + + Ok(res.into_iter().map(|r| r.catalog_name).collect()) +} diff --git a/crates/agent/src/publications.rs b/crates/agent/src/publications.rs index 91e8a73a1d..ef04853fc6 100644 --- a/crates/agent/src/publications.rs +++ b/crates/agent/src/publications.rs @@ -1,3 +1,5 @@ +use std::collections::HashSet; + use self::builds::IncompatibleCollection; use self::validation::ControlPlane; use super::{ @@ -34,6 +36,11 @@ pub enum JobStatus { #[serde(skip_serializing_if = "Vec::is_empty")] linked_materialization_publications: Vec, }, + /// Returned when there are no draft specs (after pruning unbound + /// collections). There will not be any `draft_errors` in this case, because + /// there's no `catalog_name` to associate with an error. And it may not be + /// desirable to treat this as an error, depending on the scenario. + EmptyDraft, } impl JobStatus { @@ -218,6 +225,17 @@ impl PublishHandler { .with_context(|| format!("applying spec updates for {}", spec_row.catalog_name))?; } + let pruned_collections = + agent_sql::publications::prune_unbound_collections(row.pub_id, txn).await?; + if !pruned_collections.is_empty() { + tracing::info!(?pruned_collections, "pruned unbound collections"); + } + let pruned_collections = pruned_collections.into_iter().collect::>(); + + if spec_rows.len() - pruned_collections.len() == 0 { + return stop_with_errors(Vec::new(), JobStatus::EmptyDraft, row, txn).await; + } + let errors = specs::enforce_resource_quotas(&spec_rows, prev_quota_usage, txn).await?; if !errors.is_empty() { return stop_with_errors(errors, JobStatus::build_failed(Vec::new()), row, txn).await; @@ -375,7 +393,7 @@ impl PublishHandler { } // Add built specs to the live spec when publishing a build. - specs::add_built_specs_to_live_specs(&spec_rows, &build_output, txn) + specs::add_built_specs_to_live_specs(&spec_rows, &pruned_collections, &build_output, txn) .await .context("adding built specs to live specs")?; @@ -389,6 +407,7 @@ impl PublishHandler { &self.logs_tx, row.pub_id, &spec_rows, + &pruned_collections, ) .await .context("deploying build")?; diff --git a/crates/agent/src/publications/builds.rs b/crates/agent/src/publications/builds.rs index afc37db571..d9c718cec6 100644 --- a/crates/agent/src/publications/builds.rs +++ b/crates/agent/src/publications/builds.rs @@ -9,7 +9,7 @@ use proto_flow::{ }; use serde::{Deserialize, Serialize}; use sqlx::types::Uuid; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashSet}; use std::io::Write; use std::path; @@ -384,15 +384,22 @@ pub async fn deploy_build( logs_tx: &logs::Tx, pub_id: Id, spec_rows: &[SpecRow], + pruned_collections: &HashSet, ) -> anyhow::Result> { let mut errors = Vec::new(); let spec_rows = spec_rows .iter() - // Filter specs which are tests, or are deletions of already-deleted specs. + // Filter specs to be activated .filter(|r| match (r.live_type, r.draft_type) { - (None, None) => false, // Before and after are both deleted. + // Before and after are both deleted. + (None, None) => false, + // Tests don't need activated (Some(CatalogType::Test), _) | (_, Some(CatalogType::Test)) => false, + // Collections may have been pruned if they are not bound to anything + (Some(_), Some(CatalogType::Collection)) => { + !pruned_collections.contains(r.catalog_name.as_str()) + } _ => true, }); diff --git a/crates/agent/src/publications/snapshots/agent__publications__specs__test__prune_unbound_collections_publication.snap b/crates/agent/src/publications/snapshots/agent__publications__specs__test__prune_unbound_collections_publication.snap new file mode 100644 index 0000000000..c0e4f17429 --- /dev/null +++ b/crates/agent/src/publications/snapshots/agent__publications__specs__test__prune_unbound_collections_publication.snap @@ -0,0 +1,200 @@ +--- +source: crates/agent/src/publications/specs.rs +expression: results +--- +[ + ScenarioResult { + draft_id: 1110000000000000, + status: Success { + linked_materialization_publications: [], + }, + errors: [], + live_specs: [ + LiveSpec { + catalog_name: "acmeCo/CaptureA", + connector_image_name: Some( + "allowed_connector", + ), + connector_image_tag: Some( + "", + ), + reads_from: None, + writes_to: Some( + [ + "acmeCo/should_stay", + ], + ), + spec: Some( + Object { + "bindings": Array [ + Object { + "resource": Object { + "thingy": String("foo"), + }, + "target": String("acmeCo/should_stay"), + }, + Object { + "disable": Bool(true), + "resource": Object { + "thingy": String("foo"), + }, + "target": String("acmeCo/should_stay2"), + }, + Object { + "disable": Bool(true), + "resource": Object { + "thingy": String("foo"), + }, + "target": String("acmeCo/should_stay3"), + }, + Object { + "disable": Bool(true), + "resource": Object { + "thingy": String("bar"), + }, + "target": String("acmeCo/should_prune"), + }, + ], + "endpoint": Object { + "connector": Object { + "config": Object {}, + "image": String("allowed_connector"), + }, + }, + }, + ), + spec_type: Some( + "capture", + ), + }, + LiveSpec { + catalog_name: "acmeCo/should_stay", + connector_image_name: None, + connector_image_tag: None, + reads_from: None, + writes_to: None, + spec: Some( + Object { + "key": Array [ + String("/id"), + ], + "schema": Object { + "type": String("object"), + }, + }, + ), + spec_type: Some( + "collection", + ), + }, + LiveSpec { + catalog_name: "acmeCo/should_stay2", + connector_image_name: None, + connector_image_tag: None, + reads_from: None, + writes_to: None, + spec: Some( + Object { + "key": Array [ + String("/id"), + ], + "schema": Object { + "type": String("object"), + }, + }, + ), + spec_type: Some( + "collection", + ), + }, + LiveSpec { + catalog_name: "acmeCo/should_stay3", + connector_image_name: None, + connector_image_tag: None, + reads_from: None, + writes_to: None, + spec: Some( + Object { + "key": Array [ + String("/id"), + ], + "schema": Object { + "type": String("object"), + }, + }, + ), + spec_type: Some( + "collection", + ), + }, + LiveSpec { + catalog_name: "acmeCo/should_stay4", + connector_image_name: None, + connector_image_tag: None, + reads_from: None, + writes_to: None, + spec: Some( + Object { + "key": Array [ + String("/id"), + ], + "schema": Object { + "type": String("object"), + }, + }, + ), + spec_type: Some( + "collection", + ), + }, + LiveSpec { + catalog_name: "acmeCo/should_stay5", + connector_image_name: None, + connector_image_tag: None, + reads_from: None, + writes_to: None, + spec: Some( + Object { + "key": Array [ + String("/id"), + ], + "schema": Object { + "type": String("object"), + }, + }, + ), + spec_type: Some( + "collection", + ), + }, + LiveSpec { + catalog_name: "acmeCo/should_stay6", + connector_image_name: None, + connector_image_tag: None, + reads_from: Some( + [], + ), + writes_to: None, + spec: Some( + Object { + "derive": Object { + "transforms": Array [], + "using": Object { + "sqlite": Object {}, + }, + }, + "key": Array [ + String("/id"), + ], + "schema": Object { + "type": String("object"), + }, + }, + ), + spec_type: Some( + "collection", + ), + }, + ], + }, +] diff --git a/crates/agent/src/publications/specs.rs b/crates/agent/src/publications/specs.rs index 19d2595b28..52beb43949 100644 --- a/crates/agent/src/publications/specs.rs +++ b/crates/agent/src/publications/specs.rs @@ -5,7 +5,7 @@ use agent_sql::{Capability, CatalogType, Id}; use anyhow::Context; use itertools::Itertools; use sqlx::types::Uuid; -use std::collections::{BTreeMap, HashMap}; +use std::collections::{BTreeMap, HashMap, HashSet}; // resolve_specifications returns the definitive set of specifications which // are changing in this publication. It obtains sufficient locks to ensure @@ -315,6 +315,8 @@ pub fn validate_transition( } } +/// Note that `spec_rows` may contain `live_spec_id`s that have already been deleted +/// due to being unbound collections, which have been pruned. pub async fn enforce_resource_quotas( spec_rows: &[SpecRow], prev_tenants: Vec, @@ -470,13 +472,17 @@ pub async fn apply_updates_for_row( // the list of spec_rows. pub async fn add_built_specs_to_live_specs( spec_rows: &[SpecRow], + pruned_collections: &HashSet, build_output: &builds::BuildOutput, txn: &mut sqlx::Transaction<'_, sqlx::Postgres>, ) -> Result<(), sqlx::Error> { for collection in build_output.built_collections.iter() { + // Note that only non-pruned collections must be updated as part of this function. + // Pruned collections will already have had their live_specs rows deleted. if let Some(row) = spec_rows .iter() .find(|r| r.catalog_name == collection.collection.as_str()) + .filter(|r| !pruned_collections.contains(r.catalog_name.as_str())) { agent_sql::publications::add_built_specs(row.live_spec_id, &collection.spec, txn) .await?; @@ -1235,7 +1241,12 @@ mod test { ('1111000000000000', '1110000000000000', 'usageB/CaptureC', '{ "bindings": [{"target": "usageB/CaptureC", "resource": {"binding": "foo", "syncMode": "incremental"}}], "endpoint": {"connector": {"image": "foo", "config": {}}} - }'::json, 'capture') + }'::json, 'capture'), + -- This collection should be pruned, and thus _not_ count against the quota of 2 collections. + ('1111200000000000', '1110000000000000', 'usageB/UnboundCollection', '{ + "schema": {}, + "key": ["/id"] + }'::json, 'collection') ), p6 as ( insert into publications (id, job_status, user_id, draft_id) values @@ -1302,7 +1313,14 @@ mod test { ), p5 as ( insert into draft_specs (id, draft_id, catalog_name, spec, spec_type) values - ('1112000000000000', '1120000000000000', 'usageB/DerivationA', '{"schema": {}, "key": ["foo"], "derive": {"using": {"sqlite": {}}, "transforms":[{"name": "a-transform", "source": "usageB/CollectionA"}]}}'::json, 'collection') + ('1112000000000000', '1120000000000000', 'usageB/DerivationA', '{ + "schema": {}, + "key": ["foo"], + "derive": { + "using":{"typescript": {"module": "foo.ts"}}, + "transforms": [{"source":"usageB/CollectionA","shuffle":"any","name":"foo"}] + } + }'::json, 'collection') ), p6 as ( insert into publications (id, job_status, user_id, draft_id) values @@ -1456,4 +1474,76 @@ mod test { ] "###); } + + #[tokio::test] + #[serial_test::parallel] + async fn test_prune_unbound_collections_publication() { + let mut conn = sqlx::postgres::PgConnection::connect(&FIXED_DATABASE_URL) + .await + .unwrap(); + let mut txn = conn.begin().await.unwrap(); + + sqlx::query(include_str!("test_resources/prune_collections.sql")) + .execute(&mut txn) + .await + .unwrap(); + + let results = execute_publications(&mut txn).await; + insta::assert_debug_snapshot!(results); + } + + #[tokio::test] + #[serial_test::parallel] + async fn test_publish_error_when_all_collections_are_pruned() { + let mut conn = sqlx::postgres::PgConnection::connect(&FIXED_DATABASE_URL) + .await + .unwrap(); + let mut txn = conn.begin().await.unwrap(); + + sqlx::query(r#" + with setup_user as ( + insert into auth.users (id) values + ('43a18a3e-5a59-11ed-9b6a-0242ac120002') + ), + setup_user_grants as ( + insert into user_grants (user_id, object_role, capability) values + ('43a18a3e-5a59-11ed-9b6a-0242ac120002', 'acmeCo/', 'admin') + ), + setup_role_grants as ( + insert into role_grants (subject_role, object_role, capability) values + ('acmeCo/', 'acmeCo/', 'admin') + ), + setup_draft as ( + insert into drafts (id, user_id) values + ('1111000000000000', '43a18a3e-5a59-11ed-9b6a-0242ac120002') + ), + setup_draft_specs as ( + insert into draft_specs (id, draft_id, catalog_name, spec, spec_type) values + ('1111111111111111', '1111000000000000', 'acmeCo/should_prune', '{ + "schema": { "type": "object" }, + "key": ["/id"] + }', 'collection') + ), + setup_publications as ( + insert into publications (id, job_status, user_id, draft_id) values + ('1111100000000000', '{"type": "queued"}'::json, '43a18a3e-5a59-11ed-9b6a-0242ac120002', '1111000000000000') + ) + select 1; + "#) + .execute(&mut txn) + .await + .unwrap(); + + let results = execute_publications(&mut txn).await; + insta::assert_debug_snapshot!(results, @r###" + [ + ScenarioResult { + draft_id: 1111000000000000, + status: EmptyDraft, + errors: [], + live_specs: [], + }, + ] + "###); + } } diff --git a/crates/agent/src/publications/test_resources/prune_collections.sql b/crates/agent/src/publications/test_resources/prune_collections.sql new file mode 100644 index 0000000000..8002d311a0 --- /dev/null +++ b/crates/agent/src/publications/test_resources/prune_collections.sql @@ -0,0 +1,122 @@ +with setup_user as ( + insert into auth.users (id) values + ('43a18a3e-5a59-11ed-9b6a-0242ac120002') +), +setup_user_grants as ( + insert into user_grants (user_id, object_role, capability) values + ('43a18a3e-5a59-11ed-9b6a-0242ac120002', 'acmeCo/', 'admin') +), +setup_role_grants as ( + insert into role_grants (subject_role, object_role, capability) values + ('acmeCo/', 'acmeCo/', 'admin') +), +setup_tenant as ( + -- Set the tenant collections quota such that publication would fail due to being over quota + -- if the `should_prune` collection is not pruned from the publication. + insert into tenants (tenant, collections_quota) values ('acmeCo/', 7) +), +setup_draft as ( + insert into drafts (id, user_id) values + ('1110000000000000', '43a18a3e-5a59-11ed-9b6a-0242ac120002') +), +setup_draft_specs as ( + insert into draft_specs (id, draft_id, catalog_name, spec, spec_type) values + ('1111000000000000', '1110000000000000', 'acmeCo/CaptureA', '{ + "bindings": [ + {"target": "acmeCo/should_stay", "resource": {"thingy": "foo"}}, + {"target": "acmeCo/should_stay2", "disable": true, "resource": {"thingy": "foo"}}, + {"target": "acmeCo/should_stay3", "disable": true, "resource": {"thingy": "foo"}}, + {"target": "acmeCo/should_prune", "disable": true, "resource": {"thingy": "bar"}} + ], + "endpoint": {"connector": {"image": "allowed_connector", "config": {}}} + }'::json, 'capture'), + -- should not be pruned because a drafted capture writes to it + ('1111110000000000', '1110000000000000', 'acmeCo/should_stay', '{ + "schema": { "type": "object" }, + "key": ["/id"] + }', 'collection'), + -- should not be pruned because a live materialization reads from it + ('1111111000000000', '1110000000000000', 'acmeCo/should_stay2', '{ + "schema": { "type": "object" }, + "key": ["/id"] + }', 'collection'), + -- should not be pruned because a live capture writes to it + ('1111111100000000', '1110000000000000', 'acmeCo/should_stay3', '{ + "schema": { "type": "object" }, + "key": ["/id"] + }', 'collection'), + -- this one should not be pruned because a live derivation reads it + ('1111111110000000', '1110000000000000', 'acmeCo/should_stay4', '{ + "schema": { "type": "object" }, + "key": ["/id"] + }', 'collection'), + -- should not be pruned because there is a corresponding live_specs row + ('1111111111000000', '1110000000000000', 'acmeCo/should_stay5', '{ + "schema": { "type": "object" }, + "key": ["/id"] + }', 'collection'), + -- should not be pruned because it's a derivation + ('1111111111100000', '1110000000000000', 'acmeCo/should_stay6', '{ + "schema": { "type": "object" }, + "key": ["/id"], + "derive": { + "using": {"sqlite": {}}, + "transforms": [] + } + }', 'collection'), + -- Prune on, oh pruny one + ('1111111111110000', '1110000000000000', 'acmeCo/should_prune', '{ + "schema": { "type": "object" }, + "key": ["/id"] + }', 'collection') +), +setup_live_specs as ( + insert into live_specs (id, catalog_name, spec_type, reads_from, writes_to, spec, created_at) values + ('2222222222222220', 'acmeCo/a/materialization', 'materialization', '{acmeCo/should_stay2}', null, '{ + "endpoint": { "connector": { "image": "allowed_connector", "config": {}} }, + "bindings": [{"source": "acmeCo/should_stay2", "resource": {"thingy": "foo"}}] + }', '2022-02-02T02:22:22Z'), + ('2222222222222200', 'acmeCo/a/capture', 'capture', null, '{acmeCo/should_stay3}', '{ + "endpoint": { "connector": { "image": "allowed_connector", "config": {}} }, + "bindings": [{"resource": {"thingy": "foo"}, "target": "acmeCo/should_stay3"}] + }', '2022-02-02T02:22:22Z'), + ('2222222222222000', 'acmeCo/a/derivation', 'collection', '{acmeCo/should_stay4}', null, '{ + "schema": { "type": "object" }, + "key": ["/id"], + "derive": { + "using": { "sqlite": {}}, + "transforms": [{"name": "foo", "source": "acmeCo/should_stay4", "lambda": "select 1;"}] + } + }', '2022-02-02T02:22:22Z'), + ('2222222222220000', 'acmeCo/should_stay2', 'collection', null, null, '{ + "schema": { "type": "object" }, + "key": ["/id"] + }', '2022-02-02T02:22:22Z'), + ('2222222222200000', 'acmeCo/should_stay3', 'collection', null, null, '{ + "schema": { "type": "object" }, + "key": ["/id"] + }', '2022-02-02T02:22:22Z'), + ('2222222222000000', 'acmeCo/should_stay4', 'collection', null, null, '{ + "schema": { "type": "object" }, + "key": ["/id"] + }', '2022-02-02T02:22:22Z'), + ('2222222220000000', 'acmeCo/should_stay5', 'collection', null, null, '{ + "schema": { "type": "object" }, + "key": ["/id"] + }', '2022-02-02T02:22:22Z') +), +setup_live_spec_flows as ( + insert into live_spec_flows (source_id, target_id, flow_type) values + ('2222222222220000', '2222222222222220', 'materialization'), + ('2222222222222200', '2222222222200000', 'capture'), + ('2222222222000000', '2222222222222000', 'collection') +), +setup_publications as ( + insert into publications (id, job_status, user_id, draft_id) values + ('1111100000000000', '{"type": "queued"}'::json, '43a18a3e-5a59-11ed-9b6a-0242ac120002', '1110000000000000') +), +setup_connectors as ( + insert into connectors (external_url, image_name, title, short_description, logo_url) values + ('http://example.com', 'allowed_connector', '{"en-US": "foo"}'::json, '{"en-US": "foo"}'::json, '{"en-US": "foo"}'::json) +) +select 1; diff --git a/supabase/migrations/27_cascade_publication_specs.sql b/supabase/migrations/27_cascade_publication_specs.sql new file mode 100644 index 0000000000..76549eccb9 --- /dev/null +++ b/supabase/migrations/27_cascade_publication_specs.sql @@ -0,0 +1,13 @@ + +-- Adds "on delete cascade" to the foreign key constraint on publication_specs to live_specs. +-- We previously never deleted live_specs, but now we do as part of pruning unbound collections +-- from in-progress publications. +begin; + +alter table publication_specs drop constraint publication_specs_live_spec_id_fkey; +alter table publication_specs add constraint publication_specs_live_spec_id_fkey + foreign key (live_spec_id) + references live_specs(id) + on delete cascade; + +commit; \ No newline at end of file