Skip to content

Commit

Permalink
runtime: rework image connector
Browse files Browse the repository at this point in the history
  • Loading branch information
jgraettinger committed Sep 1, 2023
1 parent 3e5287e commit 3158097
Show file tree
Hide file tree
Showing 15 changed files with 736 additions and 386 deletions.
4 changes: 2 additions & 2 deletions crates/derive-typescript/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ FROM denoland/deno:distroless-1.36.3
COPY target/x86_64-unknown-linux-musl/release/derive-typescript /

# Disable Deno's colorized output.
ENV NO_COLOR
ENV DENO_NO_UPDATE_CHECK
ENV NO_COLOR=1
ENV DENO_NO_UPDATE_CHECK=1

ENTRYPOINT ["/derive-typescript"]
LABEL FLOW_RUNTIME_CODEC=json
Expand Down
41 changes: 39 additions & 2 deletions crates/proto-flow/src/internal.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{capture, derive, materialize, runtime, Any};
use crate::{capture, derive, materialize, ops, runtime, Any};
use prost::Message;

macro_rules! impl_internal {
Expand Down Expand Up @@ -26,7 +26,16 @@ macro_rules! impl_internal {
/// Set and inspect the internal field via a callback.
/// Modifications made by the callback are re-encoded into the
/// internal Any message, the post-modification value is returned.
pub fn set_internal<F>(
pub fn set_internal<F>(&mut self, cb: F) -> Result<$ext_type, String>
where
F: FnOnce(&mut $ext_type),
{
self.set_internal_buf(&mut bytes::BytesMut::new(), cb)
}

/// Set and inspect the internal field via a callback.
/// Use an owned buffer for required allocations.
pub fn set_internal_buf<F>(
&mut self,
buf: &mut bytes::BytesMut,
cb: F,
Expand Down Expand Up @@ -83,3 +92,31 @@ impl_internal!(
runtime::MaterializeResponseExt,
"flow://runtime.MaterializeResponseExt"
);

macro_rules! impl_internal_set_log_level {
($msg_type:ty , $ext_type:ty) => {
impl $msg_type {
/// Set the log-level of the internal extension field of this Request.
pub fn set_log_level(
&mut self,
log_level: ops::log::Level,
) -> Result<$ext_type, String> {
self.set_internal_buf(&mut bytes::BytesMut::new(), |internal| {
match internal.labels.as_mut() {
Some(labels) => labels.log_level = log_level as i32,
None => {
internal.labels = Some(ops::ShardLabeling {
log_level: log_level as i32,
..Default::default()
})
}
}
})
}
}
};
}

impl_internal_set_log_level!(capture::Request, runtime::CaptureRequestExt);
impl_internal_set_log_level!(derive::Request, runtime::DeriveRequestExt);
impl_internal_set_log_level!(materialize::Request, runtime::MaterializeRequestExt);
217 changes: 137 additions & 80 deletions crates/runtime/src/capture/image.rs
Original file line number Diff line number Diff line change
@@ -1,99 +1,156 @@
use super::extract_endpoint;
use crate::{container, eof_on_error, inject_error, unseal};
use futures::{Stream, TryStreamExt};
use proto_flow::capture::{Request, Response};
use proto_flow::ops;
use crate::{
image_connector::{Connector, Container, StartRpcFuture, UnsealFuture, Unsealed},
unseal,
};
use futures::{channel::mpsc, FutureExt, Stream};
use proto_flow::{
capture::{Request, Response},
runtime::CaptureRequestExt,
};

pub async fn image_connector<L, R>(
image: String,
fn unseal(mut request: Request) -> Result<UnsealFuture<Request>, Request> {
if !matches!(
request,
Request { spec: Some(_), .. }
| Request {
discover: Some(_),
..
}
| Request {
validate: Some(_),
..
}
| Request { apply: Some(_), .. }
| Request { open: Some(_), .. }
) {
return Err(request); // Not an unseal-able request.
};

Ok(async move {
let (endpoint, config_json) = extract_endpoint(&mut request)?;

let models::CaptureEndpoint::Connector(models::ConnectorConfig {
image,
config: sealed_config,
}) = endpoint;
/* else { anyhow::bail!("task connector type has changed and is no longer an image") }; */

*config_json = unseal::decrypt_sops(&sealed_config).await?.to_string();

let log_level = match request.get_internal() {
Some(Ok(CaptureRequestExt {
labels: Some(labels),
..
})) => Some(labels.log_level()),
_ => None,
};

Ok(Unsealed {
image,
log_level,
request,
})
}
.boxed())
}

fn start_rpc(
channel: tonic::transport::Channel,
rx: mpsc::Receiver<Request>,
) -> StartRpcFuture<Response> {
async move {
proto_grpc::capture::connector_client::ConnectorClient::new(channel)
.capture(rx)
.await
}
.boxed()
}

fn attach_container(response: &mut Response, container: Container) {
response
.set_internal(|internal| {
internal.container = Some(container);
})
.unwrap();
}

pub fn connector<L, R>(
log_handler: L,
network: String,
network: &str,
request_rx: R,
task_name: &str,
) -> tonic::Result<impl Stream<Item = tonic::Result<Response>>>
) -> mpsc::Receiver<tonic::Result<Response>>
where
L: Fn(&ops::Log) + Send + Sync + 'static,
L: Fn(&ops::Log) + Clone + Send + Sync + 'static,
R: Stream<Item = tonic::Result<Request>> + Send + Unpin + 'static,
{
let (container, channel, guard) = container::start(
&image,
let (connector, response_rx) = Connector::new(
attach_container,
log_handler,
&network,
network,
request_rx,
start_rpc,
task_name,
ops::TaskType::Capture,
)
.await
.map_err(crate::anyhow_to_status)?;

// Adapt requests by identifying instances that carry endpoint configuration.
// Verify they remain compatible with our started container, and then unseal their config.
// Or if they're not compatible, then map to Status::aborted().
let request_rx = request_rx.and_then(move |mut request| {
let must_unseal = if matches!(
request,
Request { spec: Some(_), .. }
| Request {
discover: Some(_),
..
}
| Request {
validate: Some(_),
..
}
| Request { apply: Some(_), .. }
| Request { open: Some(_), .. }
) {
Some(image.clone()) // Outer closure owns `image`.
} else {
None
};
unseal,
);
tokio::spawn(async move { connector.run().await });

async move {
if let Some(expect_image) = must_unseal {
let (endpoint, config_json) =
extract_endpoint(&mut request).map_err(crate::anyhow_to_status)?;
response_rx
}

let sealed_config = match endpoint {
models::CaptureEndpoint::Connector(models::ConnectorConfig {
image: this_image,
config,
}) if expect_image == this_image => config,
#[cfg(test)]
mod test {
use super::connector;
use futures::StreamExt;
use serde_json::json;

_ => return Err(tonic::Status::aborted("connector image has changed")),
};
#[tokio::test]
async fn test_http_ingest_spec() {
if let Err(_) = locate_bin::locate("flow-connector-init") {
// Skip if `flow-connector-init` isn't available (yet). We're probably on CI.
// This test is useful as a sanity check for local development
// and we have plenty of other coverage during CI.
return;
}

*config_json = unseal::decrypt_sops(&sealed_config)
.await
.map_err(crate::anyhow_to_status)?
.to_string();
let request_rx = futures::stream::repeat(Ok(serde_json::from_value(json!({
"spec": {
"connectorType": "IMAGE",
"config": {
"image": "ghcr.io/estuary/source-http-ingest:dev",
"config": {},
}
}
}))
.unwrap()));

Ok(request)
}
});

let (request_rx, error_rx) = eof_on_error(request_rx);

// Start a capture RPC.
let container_response = proto_grpc::capture::connector_client::ConnectorClient::new(channel)
.capture(request_rx)
.await?;
let response_rx = container_response.into_inner();

// Adapt responses by enriching the first Response with the image Container.
let mut container = Some(container);
let response_rx = response_rx.and_then(move |mut response| {
_ = &guard; // Move so it's retained while responses are still being read.

if container.is_some() {
response
.set_internal(&mut bytes::BytesMut::new(), |internal| {
internal.container = container.take();
})
.unwrap();
}
futures::future::ready(Ok(response))
});
let response_rx = connector(ops::tracing_log_handler, "", request_rx.take(2), "a-task");

let responses: Vec<_> = response_rx.collect().await;
assert_eq!(responses.len(), 2);

for resp in responses {
let resp = resp.unwrap();

Ok(inject_error(response_rx, error_rx))
assert!(resp.spec.is_some());

let container = resp
.get_internal()
.expect("has internal field")
.expect("internal decodes")
.container
.expect("internal has attached container");

assert_eq!(
container.network_ports,
[proto_flow::flow::NetworkPort {
number: 8080,
protocol: String::new(),
public: true
}]
);
}
}
}
18 changes: 7 additions & 11 deletions crates/runtime/src/capture/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,13 @@ where
let request_rx = adjust_log_level(request_rx, self.set_log_level);

let response_rx = match endpoint {
models::CaptureEndpoint::Connector(models::ConnectorConfig { image, .. }) => {
image::image_connector(
image,
self.log_handler,
self.container_network,
request_rx,
&self.task_name,
)
.await?
.boxed()
}
models::CaptureEndpoint::Connector(models::ConnectorConfig { .. }) => image::connector(
self.log_handler,
&self.container_network,
request_rx,
&self.task_name,
)
.boxed(),
};

Ok(response_rx)
Expand Down
12 changes: 9 additions & 3 deletions crates/runtime/src/container.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::Context;
use futures::channel::oneshot;
use proto_flow::{flow, ops, runtime};
use proto_flow::{flow, runtime};
use tokio::io::AsyncBufReadExt;

// Port on which flow-connector-init listens for requests.
Expand All @@ -16,6 +16,7 @@ const CONNECTOR_INIT_PORT: u16 = 49092;
pub async fn start<L>(
image: &str,
log_handler: L,
log_level: Option<ops::LogLevel>,
network: &str,
task_name: &str,
task_type: ops::TaskType,
Expand Down Expand Up @@ -68,6 +69,7 @@ where

// This is default `docker run` behavior if --network is not provided.
let network = if network == "" { "bridge" } else { network };
let log_level = log_level.unwrap_or(ops::LogLevel::Warn);

let mut process: async_process::Child = async_process::Command::new("docker")
.args([
Expand All @@ -92,6 +94,7 @@ where
),
// Thread-through the logging configuration of the connector.
"--env=LOG_FORMAT=json".to_string(),
format!("--env=LOG_LEVEL={}", log_level.as_str_name()),
// Cgroup memory / CPU resource limits.
// TODO(johnny): we intend to tighten these down further, over time.
"--memory=1g".to_string(),
Expand Down Expand Up @@ -127,8 +130,10 @@ where
let mut line = String::new();

// Wait for a non-empty read of stderr to complete or EOF/error.
// Note that `flow-connector-init` writes a whitespace byte on startup.
_ = stderr.fill_buf().await;
// Note that `flow-connector-init` writes one whitespace byte on startup.
if let Ok(_buf) = stderr.fill_buf().await {
stderr.consume(1); // Discard whitespace byte.
}
std::mem::drop(ready_tx); // Signal that we're ready.

loop {
Expand Down Expand Up @@ -343,6 +348,7 @@ mod test {
let (container, channel, _guard) = start(
"ghcr.io/estuary/source-http-ingest:dev",
ops::tracing_log_handler,
Some(ops::LogLevel::Debug),
"",
"a-task-name",
proto_flow::ops::TaskType::Capture,
Expand Down
Loading

0 comments on commit 3158097

Please sign in to comment.