Skip to content

Commit

Permalink
rust connectors mega commit
Browse files Browse the repository at this point in the history
  • Loading branch information
jgraettinger committed Aug 30, 2023
1 parent 70beb4d commit 4213c67
Show file tree
Hide file tree
Showing 104 changed files with 5,255 additions and 3,931 deletions.
31 changes: 16 additions & 15 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ tracing-subscriber = { version = "0.3", features = [
"fmt",
] }
shared_child = "1.0.0"
zeroize = "1.6"

unicode-bom = "1.1"
unicode-normalization = "0.1"
Expand Down
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ install-tools: ${PKGDIR}/bin/deno ${PKGDIR}/bin/etcd ${PKGDIR}/bin/sops

.PHONY: rust-gnu-test
rust-gnu-test:
PATH=${PKGDIR}/bin:$$PATH ;\
cargo test --release --locked --workspace --exclude parser --exclude network-tunnel --exclude schemalate --exclude connector-init

.PHONY: rust-musl-test
Expand Down
4 changes: 3 additions & 1 deletion crates/agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@ license.workspace = true
[dependencies]
agent-sql = { path = "../agent-sql" }
async-process = { path = "../async-process" }
build = { path = "../build" }
doc = { path = "../doc" }
models = { path = "../models" }
proto-flow = { path = "../proto-flow" }
runtime = { path = "../runtime" }
tables = { path = "../tables", features = ["persist"] }
validation = { path = "../validation" }

anyhow = { workspace = true }
async-trait = { workspace = true }
Expand All @@ -29,7 +32,6 @@ itertools = { workspace = true }
lazy_static = { workspace = true }
regex = { workspace = true }
reqwest = { workspace = true }
rusqlite = { workspace = true }
schemars = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
Expand Down
47 changes: 40 additions & 7 deletions crates/agent/src/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,43 @@ pub struct Line {
// Tx is the channel sender of log Lines.
pub type Tx = tokio::sync::mpsc::Sender<Line>;

impl Line {
pub(crate) fn new(token: Uuid, stream: String, line: String) -> Self {
Line {
token,
stream,
line,
}
}
}

/// ops_handler is a ops::Log handler that dispatches to `tx`
/// using the given `stream` and `token`.
pub fn ops_handler(
tx: Tx,
stream: String,
token: Uuid,
) -> impl Fn(&proto_flow::ops::Log) + Send + Sync + Clone + 'static {
let log_handler = move |log: &proto_flow::ops::Log| {
// TODO(johnny): format into a user-facing representation for our UI.
let line = serde_json::to_string(log).unwrap();

let Err(tokio::sync::mpsc::error::TrySendError::Full(line)) = tx.try_send(Line {
token: token.clone(),
stream: stream.clone(),
line,
}) else { return };

// Perform an expensive "move" of all other tasks scheduled on the
// current async executor thread, so that we can block until there's capacity.
let tx_clone = tx.clone();
_ = tokio::task::block_in_place(move || {
tokio::runtime::Handle::current().block_on(tx_clone.send(line))
});
};
log_handler
}

// capture_job_logs consumes newline-delimited lines from the AsyncRead and
// streams each as a Line to the channel Sender.
#[tracing::instrument(err, skip(tx, reader))]
Expand All @@ -35,13 +72,9 @@ where
let line = String::from_utf8(line)
.unwrap_or_else(|err| String::from_utf8_lossy(err.as_bytes()).into_owned());

tx.send(Line {
token,
stream: stream.clone(),
line,
})
.await
.unwrap();
tx.send(Line::new(token, stream.clone(), line))
.await
.unwrap();
}
Ok(())
}
Expand Down
1 change: 0 additions & 1 deletion crates/agent/src/publications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,6 @@ impl PublishHandler {
&self.builds_root,
&draft_catalog,
&self.connector_network,
&self.bindir,
row.logs_token,
&self.logs_tx,
row.pub_id,
Expand Down
Loading

0 comments on commit 4213c67

Please sign in to comment.