From b9a6770171d69677d1671b456a9245ac67d9d01f Mon Sep 17 00:00:00 2001 From: Christopher Kolstad Date: Mon, 8 Jan 2024 12:31:33 +0100 Subject: [PATCH 1/3] feat: added client authenticated bulk metrics --- server/src/client_api.rs | 160 ++++++++++++++++++++++++--- server/src/http/unleash_client.rs | 33 +++++- server/src/lib.rs | 10 +- server/src/metrics/client_metrics.rs | 19 +++- server/src/types.rs | 26 ++++- server/src/urls.rs | 7 ++ 6 files changed, 235 insertions(+), 20 deletions(-) diff --git a/server/src/client_api.rs b/server/src/client_api.rs index 31cf9411..aaf8245c 100644 --- a/server/src/client_api.rs +++ b/server/src/client_api.rs @@ -5,7 +5,9 @@ use crate::filters::{ use crate::http::feature_refresher::FeatureRefresher; use crate::metrics::client_metrics::MetricsCache; use crate::tokens::cache_key; -use crate::types::{EdgeJsonResult, EdgeResult, EdgeToken, FeatureFilters}; +use crate::types::{ + BatchMetricsRequestBody, EdgeJsonResult, EdgeResult, EdgeToken, FeatureFilters, +}; use actix_web::web::{self, Data, Json, Query}; use actix_web::{get, post, HttpRequest, HttpResponse}; use dashmap::DashMap; @@ -199,6 +201,31 @@ pub async fn metrics( Ok(HttpResponse::Accepted().finish()) } +#[utoipa::path( +context_path = "/api/client", +responses( +(status = 202, description = "Accepted bulk metrics"), +(status = 403, description = "Was not allowed to post bulk metrics") +), +request_body = BatchMetricsRequestBody, +security( +("Authorization" = []) +) +)] +#[post("/metrics/bulk")] +pub async fn post_bulk_metrics( + _edge_token: EdgeToken, + bulk_metrics: Json, + connect_via: Data, + metrics_cache: Data, +) -> EdgeResult { + crate::metrics::client_metrics::register_bulk_metrics( + metrics_cache, + connect_via.get_ref(), + bulk_metrics.into_inner(), + ); + Ok(HttpResponse::Accepted().finish()) +} pub fn configure_client_api(cfg: &mut web::ServiceConfig) { cfg.service( web::scope("/client") @@ -208,7 +235,8 @@ pub fn configure_client_api(cfg: &mut web::ServiceConfig) { .service(get_features) .service(get_feature) .service(register) - .service(metrics), + .service(metrics) + .service(post_bulk_metrics), ); } @@ -224,12 +252,12 @@ pub fn configure_experimental_post_features( #[cfg(test)] mod tests { + use crate::metrics::client_metrics::{ApplicationKey, MetricsBatch, MetricsKey}; + use crate::types::{TokenType, TokenValidationStatus}; + use std::collections::HashMap; use std::path::PathBuf; use std::str::FromStr; - use std::{collections::HashMap, sync::Arc}; - - use crate::metrics::client_metrics::{ApplicationKey, MetricsKey}; - use crate::types::{TokenType, TokenValidationStatus}; + use std::sync::Arc; use super::*; @@ -243,14 +271,16 @@ mod tests { http::header::ContentType, test, web::{self, Data}, - App, + App, ResponseError, }; use chrono::{DateTime, Duration, TimeZone, Utc}; use maplit::hashmap; use reqwest::StatusCode; use ulid::Ulid; use unleash_types::client_features::{ClientFeature, Constraint, Operator, Strategy}; - use unleash_types::client_metrics::{ClientMetricsEnv, MetricBucket, ToggleStats}; + use unleash_types::client_metrics::{ + ClientMetricsEnv, ConnectViaBuilder, MetricBucket, ToggleStats, + }; use unleash_yggdrasil::EngineState; async fn make_metrics_post_request() -> Request { @@ -280,6 +310,38 @@ mod tests { .to_request() } + async fn make_bulk_metrics_post_request(authorization: Option) -> Request { + let mut req = test::TestRequest::post() + .uri("/api/client/metrics/bulk") + .insert_header(ContentType::json()); + req = match authorization { + Some(auth) => req.insert_header(("Authorization", auth)), + None => req, + }; + req.set_json(Json(BatchMetricsRequestBody { + applications: vec![ClientApplication { + app_name: "test_app".to_string(), + connect_via: None, + environment: None, + instance_id: None, + interval: 10, + sdk_version: None, + started: Default::default(), + strategies: vec![], + }], + metrics: vec![ClientMetricsEnv { + feature_name: "".to_string(), + app_name: "".to_string(), + environment: "".to_string(), + timestamp: Default::default(), + yes: 0, + no: 0, + variants: Default::default(), + }], + })) + .to_request() + } + async fn make_register_post_request(application: ClientApplication) -> Request { test::TestRequest::post() .uri("/api/client/register") @@ -476,6 +538,76 @@ mod tests { assert_eq!(saved_app.connect_via, Some(vec![our_app])); } + #[tokio::test] + async fn bulk_metrics_endpoint_correctly_accepts_data() { + let metrics_cache = MetricsCache::default(); + let connect_via = ConnectViaBuilder::default() + .app_name("unleash-edge".into()) + .instance_id("test".into()) + .build() + .unwrap(); + let app = test::init_service( + App::new() + .app_data(Data::new(connect_via)) + .app_data(web::Data::new(metrics_cache)) + .service(web::scope("/api/client").service(post_bulk_metrics)), + ) + .await; + let token = EdgeToken::from_str("*:development.somestring").unwrap(); + let req = make_bulk_metrics_post_request(Some(token.token.clone())).await; + let call = test::call_service(&app, req).await; + assert_eq!(call.status(), StatusCode::ACCEPTED); + } + #[tokio::test] + async fn bulk_metrics_endpoint_correctly_refuses_metrics_without_auth_header() { + let mut token = EdgeToken::from_str("*:development.somestring").unwrap(); + token.status = TokenValidationStatus::Validated; + token.token_type = Some(TokenType::Client); + let upstream_token_cache = Arc::new(DashMap::default()); + let upstream_features_cache = Arc::new(DashMap::default()); + let upstream_engine_cache = Arc::new(DashMap::default()); + upstream_token_cache.insert(token.token.clone(), token.clone()); + let srv = upstream_server( + upstream_token_cache, + upstream_features_cache, + upstream_engine_cache, + ) + .await; + let client = UnleashClient::new(srv.url("/").as_str(), None).unwrap(); + let status = client + .send_bulk_metrics_to_client_endpoint(MetricsBatch::default(), None) + .await; + assert_eq!(status.expect_err("").status_code(), StatusCode::FORBIDDEN); + let successful = client + .send_bulk_metrics_to_client_endpoint(MetricsBatch::default(), Some(token.clone())) + .await; + assert!(successful.is_ok()); + } + + #[tokio::test] + async fn bulk_metrics_endpoint_correctly_refuses_metrics_with_frontend_token() { + let mut frontend_token = EdgeToken::from_str("*:development.frontend").unwrap(); + frontend_token.status = TokenValidationStatus::Validated; + frontend_token.token_type = Some(TokenType::Frontend); + let upstream_token_cache = Arc::new(DashMap::default()); + let upstream_features_cache = Arc::new(DashMap::default()); + let upstream_engine_cache = Arc::new(DashMap::default()); + upstream_token_cache.insert(frontend_token.token.clone(), frontend_token.clone()); + let srv = upstream_server( + upstream_token_cache, + upstream_features_cache, + upstream_engine_cache, + ) + .await; + let client = UnleashClient::new(srv.url("/").as_str(), None).unwrap(); + let status = client + .send_bulk_metrics_to_client_endpoint( + MetricsBatch::default(), + Some(frontend_token.clone()), + ) + .await; + assert_eq!(status.expect_err("").status_code(), StatusCode::FORBIDDEN); + } #[tokio::test] async fn client_features_endpoint_correctly_returns_cached_features() { let features_cache: Arc> = Arc::new(DashMap::default()); @@ -1031,11 +1163,11 @@ mod tests { let res = test::call_service(&app, request).await; assert_eq!(res.status(), StatusCode::OK); let request = test::TestRequest::get() - .uri("/api/client/features") - .insert_header(ContentType::json()) - .insert_header(("ShouldNotWork", production_token.token.clone())) - .to_request(); - let res = test::call_service(&app, request).await; - assert_eq!(res.status(), StatusCode::FORBIDDEN); + .uri("/api/client/features") + .insert_header(ContentType::json()) + .insert_header(("ShouldNotWork", production_token.token.clone())) + .to_request(); + let res = test::call_service(&app, request).await; + assert_eq!(res.status(), StatusCode::FORBIDDEN); } } diff --git a/server/src/http/unleash_client.rs b/server/src/http/unleash_client.rs index 4121b72a..11cf1271 100644 --- a/server/src/http/unleash_client.rs +++ b/server/src/http/unleash_client.rs @@ -203,7 +203,7 @@ impl UnleashClient { service_account_token: String, connect_timeout: Duration, socket_timeout: Duration, - token_header: String + token_header: String, ) -> Self { Self { urls: UnleashUrls::from_base_url(server_url), @@ -300,7 +300,7 @@ impl UnleashClient { fn header_map(&self, api_key: Option) -> HeaderMap { let mut header_map = HeaderMap::new(); - let token_header: HeaderName= HeaderName::from_str(self.token_header.as_str()).unwrap(); + let token_header: HeaderName = HeaderName::from_str(self.token_header.as_str()).unwrap(); if let Some(key) = api_key { header_map.insert(token_header, key.parse().unwrap()); } @@ -450,6 +450,35 @@ impl UnleashClient { } } + pub async fn send_bulk_metrics_to_client_endpoint( + &self, + request: MetricsBatch, + token: Option, + ) -> EdgeResult<()> { + let result = self + .backing_client + .post(self.urls.client_bulk_metrics_url.to_string()) + .headers(self.header_map(token.map(|t| t.token))) + .json(&request) + .send() + .await + .map_err(|e| { + info!("Failed to send metrics to /api/client/metrics/bulk endpoint {e:?}"); + EdgeError::EdgeMetricsError + })?; + if result.status().is_success() { + Ok(()) + } else { + match result.status() { + StatusCode::BAD_REQUEST => Err(EdgeMetricsRequestError( + result.status(), + result.json().await.ok(), + )), + _ => Err(EdgeMetricsRequestError(result.status(), None)), + } + } + } + pub async fn forward_request_for_client_token( &self, client_token_request: ClientTokenRequest, diff --git a/server/src/lib.rs b/server/src/lib.rs index 042dec2f..07376787 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -42,9 +42,11 @@ mod tests { use actix_web::{web, App}; use dashmap::DashMap; use unleash_types::client_features::ClientFeatures; + use unleash_types::client_metrics::ConnectVia; use unleash_yggdrasil::EngineState; use crate::auth::token_validator::TokenValidator; + use crate::metrics::client_metrics::MetricsCache; use crate::types::EdgeToken; pub fn features_from_disk(path: &str) -> ClientFeatures { @@ -68,7 +70,11 @@ mod tests { test_server(move || { let config = serde_qs::actix::QsQueryConfig::default() .qs_config(serde_qs::Config::new(5, false)); - + let metrics_cache = MetricsCache::default(); + let connect_via = ConnectVia { + app_name: "edge".into(), + instance_id: "testinstance".into(), + }; HttpService::new(map_config( App::new() .app_data(config) @@ -76,6 +82,8 @@ mod tests { .app_data(web::Data::from(upstream_features_cache.clone())) .app_data(web::Data::from(upstream_engine_cache.clone())) .app_data(web::Data::from(upstream_token_cache.clone())) + .app_data(web::Data::new(metrics_cache)) + .app_data(web::Data::new(connect_via)) .service( web::scope("/api") .configure(crate::client_api::configure_client_api) diff --git a/server/src/metrics/client_metrics.rs b/server/src/metrics/client_metrics.rs index 06a7d8f6..5d82556e 100644 --- a/server/src/metrics/client_metrics.rs +++ b/server/src/metrics/client_metrics.rs @@ -1,4 +1,4 @@ -use crate::types::EdgeToken; +use crate::types::{BatchMetricsRequestBody, EdgeToken}; use actix_web::web::Data; use chrono::{DateTime, Utc}; use dashmap::DashMap; @@ -142,6 +142,14 @@ pub(crate) fn register_client_metrics( metrics_cache.sink_metrics(&metrics); } +pub(crate) fn register_bulk_metrics( + metrics_cache: Data, + connect_via: &ConnectVia, + metrics: BatchMetricsRequestBody, +) { + metrics_cache.sink_bulk_metrics(metrics, connect_via); +} + pub(crate) fn sendable(batch: &MetricsBatch) -> bool { size_of_batch(batch) < UPSTREAM_MAX_BODY_SIZE } @@ -230,6 +238,15 @@ impl MetricsCache { self.sink_metrics(&batch.metrics); } + pub fn sink_bulk_metrics(&self, metrics: BatchMetricsRequestBody, connect_via: &ConnectVia) { + for application in metrics.applications { + self.register_application( + application.connect_via(&connect_via.app_name, &connect_via.instance_id), + ) + } + self.sink_metrics(&metrics.metrics) + } + pub fn reset_metrics(&self) { self.applications.clear(); self.metrics.clear(); diff --git a/server/src/types.rs b/server/src/types.rs index 46858f50..8651438f 100644 --- a/server/src/types.rs +++ b/server/src/types.rs @@ -1,6 +1,6 @@ use std::cmp::min; use std::fmt; -use std::fmt::{Display, Formatter}; +use std::fmt::{Debug, Display, Formatter}; use std::net::IpAddr; use std::sync::Arc; @@ -69,7 +69,7 @@ pub struct ValidateTokensRequest { pub tokens: Vec, } -#[derive(Debug, Clone, Serialize, Deserialize, Eq, ToSchema)] +#[derive(Clone, Serialize, Deserialize, Eq, ToSchema)] #[cfg_attr(test, derive(Default))] #[serde(rename_all = "camelCase")] pub struct EdgeToken { @@ -82,6 +82,28 @@ pub struct EdgeToken { pub status: TokenValidationStatus, } +impl Debug for EdgeToken { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("EdgeToken") + .field( + "token", + &format!( + "{}.[redacted]", + &self + .token + .chars() + .take_while(|p| p != &'.') + .collect::() + ), + ) + .field("token_type", &self.token_type) + .field("environment", &self.environment) + .field("projects", &self.projects) + .field("status", &self.status) + .finish() + } +} + #[derive(Debug, Clone)] pub struct ServiceAccountToken { pub token: String, diff --git a/server/src/urls.rs b/server/src/urls.rs index e7244fdc..65b065da 100644 --- a/server/src/urls.rs +++ b/server/src/urls.rs @@ -12,6 +12,7 @@ pub struct UnleashUrls { pub client_features_url: Url, pub client_register_app_url: Url, pub client_metrics_url: Url, + pub client_bulk_metrics_url: Url, pub edge_api_url: Url, pub edge_validate_url: Url, pub edge_metrics_url: Url, @@ -82,12 +83,18 @@ impl UnleashUrls { .push("admin") .push("api-tokens"); + let mut client_bulk_metrics_url = client_metrics_url.clone(); + client_bulk_metrics_url + .path_segments_mut() + .expect("Could not create /api/client/metrics/bulk") + .push("bulk"); UnleashUrls { base_url, api_url, client_api_url, client_features_url, client_register_app_url, + client_bulk_metrics_url, client_metrics_url, edge_api_url, edge_validate_url, From f9116f7730b5c4aaabe27da678eddb375068ce69 Mon Sep 17 00:00:00 2001 From: Christopher Kolstad Date: Mon, 8 Jan 2024 13:05:03 +0100 Subject: [PATCH 2/3] filter metrics posted by environment accessible by token --- server/src/client_api.rs | 5 +- server/src/metrics/client_metrics.rs | 71 ++++++++++++++++++++++++++-- 2 files changed, 71 insertions(+), 5 deletions(-) diff --git a/server/src/client_api.rs b/server/src/client_api.rs index aaf8245c..562a51c9 100644 --- a/server/src/client_api.rs +++ b/server/src/client_api.rs @@ -214,14 +214,15 @@ security( )] #[post("/metrics/bulk")] pub async fn post_bulk_metrics( - _edge_token: EdgeToken, + edge_token: EdgeToken, bulk_metrics: Json, connect_via: Data, metrics_cache: Data, ) -> EdgeResult { crate::metrics::client_metrics::register_bulk_metrics( - metrics_cache, + metrics_cache.get_ref(), connect_via.get_ref(), + &edge_token, bulk_metrics.into_inner(), ); Ok(HttpResponse::Accepted().finish()) diff --git a/server/src/metrics/client_metrics.rs b/server/src/metrics/client_metrics.rs index 5d82556e..555a4036 100644 --- a/server/src/metrics/client_metrics.rs +++ b/server/src/metrics/client_metrics.rs @@ -142,12 +142,31 @@ pub(crate) fn register_client_metrics( metrics_cache.sink_metrics(&metrics); } +/*** + Will filter out metrics that do not belong to the environment that edge_token has access to +*/ pub(crate) fn register_bulk_metrics( - metrics_cache: Data, + metrics_cache: &MetricsCache, connect_via: &ConnectVia, + edge_token: &EdgeToken, metrics: BatchMetricsRequestBody, ) { - metrics_cache.sink_bulk_metrics(metrics, connect_via); + let updated: BatchMetricsRequestBody = BatchMetricsRequestBody { + applications: metrics.applications.clone(), + metrics: metrics + .metrics + .iter() + .filter(|m| { + edge_token + .environment + .clone() + .map(|e| e == m.environment) + .unwrap_or(false) + }) + .cloned() + .collect(), + }; + metrics_cache.sink_bulk_metrics(updated, connect_via); } pub(crate) fn sendable(batch: &MetricsBatch) -> bool { @@ -288,10 +307,12 @@ impl MetricsCache { #[cfg(test)] mod test { use super::*; + use crate::types::{TokenType, TokenValidationStatus}; use chrono::{DateTime, Utc}; use std::collections::HashMap; + use std::str::FromStr; use test_case::test_case; - use unleash_types::client_metrics::{ClientMetricsEnv, ConnectVia}; + use unleash_types::client_metrics::{ClientMetricsEnv, ConnectVia, ConnectViaBuilder}; #[test] fn cache_aggregates_data_correctly() { @@ -590,4 +611,48 @@ mod test { assert_eq!(metrics_batch.len(), 1); assert!(metrics_batch.get(0).unwrap().metrics.is_empty()); } + + #[test] + pub fn register_bulk_metrics_filters_metrics_based_on_environment_in_token() { + let metrics_cache = MetricsCache::default(); + let connect_via = ConnectViaBuilder::default() + .app_name("edge_bulk_metrics".into()) + .instance_id("sometest".into()) + .build() + .unwrap(); + let mut edge_token_with_development = + EdgeToken::from_str("*:development.randomstring").unwrap(); + edge_token_with_development.status = TokenValidationStatus::Validated; + edge_token_with_development.token_type = Some(TokenType::Client); + let metrics = BatchMetricsRequestBody { + applications: vec![], + metrics: vec![ + ClientMetricsEnv { + feature_name: "feature_one".into(), + app_name: "my_app".into(), + environment: "development".into(), + timestamp: Utc::now(), + yes: 50, + no: 10, + variants: Default::default(), + }, + ClientMetricsEnv { + feature_name: "feature_two".to_string(), + app_name: "other_app".to_string(), + environment: "production".to_string(), + timestamp: Default::default(), + yes: 50, + no: 10, + variants: Default::default(), + }, + ], + }; + register_bulk_metrics( + &metrics_cache, + &connect_via, + &edge_token_with_development, + metrics, + ); + assert_eq!(metrics_cache.metrics.len(), 1); + } } From 68c1ea4e35c7dac4ddaacc03f84359278884ed4f Mon Sep 17 00:00:00 2001 From: Christopher Kolstad Date: Mon, 8 Jan 2024 13:21:15 +0100 Subject: [PATCH 3/3] Cargo fmt + fix --- server/src/builder.rs | 2 +- server/src/cli.rs | 1 - server/src/http/feature_refresher.rs | 9 ++++++++- server/src/tokens.rs | 2 +- 4 files changed, 10 insertions(+), 4 deletions(-) diff --git a/server/src/builder.rs b/server/src/builder.rs index dd944c74..ca1c0688 100644 --- a/server/src/builder.rs +++ b/server/src/builder.rs @@ -144,7 +144,7 @@ async fn build_edge(args: &EdgeArgs) -> EdgeResult { args.upstream_certificate_file.clone(), Duration::seconds(args.upstream_request_timeout), Duration::seconds(args.upstream_socket_timeout), - args.token_header.token_header.clone() + args.token_header.token_header.clone(), ) }) .map(|c| c.with_custom_client_headers(args.custom_client_headers.clone())) diff --git a/server/src/cli.rs b/server/src/cli.rs index e20af4e5..5b72c219 100644 --- a/server/src/cli.rs +++ b/server/src/cli.rs @@ -216,7 +216,6 @@ pub struct TokenHeader { /// Token header to use for edge authorization. #[clap(long, env, global = true, default_value = "Authorization")] pub token_header: String, - } impl FromStr for TokenHeader { diff --git a/server/src/http/feature_refresher.rs b/server/src/http/feature_refresher.rs index 1a008ed2..8bebfb91 100644 --- a/server/src/http/feature_refresher.rs +++ b/server/src/http/feature_refresher.rs @@ -934,7 +934,14 @@ mod tests { server.stop().await; tokio::time::sleep(std::time::Duration::from_millis(5)).await; // To ensure our refresh is due feature_refresher.refresh_features().await; - assert_eq!(feature_refresher.tokens_to_refresh.get("*:development.secret123").unwrap().failure_count, 1); + assert_eq!( + feature_refresher + .tokens_to_refresh + .get("*:development.secret123") + .unwrap() + .failure_count, + 1 + ); assert!(!feature_refresher.features_cache.is_empty()); assert!(!feature_refresher.engine_cache.is_empty()); } diff --git a/server/src/tokens.rs b/server/src/tokens.rs index 8d71ef72..0f0be1b6 100644 --- a/server/src/tokens.rs +++ b/server/src/tokens.rs @@ -7,8 +7,8 @@ use std::collections::HashSet; use std::future::{ready, Ready}; use std::str::FromStr; -use crate::cli::TokenHeader; use crate::cli::EdgeMode; +use crate::cli::TokenHeader; use crate::error::EdgeError; use crate::types::EdgeResult; use crate::types::EdgeToken;