Skip to content

Commit

Permalink
protocols/flow: add array inference to protocol
Browse files Browse the repository at this point in the history
Adds "array" information to projection inference, which will include the types
of items the array contains, as well as the minimum and maximum number of items
it is allowed to contain.

This is only adding the (optional) field to the protocol. All users of the
updated rust structs will need to be updated to use the new structs before the
new field is populated.

Things that will need updated to use the updated rust structs include:
- `derive-typescript`
- Rust connectors: `source-kafka` and `source-http-ingest`
- dekaf
- flowctl, with gitpod implications
- ATF
  • Loading branch information
williamhbaker committed Nov 21, 2024
1 parent 98b7fe0 commit 891abf9
Show file tree
Hide file tree
Showing 30 changed files with 1,789 additions and 822 deletions.
5 changes: 4 additions & 1 deletion crates/assemble/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ pub fn inference(shape: &Shape, exists: Exists) -> flow::Inference {
} else {
None
},
array: None,
}
}

Expand Down Expand Up @@ -120,7 +121,9 @@ pub fn partition_template(
let compression_codec = compression_codec(codec.unwrap_or(models::CompressionCodec::Gzip));

// If an explicit flush interval isn't provided, default to 24 hours
let flush_interval = flush_interval.unwrap_or(std::time::Duration::from_secs(24 * 3600)).into();
let flush_interval = flush_interval
.unwrap_or(std::time::Duration::from_secs(24 * 3600))
.into();

// If a fragment length isn't set, default and then map MB to bytes.
let length = (length.unwrap_or(512) as i64) << 20;
Expand Down
3 changes: 3 additions & 0 deletions crates/assemble/src/snapshots/assemble__test__inference.snap
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ expression: "&[out1, out2, out3]"
secret: true,
exists: Must,
numeric: None,
array: None,
},
Inference {
types: [
Expand All @@ -34,6 +35,7 @@ expression: "&[out1, out2, out3]"
secret: true,
exists: May,
numeric: None,
array: None,
},
Inference {
types: [
Expand Down Expand Up @@ -61,5 +63,6 @@ expression: "&[out1, out2, out3]"
maximum: 1000.0,
},
),
array: None,
},
]
24 changes: 24 additions & 0 deletions crates/proto-flow/src/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ pub struct Inference {
pub exists: i32,
#[prost(message, optional, tag = "9")]
pub numeric: ::core::option::Option<inference::Numeric>,
#[prost(message, optional, tag = "10")]
pub array: ::core::option::Option<inference::Array>,
}
/// Nested message and enum types in `Inference`.
pub mod inference {
Expand Down Expand Up @@ -130,6 +132,28 @@ pub mod inference {
#[prost(double, tag = "4")]
pub maximum: f64,
}
/// Array type-specific inferences. Will be nil if types doesn't include
/// "array", or if the specification was built prior to array inference
/// existing in the protocol.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Array {
/// Minimum number of items the array must contain.
#[prost(uint32, tag = "1")]
pub min_items: u32,
/// True if there is an inferred maximum allowed number of items the array
/// may contain, otherwise False.
#[prost(bool, tag = "2")]
pub has_max_items: bool,
/// Maximum number of items the array may contain.
#[prost(uint32, tag = "3")]
pub max_items: u32,
/// The possible types of items contained in this array.
/// Subset of ["null", "boolean", "object", "array", "integer", "numeric",
/// "string"].
#[prost(string, repeated, tag = "4")]
pub items: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
}
/// Exists enumerates the possible states of existence for a location.
#[derive(
Clone,
Expand Down
166 changes: 166 additions & 0 deletions crates/proto-flow/src/flow.serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2573,6 +2573,9 @@ impl serde::Serialize for Inference {
if self.numeric.is_some() {
len += 1;
}
if self.array.is_some() {
len += 1;
}
let mut struct_ser = serializer.serialize_struct("flow.Inference", len)?;
if !self.types.is_empty() {
struct_ser.serialize_field("types", &self.types)?;
Expand Down Expand Up @@ -2600,6 +2603,9 @@ impl serde::Serialize for Inference {
if let Some(v) = self.numeric.as_ref() {
struct_ser.serialize_field("numeric", v)?;
}
if let Some(v) = self.array.as_ref() {
struct_ser.serialize_field("array", v)?;
}
struct_ser.end()
}
}
Expand All @@ -2619,6 +2625,7 @@ impl<'de> serde::Deserialize<'de> for Inference {
"secret",
"exists",
"numeric",
"array",
];

#[allow(clippy::enum_variant_names)]
Expand All @@ -2631,6 +2638,7 @@ impl<'de> serde::Deserialize<'de> for Inference {
Secret,
Exists,
Numeric,
Array,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
Expand Down Expand Up @@ -2660,6 +2668,7 @@ impl<'de> serde::Deserialize<'de> for Inference {
"secret" => Ok(GeneratedField::Secret),
"exists" => Ok(GeneratedField::Exists),
"numeric" => Ok(GeneratedField::Numeric),
"array" => Ok(GeneratedField::Array),
_ => Err(serde::de::Error::unknown_field(value, FIELDS)),
}
}
Expand Down Expand Up @@ -2687,6 +2696,7 @@ impl<'de> serde::Deserialize<'de> for Inference {
let mut secret__ = None;
let mut exists__ = None;
let mut numeric__ = None;
let mut array__ = None;
while let Some(k) = map_.next_key()? {
match k {
GeneratedField::Types => {
Expand Down Expand Up @@ -2737,6 +2747,12 @@ impl<'de> serde::Deserialize<'de> for Inference {
}
numeric__ = map_.next_value()?;
}
GeneratedField::Array => {
if array__.is_some() {
return Err(serde::de::Error::duplicate_field("array"));
}
array__ = map_.next_value()?;
}
}
}
Ok(Inference {
Expand All @@ -2748,12 +2764,162 @@ impl<'de> serde::Deserialize<'de> for Inference {
secret: secret__.unwrap_or_default(),
exists: exists__.unwrap_or_default(),
numeric: numeric__,
array: array__,
})
}
}
deserializer.deserialize_struct("flow.Inference", FIELDS, GeneratedVisitor)
}
}
impl serde::Serialize for inference::Array {
#[allow(deprecated)]
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeStruct;
let mut len = 0;
if self.min_items != 0 {
len += 1;
}
if self.has_max_items {
len += 1;
}
if self.max_items != 0 {
len += 1;
}
if !self.items.is_empty() {
len += 1;
}
let mut struct_ser = serializer.serialize_struct("flow.Inference.Array", len)?;
if self.min_items != 0 {
struct_ser.serialize_field("minItems", &self.min_items)?;
}
if self.has_max_items {
struct_ser.serialize_field("hasMaxItems", &self.has_max_items)?;
}
if self.max_items != 0 {
struct_ser.serialize_field("maxItems", &self.max_items)?;
}
if !self.items.is_empty() {
struct_ser.serialize_field("items", &self.items)?;
}
struct_ser.end()
}
}
impl<'de> serde::Deserialize<'de> for inference::Array {
#[allow(deprecated)]
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
const FIELDS: &[&str] = &[
"min_items",
"minItems",
"has_max_items",
"hasMaxItems",
"max_items",
"maxItems",
"items",
];

#[allow(clippy::enum_variant_names)]
enum GeneratedField {
MinItems,
HasMaxItems,
MaxItems,
Items,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
where
D: serde::Deserializer<'de>,
{
struct GeneratedVisitor;

impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
type Value = GeneratedField;

fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(formatter, "expected one of: {:?}", &FIELDS)
}

#[allow(unused_variables)]
fn visit_str<E>(self, value: &str) -> std::result::Result<GeneratedField, E>
where
E: serde::de::Error,
{
match value {
"minItems" | "min_items" => Ok(GeneratedField::MinItems),
"hasMaxItems" | "has_max_items" => Ok(GeneratedField::HasMaxItems),
"maxItems" | "max_items" => Ok(GeneratedField::MaxItems),
"items" => Ok(GeneratedField::Items),
_ => Err(serde::de::Error::unknown_field(value, FIELDS)),
}
}
}
deserializer.deserialize_identifier(GeneratedVisitor)
}
}
struct GeneratedVisitor;
impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
type Value = inference::Array;

fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
formatter.write_str("struct flow.Inference.Array")
}

fn visit_map<V>(self, mut map_: V) -> std::result::Result<inference::Array, V::Error>
where
V: serde::de::MapAccess<'de>,
{
let mut min_items__ = None;
let mut has_max_items__ = None;
let mut max_items__ = None;
let mut items__ = None;
while let Some(k) = map_.next_key()? {
match k {
GeneratedField::MinItems => {
if min_items__.is_some() {
return Err(serde::de::Error::duplicate_field("minItems"));
}
min_items__ =
Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
;
}
GeneratedField::HasMaxItems => {
if has_max_items__.is_some() {
return Err(serde::de::Error::duplicate_field("hasMaxItems"));
}
has_max_items__ = Some(map_.next_value()?);
}
GeneratedField::MaxItems => {
if max_items__.is_some() {
return Err(serde::de::Error::duplicate_field("maxItems"));
}
max_items__ =
Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
;
}
GeneratedField::Items => {
if items__.is_some() {
return Err(serde::de::Error::duplicate_field("items"));
}
items__ = Some(map_.next_value()?);
}
}
}
Ok(inference::Array {
min_items: min_items__.unwrap_or_default(),
has_max_items: has_max_items__.unwrap_or_default(),
max_items: max_items__.unwrap_or_default(),
items: items__.unwrap_or_default(),
})
}
}
deserializer.deserialize_struct("flow.Inference.Array", FIELDS, GeneratedVisitor)
}
}
impl serde::Serialize for inference::Exists {
#[allow(deprecated)]
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
Expand Down
6 changes: 6 additions & 0 deletions crates/proto-flow/tests/regression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ fn ex_projections() -> Vec<flow::Projection> {
has_maximum: false,
maximum: 0.0,
}),
array: Some(inference::Array {
min_items: 10,
has_max_items: true,
max_items: 20,
items: vec!["null".to_string(), "integer".to_string()],
}),
}),
}]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,15 @@ expression: json_test(msg)
"numeric": {
"hasMinimum": true,
"minimum": -1000.0
},
"array": {
"minItems": 10,
"hasMaxItems": true,
"maxItems": 20,
"items": [
"null",
"integer"
]
}
}
}
Expand Down Expand Up @@ -141,6 +150,15 @@ expression: json_test(msg)
"numeric": {
"hasMinimum": true,
"minimum": -1000.0
},
"array": {
"minItems": 10,
"hasMaxItems": true,
"maxItems": 20,
"items": [
"null",
"integer"
]
}
}
}
Expand Down Expand Up @@ -276,6 +294,15 @@ expression: json_test(msg)
"numeric": {
"hasMinimum": true,
"minimum": -1000.0
},
"array": {
"minItems": 10,
"hasMaxItems": true,
"maxItems": 20,
"items": [
"null",
"integer"
]
}
}
}
Expand Down
Loading

0 comments on commit 891abf9

Please sign in to comment.