Skip to content

Commit

Permalink
flow-web: refactor and expose Schema::extend_read_bundle
Browse files Browse the repository at this point in the history
It also makes sense to attach the routine to models::Schema rather than
the `validation` crate.
  • Loading branch information
jgraettinger committed Sep 13, 2023
1 parent aa94b7b commit db9341b
Show file tree
Hide file tree
Showing 8 changed files with 260 additions and 160 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions crates/assemble/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,10 +411,10 @@ pub fn collection_spec(
build_id: &str,
collection: &tables::Collection,
projections: Vec<flow::Projection>,
read_bundle: Option<models::RawValue>,
read_bundle: Option<models::Schema>,
stores: &[models::Store],
uuid_ptr: &str,
write_bundle: models::RawValue,
write_bundle: models::Schema,
) -> flow::CollectionSpec {
let tables::Collection {
scope: _,
Expand Down Expand Up @@ -446,8 +446,8 @@ pub fn collection_spec(
})
.collect();

let bundle_to_string = |b: models::RawValue| -> String {
let b: Box<serde_json::value::RawValue> = b.into();
let bundle_to_string = |b: models::Schema| -> String {
let b: Box<serde_json::value::RawValue> = b.into_inner().into();
let b: Box<str> = b.into();
b.into()
};
Expand Down
6 changes: 3 additions & 3 deletions crates/flow-web/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "flow-web"

# wasm-pack isn't yet fully compatible with workspace inheritance, so we can't use that for these fields
version = "0.2.0"
version = "0.3.0"
authors = ["Estuary developers <[email protected]>"]
edition = "2021"
license = "BSL"
Expand All @@ -29,9 +29,9 @@ opt-level = "s"
default = ["console_error_panic_hook"]

[dependencies]
doc = { path = "../doc", default-features = false } # `combine` feature requires `lz4`, which doesn't WASM.
json = { path = "../json" }
# Disable default-featrues of doc to exclude `combine`, which requires `lz4`, which doesn't currently compile to WASM.
doc = { path = "../doc", default-features = false }
models = { path = "../models" }

url = { workspace = true }
thiserror = { workspace = true }
Expand Down
19 changes: 13 additions & 6 deletions crates/flow-web/README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# Flow all up in your browser

This is the source for the `flow-web` NPM package, which exposes Javascript/Typescript bindings to Flow library functions in Web Assembly (WASM).
The gist is that we compile this Rust crate to WASM and then generate the corresponding JS/TS files using `wasm-bindgen`. We use `wasm-pack` to put
everything together into an NPM package that works with Webpack, and publish that to Github packages.
This is the source for the `flow-web` NPM package, which exposes Javascript/Typescript bindings to Flow library functions in Web Assembly (WASM). The gist is
that we compile this Rust crate to WASM and then generate the corresponding JS/TS files using `wasm-bindgen`. We use `wasm-pack` to put everything together into
an NPM package that works with Webpack, and publish that to Github packages.

### Prerequisites

Expand All @@ -23,9 +23,16 @@ wasm-pack build crates/flow-web
wasm-pack test --headless --firefox crates/flow-web
```


## Capabilities

Currently, this only exposes a basic schema inference function, to prove out the functionality and give us a starting point.
We'll very likely need to add functionality in order to make this truly useful by the UI.
- `infer`: takes a JSON schema as input, and produces metadata about its inferred locations.
- `extend_read_bundle`: takes `read`, `write`, and `inferred` schemas (where `inferred` is null if no inferred schema is available), and returns an updated
read-schema bundle which potentially inlines the write and inferred schemas.

## Making a Change?

Update the version in `crates/flow-web/Cargo.toml`.

The crate version needs to be updated in that file in order for the publication to the GitHub NPM registry to succeed. That registry doesn't allow overwriting
versions. When when originally set this up, it wasn't clear how to plumb through our dynamically generated versions through all the wasm/js/npm layers. It may
be possible to improve, but for now, flow-web will only be successfully published when the version number in Cargo.toml is incremented.
27 changes: 27 additions & 0 deletions crates/flow-web/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub struct Property {
pub enum_vals: Vec<serde_json::Value>,
pub string_format: Option<String>,
}

fn reduce_description(reduce: Reduction) -> &'static str {
match reduce {
Reduction::Multiple => "multiple strategies may apply",
Expand Down Expand Up @@ -116,8 +117,34 @@ pub fn infer(schema: JsValue) -> Result<JsValue, JsValue> {
}
})
.collect();

serde_wasm_bindgen::to_value(&AnalyzedSchema { properties }).map_err(|err| {
let msg = format!("failed to serialize result: {}", err);
JsValue::from_str(&msg)
})
}

#[wasm_bindgen]
pub fn extend_read_bundle(input: JsValue) -> Result<JsValue, JsValue> {
let input: serde_json::Value = ::serde_wasm_bindgen::from_value(input)
.map_err(|err| JsValue::from_str(&format!("invalid JSON: {:?}", err)))?;

#[derive(serde::Deserialize)]
#[serde(rename_all = "camelCase", deny_unknown_fields)]
struct Input {
read: models::Schema,
write: models::Schema,
inferred: Option<models::Schema>,
}

let Input {
read,
write,
inferred,
} = serde_json::from_value(input)
.map_err(|err| JsValue::from_str(&format!("invalid input: {:?}", err)))?;

let output = models::Schema::extend_read_bundle(&read, &write, inferred.as_ref());

serde_wasm_bindgen::to_value(&output).map_err(|err| JsValue::from_str(&format!("{err:?}")))
}
70 changes: 67 additions & 3 deletions crates/flow-web/tests/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#![cfg(target_arch = "wasm32")]

extern crate wasm_bindgen_test;
use flow_web::{infer, AnalyzedSchema, Property};
use serde_json::json;
use serde_wasm_bindgen::{from_value as from_js_value, Serializer};
use wasm_bindgen::JsValue;
Expand All @@ -12,7 +11,7 @@ use wasm_bindgen_test::*;
wasm_bindgen_test_configure!(run_in_browser);

#[wasm_bindgen_test]
fn test_end_to_end_schema_inferrence() {
fn test_end_to_end_schema_inference() {
let schema: JsValue = to_js_value(&json!({
"type": "object",
"reduce": { "strategy": "merge" },
Expand All @@ -31,7 +30,7 @@ fn test_end_to_end_schema_inferrence() {
},
"required": ["a", "e"]
}));
let result = infer(schema).expect("failed to infer");
let result = flow_web::infer(schema).expect("failed to infer");
let inferred: serde_json::Value =
from_js_value(result).expect("failed to deserialize analyzed schema");

Expand Down Expand Up @@ -138,6 +137,71 @@ fn test_end_to_end_schema_inferrence() {
assert_eq!(expected, inferred);
}

#[wasm_bindgen_test]
fn test_end_to_end_extend_read_bundle() {
let input: JsValue = to_js_value(&json!({
"read": {
"$defs": {
"existing://def": {"type": "array"},
},
"maxProperties": 10,
"allOf": [
{"$ref": "flow://inferred-schema"},
{"$ref": "flow://write-schema"},
]
},
"write": {
"$id": "old://value",
"required": ["a_key"],
},
"inferred": {
"$id": "old://value",
"minProperties": 5,
}
}));

let output = flow_web::extend_read_bundle(input).expect("extend first bundle");
let output: serde_json::Value = from_js_value(output).expect("failed to deserialize output");

assert_eq!(
output,
json!({
"$defs": {
"existing://def": {"type": "array"}, // Left alone.
"flow://write-schema": { "$id": "flow://write-schema", "required": ["a_key"] },
"flow://inferred-schema": { "$id": "flow://inferred-schema", "minProperties": 5 },
},
"maxProperties": 10,
"allOf": [
{"$ref": "flow://inferred-schema"},
{"$ref": "flow://write-schema"},
]
})
);

let input: JsValue = to_js_value(&json!({
"read": {
"maxProperties": 10,
},
"write": {
"$id": "old://value",
"required": ["a_key"],
},
"inferred": null,
}));

let output = flow_web::extend_read_bundle(input).expect("extend second bundle");
let output: serde_json::Value = from_js_value(output).expect("failed to deserialize output");

assert_eq!(
output,
json!({
"$defs": {},
"maxProperties": 10,
})
);
}

fn to_js_value(val: &serde_json::Value) -> JsValue {
use serde::Serialize;
// We need to use the json compatible serializer because the default
Expand Down
139 changes: 139 additions & 0 deletions crates/models/src/schemas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use super::RawValue;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::{from_value, json};
use std::collections::BTreeMap;

/// A schema is a draft 2020-12 JSON Schema which validates Flow documents.
/// Schemas also provide annotations at document locations, such as reduction
Expand Down Expand Up @@ -36,6 +37,9 @@ impl Schema {
pub fn new(v: RawValue) -> Self {
Self(v)
}
pub fn into_inner(self) -> RawValue {
self.0
}

// URL for referencing the inferred schema of a collection, which may be used within a read schema.
pub const REF_INFERRED_SCHEMA_URL: &str = "flow://inferred-schema";
Expand Down Expand Up @@ -67,6 +71,75 @@ impl Schema {
}))
.unwrap()
}

/// Extend a bundled Flow read schema, which may include references to the
/// canonical collection write schema URI and inferred schema URI,
/// with inline definitions that fully resolve these references.
/// If an inferred schema is not available then `{}` is used.
pub fn extend_read_bundle(
read_bundle: &Self,
write_bundle: &Self,
inferred_bundle: Option<&Self>,
) -> Self {
const KEYWORD_DEF: &str = "$defs";
const KEYWORD_ID: &str = "$id";

use serde_json::{value::to_raw_value, Value};
type Skim = BTreeMap<String, RawValue>;

let mut read_schema: Skim = serde_json::from_str(read_bundle.get()).unwrap();
let mut read_defs: Skim = read_schema
.get(KEYWORD_DEF)
.map(|d| serde_json::from_str(d.get()).unwrap())
.unwrap_or_default();

// Add a definition for the write schema if it's referenced.
// We cannot add it in all cases because the existing `read_bundle` and
// `write_bundle` may have a common sub-schema defined, and naively adding
// it would result in an indexing error due to the duplicate definition.
// So, we treat $ref: flow://write-schema as a user assertion that there is
// no such conflicting definition (and we may produce an indexing error
// later if they're wrong).
if read_bundle.references_write_schema() {
let mut write_schema: Skim = serde_json::from_str(write_bundle.get()).unwrap();

// Set $id to "flow://write-schema".
_ = write_schema.insert(
KEYWORD_ID.to_string(),
RawValue::from_value(&Value::String(Self::REF_WRITE_SCHEMA_URL.to_string())),
);
// Add as a definition within the read schema.
read_defs.insert(
Self::REF_WRITE_SCHEMA_URL.to_string(),
to_raw_value(&write_schema).unwrap().into(),
);
}

// Add a definition for the inferred schema if it's referenced.
if read_bundle.references_inferred_schema() {
let mut inferred_schema: Skim = inferred_bundle
.map(|s| serde_json::from_str(s.get()).unwrap())
.unwrap_or(Skim::new()); // Default to the "anything" schema {}.

// Set $id to "flow://inferred-schema".
_ = inferred_schema.insert(
KEYWORD_ID.to_string(),
RawValue::from_value(&Value::String(Self::REF_INFERRED_SCHEMA_URL.to_string())),
);
// Add as a definition within the read schema.
read_defs.insert(
Self::REF_INFERRED_SCHEMA_URL.to_string(),
to_raw_value(&inferred_schema).unwrap().into(),
);
}

// Re-serialize the updated definitions of the read schema.
_ = read_schema.insert(
KEYWORD_DEF.to_string(),
serde_json::value::to_raw_value(&read_defs).unwrap().into(),
);
Self(to_raw_value(&read_schema).unwrap().into())
}
}

// These patterns let us cheaply detect if a collection schema references the
Expand Down Expand Up @@ -118,4 +191,70 @@ mod test {
assert!(!fixture.references_inferred_schema());
assert!(!fixture.references_write_schema());
}

#[test]
fn test_extend_read_schema() {
let read_schema = Schema::new(RawValue::from_value(&json!({
"$defs": {
"existing://def": {"type": "array"},
},
"maxProperties": 10,
"allOf": [
{"$ref": "flow://inferred-schema"},
{"$ref": "flow://write-schema"},
]
})));
let write_schema = Schema::new(RawValue::from_value(&json!({
"$id": "old://value",
"required": ["a_key"],
})));
let inferred_schema = Schema::new(RawValue::from_value(&json!({
"$id": "old://value",
"minProperties": 5,
})));

assert_eq!(
Schema::extend_read_bundle(&read_schema, &write_schema, Some(&inferred_schema))
.to_value(),
json!({
"$defs": {
"existing://def": {"type": "array"}, // Left alone.
"flow://write-schema": { "$id": "flow://write-schema", "required": ["a_key"] },
"flow://inferred-schema": { "$id": "flow://inferred-schema", "minProperties": 5 },
},
"maxProperties": 10,
"allOf": [
{"$ref": "flow://inferred-schema"},
{"$ref": "flow://write-schema"},
]
})
);

// Case: no inferred schema is available.
assert_eq!(
Schema::extend_read_bundle(&read_schema, &write_schema, None).to_value(),
json!({
"$defs": {
"existing://def": {"type": "array"}, // Left alone.
"flow://write-schema": { "$id": "flow://write-schema", "required": ["a_key"] },
"flow://inferred-schema": { "$id": "flow://inferred-schema" },
},
"maxProperties": 10,
"allOf": [
{"$ref": "flow://inferred-schema"},
{"$ref": "flow://write-schema"},
]
})
);

// Case: pass `write_schema` which has no references.
assert_eq!(
Schema::extend_read_bundle(&write_schema, &write_schema, None).to_value(),
json!({
"$defs": {},
"$id": "old://value",
"required": ["a_key"],
})
);
}
}
Loading

0 comments on commit db9341b

Please sign in to comment.