Skip to content

Commit

Permalink
crates/sources: fix some subtle bugs in spec indirection and merging
Browse files Browse the repository at this point in the history
Merging specs into source tables and indirecting specs are
transformations done by `flowctl` as part of manipulating a user's local
catalog specs. It's not used in the control-plane or data-plane.
As such, in my testing I found some aspects were a bit fast-and-lose.

Bug 1) sources::extend_from_catalog() was not properly attaching the fragment
scopes to entities merged into tables::Sources from a models::Catalog.

This matters because those scopes may be used to idenfity JSON schemas
to bundle when later inline-ing the schema of a collection, so they need
to be correct. Update the routine to be much more pedantic about
producing correct scopes.

Bug 2) sources::indirect_large_files() wasn't producing tables::Imports
for the resources that it chose to indirect. This *also* matters if
those sources are later inlined again, because JSON schema bundling
relies on correct imports to identify schemas to bundle.

Fixing both of these bugs allows `flowctl catalog pull-specs` to
correctly pull down, indirect, then inline, validate, and generate files
for a control-plane derivation (where it was failing previously).

Extend sources scenario test coverage to fully compare each fixture for
equality after fully round-tripping through indirection.

Bonuses, that got wrapped up in this changeset and which I'm not
bothering to pull out right now:

* build::Fetcher now uses a thirty-second timeout for all fetches.
  In combination with the connectors timeout, this means catalog builds
  are now guaranteed to complete in a bounded time frame.

* Move flowctl::local_specs::into_catalog() =>
  sources::merge::into_catalog().

* When running with RUST_LOG=debug, `flowctl` will now persist a SQLite
  build database for examination or shipping to Estuary support.
  • Loading branch information
jgraettinger committed Sep 12, 2023
1 parent 473a6a7 commit 9a6e209
Show file tree
Hide file tree
Showing 26 changed files with 427 additions and 358 deletions.
2 changes: 1 addition & 1 deletion crates/agent/src/publications/builds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ pub async fn build_catalog(
..Default::default()
},
&db_path,
&build_result,
build_result.as_ref(),
)?;
let dest_url = builds_root.join(&pub_id.to_string())?;

Expand Down
120 changes: 69 additions & 51 deletions crates/build/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,7 @@ pub fn project_root(source: &url::Url) -> url::Url {
/// Load a Flow specification `source` into tables::Sources.
/// All file:// resources are rooted ("jailed") to the given `file_root`.
pub async fn load(source: &url::Url, file_root: &Path) -> tables::Sources {
let loader = sources::Loader::new(
tables::Sources::default(),
Fetcher {
file_root: file_root.to_owned(),
},
);
let loader = sources::Loader::new(tables::Sources::default(), Fetcher::new(file_root));

loader
.load_resource(
Expand Down Expand Up @@ -204,7 +199,7 @@ where
pub fn persist(
build_config: proto_flow::flow::build_api::Config,
db_path: &Path,
result: &Result<(tables::Sources, tables::Validations), tables::Errors>,
result: Result<&(tables::Sources, tables::Validations), &tables::Errors>,
) -> anyhow::Result<()> {
let db = rusqlite::Connection::open(db_path).context("failed to open catalog database")?;

Expand Down Expand Up @@ -281,60 +276,82 @@ pub fn write_files(project_root: &url::Url, files: Vec<(url::Url, Vec<u8>)>) ->

/// Fetcher is a general-purpose implementation of sources::Fetcher.
struct Fetcher {
client: reqwest::Result<reqwest::Client>,
file_root: PathBuf,
}

impl Fetcher {
fn new(file_root: impl Into<PathBuf>) -> Self {
let client = reqwest::ClientBuilder::new().timeout(FETCH_TIMEOUT).build();

Self {
client,
file_root: file_root.into(),
}
}

async fn fetch_inner(
&self,
resource: url::Url,
mut file_path: PathBuf,
) -> anyhow::Result<bytes::Bytes> {
match resource.scheme() {
"http" | "https" => {
let client = match &self.client {
Ok(ok) => ok,
Err(err) => anyhow::bail!("failed to initialize HTTP client: {err}"),
};

let resp = client.get(resource).send().await?;
let status = resp.status();

if status.is_success() {
Ok(resp.bytes().await?)
} else {
let body = resp.text().await?;
anyhow::bail!("{status}: {body}");
}
}
"file" => {
let rel_path = resource.to_file_path().map_err(|err| {
anyhow::anyhow!("failed to convert file uri to path: {:?}", err)
})?;

// `rel_path` is absolute, so we must extend `file_path` rather than joining.
// Skip the first component, which is a RootDir token.
file_path.extend(rel_path.components().skip(1));

let bytes = std::fs::read(&file_path)
.with_context(|| format!("failed to read {file_path:?}"))?;
Ok(bytes.into())
}
"stdin" => {
use tokio::io::AsyncReadExt;

let mut bytes = Vec::new();
tokio::io::stdin()
.read_to_end(&mut bytes)
.await
.context("reading stdin")?;

Ok(bytes.into())
}
_ => Err(anyhow::anyhow!(
"cannot fetch unsupported URI scheme: '{resource}'"
)),
}
}
}

impl sources::Fetcher for Fetcher {
fn fetch<'a>(
&'a self,
resource: &'a url::Url,
content_type: flow::ContentType,
) -> BoxFuture<'a, anyhow::Result<bytes::Bytes>> {
tracing::debug!(%resource, ?content_type, file_root=?self.file_root, "fetching resource");
fetch_async(resource.clone(), self.file_root.clone()).boxed()
}
}

async fn fetch_async(resource: url::Url, mut file_path: PathBuf) -> anyhow::Result<bytes::Bytes> {
match resource.scheme() {
"http" | "https" => {
let resp = reqwest::get(resource.as_str()).await?;
let status = resp.status();

if status.is_success() {
Ok(resp.bytes().await?)
} else {
let body = resp.text().await?;
anyhow::bail!("{status}: {body}");
}
}
"file" => {
let rel_path = resource
.to_file_path()
.map_err(|err| anyhow::anyhow!("failed to convert file uri to path: {:?}", err))?;

// `rel_path` is absolute, so we must extend `file_path` rather than joining.
// Skip the first component, which is a RootDir token.
file_path.extend(rel_path.components().skip(1));

let bytes = std::fs::read(&file_path)
.with_context(|| format!("failed to read {file_path:?}"))?;
Ok(bytes.into())
}
"stdin" => {
use tokio::io::AsyncReadExt;

let mut bytes = Vec::new();
tokio::io::stdin()
.read_to_end(&mut bytes)
.await
.context("reading stdin")?;

Ok(bytes.into())
}
_ => Err(anyhow::anyhow!(
"cannot fetch unsupported URI scheme: '{resource}'"
)),
self.fetch_inner(resource.clone(), self.file_root.clone())
.boxed()
}
}

Expand Down Expand Up @@ -420,5 +437,6 @@ fn status_to_anyhow(status: tonic::Status) -> anyhow::Error {
anyhow::anyhow!(status.message().to_string())
}

pub const FETCH_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
pub const CONNECTOR_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(300); // Five minutes.
pub const STDIN_URL: &str = "stdin://root/flow.yaml";
75 changes: 20 additions & 55 deletions crates/flowctl/src/local_specs.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use anyhow::Context;
use futures::{future::BoxFuture, FutureExt};
use std::collections::BTreeMap;

/// Load and validate sources and derivation connectors (only).
/// Capture and materialization connectors are not validated.
Expand Down Expand Up @@ -30,11 +29,9 @@ pub(crate) async fn generate_files(
client: crate::controlplane::Client,
sources: tables::Sources,
) -> anyhow::Result<()> {
let source = &sources.fetches[0].resource.clone();
let project_root = build::project_root(source);

let (mut sources, validations) = validate(client, true, false, true, sources).await;

let project_root = build::project_root(&sources.fetches[0].resource);
build::generate_files(&project_root, &validations)?;

sources.errors = sources
Expand Down Expand Up @@ -79,7 +76,7 @@ async fn validate(
let source = &sources.fetches[0].resource.clone();
let project_root = build::project_root(source);

let (sources, mut validate) = build::validate(
let (sources, mut validations) = build::validate(
"local-build",
"", // Use default connector network.
&Resolver { client },
Expand All @@ -95,7 +92,7 @@ async fn validate(

// Local specs are not expected to satisfy all referential integrity checks.
// Filter out errors which are not really "errors" for the Flow CLI.
validate.errors = validate
validations.errors = validations
.errors
.into_iter()
.filter(|err| match err.error.downcast_ref() {
Expand All @@ -108,7 +105,22 @@ async fn validate(
})
.collect::<tables::Errors>();

(sources, validate)
let out = (sources, validations);

// If DEBUG tracing is enabled, then write sources and validations to a
// debugging database that can be inspected or shipped to Estuary for support.
if tracing::enabled!(tracing::Level::DEBUG) {
let seconds = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();

let db_path = std::env::temp_dir().join(format!("flowctl_{seconds}.sqlite"));
build::persist(Default::default(), &db_path, Ok(&out)).expect("failed to write build DB");
tracing::debug!(db_path=%db_path.to_string_lossy(), "wrote debugging database");
}

out
}

pub(crate) fn surface_errors<T>(result: Result<T, tables::Errors>) -> anyhow::Result<T> {
Expand Down Expand Up @@ -154,54 +166,7 @@ pub(crate) fn write_resources(mut sources: tables::Sources) -> anyhow::Result<ta
}

pub(crate) fn into_catalog(sources: tables::Sources) -> models::Catalog {
let tables::Sources {
captures,
collections,
fetches: _,
imports: _,
materializations,
resources: _,
storage_mappings: _,
tests,
errors,
} = sources;

assert!(errors.is_empty());

models::Catalog {
_schema: None,
import: Vec::new(), // Fully inline and requires no imports.
captures: captures
.into_iter()
.map(|tables::Capture { capture, spec, .. }| (capture, spec))
.collect(),
collections: collections
.into_iter()
.map(
|tables::Collection {
collection, spec, ..
}| (collection, spec),
)
.collect(),
materializations: materializations
.into_iter()
.map(
|tables::Materialization {
materialization,
spec,
..
}| (materialization, spec),
)
.collect(),
tests: tests
.into_iter()
.map(|tables::Test { test, spec, .. }| (test, spec))
.collect(),

// We deliberately omit storage mappings.
// The control plane will inject these during its builds.
storage_mappings: BTreeMap::new(),
}
::sources::merge::into_catalog(sources)
}

pub(crate) fn extend_from_catalog<P>(
Expand Down
2 changes: 1 addition & 1 deletion crates/flowctl/src/raw/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ async fn do_build(ctx: &mut crate::CliContext, build: &Build) -> anyhow::Result<
..Default::default()
};

build::persist(build_config, &db_path, &build_result)?;
build::persist(build_config, &db_path, build_result.as_ref())?;

Ok(())
}
Expand Down
Loading

0 comments on commit 9a6e209

Please sign in to comment.