Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rust runtime and connectors: phase one #1167

Merged
merged 14 commits into from
Sep 12, 2023
Merged

Conversation

jgraettinger
Copy link
Member

@jgraettinger jgraettinger commented Aug 30, 2023

Description:

Introduce support for connector container images in Rust, and use it for catalog build validations and derivations. Peel off onion layers that currently exist in how catalog builds are threaded-through from the control-plane agent.

This is a significant refactor of Rust portions of the codebase, with no consequential user-facing changes.

It removes the flowctl raw deno-derive hack that we've been using up until now. TypeScript derivations now run as a proper image connector like any other, and have very minimal remaining support in the models & sources crate (for source loading and code layout ergonomics).

We're now well positioned to add future derivation connectors with minimal (if any) changes to the runtime.

We do not yet use Rust connectors for Spec, Discover, Apply, or Open'd capture and materialization containers & RPCs. Those code-paths haven't changed yet. The runtime::Runtime is used only for Validate RPCs of those types.

Derivation RPCs (Validate, Open) continue to be fully powered by Rust. TypeScript derivations will no longer fail if published multiple times in a given shard assignment, because the runtime now properly drains and restarts their container contexts.

Workflow steps:

  • flowctl raw build performs a "full" build akin to flowctl-go api build
  • flowctl-go api build and bindings.BuildCatalog is now a think wrapper around flowctl raw build.
  • flowctl-go json-schema is replaced with flowctl raw json-schema
  • Validations ("Test" in the UI) now respect the configured shards: {logLevel: debug}.

No meaningful other changes to features or function.

Documentation links affected:

None I'm aware of. flowctl-go is undocumented, and we don't document flowctl raw commands outside of CLI --help.

Notes for reviewers:

Please review commit-by-commit. Lots of details in the commit messages.


This change is Reviewable

@jgraettinger jgraettinger force-pushed the johnny/rust-connectors branch 11 times, most recently from 2ada380 to 3fd4347 Compare September 1, 2023 13:29
* Remove NetworkPort from Request.Validate of the runtime protocols.

