Skip to content

Commit

Permalink
rework internal fields
Browse files Browse the repository at this point in the history
  • Loading branch information
jgraettinger committed Sep 1, 2023
1 parent 3158097 commit 3fd4347
Show file tree
Hide file tree
Showing 49 changed files with 650 additions and 798 deletions.
7 changes: 5 additions & 2 deletions crates/connector-init/src/capture.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::{codec::Codec, rpc};
use futures::StreamExt;
use futures::{StreamExt, TryStreamExt};
use proto_flow::capture::{Request, Response};

pub struct Proxy {
Expand All @@ -20,7 +20,10 @@ impl proto_grpc::capture::connector_server::Connector for Proxy {
rpc::bidi::<Request, Response, _, _>(
rpc::new_command(&self.entrypoint),
self.codec,
request.into_inner(),
request.into_inner().map_ok(|mut request| {
request.internal.clear(); // TODO(johnny): Temporarily remove $internal.
request
}),
ops::stderr_log_handler,
)?
.boxed(),
Expand Down
7 changes: 5 additions & 2 deletions crates/connector-init/src/derive.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::{codec::Codec, rpc};
use futures::StreamExt;
use futures::{StreamExt, TryStreamExt};
use proto_flow::derive::{Request, Response};

pub struct Proxy {
Expand All @@ -20,7 +20,10 @@ impl proto_grpc::derive::connector_server::Connector for Proxy {
rpc::bidi::<Request, Response, _, _>(
rpc::new_command(&self.entrypoint),
self.codec,
request.into_inner(),
request.into_inner().map_ok(|mut request| {
request.internal.clear(); // TODO(johnny): Temporarily remove $internal.
request
}),
ops::stderr_log_handler,
)?
.boxed(),
Expand Down
7 changes: 5 additions & 2 deletions crates/connector-init/src/materialize.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::{codec::Codec, rpc};
use futures::StreamExt;
use futures::{StreamExt, TryStreamExt};
use proto_flow::materialize::{Request, Response};

pub struct Proxy {
Expand All @@ -20,7 +20,10 @@ impl proto_grpc::materialize::connector_server::Connector for Proxy {
rpc::bidi::<Request, Response, _, _>(
rpc::new_command(&self.entrypoint),
self.codec,
request.into_inner(),
request.into_inner().map_ok(|mut request| {
request.internal.clear(); // TODO(johnny): Temporarily remove $internal.
request
}),
ops::stderr_log_handler,
)?
.boxed(),
Expand Down
48 changes: 19 additions & 29 deletions crates/derive-sqlite/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ use futures::channel::mpsc;
use futures::TryStreamExt;
use futures::{SinkExt, Stream};
use prost::Message;
use proto_flow::runtime::{
derive_request_ext, derive_response_ext, DeriveRequestExt, DeriveResponseExt,
};
use proto_flow::runtime::{derive_request_ext, derive_response_ext, DeriveRequestExt};
use proto_flow::{
derive::{request, response, Request, Response},
flow, RuntimeCheckpoint,
Expand Down Expand Up @@ -85,20 +83,13 @@ where
let _ = response_tx
.send(Ok(Response {
opened: Some(response::Opened {}),
internal: Some(proto_flow::Any {
type_url: "flow://runtime.DeriveResponseExt".to_string(),
value: DeriveResponseExt {
opened: Some(derive_response_ext::Opened {
runtime_checkpoint: Some(runtime_checkpoint),
}),
..Default::default()
}
.encode_to_vec()
.into(),
}),

..Default::default()
}))
}
.with_internal(|internal| {
internal.opened = Some(derive_response_ext::Opened {
runtime_checkpoint: Some(runtime_checkpoint),
});
})))
.await;

maybe_handle = Some(handle);
Expand Down Expand Up @@ -164,7 +155,7 @@ where

fn parse_open(
open: request::Open,
internal: Option<proto_flow::Any>,
internal: bytes::Bytes,
) -> anyhow::Result<(String, Vec<String>, Vec<Transform>)> {
let request::Open {
collection,
Expand All @@ -173,21 +164,20 @@ fn parse_open(
version: _,
} = open;

let sqlite_uri = match internal {
let sqlite_uri = if internal.is_empty() {
// If DeriveRequestExt was not sent, then use a :memory: DB.
None => ":memory:".to_string(),
":memory:".to_string()
} else {
// If it was sent, *require* that `sqlite_vfs_uri` is populated.
Some(internal) => {
let DeriveRequestExt { open: open_ext, .. } =
Message::decode(internal.value).context("internal is a DeriveRequestExt")?;
let derive_request_ext::Open { sqlite_vfs_uri, .. } =
open_ext.context("expected DeriveRequestExt.open to be set")?;

if sqlite_vfs_uri.is_empty() {
anyhow::bail!("DeriveRequestExt.open.sqlite_vfs_uri is not set and must be");
}
sqlite_vfs_uri
let DeriveRequestExt { open: open_ext, .. } =
Message::decode(internal).context("internal is a DeriveRequestExt")?;
let derive_request_ext::Open { sqlite_vfs_uri, .. } =
open_ext.context("expected DeriveRequestExt.open to be set")?;

if sqlite_vfs_uri.is_empty() {
anyhow::bail!("DeriveRequestExt.open.sqlite_vfs_uri is not set and must be");
}
sqlite_vfs_uri
};

let flow::CollectionSpec { derivation, .. } = collection.unwrap();
Expand Down
29 changes: 11 additions & 18 deletions crates/flowctl/src/preview/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ use anyhow::Context;
use doc::shape::schema::to_schema;
use futures::channel::mpsc;
use futures::{SinkExt, StreamExt, TryStreamExt};
use prost::Message;
use proto_flow::runtime::{derive_request_ext, DeriveRequestExt};
use proto_flow::runtime::derive_request_ext;
use proto_flow::{derive, flow, flow::collection_spec::derivation::Transform};
use proto_gazette::broker;
use tokio::sync::broadcast;
Expand Down Expand Up @@ -140,20 +139,14 @@ impl Preview {
version: "local".to_string(),
state_json: "{}".to_string(),
}),
internal: Some(proto_flow::Any {
type_url: "flow://runtime.DeriveResponseExt".to_string(),
value: DeriveRequestExt {
open: Some(derive_request_ext::Open {
sqlite_vfs_uri: sqlite_path.clone(),
..Default::default()
}),
..Default::default()
}
.encode_to_vec()
.into(),
}),
..Default::default()
}))
}
.with_internal(|internal| {
internal.open = Some(derive_request_ext::Open {
sqlite_vfs_uri: sqlite_path.clone(),
..Default::default()
});
})))
.await?;

let mut responses_rx = runtime::Runtime::new(
Expand Down Expand Up @@ -357,9 +350,9 @@ where
while let Some(response) = responses_rx.next().await {
let response = response.map_err(status_to_anyhow)?;

let internal: proto_flow::runtime::DeriveResponseExt =
Message::decode(response.internal.map(|i| i.value).unwrap_or_default())
.context("failed to decode internal runtime.DeriveResponseExt")?;
let internal = response
.get_internal()
.context("failed to decode internal runtime.DeriveResponseExt")?;

if let Some(derive::response::Published { doc_json }) = response.published {
let proto_flow::runtime::derive_response_ext::Published {
Expand Down
7 changes: 6 additions & 1 deletion crates/proto-flow/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@ fn main() {
.out_dir(&b.src_dir)
.btree_map(&["."]) // Make ordering stable for snapshots.
// Fields that hold tuple-encoded values use Bytes rather than Vec<u8>.
.bytes(&["key_packed", "values_packed", "partitions_packed"])
.bytes(&[
"internal",
"key_packed",
"partitions_packed",
"values_packed",
])
.file_descriptor_set_path(&b.descriptor_path)
.compile_well_known_types()
.extern_path(".consumer", "::proto_gazette::consumer")
Expand Down
8 changes: 4 additions & 4 deletions crates/proto-flow/src/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ pub struct Request {
#[prost(message, optional, tag = "6")]
pub acknowledge: ::core::option::Option<request::Acknowledge>,
/// Reserved for internal use.
#[prost(message, optional, tag = "100")]
pub internal: ::core::option::Option<::pbjson_types::Any>,
#[prost(bytes = "bytes", tag = "100")]
pub internal: ::prost::bytes::Bytes,
}
/// Nested message and enum types in `Request`.
pub mod request {
Expand Down Expand Up @@ -164,8 +164,8 @@ pub struct Response {
#[prost(message, optional, tag = "7")]
pub checkpoint: ::core::option::Option<response::Checkpoint>,
/// Reserved for internal use.
#[prost(message, optional, tag = "100")]
pub internal: ::core::option::Option<::pbjson_types::Any>,
#[prost(bytes = "bytes", tag = "100")]
pub internal: ::prost::bytes::Bytes,
}
/// Nested message and enum types in `Response`.
pub mod response {
Expand Down
34 changes: 20 additions & 14 deletions crates/proto-flow/src/capture.serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl serde::Serialize for Request {
if self.acknowledge.is_some() {
len += 1;
}
if self.internal.is_some() {
if !self.internal.is_empty() {
len += 1;
}
let mut struct_ser = serializer.serialize_struct("capture.Request", len)?;
Expand All @@ -46,8 +46,8 @@ impl serde::Serialize for Request {
if let Some(v) = self.acknowledge.as_ref() {
struct_ser.serialize_field("acknowledge", v)?;
}
if let Some(v) = self.internal.as_ref() {
struct_ser.serialize_field("internal", v)?;
if !self.internal.is_empty() {
struct_ser.serialize_field("$internal", pbjson::private::base64::encode(&self.internal).as_str())?;
}
struct_ser.end()
}
Expand All @@ -66,6 +66,7 @@ impl<'de> serde::Deserialize<'de> for Request {
"open",
"acknowledge",
"internal",
"$internal",
];

#[allow(clippy::enum_variant_names)]
Expand Down Expand Up @@ -104,7 +105,7 @@ impl<'de> serde::Deserialize<'de> for Request {
"apply" => Ok(GeneratedField::Apply),
"open" => Ok(GeneratedField::Open),
"acknowledge" => Ok(GeneratedField::Acknowledge),
"internal" => Ok(GeneratedField::Internal),
"$internal" | "internal" => Ok(GeneratedField::Internal),
_ => Err(serde::de::Error::unknown_field(value, FIELDS)),
}
}
Expand Down Expand Up @@ -171,9 +172,11 @@ impl<'de> serde::Deserialize<'de> for Request {
}
GeneratedField::Internal => {
if internal__.is_some() {
return Err(serde::de::Error::duplicate_field("internal"));
return Err(serde::de::Error::duplicate_field("$internal"));
}
internal__ = map.next_value()?;
internal__ =
Some(map.next_value::<::pbjson::private::BytesDeserialize<_>>()?.0)
;
}
}
}
Expand All @@ -184,7 +187,7 @@ impl<'de> serde::Deserialize<'de> for Request {
apply: apply__,
open: open__,
acknowledge: acknowledge__,
internal: internal__,
internal: internal__.unwrap_or_default(),
})
}
}
Expand Down Expand Up @@ -1061,7 +1064,7 @@ impl serde::Serialize for Response {
if self.checkpoint.is_some() {
len += 1;
}
if self.internal.is_some() {
if !self.internal.is_empty() {
len += 1;
}
let mut struct_ser = serializer.serialize_struct("capture.Response", len)?;
Expand All @@ -1086,8 +1089,8 @@ impl serde::Serialize for Response {
if let Some(v) = self.checkpoint.as_ref() {
struct_ser.serialize_field("checkpoint", v)?;
}
if let Some(v) = self.internal.as_ref() {
struct_ser.serialize_field("internal", v)?;
if !self.internal.is_empty() {
struct_ser.serialize_field("$internal", pbjson::private::base64::encode(&self.internal).as_str())?;
}
struct_ser.end()
}
Expand All @@ -1107,6 +1110,7 @@ impl<'de> serde::Deserialize<'de> for Response {
"captured",
"checkpoint",
"internal",
"$internal",
];

#[allow(clippy::enum_variant_names)]
Expand Down Expand Up @@ -1147,7 +1151,7 @@ impl<'de> serde::Deserialize<'de> for Response {
"opened" => Ok(GeneratedField::Opened),
"captured" => Ok(GeneratedField::Captured),
"checkpoint" => Ok(GeneratedField::Checkpoint),
"internal" => Ok(GeneratedField::Internal),
"$internal" | "internal" => Ok(GeneratedField::Internal),
_ => Err(serde::de::Error::unknown_field(value, FIELDS)),
}
}
Expand Down Expand Up @@ -1221,9 +1225,11 @@ impl<'de> serde::Deserialize<'de> for Response {
}
GeneratedField::Internal => {
if internal__.is_some() {
return Err(serde::de::Error::duplicate_field("internal"));
return Err(serde::de::Error::duplicate_field("$internal"));
}
internal__ = map.next_value()?;
internal__ =
Some(map.next_value::<::pbjson::private::BytesDeserialize<_>>()?.0)
;
}
}
}
Expand All @@ -1235,7 +1241,7 @@ impl<'de> serde::Deserialize<'de> for Response {
opened: opened__,
captured: captured__,
checkpoint: checkpoint__,
internal: internal__,
internal: internal__.unwrap_or_default(),
})
}
}
Expand Down
8 changes: 4 additions & 4 deletions crates/proto-flow/src/derive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ pub struct Request {
#[prost(message, optional, tag = "7")]
pub reset: ::core::option::Option<request::Reset>,
/// Reserved for internal use.
#[prost(message, optional, tag = "100")]
pub internal: ::core::option::Option<::pbjson_types::Any>,
#[prost(bytes = "bytes", tag = "100")]
pub internal: ::prost::bytes::Bytes,
}
/// Nested message and enum types in `Request`.
pub mod request {
Expand Down Expand Up @@ -203,8 +203,8 @@ pub struct Response {
#[prost(message, optional, tag = "6")]
pub started_commit: ::core::option::Option<response::StartedCommit>,
/// Reserved for internal use.
#[prost(message, optional, tag = "100")]
pub internal: ::core::option::Option<::pbjson_types::Any>,
#[prost(bytes = "bytes", tag = "100")]
pub internal: ::prost::bytes::Bytes,
}
/// Nested message and enum types in `Response`.
pub mod response {
Expand Down
Loading

0 comments on commit 3fd4347

Please sign in to comment.