Skip to content

Commit

Permalink
end-to-end schema inference
Browse files Browse the repository at this point in the history
Introduce well-known URIs `flow://write-schema` and
`flow://inferred-schema` which are available to collection read schemas.

These URIs may be `$ref`'d from within a read schema, and having done
so, their definitions are inlined into the effective read schema of the
built collection with every publication.

Extend the `validation::ControlPlane` trait to enable resolution of
collections to their current inferred schemas. Implement within
`flowctl` through our API, and within `agent` via out-of-transaction SQL
lookups done using the agent's Postgres pool.

Update the `agent` discovers handler to map the existing
`x-infer-schema: true` annotation into a synthesized initial read
schema, that composes the write & inferred schemas.

Issue #1103
  • Loading branch information
jgraettinger committed Sep 12, 2023
1 parent 2e36b11 commit 28b021e
Show file tree
Hide file tree
Showing 30 changed files with 899 additions and 166 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions Tiltfile
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,16 @@ local_resource('gazette', serve_cmd='%s/flow/.build/package/bin/gazette serve \
local_resource('reactor', serve_cmd='%s/flow/.build/package/bin/flowctl-go serve consumer \
--broker.address http://localhost:8080 \
--broker.cache.size 128 \
--consumer.host localhost \
--consumer.limit 1024 \
--consumer.max-hot-standbys 0 \
--consumer.port 9000 \
--consumer.host localhost \
--etcd.address http://localhost:2379 \
--flow.builds-root file://%s/ \
--flow.enable-schema-inference \
--flow.network supabase_network_flow \
--log.format text \
--log.level info \
--flow.network supabase_network_flow' % (REPO_BASE, FLOW_BUILDS_DIR),
--log.level info' % (REPO_BASE, FLOW_BUILDS_DIR),
links='http://localhost:9000/debug/pprof',
resource_deps=['etcd'],
readiness_probe=probe(
Expand Down
45 changes: 45 additions & 0 deletions crates/agent-sql/src/publications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -618,3 +618,48 @@ pub async fn resolve_storage_mappings(
.fetch_all(&mut *txn)
.await
}

pub struct ResolvedCollectionRow {
pub built_spec: Option<Json<proto_flow::flow::CollectionSpec>>,
}

pub async fn resolve_collections(
collections: Vec<String>,
pool: sqlx::PgPool,
) -> sqlx::Result<Vec<ResolvedCollectionRow>> {
sqlx::query_as!(
ResolvedCollectionRow,
r#"select
built_spec as "built_spec: Json<proto_flow::flow::CollectionSpec>"
from live_specs
where catalog_name = ANY($1::text[])
and spec_type = 'collection'
"#,
collections as Vec<String>,
)
.fetch_all(&pool)
.await
}

pub struct InferredSchemaRow {
pub collection_name: String,
pub schema: Json<Box<RawValue>>,
}

pub async fn get_inferred_schemas(
collections: Vec<String>,
pool: sqlx::PgPool,
) -> sqlx::Result<Vec<InferredSchemaRow>> {
sqlx::query_as!(
InferredSchemaRow,
r#"select
collection_name,
schema as "schema!: Json<Box<RawValue>>"
from inferred_schemas
where collection_name = ANY($1::text[])
"#,
collections as Vec<String>,
)
.fetch_all(&pool)
.await
}
1 change: 1 addition & 0 deletions crates/agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ agent-sql = { path = "../agent-sql" }
async-process = { path = "../async-process" }
build = { path = "../build" }
doc = { path = "../doc" }
json = { path = "../json" }
models = { path = "../models" }
proto-flow = { path = "../proto-flow" }
runtime = { path = "../runtime" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,25 @@ expression: "serde_json::to_string_pretty(&out).unwrap()"
]
},
"case/4": {
"writeSchema": {"const":"write!"},
"writeSchema": {"const":"write!","x-infer-schema":true},
"readSchema": {"const":"read!"},
"key": [
"/foo",
"/bar"
]
},
"case/5": {
"writeSchema": {"const":"write!","x-infer-schema":true},
"readSchema": {"allOf":[{"$ref":"flow://write-schema"},{"$ref":"flow://inferred-schema"}]},
"key": [
"/key"
]
},
"case/6": {
"writeSchema": {"const":"write!","x-infer-schema":true},
"readSchema": {"allOf":[{"$ref":"flow://write-schema"},{"$ref":"flow://inferred-schema"}]},
"key": [
"/key"
]
}
}
32 changes: 29 additions & 3 deletions crates/agent/src/discovers/specs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ pub fn merge_collections(
},
) in targets.into_iter().zip(discovered_bindings.into_iter())
{
let document_schema: models::Schema = serde_json::from_str(&document_schema_json).unwrap();
let document_schema =
models::Schema::new(models::RawValue::from_string(document_schema_json).unwrap());

// Unwrap a fetched collection, or initialize a blank one.
let mut collection =
fetched_collections
Expand All @@ -141,6 +143,20 @@ pub fn merge_collections(

if collection.read_schema.is_some() {
collection.write_schema = Some(document_schema);
} else if matches!(
// Does the connector use schema inference?
document_schema.to_value().get("x-infer-schema"),
Some(serde_json::Value::Bool(true))
) {
collection.schema = None;
collection.write_schema = Some(document_schema);

// Synthesize a minimal read schema.
collection.read_schema = Some(models::Schema::new(models::RawValue::from_value(
&serde_json::json!({
"allOf": [{"$ref":"flow://write-schema"},{"$ref":"flow://inferred-schema"}],
}),
)));
} else {
collection.schema = Some(document_schema)
}
Expand Down Expand Up @@ -240,7 +256,11 @@ mod tests {
// case/3: If discovered key is empty, it doesn't replace the collection key.
{"documentSchema": {"const": 42}, "key": [], "recommendedName": "", "resourceConfig": {}},
// case/4: If fetched collection has read & write schemas, only the write schema is updated.
{"documentSchema": {"const": "write!"}, "key": ["/foo", "/bar"], "recommendedName": "", "resourceConfig": {}},
{"documentSchema": {"x-infer-schema": true, "const": "write!"}, "key": ["/foo", "/bar"], "recommendedName": "", "resourceConfig": {}},
// case/5: If there is no fetched collection but schema inference is used, an initial read schema is created.
{"documentSchema": {"x-infer-schema": true, "const": "write!"}, "key": ["/key"], "recommendedName": "", "resourceConfig": {}},
// case/6: The fetched collection did not use schema inference, but now does.
{"documentSchema": {"x-infer-schema": true, "const": "write!"}, "key": ["/key"], "recommendedName": "", "resourceConfig": {}},
],
{
"case/2": {
Expand All @@ -262,12 +282,18 @@ mod tests {
"readSchema": {"const": "read!"},
"key": ["/old"],
},
"case/6": {
"schema": false,
"key": ["/old"],
},
},
[
"case/1",
"case/2",
"case/3",
"case/4",
"case/5",
"case/6",
]
]))
.unwrap();
Expand Down Expand Up @@ -387,7 +413,7 @@ mod tests {
("Foo", "Foo"),
("foo/bar", "foo/bar"),
("/foo/bar//baz/", "foo/bar_baz"), // Invalid leading, middle, & trailing slash.
("#੫൬ , bar-_!", "੫൬_bar-_"), // Invalid leading, middle, & trailing chars.
("#੫൬ , bar-_!", "੫൬_bar-_"), // Invalid leading, middle, & trailing chars.
("One! two/_three", "One_two/_three"),
] {
assert_eq!(
Expand Down
1 change: 1 addition & 0 deletions crates/agent/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ async fn main() -> Result<(), anyhow::Error> {
&args.connector_network,
&args.consumer_address,
&logs_tx,
Some(&pg_pool),
)),
Box::new(agent::TagHandler::new(
&args.connector_network,
Expand Down
6 changes: 6 additions & 0 deletions crates/agent/src/publications.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use self::builds::IncompatibleCollection;
use self::validation::ControlPlane;
use super::{
draft::{self, Error},
logs, Handler, HandlerStatus, Id,
Expand All @@ -12,6 +13,7 @@ pub mod builds;
mod linked_materializations;
pub mod specs;
mod storage;
mod validation;

/// JobStatus is the possible outcomes of a handled draft submission.
#[derive(Debug, Deserialize, Serialize, Clone, PartialEq)]
Expand Down Expand Up @@ -56,6 +58,7 @@ pub struct PublishHandler {
builds_root: url::Url,
connector_network: String,
consumer_address: url::Url,
control_plane: ControlPlane,
logs_tx: logs::Tx,
}

Expand All @@ -68,6 +71,7 @@ impl PublishHandler {
connector_network: &str,
consumer_address: &url::Url,
logs_tx: &logs::Tx,
pool: Option<&sqlx::PgPool>,
) -> Self {
Self {
agent_user_email: agent_user_email.into(),
Expand All @@ -76,6 +80,7 @@ impl PublishHandler {
builds_root: builds_root.clone(),
connector_network: connector_network.to_string(),
consumer_address: consumer_address.clone(),
control_plane: ControlPlane::new(pool),
logs_tx: logs_tx.clone(),
}
}
Expand Down Expand Up @@ -301,6 +306,7 @@ impl PublishHandler {
&self.builds_root,
&draft_catalog,
&self.connector_network,
self.control_plane.clone(),
row.logs_token,
&self.logs_tx,
row.pub_id,
Expand Down
4 changes: 2 additions & 2 deletions crates/agent/src/publications/builds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl BuildOutput {
.iter()
.map(|e| Error {
scope: Some(e.scope.to_string()),
detail: e.error.to_string(),
detail: format!("{:#}", e.error),
..Default::default()
})
.collect()
Expand Down Expand Up @@ -118,6 +118,7 @@ pub async fn build_catalog(
builds_root: &url::Url,
catalog: &models::Catalog,
connector_network: &str,
control_plane: super::ControlPlane,
logs_token: Uuid,
logs_tx: &logs::Tx,
pub_id: Id,
Expand All @@ -137,7 +138,6 @@ pub async fn build_catalog(
.context("writing catalog file")?;

let build_id = format!("{pub_id}");
let control_plane = validation::NoOpControlPlane {};
let db_path = builds_dir.join(&build_id);
let log_handler = logs::ops_handler(logs_tx.clone(), "build".to_string(), logs_token);
let project_root = url::Url::parse("file:///").unwrap();
Expand Down
1 change: 1 addition & 0 deletions crates/agent/src/publications/specs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,7 @@ mod test {
"",
&bs_url,
&logs_tx,
None,
);

let mut results: Vec<ScenarioResult> = vec![];
Expand Down
62 changes: 62 additions & 0 deletions crates/agent/src/publications/validation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use futures::{FutureExt, TryFutureExt};

#[derive(Clone)]
pub struct ControlPlane {
pool: Option<sqlx::PgPool>,
}

impl ControlPlane {
pub fn new(pool: Option<&sqlx::PgPool>) -> Self {
Self {
pool: pool.cloned(),
}
}
}

impl validation::ControlPlane for ControlPlane {
fn resolve_collections<'a>(
&'a self,
collections: Vec<models::Collection>,
) -> futures::future::BoxFuture<'a, anyhow::Result<Vec<proto_flow::flow::CollectionSpec>>> {
let Some(pool) = self.pool.clone() else {
return validation::NoOpControlPlane.resolve_collections(collections)
};
let collections = collections.into_iter().map(Into::into).collect();

agent_sql::publications::resolve_collections(collections, pool)
.map_err(Into::into)
.map_ok(|rows| {
rows.into_iter()
.filter_map(|row| row.built_spec.map(|s| s.0))
.collect()
})
.boxed()
}

fn get_inferred_schemas<'a>(
&'a self,
collections: Vec<models::Collection>,
) -> futures::future::BoxFuture<
'a,
anyhow::Result<std::collections::BTreeMap<models::Collection, models::Schema>>,
> {
let Some(pool) = self.pool.clone() else {
return validation::NoOpControlPlane.get_inferred_schemas(collections)
};
let collections = collections.into_iter().map(Into::into).collect();

agent_sql::publications::get_inferred_schemas(collections, pool)
.map_err(Into::into)
.map_ok(|rows| {
rows.into_iter()
.map(|row| {
(
models::Collection::new(row.collection_name),
models::Schema::new(row.schema.0.into()),
)
})
.collect()
})
.boxed()
}
}
23 changes: 11 additions & 12 deletions crates/assemble/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,23 +411,24 @@ pub fn collection_spec(
build_id: &str,
collection: &tables::Collection,
projections: Vec<flow::Projection>,
read_bundle: Option<models::RawValue>,
stores: &[models::Store],
uuid_ptr: &str,
write_bundle: models::RawValue,
) -> flow::CollectionSpec {
let tables::Collection {
scope: _,
collection: name,
spec:
models::CollectionDef {
schema,
read_schema,
write_schema,
schema: _,
read_schema: _,
write_schema: _,
key,
projections: _,
journals,
derivation: _,
derive: _,
..
},
} = collection;

Expand All @@ -446,18 +447,16 @@ pub fn collection_spec(
})
.collect();

let (write_schema_json, read_schema_json) = match (schema, write_schema, read_schema) {
(Some(schema), None, None) => (schema.to_string(), String::new()),
(None, Some(write_schema), Some(read_schema)) => {
(write_schema.to_string(), read_schema.to_string())
}
_ => (String::new(), String::new()),
let bundle_to_string = |b: models::RawValue| -> String {
let b: Box<serde_json::value::RawValue> = b.into();
let b: Box<str> = b.into();
b.into()
};

flow::CollectionSpec {
name: name.to_string(),
write_schema_json,
read_schema_json,
write_schema_json: bundle_to_string(write_bundle),
read_schema_json: read_bundle.map(bundle_to_string).unwrap_or_default(),
key: key.iter().map(|p| p.to_string()).collect(),
projections,
partition_fields,
Expand Down
Loading

0 comments on commit 28b021e

Please sign in to comment.