Skip to content

Commit

Permalink
flowctl: update raw discover/capture to use runtime crate
Browse files Browse the repository at this point in the history
Also expand the perview of `flowctl generate` to also generate endpoint
config and resource config stubs for captures and materializations where
the indicate files are missing.

This is a replacement for the two-step "discover" workflow we've
previously had, where the first stage would write a config and the
second would actually do discover. That doesn't work with local
connectors (because we need a proper capture spec in order to know what
connector to discover from).
  • Loading branch information
jgraettinger committed Sep 26, 2023
1 parent 2336d2e commit 1785555
Show file tree
Hide file tree
Showing 8 changed files with 701 additions and 519 deletions.
411 changes: 410 additions & 1 deletion crates/flowctl/src/generate/mod.rs

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion crates/flowctl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ mod auth;
mod catalog;
mod collection;
mod config;
mod connector;
mod controlplane;
mod dataplane;
mod draft;
Expand Down Expand Up @@ -272,3 +271,7 @@ fn format_user(email: Option<String>, full_name: Option<String>, id: Option<uuid
id = id.map(|id| id.to_string()).unwrap_or_default(),
)
}

fn status_to_anyhow(status: tonic::Status) -> anyhow::Error {
anyhow::anyhow!(status.message().to_string())
}
14 changes: 5 additions & 9 deletions crates/flowctl/src/preview/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tokio::sync::broadcast;
#[derive(Debug, clap::Args)]
#[clap(rename_all = "kebab-case")]
pub struct Preview {
/// Path or URL to a Flow specification file to generate development files for.
/// Path or URL to a Flow specification file.
#[clap(long)]
source: String,
/// Name of the derived collection to preview within the Flow specification file.
Expand Down Expand Up @@ -60,7 +60,7 @@ impl Preview {
} else if sources.collections.is_empty() {
anyhow::bail!("sourced specification files do not contain any derivations");
} else {
anyhow::bail!("sourced specification files contain multiple derivations. Use --collection to identify the specific one to preview");
anyhow::bail!("sourced specification files contain multiple derivations. Use --collection to identify a specific one");
};

// Resolve the built collection and its contained derivation.
Expand Down Expand Up @@ -158,13 +158,13 @@ impl Preview {
)
.serve_derive(request_rx)
.await
.map_err(|status| anyhow::anyhow!("{}", status.message()))?;
.map_err(crate::status_to_anyhow)?;

let _opened = responses_rx
.next()
.await
.context("expected Opened, not EOF")?
.map_err(status_to_anyhow)?
.map_err(crate::status_to_anyhow)?
.opened
.context("expected Opened")?;

Expand Down Expand Up @@ -349,7 +349,7 @@ where
let mut inferred_shape = doc::Shape::nothing();

while let Some(response) = responses_rx.next().await {
let response = response.map_err(status_to_anyhow)?;
let response = response.map_err(crate::status_to_anyhow)?;

let internal = response
.get_internal()
Expand Down Expand Up @@ -389,7 +389,3 @@ where
None
})
}

fn status_to_anyhow(status: tonic::Status) -> anyhow::Error {
anyhow::anyhow!(status.message().to_string())
}
Loading

0 comments on commit 1785555

Please sign in to comment.