From 3835e16fb91325aeb56e294838407696aa5cb609 Mon Sep 17 00:00:00 2001 From: Phil Date: Tue, 10 Dec 2024 13:16:36 -0500 Subject: [PATCH] agent: introduce status HTTP API and OpenAPI docs Adds the first public HTTP endpoint to agent, for fetching the status for a list of live specs. This includes some boilerplate for generating an OpenAPI spec from our axum handlers, and API docs that are rendered from that spec using Scalar. I tried to establish some repeatable patterns for writing API handlers with generated documentation and consistent error handling. The objective was to make it pretty straight forward to add more endpoints. Part of this was introducing a separation between the "public" APIs that we wish to make available to all clients, and the private APIs for which we don't want to guarantee backward compatibility. The idea is to only add apis to the `public` module when we want to support it for use by end users (which implies at least some level of backward compatibility). Only the public APIs are covered by the OpenAPI spec. The `aide` crate is used for generating the OpenAPI spec. It's not _great_, but seems to be better than the alternative, `utoipa`, which requires macros for defining routes and isn't directly compatible with `schemars`. --- Cargo.lock | 111 +++++++++++++++++- Cargo.toml | 13 +- crates/agent/Cargo.toml | 2 + crates/agent/src/api/error.rs | 163 ++++++++++++++++++++++++++ crates/agent/src/api/mod.rs | 72 +++++++++++- crates/agent/src/api/public/mod.rs | 129 ++++++++++++++++++++ crates/agent/src/api/public/status.rs | 80 +++++++++++++ crates/agent/src/draft.rs | 42 +------ crates/models/src/lib.rs | 1 + crates/models/src/sqlx_json.rs | 38 ++++++ crates/models/src/status/mod.rs | 60 ++++++++-- 11 files changed, 650 insertions(+), 61 deletions(-) create mode 100644 crates/agent/src/api/error.rs create mode 100644 crates/agent/src/api/public/mod.rs create mode 100644 crates/agent/src/api/public/status.rs create mode 100644 crates/models/src/sqlx_json.rs diff --git a/Cargo.lock b/Cargo.lock index 123a0e20bb..fe2fd2027f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -100,6 +100,7 @@ version = "0.0.0" dependencies = [ "activate", "agent-sql", + "aide", "allocator", "anyhow", "async-process", @@ -149,6 +150,7 @@ dependencies = [ "tokio", "tokio-util", "tonic", + "tower 0.5.0", "tower-http", "tracing", "tracing-subscriber", @@ -212,6 +214,41 @@ dependencies = [ "memchr", ] +[[package]] +name = "aide" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b0e3b97a21e41ec5c19bfd9b4fc1f7086be104f8b988681230247ffc91cc8ed" +dependencies = [ + "aide-macros", + "axum", + "axum-extra", + "bytes", + "cfg-if 1.0.0", + "http 1.1.0", + "indexmap 2.3.0", + "schemars", + "serde", + "serde_json", + "serde_qs 0.13.0", + "thiserror", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "aide-macros" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0487f8598afe49e6bc950a613a678bd962c4a6f431022ded62643c8b990301a" +dependencies = [ + "darling 0.20.10", + "proc-macro2", + "quote", + "syn 2.0.74", +] + [[package]] name = "alloc-no-stdlib" version = "2.0.4" @@ -772,6 +809,7 @@ dependencies = [ "mime", "pin-project-lite", "serde", + "serde_html_form", "tower 0.4.13", "tower-layer", "tower-service", @@ -1750,8 +1788,18 @@ version = "0.13.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a01d95850c592940db9b8194bc39f4bc0e89dee5c4265e4b1807c34a9aba453c" dependencies = [ - "darling_core", - "darling_macro", + "darling_core 0.13.4", + "darling_macro 0.13.4", +] + +[[package]] +name = "darling" +version = "0.20.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f63b86c8a8826a49b8c21f08a2d07338eec8d900540f8630dc76284be802989" +dependencies = [ + "darling_core 0.20.10", + "darling_macro 0.20.10", ] [[package]] @@ -1768,17 +1816,42 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "darling_core" +version = "0.20.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95133861a8032aaea082871032f5815eb9e98cef03fa916ab4500513994df9e5" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim 0.11.1", + "syn 2.0.74", +] + [[package]] name = "darling_macro" version = "0.13.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c972679f83bdf9c42bd905396b6c3588a843a17f0f16dfcfa3e2c5d57441835" dependencies = [ - "darling_core", + "darling_core 0.13.4", "quote", "syn 1.0.109", ] +[[package]] +name = "darling_macro" +version = "0.20.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806" +dependencies = [ + "darling_core 0.20.10", + "quote", + "syn 2.0.74", +] + [[package]] name = "dary_heap" version = "0.3.6" @@ -3171,6 +3244,7 @@ checksum = "de3fc2e30ba82dd1b3911c8de1ffc143c74a914a14e99514d7637e3099df5ea0" dependencies = [ "equivalent", "hashbrown 0.14.5", + "serde", ] [[package]] @@ -3881,7 +3955,9 @@ checksum = "c9be0862c1b3f26a88803c4a49de6889c10e608b3ee9344e6ef5b45fb37ad3d1" name = "models" version = "0.0.0" dependencies = [ + "anyhow", "caseless", + "chrono", "humantime-serde", "insta", "itertools 0.10.5", @@ -5603,6 +5679,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09c024468a378b7e36765cd36702b7a90cc3cba11654f6685c8f233408e89e92" dependencies = [ "dyn-clone", + "indexmap 2.3.0", "schemars_derive", "serde", "serde_json", @@ -5750,6 +5827,19 @@ dependencies = [ "syn 2.0.74", ] +[[package]] +name = "serde_html_form" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8de514ef58196f1fc96dcaef80fe6170a1ce6215df9687a93fe8300e773fefc5" +dependencies = [ + "form_urlencoded", + "indexmap 2.3.0", + "itoa", + "ryu", + "serde", +] + [[package]] name = "serde_json" version = "1.0.124" @@ -5794,6 +5884,19 @@ dependencies = [ "thiserror", ] +[[package]] +name = "serde_qs" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd34f36fe4c5ba9654417139a9b3a20d2e1de6012ee678ad14d240c22c78d8d6" +dependencies = [ + "axum", + "futures", + "percent-encoding", + "serde", + "thiserror", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -6874,7 +6977,7 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2ca2384932803823dd024a68d84019666577f4b373d78eb7ccafd5aa2548d615" dependencies = [ - "darling", + "darling 0.13.4", "proc-macro2", "quote", "syn 1.0.109", diff --git a/Cargo.toml b/Cargo.toml index 1d4eab9e17..c6385b8f5a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,14 @@ license = "BSL" [workspace.dependencies] addr = { version = "0.15.4", default-features = false, features = ["std"] } +aide = { version = "0.13", features = [ + "axum", + "macros", + "scalar", + "redoc", + "axum-extra", + "axum-extra-query", +] } anyhow = "1.0" async-compression = { version = "0.3", features = [ "futures-io", @@ -227,10 +235,11 @@ pbjson-build = "0.7" prost-build = "0.13" tonic-build = "0.12" -warp = "0.3.3" # TODO(johnny) remove me in favor of axum +warp = "0.3.3" # TODO(johnny) remove me in favor of axum +# Used for the agent http server axum = { version = "0.7", features = ["macros"] } axum-server = { version = "0.7", features = ["tls-rustls"] } -axum-extra = { version = "0.9", features = ["typed-header"] } +axum-extra = { version = "0.9", features = ["typed-header", "query"] } [profile.release] incremental = true diff --git a/crates/agent/Cargo.toml b/crates/agent/Cargo.toml index c97947e53f..5b41e9e441 100644 --- a/crates/agent/Cargo.toml +++ b/crates/agent/Cargo.toml @@ -30,6 +30,7 @@ sources = { path = "../sources" } tables = { path = "../tables", features = ["persist"] } validation = { path = "../validation" } +aide = { workspace = true } anyhow = { workspace = true } async-trait = { workspace = true } axum = { workspace = true } @@ -60,6 +61,7 @@ thiserror = { workspace = true } tokio = { workspace = true } tokio-util = { workspace = true } tonic = { workspace = true } +tower = { workspace = true } tower-http = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } diff --git a/crates/agent/src/api/error.rs b/crates/agent/src/api/error.rs new file mode 100644 index 0000000000..f987d3f9eb --- /dev/null +++ b/crates/agent/src/api/error.rs @@ -0,0 +1,163 @@ +//! Defines the `ApiError` type that can be returned from an API handler, which +//! specifies an HTTP status code and wraps an `anyhow::Error`. It implements +//! `IntoResponse`, allowing handlers to return a `Result, ApiError>`. +//! `From` impls exist for `anyhow::Error`, `Rejection`, and `sqlx::Error` with +//! reasonable default status codes. The http status code can be customized +//! using `ApiErrorExt::with_status` if you need to return a specific response +//! status for a given error. +//! +//! These types are written with the aim of making them easy to use in the +//! server. It's unclear whether we need an error struct defined in the `models` +//! crate, but in this case it's probably easiest to just use separate structs +//! for the client and server. +use axum::http::StatusCode; +use schemars::JsonSchema; + +use super::Rejection; + +pub trait ApiErrorExt { + /// Sets the given http response status to use when responding with this error. + fn with_status(self, status: axum::http::StatusCode) -> ApiError; +} + +impl + Sized> ApiErrorExt for E { + fn with_status(self, status: axum::http::StatusCode) -> ApiError { + let mut err: ApiError = self.into(); + err.status = status; + err + } +} + +/// An error response +#[derive( + Debug, thiserror::Error, serde::Serialize, serde::Deserialize, JsonSchema, aide::OperationIo, +)] +#[aide(output)] +#[error("status: {status}, error: {error}")] +pub struct ApiError { + /// The HTTP status code + #[serde(with = "status_serde")] + #[schemars(schema_with = "status_serde::schema")] + status: axum::http::StatusCode, + + /// The error message + #[serde(with = "error_serde")] + #[schemars(schema_with = "error_serde::schema")] + #[source] + error: anyhow::Error, +} + +mod status_serde { + use serde::{ + de::{self, Deserialize, Deserializer}, + ser::{Serialize, Serializer}, + }; + + pub fn schema(_: &mut schemars::gen::SchemaGenerator) -> schemars::schema::Schema { + serde_json::from_value(serde_json::json!({ + "type": "integer", + "minimum": 100, + "maximum": 599, + })) + .unwrap() + } + pub fn serialize( + status: &axum::http::StatusCode, + s: S, + ) -> Result { + status.as_u16().serialize(s) + } + pub fn deserialize<'a, D: Deserializer<'a>>( + deserializer: D, + ) -> Result { + let int_val = ::deserialize(deserializer)?; + axum::http::StatusCode::from_u16(int_val).map_err(|e| de::Error::custom(e)) + } +} + +mod error_serde { + use serde::{ + de::{Deserialize, Deserializer}, + ser::Serializer, + }; + + pub fn schema(_: &mut schemars::gen::SchemaGenerator) -> schemars::schema::Schema { + serde_json::from_value(serde_json::json!({ + "type": "string", + })) + .unwrap() + } + pub fn serialize(error: &anyhow::Error, s: S) -> Result { + let err_str = format!("{error:#}"); // alternate renders nested causes + s.serialize_str(&err_str) + } + pub fn deserialize<'a, D: Deserializer<'a>>( + deserializer: D, + ) -> Result { + let str_val = ::deserialize(deserializer)?; + Ok(anyhow::anyhow!(str_val)) + } +} + +impl ApiError { + pub fn not_found() -> ApiError { + ApiError::new( + StatusCode::NOT_FOUND, + anyhow::anyhow!("requested entity does not exist or you are not authorized"), + ) + } + + pub fn new(status: StatusCode, error: anyhow::Error) -> ApiError { + ApiError { status, error } + } + + fn status_for(err: &anyhow::Error) -> StatusCode { + // Ensure that we set the proper status code if the anyhow error itself + // wraps a Rejection. This might not be necessary since we generally + // convert Rejections into ApiErrors directly, using `From` impl, which + // always sets the proper status. But this check is cheap, and it + // ensures that we'll set the proper status in case a `?` operator + // somewhere converts the Rejection into an `anyhow::Error` before + // converting that error into an `ApiError`. + if let Some(_rejection) = err.downcast_ref::() { + return StatusCode::BAD_REQUEST; + } + if let Some(api_error) = err.downcast_ref::() { + return api_error.status; + } + StatusCode::INTERNAL_SERVER_ERROR + } +} + +impl From for ApiError { + fn from(error: sqlx::Error) -> ApiError { + tracing::error!(?error, "API responding with database error"); + ApiError { + status: StatusCode::INTERNAL_SERVER_ERROR, + error: anyhow::anyhow!("database error, please retry the request"), + } + } +} + +impl From for ApiError { + fn from(error: anyhow::Error) -> Self { + let status = Self::status_for(&error); + ApiError { status, error } + } +} + +impl From for ApiError { + fn from(value: Rejection) -> Self { + ApiError { + status: StatusCode::BAD_REQUEST, + error: anyhow::Error::from(value).context("Input validation error"), + } + } +} + +impl axum::response::IntoResponse for ApiError { + fn into_response(self) -> axum::response::Response { + let status = self.status; + (status, axum::Json(self)).into_response() + } +} diff --git a/crates/agent/src/api/mod.rs b/crates/agent/src/api/mod.rs index f63b40e5a1..0e2ae435a6 100644 --- a/crates/agent/src/api/mod.rs +++ b/crates/agent/src/api/mod.rs @@ -1,4 +1,5 @@ use axum::{http::StatusCode, response::IntoResponse}; +use models::Capability; use std::sync::{Arc, Mutex}; mod authorize_dekaf; @@ -6,12 +7,16 @@ mod authorize_task; mod authorize_user_collection; mod authorize_user_task; mod create_data_plane; +mod error; +mod public; mod snapshot; mod update_l2_reporting; use anyhow::Context; use snapshot::Snapshot; +pub use error::ApiError; + /// Request wraps a JSON-deserialized request type T which /// also implements the validator::Validate trait. #[derive(Debug, Clone, Copy, Default)] @@ -51,6 +56,46 @@ struct App { snapshot: std::sync::RwLock, } +impl App { + pub async fn is_user_authorized( + &self, + claims: &ControlClaims, + catalog_names: &[impl AsRef], + capability: Capability, + ) -> anyhow::Result { + let started_unix = jsonwebtoken::get_current_timestamp(); + loop { + match Snapshot::evaluate(&self.snapshot, started_unix, |snapshot: &Snapshot| { + for catalog_name in catalog_names { + if !tables::UserGrant::is_authorized( + &snapshot.role_grants, + &snapshot.user_grants, + claims.sub, + catalog_name.as_ref(), + capability, + ) { + tracing::debug!( + catalog_name=%catalog_name.as_ref(), + required_capability = ?capability, + user_id = %claims.sub, + "user is unauthorized" + ); + return Ok(false); + } + } + Ok(true) + }) { + Ok(authz_result) => return Ok(authz_result), + Err(Ok(retry_millis)) => { + tracing::debug!(%retry_millis, "waiting before retrying authZ check"); + () = tokio::time::sleep(std::time::Duration::from_millis(retry_millis)).await; + } + Err(Err(err)) => return Err(err), + } + } + } +} + /// Build the agent's API router. pub fn build_router( id_generator: models::IdGenerator, @@ -102,7 +147,9 @@ pub fn build_router( .allow_origin(tower_http::cors::AllowOrigin::list(allow_origin)) .allow_headers(allow_headers); - let schema_router = axum::Router::new() + let public_api_router = public::api_v1_router(app.clone()); + + let main_router = axum::Router::new() .route("/authorize/task", post(authorize_task::authorize_task)) .route("/authorize/dekaf", post(authorize_dekaf::authorize_dekaf)) .route( @@ -127,11 +174,15 @@ pub fn build_router( post(update_l2_reporting::update_l2_reporting) .route_layer(axum::middleware::from_fn_with_state(app.clone(), authorize)), ) - .layer(tower_http::trace::TraceLayer::new_for_http()) + .merge(public_api_router) + .layer( + tower_http::trace::TraceLayer::new_for_http() + .on_failure(tower_http::trace::DefaultOnFailure::new().level(tracing::Level::INFO)), + ) .layer(cors) .with_state(app); - Ok(schema_router) + Ok(main_router) } async fn preflight_handler() -> impl IntoResponse { @@ -241,3 +292,18 @@ fn maybe_rewrite_address(external: bool, address: &str) -> String { address.to_string() } } + +fn optional_datetime_schema(_: &mut schemars::gen::SchemaGenerator) -> schemars::schema::Schema { + serde_json::from_value(serde_json::json!({ + "type": ["string", "null"], + "format": "date-time", + })) + .unwrap() +} +fn datetime_schema(_: &mut schemars::gen::SchemaGenerator) -> schemars::schema::Schema { + serde_json::from_value(serde_json::json!({ + "type": "string", + "format": "date-time", + })) + .unwrap() +} diff --git a/crates/agent/src/api/public/mod.rs b/crates/agent/src/api/public/mod.rs new file mode 100644 index 0000000000..8048df3aab --- /dev/null +++ b/crates/agent/src/api/public/mod.rs @@ -0,0 +1,129 @@ +mod status; + +use axum::{http::StatusCode, response::IntoResponse}; +use std::sync::Arc; + +use crate::api::{authorize, error::ApiErrorExt, ApiError, App}; + +/// Creates a router for the public API that can be merged into an existing router. +/// All endpoints registered here are documented in an OpenAPI spec. For adding new +/// endpoints, the general rule is to use a handler function signature like: +/// +/// ``` +/// fn handle_{get|post|etc}_{resource_name}( +/// state: State>, // has the database connection pool, etc +/// claims: Extension, // claims from a verified JWT (unauthenticated requests would be rejected automatically) +/// other_stuff: T, // other extracted data from the request +/// ) -> Result, ApiError> +/// ``` +/// +/// and register the handler using `.api_route(path, aide::axum::routing::get(handle_get_thing))`. +/// +/// Other input parameters can be used, as long as they implement +/// `aide::operation::OperationInput`. The basic ones, like `Path` and `Query` +/// all do so already. This just ensures that the parameters are documented in +/// the OpenAPI spec. You can `impl aide::operation::OperationInput for MyInput +/// {}` if you don't want it to show in the spec. +/// +/// For accepting query parameters, define a struct with `Deserialize` and +/// `JsonSchema` impls, and use a parameter of type +/// `axum_extra::extract::Query` to extract it. This will +/// automatically return a 400 response if the given query parameters can't be +/// deserialized into the struct. +/// +/// The output type `Result, ApiError>` is suitable for any handler that +/// returns JSON, which is all of them. Just ensure that `T` implements +/// `serde::Serialize` and `schemars::JsonSchema`. See the `crate::api::error` module +/// docs for more information on error handling. +pub fn api_v1_router(app: Arc) -> axum::Router> { + // When errors occur during the process of generating an openapi spec, aide + // will call this function with the error so we can log it. They have a note + // in their docs warning about false positives where it logs errors even + // when it's able to return a valid response. I know it smells, but seems + // better than the available alternatives. + aide::gen::on_error(|error| { + tracing::error!(?error, "aide gen error"); + if cfg!(test) { + panic!("aide gen error: {:?}", error); + } + }); + + // Routes are defined in groups, with the first group all being + // authenticated routes that require a valid authentication token, and the + // second group being unauthenticated routes that can be accessed by anyone. + let router = aide::axum::ApiRouter::new() + .api_route( + "/api/v1/status", + aide::axum::routing::get(status::handle_get_status), + ) + .layer(axum::middleware::from_fn(ensure_accepts_json)) + // All routes below this are publicly accessible to anyone, without an authentication token + .layer(axum::middleware::from_fn_with_state(app.clone(), authorize)) + // The openapi json is itself documented as an API route + .api_route("/api/v1/openapi.json", aide::axum::routing::get(serve_docs)) + // The docs UI is not documented as an API route + .route( + "/api/v1/docs", + axum::routing::get( + aide::scalar::Scalar::new("/api/v1/openapi.json") + .with_title(API_TITLE) + .axum_handler(), + ), + ) + .with_state(app.clone()); + + // There's kind of a weird twist here, where we take the `OpenApi` that + // holds the generated documentation, and add it as an extension to the + // router that we just generated the documentation from. + let mut api = aide::openapi::OpenApi::default(); + let router = router.finish_api_with(&mut api, api_docs); + router.layer(axum::Extension(Arc::new(api))) +} + +/// Our API currently only supports JSON responses, so we check to make sure +/// that the accept header permits those. +async fn ensure_accepts_json( + headers: axum::http::HeaderMap, + req: axum::http::Request, + next: axum::middleware::Next, +) -> axum::response::Response { + if let Some(val) = headers.get("accept") { + let Ok(accept) = val.to_str() else { + return anyhow::anyhow!("invalid accept header was not ascii") + .with_status(StatusCode::BAD_REQUEST) + .into_response(); + }; + if !accept.contains("application/json") && !accept.contains("*/*") { + return anyhow::anyhow!("only application/json responses are supported at this time") + .with_status(StatusCode::NOT_ACCEPTABLE) + .into_response(); + } + } + next.run(req).await +} + +/// Handler that serves the openapi spec as JSON +async fn serve_docs( + axum::extract::Extension(api): axum::extract::Extension>, +) -> impl aide::axum::IntoApiResponse { + axum::Json(api).into_response() +} + +const API_TITLE: &str = "Flow Control Plane V1 API"; + +fn api_docs(api: aide::transform::TransformOpenApi) -> aide::transform::TransformOpenApi { + api.title(API_TITLE) + .summary("Controlling the control plane") + .description("API for the Flow control plane") + .security_scheme( + "ApiKey", + aide::openapi::SecurityScheme::Http { + scheme: "bearer".to_string(), + bearer_format: Some("JWT".to_string()), + description: Some("Estuary authentication token".to_string()), + extensions: Default::default(), + }, + ) + .security_requirement("ApiKey") + .default_response_with::, _>(|res| res.example(ApiError::not_found())) +} diff --git a/crates/agent/src/api/public/status.rs b/crates/agent/src/api/public/status.rs new file mode 100644 index 0000000000..78fe5c6f91 --- /dev/null +++ b/crates/agent/src/api/public/status.rs @@ -0,0 +1,80 @@ +use std::sync::Arc; + +use crate::api::error::ApiErrorExt; +use crate::api::{ApiError, App, ControlClaims}; +use axum::extract::{Path, State}; +use axum::http::StatusCode; +use axum::{Extension, Json}; +// axum_extra's `Query` is needed here because unlike the one from `axum`, it +// handles multiple query parameters with the same name +use axum_extra::extract::Query; +use chrono::{DateTime, Utc}; +use itertools::Itertools; +use models::status::{self, StatusResponse}; +use models::Id; + +/// Query parameters for the status endpoint +#[derive(Debug, serde::Deserialize, schemars::JsonSchema)] +pub struct StatusQuery { + /// The catalog name of the live spec to get the status of + pub name: Vec, +} + +#[axum::debug_handler] +pub async fn handle_get_status( + state: State>, + Extension(claims): Extension, + Query(params): Query, +) -> Result>, ApiError> { + if !state + .0 + .is_user_authorized(&claims, ¶ms.name, models::Capability::Read) + .await? + { + return Err(ApiError::not_found()); + } + let pool = state.0.pg_pool.clone(); + let status = fetch_status(&pool, ¶ms.name).await?; + Ok(Json(status)) +} + +async fn fetch_status( + pool: &sqlx::PgPool, + catalog_names: &[String], +) -> Result, ApiError> { + let resp = sqlx::query_as!(StatusResponse, r#"select + ls.catalog_name as "catalog_name!: String", + ls.id as "live_spec_id: Id", + ls.spec_type as "spec_type: models::CatalogType", + coalesce(ls.spec->'shards'->>'disable', ls.spec->'derive'->'shards'->>'disable', 'false') = 'true' as "disabled!: bool", + ls.last_pub_id as "last_pub_id: Id", + ls.last_build_id as "last_build_id: Id", + ls.controller_next_run, + ls.updated_at as "live_spec_updated_at: DateTime", + cj.updated_at as "controller_updated_at: DateTime", + cj.status as "status: status::Status", + cj.error as "controller_error: String", + cj.failures as "controller_failures: i32" + from live_specs ls + join controller_jobs cj on ls.id = cj.live_spec_id + where ls.catalog_name::text = any($1::text[]) + "#, + catalog_names as &[String], + ).fetch_all(pool) + .await?; + + if resp.len() < catalog_names.len() { + let actual = resp + .into_iter() + .map(|r| r.catalog_name) + .collect::>(); + let missing = catalog_names + .iter() + .filter(|n| !actual.contains(n.as_str())); + return Err( + anyhow::anyhow!("no live specs found for names: [{}]", missing.format(", ")) + .with_status(StatusCode::NOT_FOUND), + ); + } + Ok(resp) +} diff --git a/crates/agent/src/draft.rs b/crates/agent/src/draft.rs index 8d49ea9d48..ce5a266542 100644 --- a/crates/agent/src/draft.rs +++ b/crates/agent/src/draft.rs @@ -1,47 +1,7 @@ -use crate::publications::LockFailure; - use super::Id; use agent_sql::{drafts as drafts_sql, CatalogType}; use anyhow::Context; -use schemars::JsonSchema; -use serde::{Deserialize, Serialize}; - -#[derive(Debug, Default, Serialize, Deserialize, PartialEq, Clone, JsonSchema)] -pub struct Error { - #[serde(default, skip_serializing_if = "String::is_empty")] - pub catalog_name: String, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub scope: Option, - pub detail: String, -} - -impl Error { - pub fn from_tables_error(err: &tables::Error) -> Self { - let catalog_name = tables::parse_synthetic_scope(&err.scope) - .map(|(_, name)| name) - .unwrap_or_default(); - Error { - catalog_name, - scope: Some(err.scope.to_string()), - // use alternate to print chained contexts - detail: format!("{:#}", err.error), - } - } -} - -impl From for Error { - fn from(err: LockFailure) -> Self { - let detail = format!( - "the expectPubId of spec {:?} {:?} did not match that of the live spec {:?}", - err.catalog_name, err.expected, err.actual - ); - Error { - catalog_name: err.catalog_name, - detail, - scope: None, - } - } -} +use models::draft_error::Error; pub async fn load_draft( draft_id: Id, diff --git a/crates/models/src/lib.rs b/crates/models/src/lib.rs index 82736e8197..d59c8f6b4c 100644 --- a/crates/models/src/lib.rs +++ b/crates/models/src/lib.rs @@ -23,6 +23,7 @@ mod schemas; mod shards; mod source; mod source_capture; +pub(crate) mod sqlx_json; pub mod status; mod tests; diff --git a/crates/models/src/sqlx_json.rs b/crates/models/src/sqlx_json.rs new file mode 100644 index 0000000000..f803df37b9 --- /dev/null +++ b/crates/models/src/sqlx_json.rs @@ -0,0 +1,38 @@ +/// Implements `sqlx::{Type, Decode, Encode}` for a given type, allowing sqlx to +/// treat it as a plain `json` column (not `jsonb`). The type given to this +/// macro must implement `serde::Serialize` and `serde::Deserialize`. +macro_rules! sqlx_json { + ($rust_type:ty) => { + + #[cfg(feature = "sqlx-support")] + impl sqlx::Type for $rust_type { + fn type_info() -> sqlx::postgres::PgTypeInfo { + sqlx::postgres::PgTypeInfo::with_name("JSON") + } + // TODO: is `compatible` impl necessary? + fn compatible(ty: &sqlx::postgres::PgTypeInfo) -> bool { + *ty == Self::type_info() // Not compatible with JSONB. + } + } + + #[cfg(feature = "sqlx-support")] + impl<'a> sqlx::Decode<'a, sqlx::postgres::Postgres> for $rust_type { + fn decode(value: sqlx::postgres::PgValueRef<'a>) -> Result { + as sqlx::Decode<'a, sqlx::postgres::Postgres>>::decode(value) + .map(|t| t.0) + } + } + + #[cfg(feature = "sqlx-support")] + impl<'q> sqlx::Encode<'q, sqlx::postgres::Postgres> for $rust_type { + fn encode_by_ref(&self, buf: &mut sqlx::postgres::PgArgumentBuffer) -> sqlx::encode::IsNull { + as sqlx::Encode<'q, sqlx::postgres::Postgres>>::encode( + sqlx::types::Json(self), + buf, + ) + } + } + }; +} + +pub(crate) use sqlx_json; diff --git a/crates/models/src/status/mod.rs b/crates/models/src/status/mod.rs index 5ec832f996..9fad79bdd8 100644 --- a/crates/models/src/status/mod.rs +++ b/crates/models/src/status/mod.rs @@ -4,10 +4,51 @@ pub mod collection; pub mod materialization; pub mod publications; -use crate::CatalogType; +use crate::{datetime_schema, is_false, option_datetime_schema, CatalogType, Id}; +use chrono::{DateTime, Utc}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; +/// Response type for the status endpoint +#[derive(Debug, serde::Serialize, serde::Deserialize, schemars::JsonSchema)] +pub struct StatusResponse { + /// The name of the live spec + pub catalog_name: String, + /// The id of the live spec + pub live_spec_id: Id, + /// The type of the live spec + pub spec_type: Option, + /// Whether the shards are disabled. Only pertinent to tasks. Omitted if false. + #[serde(default, skip_serializing_if = "is_false")] + pub disabled: bool, + /// The id of the last successful publication that modified the spec. + pub last_pub_id: Id, + /// The id of the last successful publication of the spec, regardless of + /// whether the spec was modified. This value can be compared against the + /// value of `/controller_status/activations/last_activated` in order to + /// determine whether the most recent build has been activated in the data + /// plane. + pub last_build_id: Id, + /// Time at which the controller is next scheduled to run. Or null if there + /// is no run scheduled. + #[schemars(schema_with = "option_datetime_schema")] + pub controller_next_run: Option>, + /// Time of the last publication that affected the live spec. + #[schemars(schema_with = "datetime_schema")] + pub live_spec_updated_at: DateTime, + /// Time of the last controller run for this spec. + #[schemars(schema_with = "datetime_schema")] + pub controller_updated_at: DateTime, + /// The controller status json. + pub status: Status, + /// Error from the most recent controller run, or `null` if the run was + /// successful. + pub controller_error: Option, + /// The number of consecutive failures of the controller. Resets to 0 after + /// any successful run. + pub controller_failures: i32, +} + /// Represents the internal state of a controller. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, JsonSchema)] #[serde(tag = "type")] @@ -16,20 +57,14 @@ pub enum Status { Collection(collection::CollectionStatus), Materialization(materialization::MaterializationStatus), Test(catalog_test::TestStatus), - #[schemars(skip)] #[serde(other, untagged)] Uninitialized, } -impl Status { - pub fn json_schema() -> schemars::schema::RootSchema { - let settings = schemars::gen::SchemaSettings::draft2019_09(); - //settings.option_add_null_type = false; - //settings.inline_subschemas = true; - let generator = schemars::gen::SchemaGenerator::new(settings); - generator.into_root_schema_for::() - } +// Status types are serialized as plain json columns. +crate::sqlx_json::sqlx_json!(Status); +impl Status { pub fn catalog_type(&self) -> Option { match self { Status::Capture(_) => Some(CatalogType::Capture), @@ -200,7 +235,10 @@ mod test { #[test] fn test_status_json_schema() { - let schema = serde_json::to_value(Status::json_schema()).unwrap(); + let settings = schemars::gen::SchemaSettings::draft2019_09(); + let generator = schemars::gen::SchemaGenerator::new(settings); + let schema_obj = generator.into_root_schema_for::(); + let schema = serde_json::to_value(&schema_obj).unwrap(); insta::assert_json_snapshot!(schema); } }