Skip to content

Commit

Permalink
rust connectors mega commit
Browse files Browse the repository at this point in the history
  • Loading branch information
jgraettinger committed Aug 30, 2023
1 parent 70beb4d commit 0720d09
Show file tree
Hide file tree
Showing 116 changed files with 7,794 additions and 4,360 deletions.
52 changes: 52 additions & 0 deletions .github/workflows/derive-typescript.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
name: derive-typescript

on:
push:
branches: [master]
paths: [crates/derive-typescript/**]
pull_request:
branches: [master]
paths: [crates/derive-typescript/**]

jobs:
build-and-test:
runs-on: ubuntu-20.04

steps:
- uses: actions/checkout@v2
with:
fetch-depth: 0
submodules: false

- name: Install Rust
uses: actions-rs/toolchain@v1
with:
toolchain: 1.72.0
default: true
target: x86_64-unknown-linux-musl

- run: sudo apt install -y musl-tools

- name: Login to GitHub container registry
if: ${{ github.ref == 'refs/heads/master' }}
run: |
echo "${{ secrets.GITHUB_TOKEN }}" | \
docker login --username ${{ github.actor }} --password-stdin ghcr.io
- uses: Swatinem/rust-cache@v2
with:
workspaces: |
crates/derive-typescript
- name: build `derive-typescript`
run: cd crates/derive-typescript && cargo build --target x86_64-unknown-linux-musl --release

- name: test `derive-typescript`
run: cd crates/derive-typescript && cargo test --target x86_64-unknown-linux-musl --release

- name: package docker image
run: docker build -t ghcr.io/estuary/derive-typescript:dev crates/derive-typescript/

- name: push docker image
if: ${{ github.ref == 'refs/heads/master' }}
run: docker push ghcr.io/estuary/derive-typescript:dev
56 changes: 16 additions & 40 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[workspace]
resolver = "2"
members = ["crates/*"]
exclude = ["crates/derive-typescript"]

[workspace.package]
version = "0.0.0"
Expand Down Expand Up @@ -159,6 +160,7 @@ tracing-subscriber = { version = "0.3", features = [
"fmt",
] }
shared_child = "1.0.0"
zeroize = "1.6"

unicode-bom = "1.1"
unicode-normalization = "0.1"
Expand Down
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ install-tools: ${PKGDIR}/bin/deno ${PKGDIR}/bin/etcd ${PKGDIR}/bin/sops

.PHONY: rust-gnu-test
rust-gnu-test:
PATH=${PKGDIR}/bin:$$PATH ;\
cargo test --release --locked --workspace --exclude parser --exclude network-tunnel --exclude schemalate --exclude connector-init

.PHONY: rust-musl-test
Expand Down
4 changes: 3 additions & 1 deletion crates/agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@ license.workspace = true
[dependencies]
agent-sql = { path = "../agent-sql" }
async-process = { path = "../async-process" }
build = { path = "../build" }
doc = { path = "../doc" }
models = { path = "../models" }
proto-flow = { path = "../proto-flow" }
runtime = { path = "../runtime" }
tables = { path = "../tables", features = ["persist"] }
validation = { path = "../validation" }

anyhow = { workspace = true }
async-trait = { workspace = true }
Expand All @@ -29,7 +32,6 @@ itertools = { workspace = true }
lazy_static = { workspace = true }
regex = { workspace = true }
reqwest = { workspace = true }
rusqlite = { workspace = true }
schemars = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
Expand Down
47 changes: 40 additions & 7 deletions crates/agent/src/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,43 @@ pub struct Line {
// Tx is the channel sender of log Lines.
pub type Tx = tokio::sync::mpsc::Sender<Line>;

impl Line {
pub(crate) fn new(token: Uuid, stream: String, line: String) -> Self {
Line {
token,
stream,
line,
}
}
}

/// ops_handler is a ops::Log handler that dispatches to `tx`
/// using the given `stream` and `token`.
pub fn ops_handler(
tx: Tx,
stream: String,
token: Uuid,
) -> impl Fn(&proto_flow::ops::Log) + Send + Sync + Clone + 'static {
let log_handler = move |log: &proto_flow::ops::Log| {
// TODO(johnny): format into a user-facing representation for our UI.
let line = serde_json::to_string(log).unwrap();

let Err(tokio::sync::mpsc::error::TrySendError::Full(line)) = tx.try_send(Line {
token: token.clone(),
stream: stream.clone(),
line,
}) else { return };

// Perform an expensive "move" of all other tasks scheduled on the
// current async executor thread, so that we can block until there's capacity.
let tx_clone = tx.clone();
_ = tokio::task::block_in_place(move || {
tokio::runtime::Handle::current().block_on(tx_clone.send(line))
});
};
log_handler
}

// capture_job_logs consumes newline-delimited lines from the AsyncRead and
// streams each as a Line to the channel Sender.
#[tracing::instrument(err, skip(tx, reader))]
Expand All @@ -35,13 +72,9 @@ where
let line = String::from_utf8(line)
.unwrap_or_else(|err| String::from_utf8_lossy(err.as_bytes()).into_owned());

tx.send(Line {
token,
stream: stream.clone(),
line,
})
.await
.unwrap();
tx.send(Line::new(token, stream.clone(), line))
.await
.unwrap();
}
Ok(())
}
Expand Down
1 change: 0 additions & 1 deletion crates/agent/src/publications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,6 @@ impl PublishHandler {
&self.builds_root,
&draft_catalog,
&self.connector_network,
&self.bindir,
row.logs_token,
&self.logs_tx,
row.pub_id,
Expand Down
Loading

0 comments on commit 0720d09

Please sign in to comment.