This is now an internal detail negotiated during validation. It's outcome is
still captured in the built flow specs, but it's not validated by the connector
(and isn't actually known until after validation completes).

* Breaking: remove use of `Any` for internal field and switch to bytes.

Any has major compatibility issues in its JSON encoding between Go and
Rust. For one, the pbjson_types serde imlementation for Any does not
match the protobuf spec (`typeUrl` instead of `@type`). For another,
Go `jsonpb` decoding requires that @type dynamically resolves to a
concrete Go type which is unmarshalled. Neat, but that's not the
intended purpose of the internal field, which is really a pass-through
piggyback for runtime-internal structured data.

So, instead have internal be regular bytes. Also use `$internal` as the
JSON name to make the special nature of this field clearer.
Add and update tooling in Rust / Go to make it easier to work with
internal fields.

For the moment, filter out `$internal` fields in the connector-proxy.
We can remove this once these protocol changes have propagated to
connectors.

* Remove most of the internal BuildAPI, leaving only its Config
  message.

* Derive: update Response.Spec to exactly match that of the capture &
  materialize protocols. We should further extract the various
  Response.Spec messages into a common message with the same tags and
  names. I avoided doing so just now to reduce churn.

* Add explicit zeroing in Rust of the `config_json` fields. `sops`
  decryption will shortly happen from Rust, and this provides a tight
  bound on how long decrypted credentials can be resident in memory.
For use in a tokio::Runtime, which requires these traits.

This is basically using a BoxFuture instead of LocalBoxFuture, and
making the internal `tables` field thread-safe.

It remains the case that Loader does not evaluate in parallel -- only
concurently.
Both connectors are updated for protocol changes, especially handling of
internal fields and new helper routines.

`derive-typescript` is further updated to build outside of the root
Cargo workspace, as a stand-alone Dockerized binary.
* Make `Connectors` and `ControlPlane` Send + Sync so it can be used in
  tokio::Runtime.

* Have `Connectors` take and receive top-level Request/Response so that
  `internal` fields may be provided. This is used for threading-through
  log level as well as for resolving flow.NetworkPorts.

* Remove `inspect_image` from Connectors, as that is no longer required
  (related network ports are instead carried by the Response.internal field).

* Remove all dependencies on build_api::Config. All we actually need is
  the `build_id` and `project_root`, so use those instead.

* Some use of destructuring is updated due to new zero-ing Drop
  implementations for message types.

* Add a new explicit validation that generated file URLs are valid.
  (They sometimes weren't in some prior build configurations).
@jgraettinger jgraettinger force-pushed the johnny/rust-connectors branch from 3fd4347 to 98d432d Compare September 1, 2023 17:56
@jgraettinger jgraettinger changed the title rust connectors mega commit rust runtime and connectors: phase one Sep 1, 2023
@jgraettinger jgraettinger marked this pull request as ready for review September 1, 2023 18:14
@jgraettinger
Copy link
Member Author

Screen grab of what validation / "Test" logs now look like. Note they respect shards: logLevel. There's extra horizontal spacing in the UI we should seek to eliminate:

image

@jgraettinger jgraettinger requested a review from a team September 1, 2023 18:32
Copy link
Member

@psFried psFried left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This all seems solid to me. None of my questions are blocking.


/// Span the command and wait for it to exit, passing it the given input and buffering its stdout and stderr.
/// Upon its exit return an Output having its stdout, stderr, and ExitStatus.
pub async fn input_output(cmd: &mut Command, input: &[u8]) -> std::io::Result<Output> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be a good place for a #[tracing::instrument(...)] so we can log the timing and exit status. Feel free to ignore if that's already covered elsewhere.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

stdout,
status,
} = async_process::input_output(
async_process::Command::new("jq").args([
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use locate_bin here as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

let s = serde_json::to_string(self)
// "pretty" encoding is not strictly required, but it makes database
// more pleasant to examine with SQLite GUI tooling.
let s = serde_json::to_string_pretty(self)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that this would affect the md5 comparisons between flowctl and control plane? I think this probably wouldn't affect it, because we compute the md5 based on the in-memory representation in tables::Sources instead of the persisted one here. But I'll leave this comment just in case you can think of a case where we use the serialized values from these tables in live_specs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right. I can't think of how this would impact it.

detail: anyhow::anyhow!("expected Validated but got {}", serde_json::to_string(&response).unwrap()),
});
};
let network_ports = internal.container.unwrap_or_default().network_ports;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Derivation containers exposing network ports?

excellent

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A change here, is that we identify the container IP and then send that back along the Opened path to the rest of the runtime, for it to directly call into the container's networking stack. I did this because it a) works on linux docker and production and b) sets things up for Firecracker, which only has an IP and doesn't have any built-in facilities for port forwarding.

But. I'm just realizing this won't work on Docker Desktop for Mac, because these container IPs are only accessible inside the docker VM. Which breaks flowctl on Mac. Still thinking through how to handle this....

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I think I've worked out a good pathway here:

When running on non-linux, we pass --publish-all as well as --publish 0.0.0.0:0:{CONNECTOR_INIT_PORT} to docker. That causes it to pick some random host ports and publish all image ports through them, including the connector-init port. Next we inspect the container, and, in addition to grabbing the container IP, we also grab any host port mappings a populate a mapping of container-port => host-port-ip:host-port addresses which are also attached to the runtime::Container description. Then, if you want to reach a container port, you first look for a mapped host-port and preferentially use that (non-production setups like Mac), otherwise you use the container IP directly.

Next problem: we need an amd64 copy of flow-connector-init. The solution I worked up is to use a pinned version of the Flow docker image, from which we pluck out the binary and copy it into a temp location. This is a bit of a kludge -- mostly because it requires pulling down the whole flow image and updating the pin is currently awkward -- but I do think the practice of shipping a vendor'd docker image for flowctl to use has legs to it, much like the supabase CLI does.

Tested on my macbook air M2 and working well.

@jgraettinger jgraettinger force-pushed the johnny/rust-connectors branch 3 times, most recently from d220121 to 3038afe Compare September 12, 2023 03:36
For passing input to a process while also accumulating its output.
Add `unseal` module which performs `sops` decryptions, matching our
existing Go implementation.

Add `container` module which manages an image container instance,
similar to our Go implementation.

Containers have an explicit, random, and recognizable container name
like `fc_a50cfebf` instead of docker-generated pet names like `bold_bose`.
This allows us to inspect containers that we start.

Containers do NOT use host port-mappings, as the Go implementation does.
Instead a runtime::Container description is returned which includes the
containers allocated (host-local) IP address as well as the
network-ports inspected from its container image.

There are a variety of startup races that Docker doesn't do a great job
of managing -- there is no precise means of knowing, through docker,
when it's safe to ask what a started container's IP address is, because
Docker does network setup as an async background task. It IS reliable to
wait until the internal container process has definititively started,
which we can do by waiting for _something_ to arrive over its forward
stderr. So, we have flow-connector-init write a single whitespace byte
on startup and expect to read it from the host.

Next, building on module `container`, add an `image_connector` adapter
which manages the lifecyle of an RPC stream which may start and restart
*many* container instances over its lifetime.

Refactor deriev::Middleware into a common runtime::Runtime which
provides all three (capture, derive, materialize) of the core Runtime
RPCs. We'll extend this further with shuffling services in the future.

Capture and materialize have minimal implementations which dispatch to an
image connector.

Derivations are re-worked to transparently map a `derive: using: typescript`
into an image connector of `ghcr.io/estuary/derive-typescript:dev`.
The hacky `flowctl` flow for running these derivations is removed.

Also refactor TaskService & TaskRuntime into a thinner TaskServie and
reusable TokioContext. TokioContext makes it easy to build an isolated
tokio::Runtime with that dispatches its tracing events to an ops::Log handler,
and has the required non-synchronous Drop semantics.
The CGO build API is removed entirely. Instead, the role of the `build`
crate is now to hold common routines shared across the `agent` and
`flowctl` crates for building Flow specifications into built catalog
specs.

It also provides general-purpose implementations of sources::Fetcher
and validation::Connectors, but not validation::ControlPlane.
Overhaul the `local_specs` module atop the refactored `build` crate.
What remains is intended to be opinionated wrappers of routines provide
by `build`.

Don't immediately fail un-credentialed users. This allows invocations
that don't require authenticated control-plane access to succeed.

Initialize to a WARN log-level by default (so that "you're not
authenticated" warnings actually show up).

Add a `raw build` subcommand which performs a build::managed_build().
This is a replacement for `flowctl-go api build`.

Add a `raw json-schema` subcommand to replace `flowctl-go json-schema`.
Instead of shelling out to `flowctl-go`.

This is a fairly stright forward drop-in replacement with
`build::managed_build`, dispatched to an isolated runtime::TokioContext
so that in-process tracing events can also be gathered.

However, this means the Go logrus package is no longer responsible for
text-formatting logs and we need to present structured instances of
`ops::Log` to the control-plane and UI. Eventually we'd like to directly
pass-through structured logs into the `internal.log_lines` table, but
for now I've introduced a reasonable ANSI-colored rendering of
deeply structured logs (that, IMO, improves on logrus).

When we're ready, we can rip this out and trivially pass-through JSON
encodings of ops::Logs.
bindings.BuildCatalog() remains, but is now a simple wrapper around
`flowctl raw build`.

bindings.CatalogJSONSchema and `flowctl-go json-schema` are removed.
Merging specs into source tables and indirecting specs are
transformations done by `flowctl` as part of manipulating a user's local
catalog specs. It's not used in the control-plane or data-plane.
As such, in my testing I found some aspects were a bit fast-and-lose.

Bug 1) sources::extend_from_catalog() was not properly attaching the fragment
scopes to entities merged into tables::Sources from a models::Catalog.

This matters because those scopes may be used to idenfity JSON schemas
to bundle when later inline-ing the schema of a collection, so they need
to be correct. Update the routine to be much more pedantic about
producing correct scopes.

Bug 2) sources::indirect_large_files() wasn't producing tables::Imports
for the resources that it chose to indirect. This *also* matters if
those sources are later inlined again, because JSON schema bundling
relies on correct imports to identify schemas to bundle.

Fixing both of these bugs allows `flowctl catalog pull-specs` to
correctly pull down, indirect, then inline, validate, and generate files
for a control-plane derivation (where it was failing previously).

Extend sources scenario test coverage to fully compare each fixture for
equality after fully round-tripping through indirection.

Bonuses, that got wrapped up in this changeset and which I'm not
bothering to pull out right now:

* build::Fetcher now uses a thirty-second timeout for all fetches.
  In combination with the connectors timeout, this means catalog builds
  are now guaranteed to complete in a bounded time frame.

* Move flowctl::local_specs::into_catalog() =>
  sources::merge::into_catalog().

* When running with RUST_LOG=debug, `flowctl` will now persist a SQLite
  build database for examination or shipping to Estuary support.
When running on non-linux OS's, ask docker to publish all ports
(including the init port), then extract and pass around the mapping
of container ports to mapped host addresses.

Also fix a flow-connector-init startup race where it writes to stderr
before its port has been bound.
@jgraettinger jgraettinger merged commit 30e150d into master Sep 12, 2023
5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants