Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: added client authenticated bulk metrics #373

Merged
merged 4 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ async fn build_edge(args: &EdgeArgs) -> EdgeResult<EdgeInfo> {
args.upstream_certificate_file.clone(),
Duration::seconds(args.upstream_request_timeout),
Duration::seconds(args.upstream_socket_timeout),
args.token_header.token_header.clone()
args.token_header.token_header.clone(),
)
})
.map(|c| c.with_custom_client_headers(args.custom_client_headers.clone()))
Expand Down
1 change: 0 additions & 1 deletion server/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,6 @@ pub struct TokenHeader {
/// Token header to use for edge authorization.
#[clap(long, env, global = true, default_value = "Authorization")]
pub token_header: String,

}

impl FromStr for TokenHeader {
Expand Down
149 changes: 141 additions & 8 deletions server/src/client_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use crate::filters::{
use crate::http::feature_refresher::FeatureRefresher;
use crate::metrics::client_metrics::MetricsCache;
use crate::tokens::cache_key;
use crate::types::{self, EdgeJsonResult, EdgeResult, EdgeToken, FeatureFilters};
use crate::types::{
self, BatchMetricsRequestBody, EdgeJsonResult, EdgeResult, EdgeToken, FeatureFilters,
};
use actix_web::web::{self, Data, Json, Query};
use actix_web::{get, post, HttpRequest, HttpResponse};
use dashmap::DashMap;
Expand Down Expand Up @@ -201,6 +203,32 @@ pub async fn metrics(
Ok(HttpResponse::Accepted().finish())
}

#[utoipa::path(
context_path = "/api/client",
responses(
(status = 202, description = "Accepted bulk metrics"),
(status = 403, description = "Was not allowed to post bulk metrics")
),
request_body = BatchMetricsRequestBody,
security(
("Authorization" = [])
)
)]
#[post("/metrics/bulk")]
pub async fn post_bulk_metrics(
edge_token: EdgeToken,
bulk_metrics: Json<BatchMetricsRequestBody>,
connect_via: Data<ConnectVia>,
metrics_cache: Data<MetricsCache>,
) -> EdgeResult<HttpResponse> {
crate::metrics::client_metrics::register_bulk_metrics(
metrics_cache.get_ref(),
connect_via.get_ref(),
&edge_token,
bulk_metrics.into_inner(),
);
Ok(HttpResponse::Accepted().finish())
}
pub fn configure_client_api(cfg: &mut web::ServiceConfig) {
cfg.service(
web::scope("/client")
Expand All @@ -210,7 +238,8 @@ pub fn configure_client_api(cfg: &mut web::ServiceConfig) {
.service(get_features)
.service(get_feature)
.service(register)
.service(metrics),
.service(metrics)
.service(post_bulk_metrics),
);
}

Expand All @@ -226,12 +255,12 @@ pub fn configure_experimental_post_features(
#[cfg(test)]
mod tests {

use crate::metrics::client_metrics::{ApplicationKey, MetricsBatch, MetricsKey};
use crate::types::{TokenType, TokenValidationStatus};
use std::collections::HashMap;
use std::path::PathBuf;
use std::str::FromStr;
use std::{collections::HashMap, sync::Arc};

use crate::metrics::client_metrics::{ApplicationKey, MetricsKey};
use crate::types::{TokenType, TokenValidationStatus};
use std::sync::Arc;

use super::*;

Expand All @@ -245,14 +274,16 @@ mod tests {
http::header::ContentType,
test,
web::{self, Data},
App,
App, ResponseError,
};
use chrono::{DateTime, Duration, TimeZone, Utc};
use maplit::hashmap;
use reqwest::StatusCode;
use ulid::Ulid;
use unleash_types::client_features::{ClientFeature, Constraint, Operator, Strategy};
use unleash_types::client_metrics::{ClientMetricsEnv, MetricBucket, ToggleStats};
use unleash_types::client_metrics::{
ClientMetricsEnv, ConnectViaBuilder, MetricBucket, ToggleStats,
};
use unleash_yggdrasil::EngineState;

async fn make_metrics_post_request() -> Request {
Expand Down Expand Up @@ -282,6 +313,38 @@ mod tests {
.to_request()
}

async fn make_bulk_metrics_post_request(authorization: Option<String>) -> Request {
let mut req = test::TestRequest::post()
.uri("/api/client/metrics/bulk")
.insert_header(ContentType::json());
req = match authorization {
Some(auth) => req.insert_header(("Authorization", auth)),
None => req,
};
req.set_json(Json(BatchMetricsRequestBody {
applications: vec![ClientApplication {
app_name: "test_app".to_string(),
connect_via: None,
environment: None,
instance_id: None,
interval: 10,
sdk_version: None,
started: Default::default(),
strategies: vec![],
}],
metrics: vec![ClientMetricsEnv {
feature_name: "".to_string(),
app_name: "".to_string(),
environment: "".to_string(),
timestamp: Default::default(),
yes: 0,
no: 0,
variants: Default::default(),
}],
}))
.to_request()
}

async fn make_register_post_request(application: ClientApplication) -> Request {
test::TestRequest::post()
.uri("/api/client/register")
Expand Down Expand Up @@ -478,6 +541,76 @@ mod tests {
assert_eq!(saved_app.connect_via, Some(vec![our_app]));
}

#[tokio::test]
async fn bulk_metrics_endpoint_correctly_accepts_data() {
let metrics_cache = MetricsCache::default();
let connect_via = ConnectViaBuilder::default()
.app_name("unleash-edge".into())
.instance_id("test".into())
.build()
.unwrap();
let app = test::init_service(
App::new()
.app_data(Data::new(connect_via))
.app_data(web::Data::new(metrics_cache))
.service(web::scope("/api/client").service(post_bulk_metrics)),
)
.await;
let token = EdgeToken::from_str("*:development.somestring").unwrap();
let req = make_bulk_metrics_post_request(Some(token.token.clone())).await;
let call = test::call_service(&app, req).await;
assert_eq!(call.status(), StatusCode::ACCEPTED);
}
#[tokio::test]
async fn bulk_metrics_endpoint_correctly_refuses_metrics_without_auth_header() {
let mut token = EdgeToken::from_str("*:development.somestring").unwrap();
token.status = TokenValidationStatus::Validated;
token.token_type = Some(TokenType::Client);
let upstream_token_cache = Arc::new(DashMap::default());
let upstream_features_cache = Arc::new(DashMap::default());
let upstream_engine_cache = Arc::new(DashMap::default());
upstream_token_cache.insert(token.token.clone(), token.clone());
let srv = upstream_server(
upstream_token_cache,
upstream_features_cache,
upstream_engine_cache,
)
.await;
let client = UnleashClient::new(srv.url("/").as_str(), None).unwrap();
let status = client
.send_bulk_metrics_to_client_endpoint(MetricsBatch::default(), None)
.await;
assert_eq!(status.expect_err("").status_code(), StatusCode::FORBIDDEN);
let successful = client
.send_bulk_metrics_to_client_endpoint(MetricsBatch::default(), Some(token.clone()))
.await;
assert!(successful.is_ok());
}

#[tokio::test]
async fn bulk_metrics_endpoint_correctly_refuses_metrics_with_frontend_token() {
let mut frontend_token = EdgeToken::from_str("*:development.frontend").unwrap();
frontend_token.status = TokenValidationStatus::Validated;
frontend_token.token_type = Some(TokenType::Frontend);
let upstream_token_cache = Arc::new(DashMap::default());
let upstream_features_cache = Arc::new(DashMap::default());
let upstream_engine_cache = Arc::new(DashMap::default());
upstream_token_cache.insert(frontend_token.token.clone(), frontend_token.clone());
let srv = upstream_server(
upstream_token_cache,
upstream_features_cache,
upstream_engine_cache,
)
.await;
let client = UnleashClient::new(srv.url("/").as_str(), None).unwrap();
let status = client
.send_bulk_metrics_to_client_endpoint(
MetricsBatch::default(),
Some(frontend_token.clone()),
)
.await;
assert_eq!(status.expect_err("").status_code(), StatusCode::FORBIDDEN);
}
#[tokio::test]
async fn register_endpoint_returns_version_header() {
let metrics_cache = Arc::new(MetricsCache::default());
Expand Down
9 changes: 8 additions & 1 deletion server/src/http/feature_refresher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -934,7 +934,14 @@ mod tests {
server.stop().await;
tokio::time::sleep(std::time::Duration::from_millis(5)).await; // To ensure our refresh is due
feature_refresher.refresh_features().await;
assert_eq!(feature_refresher.tokens_to_refresh.get("*:development.secret123").unwrap().failure_count, 1);
assert_eq!(
feature_refresher
.tokens_to_refresh
.get("*:development.secret123")
.unwrap()
.failure_count,
1
);
assert!(!feature_refresher.features_cache.is_empty());
assert!(!feature_refresher.engine_cache.is_empty());
}
Expand Down
33 changes: 31 additions & 2 deletions server/src/http/unleash_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ impl UnleashClient {
service_account_token: String,
connect_timeout: Duration,
socket_timeout: Duration,
token_header: String
token_header: String,
) -> Self {
Self {
urls: UnleashUrls::from_base_url(server_url),
Expand Down Expand Up @@ -300,7 +300,7 @@ impl UnleashClient {

fn header_map(&self, api_key: Option<String>) -> HeaderMap {
let mut header_map = HeaderMap::new();
let token_header: HeaderName= HeaderName::from_str(self.token_header.as_str()).unwrap();
let token_header: HeaderName = HeaderName::from_str(self.token_header.as_str()).unwrap();
if let Some(key) = api_key {
header_map.insert(token_header, key.parse().unwrap());
}
Expand Down Expand Up @@ -450,6 +450,35 @@ impl UnleashClient {
}
}

pub async fn send_bulk_metrics_to_client_endpoint(
&self,
request: MetricsBatch,
token: Option<EdgeToken>,
) -> EdgeResult<()> {
let result = self
.backing_client
.post(self.urls.client_bulk_metrics_url.to_string())
.headers(self.header_map(token.map(|t| t.token)))
.json(&request)
.send()
.await
.map_err(|e| {
info!("Failed to send metrics to /api/client/metrics/bulk endpoint {e:?}");
EdgeError::EdgeMetricsError
})?;
if result.status().is_success() {
Ok(())
} else {
match result.status() {
StatusCode::BAD_REQUEST => Err(EdgeMetricsRequestError(
result.status(),
result.json().await.ok(),
)),
_ => Err(EdgeMetricsRequestError(result.status(), None)),
}
}
}

pub async fn forward_request_for_client_token(
&self,
client_token_request: ClientTokenRequest,
Expand Down
10 changes: 9 additions & 1 deletion server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@ mod tests {
use actix_web::{web, App};
use dashmap::DashMap;
use unleash_types::client_features::ClientFeatures;
use unleash_types::client_metrics::ConnectVia;
use unleash_yggdrasil::EngineState;

use crate::auth::token_validator::TokenValidator;
use crate::metrics::client_metrics::MetricsCache;
use crate::types::EdgeToken;

pub fn features_from_disk(path: &str) -> ClientFeatures {
Expand All @@ -68,14 +70,20 @@ mod tests {
test_server(move || {
let config = serde_qs::actix::QsQueryConfig::default()
.qs_config(serde_qs::Config::new(5, false));

let metrics_cache = MetricsCache::default();
let connect_via = ConnectVia {
app_name: "edge".into(),
instance_id: "testinstance".into(),
};
HttpService::new(map_config(
App::new()
.app_data(config)
.app_data(web::Data::from(token_validator.clone()))
.app_data(web::Data::from(upstream_features_cache.clone()))
.app_data(web::Data::from(upstream_engine_cache.clone()))
.app_data(web::Data::from(upstream_token_cache.clone()))
.app_data(web::Data::new(metrics_cache))
.app_data(web::Data::new(connect_via))
.service(
web::scope("/api")
.configure(crate::client_api::configure_client_api)
Expand Down
Loading