diff --git a/Cargo.lock b/Cargo.lock index b6823834..786ae48f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -109,6 +109,12 @@ version = "1.0.86" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da" +[[package]] +name = "arrayvec" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" + [[package]] name = "assert_json" version = "0.1.0" @@ -1816,6 +1822,7 @@ dependencies = [ "ndc-models", "nom", "nonempty", + "pretty", "pretty_assertions", "proptest", "ref-cast", @@ -2345,6 +2352,18 @@ dependencies = [ "termtree", ] +[[package]] +name = "pretty" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b55c4d17d994b637e2f4daf6e5dc5d660d209d5642377d675d7a1c3ab69fa579" +dependencies = [ + "arrayvec", + "termcolor", + "typed-arena", + "unicode-width", +] + [[package]] name = "pretty_assertions" version = "1.4.0" @@ -3617,6 +3636,12 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "typed-arena" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6af6ae20167a9ece4bcb41af5b80f8a1f1df981f6391189ce00fd257af04126a" + [[package]] name = "typed-builder" version = "0.10.0" diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index 64fcfcad..c19d6865 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -4,7 +4,7 @@ edition = "2021" version.workspace = true [features] -native-query-subcommand = [] +native-query-subcommand = ["dep:pretty", "dep:nom"] [dependencies] configuration = { path = "../configuration" } @@ -19,8 +19,9 @@ futures-util = "0.3.28" indexmap = { workspace = true } itertools = { workspace = true } ndc-models = { workspace = true } -nom = "^7.1.3" +nom = { version = "^7.1.3", optional = true } nonempty = "^0.10.0" +pretty = { version = "^0.12.3", features = ["termcolor"], optional = true } ref-cast = { workspace = true } regex = "^1.11.1" serde = { workspace = true } diff --git a/crates/cli/src/exit_codes.rs b/crates/cli/src/exit_codes.rs index f821caa5..a8d7c246 100644 --- a/crates/cli/src/exit_codes.rs +++ b/crates/cli/src/exit_codes.rs @@ -4,7 +4,9 @@ pub enum ExitCode { CouldNotReadConfiguration, CouldNotProcessAggregationPipeline, ErrorWriting, + InvalidArguments, RefusedToOverwrite, + ResourceNotFound, } impl From for i32 { @@ -14,7 +16,9 @@ impl From for i32 { ExitCode::CouldNotReadConfiguration => 202, ExitCode::CouldNotProcessAggregationPipeline => 205, ExitCode::ErrorWriting => 204, + ExitCode::InvalidArguments => 400, ExitCode::RefusedToOverwrite => 203, + ExitCode::ResourceNotFound => 404, } } } diff --git a/crates/cli/src/lib.rs b/crates/cli/src/lib.rs index e09ae645..3fb92b9d 100644 --- a/crates/cli/src/lib.rs +++ b/crates/cli/src/lib.rs @@ -43,6 +43,7 @@ pub enum Command { pub struct Context { pub path: PathBuf, pub connection_uri: Option, + pub display_color: bool, } /// Run a command in a given directory. diff --git a/crates/cli/src/main.rs b/crates/cli/src/main.rs index 20b508b9..c358be99 100644 --- a/crates/cli/src/main.rs +++ b/crates/cli/src/main.rs @@ -31,6 +31,10 @@ pub struct Args { )] pub connection_uri: Option, + /// Disable color in command output. + #[arg(long = "no-color", short = 'C')] + pub no_color: bool, + /// The command to invoke. #[command(subcommand)] pub subcommand: Command, @@ -49,6 +53,7 @@ pub async fn main() -> anyhow::Result<()> { let context = Context { path, connection_uri: args.connection_uri, + display_color: !args.no_color, }; run(args.subcommand, &context).await?; Ok(()) diff --git a/crates/cli/src/native_query/error.rs b/crates/cli/src/native_query/error.rs index 30139315..62021689 100644 --- a/crates/cli/src/native_query/error.rs +++ b/crates/cli/src/native_query/error.rs @@ -87,10 +87,10 @@ pub enum Error { #[error("Type inference is not currently implemented for the aggregation expression operator, {0}. Please file a bug report, and declare types for your native query by hand for the time being.")] UnknownAggregationOperator(String), - #[error("Type inference is not currently implemented for {stage}, stage number {} in your aggregation pipeline. Please file a bug report, and declare types for your native query by hand for the time being.", stage_index + 1)] + #[error("Type inference is not currently implemented for{} stage number {} in your aggregation pipeline. Please file a bug report, and declare types for your native query by hand for the time being.", match stage_name { Some(name) => format!(" {name},"), None => "".to_string() }, stage_index + 1)] UnknownAggregationStage { stage_index: usize, - stage: bson::Document, + stage_name: Option<&'static str>, }, #[error("Native query input collection, \"{0}\", is not defined in the connector schema")] diff --git a/crates/cli/src/native_query/mod.rs b/crates/cli/src/native_query/mod.rs index 56d3f086..b5e68373 100644 --- a/crates/cli/src/native_query/mod.rs +++ b/crates/cli/src/native_query/mod.rs @@ -3,6 +3,7 @@ pub mod error; mod helpers; mod pipeline; mod pipeline_type_context; +mod pretty_printing; mod prune_object_types; mod reference_shorthand; mod type_annotation; @@ -12,6 +13,7 @@ mod type_solver; #[cfg(test)] mod tests; +use std::collections::BTreeMap; use std::path::{Path, PathBuf}; use std::process::exit; @@ -20,9 +22,11 @@ use configuration::schema::ObjectField; use configuration::{ native_query::NativeQueryRepresentation::Collection, serialized::NativeQuery, Configuration, }; -use configuration::{read_directory_with_ignored_configs, WithName}; +use configuration::{read_directory_with_ignored_configs, read_native_query_directory, WithName}; use mongodb_support::aggregate::Pipeline; -use ndc_models::CollectionName; +use ndc_models::{CollectionName, FunctionName}; +use pretty::termcolor::{ColorChoice, StandardStream}; +use pretty_printing::pretty_print_native_query; use tokio::fs; use crate::exit_codes::ExitCode; @@ -30,15 +34,16 @@ use crate::Context; use self::error::Result; use self::pipeline::infer_pipeline_types; +use self::pretty_printing::pretty_print_native_query_info; /// Create native queries - custom MongoDB queries that integrate into your data graph #[derive(Clone, Debug, Subcommand)] pub enum Command { /// Create a native query from a JSON file containing an aggregation pipeline Create { - /// Name that will identify the query in your data graph - #[arg(long, short = 'n', required = true)] - name: String, + /// Name that will identify the query in your data graph (defaults to base name of pipeline file) + #[arg(long, short = 'n')] + name: Option, /// Name of the collection that acts as input for the pipeline - omit for a pipeline that does not require input #[arg(long, short = 'c')] @@ -48,9 +53,21 @@ pub enum Command { #[arg(long, short = 'f')] force: bool, - /// Path to a JSON file with an aggregation pipeline + /// Path to a JSON file with an aggregation pipeline that specifies your custom query. This + /// is a value that could be given to the MongoDB command db..aggregate(). pipeline_path: PathBuf, }, + + /// Delete a native query identified by name. Use the list subcommand to see native query + /// names. + Delete { native_query_name: String }, + + /// List all configured native queries + List, + + /// Print details of a native query identified by name. Use the list subcommand to see native + /// query names. + Show { native_query_name: String }, } pub async fn run(context: &Context, command: Command) -> anyhow::Result<()> { @@ -60,76 +77,160 @@ pub async fn run(context: &Context, command: Command) -> anyhow::Result<()> { collection, force, pipeline_path, - } => { - let native_query_path = { - let path = get_native_query_path(context, &name); - if !force && fs::try_exists(&path).await? { - eprintln!( - "A native query named {name} already exists at {}.", - path.to_string_lossy() - ); - eprintln!("Re-run with --force to overwrite."); - exit(ExitCode::RefusedToOverwrite.into()) - } - path - }; - - let configuration = match read_directory_with_ignored_configs( - &context.path, - &[native_query_path.clone()], - ) - .await - { - Ok(c) => c, - Err(err) => { - eprintln!("Could not read connector configuration - configuration must be initialized before creating native queries.\n\n{err:#}"); - exit(ExitCode::CouldNotReadConfiguration.into()) - } - }; - eprintln!( - "Read configuration from {}", - &context.path.to_string_lossy() - ); + } => create(context, name, collection, force, &pipeline_path).await, + Command::Delete { native_query_name } => delete(context, &native_query_name).await, + Command::List => list(context).await, + Command::Show { native_query_name } => show(context, &native_query_name).await, + } +} - let pipeline = match read_pipeline(&pipeline_path).await { - Ok(p) => p, - Err(err) => { - eprintln!("Could not read aggregation pipeline.\n\n{err}"); - exit(ExitCode::CouldNotReadAggregationPipeline.into()) - } - }; - let native_query = - match native_query_from_pipeline(&configuration, &name, collection, pipeline) { - Ok(q) => WithName::named(name, q), - Err(err) => { - eprintln!("Error interpreting aggregation pipeline.\n\n{err}"); - exit(ExitCode::CouldNotReadAggregationPipeline.into()) - } - }; - - let native_query_dir = native_query_path - .parent() - .expect("parent directory of native query configuration path"); - if !(fs::try_exists(&native_query_dir).await?) { - fs::create_dir(&native_query_dir).await?; - } - - if let Err(err) = fs::write( - &native_query_path, - serde_json::to_string_pretty(&native_query)?, - ) - .await - { - eprintln!("Error writing native query configuration: {err}"); - exit(ExitCode::ErrorWriting.into()) - }; +async fn list(context: &Context) -> anyhow::Result<()> { + let native_queries = read_native_queries(context).await?; + for (name, _) in native_queries { + println!("{}", name); + } + Ok(()) +} + +async fn delete(context: &Context, native_query_name: &str) -> anyhow::Result<()> { + let (_, path) = find_native_query(context, native_query_name).await?; + fs::remove_file(&path).await?; + eprintln!( + "Deleted native query configuration at {}", + path.to_string_lossy() + ); + Ok(()) +} + +async fn show(context: &Context, native_query_name: &str) -> anyhow::Result<()> { + let (native_query, path) = find_native_query(context, native_query_name).await?; + pretty_print_native_query(&mut stdout(context), &native_query, &path).await?; + Ok(()) +} + +async fn create( + context: &Context, + name: Option, + collection: Option, + force: bool, + pipeline_path: &Path, +) -> anyhow::Result<()> { + let name = match name.or_else(|| { + pipeline_path + .file_stem() + .map(|os_str| os_str.to_string_lossy().to_string()) + }) { + Some(name) => name, + None => { + eprintln!("Could not determine name for native query."); + exit(ExitCode::InvalidArguments.into()) + } + }; + + let native_query_path = { + let path = get_native_query_path(context, &name); + if !force && fs::try_exists(&path).await? { eprintln!( - "Wrote native query configuration to {}", - native_query_path.to_string_lossy() + "A native query named {name} already exists at {}.", + path.to_string_lossy() ); - Ok(()) + eprintln!("Re-run with --force to overwrite."); + exit(ExitCode::RefusedToOverwrite.into()) } + path + }; + + let configuration = read_configuration(context, &[native_query_path.clone()]).await?; + + let pipeline = match read_pipeline(pipeline_path).await { + Ok(p) => p, + Err(err) => { + eprintln!("Could not read aggregation pipeline.\n\n{err}"); + exit(ExitCode::CouldNotReadAggregationPipeline.into()) + } + }; + let native_query = match native_query_from_pipeline(&configuration, &name, collection, pipeline) + { + Ok(q) => WithName::named(name, q), + Err(err) => { + eprintln!("Error interpreting aggregation pipeline. If you are not able to resolve this error you can add the native query by writing the configuration file directly in {}.\n\n{err}", native_query_path.to_string_lossy()); + exit(ExitCode::CouldNotReadAggregationPipeline.into()) + } + }; + + let native_query_dir = native_query_path + .parent() + .expect("parent directory of native query configuration path"); + if !(fs::try_exists(&native_query_dir).await?) { + fs::create_dir(&native_query_dir).await?; } + + if let Err(err) = fs::write( + &native_query_path, + serde_json::to_string_pretty(&native_query)?, + ) + .await + { + eprintln!("Error writing native query configuration: {err}"); + exit(ExitCode::ErrorWriting.into()) + }; + eprintln!( + "\nWrote native query configuration to {}", + native_query_path.to_string_lossy() + ); + eprintln!(); + pretty_print_native_query_info(&mut stdout(context), &native_query.value).await?; + Ok(()) +} + +/// Reads configuration, or exits with specific error code on error +async fn read_configuration( + context: &Context, + ignored_configs: &[PathBuf], +) -> anyhow::Result { + let configuration = match read_directory_with_ignored_configs(&context.path, ignored_configs) + .await + { + Ok(c) => c, + Err(err) => { + eprintln!("Could not read connector configuration - configuration must be initialized before creating native queries.\n\n{err:#}"); + exit(ExitCode::CouldNotReadConfiguration.into()) + } + }; + eprintln!( + "Read configuration from {}", + &context.path.to_string_lossy() + ); + Ok(configuration) +} + +/// Reads native queries skipping configuration processing, or exits with specific error code on error +async fn read_native_queries( + context: &Context, +) -> anyhow::Result> { + let native_queries = match read_native_query_directory(&context.path, &[]).await { + Ok(native_queries) => native_queries, + Err(err) => { + eprintln!("Could not read native queries.\n\n{err}"); + exit(ExitCode::CouldNotReadConfiguration.into()) + } + }; + Ok(native_queries) +} + +async fn find_native_query( + context: &Context, + name: &str, +) -> anyhow::Result<(NativeQuery, PathBuf)> { + let mut native_queries = read_native_queries(context).await?; + let (_, definition_and_path) = match native_queries.remove_entry(name) { + Some(native_query) => native_query, + None => { + eprintln!("No native query named {name} found."); + exit(ExitCode::ResourceNotFound.into()) + } + }; + Ok(definition_and_path) } async fn read_pipeline(pipeline_path: &Path) -> anyhow::Result { @@ -183,3 +284,11 @@ pub fn native_query_from_pipeline( description: None, }) } + +fn stdout(context: &Context) -> StandardStream { + if context.display_color { + StandardStream::stdout(ColorChoice::Auto) + } else { + StandardStream::stdout(ColorChoice::Never) + } +} diff --git a/crates/cli/src/native_query/pipeline/mod.rs b/crates/cli/src/native_query/pipeline/mod.rs index 664670ed..acc80046 100644 --- a/crates/cli/src/native_query/pipeline/mod.rs +++ b/crates/cli/src/native_query/pipeline/mod.rs @@ -69,7 +69,10 @@ fn infer_stage_output_type( stage: &Stage, ) -> Result> { let output_type = match stage { - Stage::AddFields(_) => todo!("add fields stage"), + Stage::AddFields(_) => Err(Error::UnknownAggregationStage { + stage_index, + stage_name: Some("$addFields"), + })?, Stage::Documents(docs) => { let doc_constraints = docs .iter() @@ -112,7 +115,10 @@ fn infer_stage_output_type( )?; None } - Stage::Lookup { .. } => todo!("lookup stage"), + Stage::Lookup { .. } => Err(Error::UnknownAggregationStage { + stage_index, + stage_name: Some("$lookup"), + })?, Stage::Group { key_expression, accumulators, @@ -125,8 +131,14 @@ fn infer_stage_output_type( )?; Some(TypeConstraint::Object(object_type_name)) } - Stage::Facet(_) => todo!("facet stage"), - Stage::Count(_) => todo!("count stage"), + Stage::Facet(_) => Err(Error::UnknownAggregationStage { + stage_index, + stage_name: Some("$facet"), + })?, + Stage::Count(_) => Err(Error::UnknownAggregationStage { + stage_index, + stage_name: Some("$count"), + })?, Stage::Project(doc) => { let augmented_type = project_stage::infer_type_from_project_stage( context, @@ -160,16 +172,10 @@ fn infer_stage_output_type( include_array_index.as_deref(), *preserve_null_and_empty_arrays, )?), - Stage::Other(doc) => { - context.add_warning(Error::UnknownAggregationStage { - stage_index, - stage: doc.clone(), - }); - // We don't know what the type is here so we represent it with an unconstrained type - // variable. - let type_variable = context.new_type_variable(Variance::Covariant, []); - Some(TypeConstraint::Variable(type_variable)) - } + Stage::Other(_) => Err(Error::UnknownAggregationStage { + stage_index, + stage_name: None, + })?, }; Ok(output_type) } diff --git a/crates/cli/src/native_query/pretty_printing.rs b/crates/cli/src/native_query/pretty_printing.rs new file mode 100644 index 00000000..7543393d --- /dev/null +++ b/crates/cli/src/native_query/pretty_printing.rs @@ -0,0 +1,239 @@ +use std::path::Path; + +use configuration::{schema::ObjectType, serialized::NativeQuery}; +use itertools::Itertools; +use pretty::{ + termcolor::{Color, ColorSpec, StandardStream}, + BoxAllocator, DocAllocator, DocBuilder, Pretty, +}; +use tokio::task; + +/// Prints metadata for a native query, excluding its pipeline +pub async fn pretty_print_native_query_info( + output: &mut StandardStream, + native_query: &NativeQuery, +) -> std::io::Result<()> { + task::block_in_place(move || { + let allocator = BoxAllocator; + native_query_info_printer(native_query, &allocator) + .1 + .render_colored(80, output)?; + Ok(()) + }) +} + +/// Prints metadata for a native query including its pipeline +pub async fn pretty_print_native_query( + output: &mut StandardStream, + native_query: &NativeQuery, + path: &Path, +) -> std::io::Result<()> { + task::block_in_place(move || { + let allocator = BoxAllocator; + native_query_printer(native_query, path, &allocator) + .1 + .render_colored(80, output)?; + Ok(()) + }) +} + +fn native_query_printer<'a, D>( + nq: &'a NativeQuery, + path: &'a Path, + allocator: &'a D, +) -> DocBuilder<'a, D, ColorSpec> +where + D: DocAllocator<'a, ColorSpec>, + D::Doc: Clone, +{ + let source = definition_list_entry( + "configuration source", + allocator.text(path.to_string_lossy()), + allocator, + ); + let info = native_query_info_printer(nq, allocator); + let pipeline = section( + "pipeline", + allocator.text(serde_json::to_string_pretty(&nq.pipeline).unwrap()), + allocator, + ); + allocator.intersperse([source, info, pipeline], allocator.hardline()) +} + +fn native_query_info_printer<'a, D>( + nq: &'a NativeQuery, + allocator: &'a D, +) -> DocBuilder<'a, D, ColorSpec> +where + D: DocAllocator<'a, ColorSpec>, + D::Doc: Clone, +{ + let input_collection = nq.input_collection.as_ref().map(|collection| { + definition_list_entry( + "input collection", + allocator.text(collection.to_string()), + allocator, + ) + }); + + let representation = Some(definition_list_entry( + "representation", + allocator.text(nq.representation.to_str()), + allocator, + )); + + let parameters = if !nq.arguments.is_empty() { + let params = nq.arguments.iter().map(|(name, definition)| { + allocator + .text(name.to_string()) + .annotate(field_name()) + .append(allocator.text(": ")) + .append( + allocator + .text(definition.r#type.to_string()) + .annotate(type_expression()), + ) + }); + Some(section( + "parameters", + allocator.intersperse(params, allocator.line()), + allocator, + )) + } else { + None + }; + + let result_type = { + let body = if let Some(object_type) = nq.object_types.get(&nq.result_document_type) { + object_type_printer(object_type, allocator) + } else { + allocator.text(nq.result_document_type.to_string()) + }; + Some(section("result type", body, allocator)) + }; + + let other_object_types = nq + .object_types + .iter() + .filter(|(name, _)| **name != nq.result_document_type) + .collect_vec(); + let object_types_doc = if !other_object_types.is_empty() { + let docs = other_object_types.into_iter().map(|(name, definition)| { + allocator + .text(format!("{name} ")) + .annotate(object_type_name()) + .append(object_type_printer(definition, allocator)) + }); + let separator = allocator.line().append(allocator.line()); + Some(section( + "object type definitions", + allocator.intersperse(docs, separator), + allocator, + )) + } else { + None + }; + + allocator.intersperse( + [ + input_collection, + representation, + parameters, + result_type, + object_types_doc, + ] + .into_iter() + .filter(Option::is_some), + allocator.hardline(), + ) +} + +fn object_type_printer<'a, D>(ot: &'a ObjectType, allocator: &'a D) -> DocBuilder<'a, D, ColorSpec> +where + D: DocAllocator<'a, ColorSpec>, + D::Doc: Clone, +{ + let fields = ot.fields.iter().map(|(name, definition)| { + allocator + .text(name.to_string()) + .annotate(field_name()) + .append(allocator.text(": ")) + .append( + allocator + .text(definition.r#type.to_string()) + .annotate(type_expression()), + ) + }); + let separator = allocator.text(",").append(allocator.line()); + let body = allocator.intersperse(fields, separator); + body.indent(2).enclose( + allocator.text("{").append(allocator.line()), + allocator.line().append(allocator.text("}")), + ) +} + +fn definition_list_entry<'a, D>( + label: &'a str, + body: impl Pretty<'a, D, ColorSpec>, + allocator: &'a D, +) -> DocBuilder<'a, D, ColorSpec> +where + D: DocAllocator<'a, ColorSpec>, + D::Doc: Clone, +{ + allocator + .text(label) + .annotate(definition_list_label()) + .append(allocator.text(": ")) + .append(body) +} + +fn section<'a, D>( + heading: &'a str, + body: impl Pretty<'a, D, ColorSpec>, + allocator: &'a D, +) -> DocBuilder<'a, D, ColorSpec> +where + D: DocAllocator<'a, ColorSpec>, + D::Doc: Clone, +{ + let heading_doc = allocator + .text("## ") + .append(heading) + .annotate(section_heading()); + allocator + .line() + .append(heading_doc) + .append(allocator.line()) + .append(allocator.line()) + .append(body) +} + +fn section_heading() -> ColorSpec { + let mut color = ColorSpec::new(); + color.set_fg(Some(Color::Red)); + color.set_bold(true); + color +} + +fn definition_list_label() -> ColorSpec { + let mut color = ColorSpec::new(); + color.set_fg(Some(Color::Blue)); + color +} + +fn field_name() -> ColorSpec { + let mut color = ColorSpec::new(); + color.set_fg(Some(Color::Yellow)); + color +} + +fn object_type_name() -> ColorSpec { + // placeholder in case we want styling here in the future + ColorSpec::new() +} + +fn type_expression() -> ColorSpec { + // placeholder in case we want styling here in the future + ColorSpec::new() +} diff --git a/crates/cli/src/native_query/tests.rs b/crates/cli/src/native_query/tests.rs index 3e692042..1a543724 100644 --- a/crates/cli/src/native_query/tests.rs +++ b/crates/cli/src/native_query/tests.rs @@ -3,10 +3,8 @@ use std::collections::BTreeMap; use anyhow::Result; use configuration::{ native_query::NativeQueryRepresentation::Collection, - read_directory, schema::{ObjectField, ObjectType, Type}, serialized::NativeQuery, - Configuration, }; use googletest::prelude::*; use itertools::Itertools as _; @@ -23,7 +21,7 @@ use super::native_query_from_pipeline; #[tokio::test] async fn infers_native_query_from_pipeline() -> Result<()> { - let config = read_configuration().await?; + let config = mflix_config(); let pipeline = Pipeline::new(vec![Stage::Documents(vec![ doc! { "foo": 1 }, doc! { "bar": 2 }, @@ -78,7 +76,7 @@ async fn infers_native_query_from_pipeline() -> Result<()> { #[tokio::test] async fn infers_native_query_from_non_trivial_pipeline() -> Result<()> { - let config = read_configuration().await?; + let config = mflix_config(); let pipeline = Pipeline::new(vec![ Stage::ReplaceWith(Selection::new(doc! { "title": "$title", @@ -508,7 +506,3 @@ where }) .collect() } - -async fn read_configuration() -> Result { - read_directory("../../fixtures/hasura/sample_mflix/connector").await -} diff --git a/crates/configuration/src/directory.rs b/crates/configuration/src/directory.rs index b3a23232..262d5f6d 100644 --- a/crates/configuration/src/directory.rs +++ b/crates/configuration/src/directory.rs @@ -1,6 +1,7 @@ use anyhow::{anyhow, Context as _}; use futures::stream::TryStreamExt as _; use itertools::Itertools as _; +use ndc_models::FunctionName; use serde::{Deserialize, Serialize}; use std::{ collections::{BTreeMap, HashSet}, @@ -11,7 +12,10 @@ use tokio::{fs, io::AsyncWriteExt}; use tokio_stream::wrappers::ReadDirStream; use crate::{ - configuration::ConfigurationOptions, serialized::Schema, with_name::WithName, Configuration, + configuration::ConfigurationOptions, + serialized::{NativeQuery, Schema}, + with_name::WithName, + Configuration, }; pub const SCHEMA_DIRNAME: &str = "schema"; @@ -69,9 +73,11 @@ pub async fn read_directory_with_ignored_configs( .await? .unwrap_or_default(); - let native_queries = read_subdir_configs(&dir.join(NATIVE_QUERIES_DIRNAME), ignored_configs) + let native_queries = read_native_query_directory(dir, ignored_configs) .await? - .unwrap_or_default(); + .into_iter() + .map(|(name, (config, _))| (name, config)) + .collect(); let options = parse_configuration_options_file(dir).await?; @@ -80,6 +86,19 @@ pub async fn read_directory_with_ignored_configs( Configuration::validate(schema, native_mutations, native_queries, options) } +/// Read native queries only, and skip configuration processing +pub async fn read_native_query_directory( + configuration_dir: impl AsRef + Send, + ignored_configs: &[PathBuf], +) -> anyhow::Result> { + let dir = configuration_dir.as_ref(); + let native_queries = + read_subdir_configs_with_paths(&dir.join(NATIVE_QUERIES_DIRNAME), ignored_configs) + .await? + .unwrap_or_default(); + Ok(native_queries) +} + /// Parse all files in a directory with one of the allowed configuration extensions according to /// the given type argument. For example if `T` is `NativeMutation` this function assumes that all /// json and yaml files in the given directory should be parsed as native mutation configurations. @@ -89,6 +108,23 @@ async fn read_subdir_configs( subdir: &Path, ignored_configs: &[PathBuf], ) -> anyhow::Result>> +where + for<'a> T: Deserialize<'a>, + for<'a> N: Ord + ToString + Deserialize<'a>, +{ + let configs_with_paths = read_subdir_configs_with_paths(subdir, ignored_configs).await?; + let configs_without_paths = configs_with_paths.map(|cs| { + cs.into_iter() + .map(|(name, (config, _))| (name, config)) + .collect() + }); + Ok(configs_without_paths) +} + +async fn read_subdir_configs_with_paths( + subdir: &Path, + ignored_configs: &[PathBuf], +) -> anyhow::Result>> where for<'a> T: Deserialize<'a>, for<'a> N: Ord + ToString + Deserialize<'a>, @@ -98,8 +134,8 @@ where } let dir_stream = ReadDirStream::new(fs::read_dir(subdir).await?); - let configs: Vec> = dir_stream - .map_err(|err| err.into()) + let configs: Vec> = dir_stream + .map_err(anyhow::Error::from) .try_filter_map(|dir_entry| async move { // Permits regular files and symlinks, does not filter out symlinks to directories. let is_file = !(dir_entry.file_type().await?.is_dir()); @@ -128,7 +164,11 @@ where Ok(format_option.map(|format| (path, format))) }) .and_then(|(path, format)| async move { - parse_config_file::>(path, format).await + let config = parse_config_file::>(&path, format).await?; + Ok(WithName { + name: config.name, + value: (config.value, path), + }) }) .try_collect() .await?; diff --git a/crates/configuration/src/lib.rs b/crates/configuration/src/lib.rs index c252fcc9..798f232c 100644 --- a/crates/configuration/src/lib.rs +++ b/crates/configuration/src/lib.rs @@ -12,7 +12,9 @@ pub use crate::directory::get_config_file_changed; pub use crate::directory::list_existing_schemas; pub use crate::directory::parse_configuration_options_file; pub use crate::directory::write_schema_directory; -pub use crate::directory::{read_directory, read_directory_with_ignored_configs}; +pub use crate::directory::{ + read_directory, read_directory_with_ignored_configs, read_native_query_directory, +}; pub use crate::directory::{ CONFIGURATION_OPTIONS_BASENAME, CONFIGURATION_OPTIONS_METADATA, NATIVE_MUTATIONS_DIRNAME, NATIVE_QUERIES_DIRNAME, SCHEMA_DIRNAME, diff --git a/crates/configuration/src/native_query.rs b/crates/configuration/src/native_query.rs index 2b819996..9588e3f1 100644 --- a/crates/configuration/src/native_query.rs +++ b/crates/configuration/src/native_query.rs @@ -45,3 +45,12 @@ pub enum NativeQueryRepresentation { Collection, Function, } + +impl NativeQueryRepresentation { + pub fn to_str(&self) -> &'static str { + match self { + NativeQueryRepresentation::Collection => "collection", + NativeQueryRepresentation::Function => "function", + } + } +} diff --git a/crates/mongodb-support/src/aggregate/stage.rs b/crates/mongodb-support/src/aggregate/stage.rs index 76ee4e93..635e2c2e 100644 --- a/crates/mongodb-support/src/aggregate/stage.rs +++ b/crates/mongodb-support/src/aggregate/stage.rs @@ -168,7 +168,7 @@ pub enum Stage { /// $replaceWith is an alias for $replaceRoot stage. /// /// See https://www.mongodb.com/docs/manual/reference/operator/aggregation/replaceRoot/#mongodb-pipeline-pipe.-replaceRoot - #[serde(rename = "$replaceWith", rename_all = "camelCase")] + #[serde(rename = "$replaceRoot", rename_all = "camelCase")] ReplaceRoot { new_root: Selection }, /// Replaces a document with the specified embedded document. The operation replaces all