From 925fc0c2ec03415ae21fb8a35b294eb4b2d704db Mon Sep 17 00:00:00 2001 From: Thomas Heartman Date: Thu, 12 Dec 2024 15:32:10 +0100 Subject: [PATCH] initial streaming spike --- server/src/client_api.rs | 10 +++++ server/src/http/broadcaster.rs | 2 + server/src/internal_backstage.rs | 6 +++ server/src/lib.rs | 69 ++++++++++++++++++++++++++++++-- server/tests/streaming_test.rs | 65 ++++++++++++++++++++++++++++++ 5 files changed, 149 insertions(+), 3 deletions(-) create mode 100644 server/tests/streaming_test.rs diff --git a/server/src/client_api.rs b/server/src/client_api.rs index 9806e846..d324e075 100644 --- a/server/src/client_api.rs +++ b/server/src/client_api.rs @@ -689,10 +689,12 @@ mod tests { let upstream_features_cache = Arc::new(DashMap::default()); let upstream_engine_cache = Arc::new(DashMap::default()); upstream_token_cache.insert(token.token.clone(), token.clone()); + let broadcaster = Broadcaster::new(upstream_features_cache.clone()); let srv = upstream_server( upstream_token_cache, upstream_features_cache, upstream_engine_cache, + broadcaster.clone(), ) .await; let req = reqwest::Client::new(); @@ -728,10 +730,12 @@ mod tests { let upstream_features_cache = Arc::new(DashMap::default()); let upstream_engine_cache = Arc::new(DashMap::default()); upstream_token_cache.insert(frontend_token.token.clone(), frontend_token.clone()); + let broadcaster = Broadcaster::new(upstream_features_cache.clone()); let srv = upstream_server( upstream_token_cache, upstream_features_cache, upstream_engine_cache, + broadcaster.clone(), ) .await; let client = UnleashClient::new(srv.url("/").as_str(), None).unwrap(); @@ -994,10 +998,12 @@ mod tests { Arc::new(DashMap::default()); let upstream_token_cache: Arc> = Arc::new(DashMap::default()); let upstream_engine_cache: Arc> = Arc::new(DashMap::default()); + let broadcaster = Broadcaster::new(upstream_features_cache.clone()); let server = upstream_server( upstream_token_cache.clone(), upstream_features_cache.clone(), upstream_engine_cache.clone(), + broadcaster.clone(), ) .await; let upstream_features = features_from_disk("../examples/hostedexample.json"); @@ -1058,10 +1064,12 @@ mod tests { Arc::new(DashMap::default()); let upstream_token_cache: Arc> = Arc::new(DashMap::default()); let upstream_engine_cache: Arc> = Arc::new(DashMap::default()); + let broadcaster = Broadcaster::new(upstream_features_cache.clone()); let server = upstream_server( upstream_token_cache.clone(), upstream_features_cache.clone(), upstream_engine_cache.clone(), + broadcaster.clone(), ) .await; let upstream_features = features_from_disk("../examples/hostedexample.json"); @@ -1181,10 +1189,12 @@ mod tests { Arc::new(DashMap::default()); let upstream_token_cache: Arc> = Arc::new(DashMap::default()); let upstream_engine_cache: Arc> = Arc::new(DashMap::default()); + let broadcaster = Broadcaster::new(upstream_features_cache.clone()); let server = upstream_server( upstream_token_cache.clone(), upstream_features_cache.clone(), upstream_engine_cache.clone(), + broadcaster.clone(), ) .await; let upstream_features = features_from_disk("../examples/hostedexample.json"); diff --git a/server/src/http/broadcaster.rs b/server/src/http/broadcaster.rs index 0b4a4250..2bf9146f 100644 --- a/server/src/http/broadcaster.rs +++ b/server/src/http/broadcaster.rs @@ -206,3 +206,5 @@ impl Broadcaster { let _ = future::join_all(send_events).await; } } + +// e2e test? -> test module in the test folder (makes it a separate compilation unit; will increase test compliation time) diff --git a/server/src/internal_backstage.rs b/server/src/internal_backstage.rs index dc8bb76a..82cdd9d9 100644 --- a/server/src/internal_backstage.rs +++ b/server/src/internal_backstage.rs @@ -181,6 +181,7 @@ mod tests { use unleash_yggdrasil::EngineState; use crate::auth::token_validator::TokenValidator; + use crate::http::broadcaster::Broadcaster; use crate::http::feature_refresher::FeatureRefresher; use crate::http::unleash_client::UnleashClient; use crate::internal_backstage::EdgeStatus; @@ -312,6 +313,7 @@ mod tests { Arc::new(DashMap::default()), Arc::new(DashMap::default()), Arc::new(DashMap::default()), + Broadcaster::new(Arc::new(DashMap::default())), ) .await; let unleash_client = @@ -353,10 +355,12 @@ mod tests { Arc::new(DashMap::default()); let upstream_token_cache: Arc> = Arc::new(DashMap::default()); let upstream_engine_cache: Arc> = Arc::new(DashMap::default()); + let broadcaster = Broadcaster::new(upstream_features_cache.clone()); let server = upstream_server( upstream_token_cache.clone(), upstream_features_cache.clone(), upstream_engine_cache.clone(), + broadcaster.clone(), ) .await; let upstream_features = crate::tests::features_from_disk("../examples/hostedexample.json"); @@ -425,10 +429,12 @@ mod tests { Arc::new(DashMap::default()); let upstream_token_cache: Arc> = Arc::new(DashMap::default()); let upstream_engine_cache: Arc> = Arc::new(DashMap::default()); + let broadcaster = Broadcaster::new(upstream_features_cache.clone()); let server = upstream_server( upstream_token_cache.clone(), upstream_features_cache.clone(), upstream_engine_cache.clone(), + broadcaster.clone(), ) .await; let upstream_features = crate::tests::features_from_disk("../examples/hostedexample.json"); diff --git a/server/src/lib.rs b/server/src/lib.rs index d4f4e631..f8ee3da2 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -28,23 +28,30 @@ pub mod tokens; pub mod types; pub mod urls; #[cfg(test)] -mod tests { +pub mod tests { use std::fs; use std::io::BufReader; use std::path::PathBuf; use std::sync::Arc; + use crate::client_api::configure_client_api; + use crate::middleware; use actix_http::HttpService; use actix_http_test::{test_server, TestServer}; use actix_service::map_config; - use actix_web::dev::AppConfig; - use actix_web::{web, App}; + use actix_web::dev::{AppConfig, Url}; + use actix_web::web::Data; + use actix_web::{test, web, App}; + use chrono::Duration; use dashmap::DashMap; use unleash_types::client_features::ClientFeatures; use unleash_types::client_metrics::ConnectVia; use unleash_yggdrasil::EngineState; use crate::auth::token_validator::TokenValidator; + use crate::http::broadcaster::{self, Broadcaster}; + use crate::http::feature_refresher::FeatureRefresher; + use crate::http::unleash_client::UnleashClient; use crate::metrics::client_metrics::MetricsCache; use crate::types::EdgeToken; @@ -55,10 +62,65 @@ mod tests { serde_json::from_reader(reader).unwrap() } + pub async fn edge_server(upstream_url: &str) -> TestServer { + let unleash_client = Arc::new(UnleashClient::new(upstream_url, None).unwrap()); + + let features_cache: Arc> = Arc::new(DashMap::default()); + let token_cache: Arc> = Arc::new(DashMap::default()); + let engine_cache: Arc> = Arc::new(DashMap::default()); + let broadcaster = Broadcaster::new(features_cache.clone()); + let feature_refresher = Arc::new(FeatureRefresher { + unleash_client: unleash_client.clone(), + features_cache: features_cache.clone(), + engine_cache: engine_cache.clone(), + refresh_interval: Duration::seconds(6000), + broadcaster: broadcaster.clone(), + ..Default::default() + }); + let token_validator = Arc::new(TokenValidator { + unleash_client: unleash_client.clone(), + token_cache: token_cache.clone(), + persistence: None, + }); + test_server(move || { + let config = serde_qs::actix::QsQueryConfig::default() + .qs_config(serde_qs::Config::new(5, false)); + let metrics_cache = MetricsCache::default(); + let connect_via = ConnectVia { + app_name: "edge".into(), + instance_id: "testinstance".into(), + }; + HttpService::new(map_config( + App::new() + .app_data(config) + .app_data(web::Data::from(token_validator.clone())) + .app_data(web::Data::from(features_cache.clone())) + .app_data(web::Data::from(broadcaster.clone())) + .app_data(web::Data::from(engine_cache.clone())) + .app_data(web::Data::from(token_cache.clone())) + .app_data(web::Data::new(metrics_cache)) + .app_data(web::Data::new(connect_via)) + .app_data(web::Data::from(feature_refresher.clone())) + .service( + web::scope("/api") + .configure(crate::client_api::configure_client_api) + .configure(|cfg| { + crate::frontend_api::configure_frontend_api(cfg, false) + }), + ) + .service(web::scope("/edge").configure(crate::edge_api::configure_edge_api)), + |_| AppConfig::default(), + )) + .tcp() + }) + .await + } + pub async fn upstream_server( upstream_token_cache: Arc>, upstream_features_cache: Arc>, upstream_engine_cache: Arc>, + upstream_broadcaster: Arc, ) -> TestServer { let token_validator = Arc::new(TokenValidator { unleash_client: Arc::new(Default::default()), @@ -79,6 +141,7 @@ mod tests { .app_data(config) .app_data(web::Data::from(token_validator.clone())) .app_data(web::Data::from(upstream_features_cache.clone())) + .app_data(web::Data::from(upstream_broadcaster.clone())) .app_data(web::Data::from(upstream_engine_cache.clone())) .app_data(web::Data::from(upstream_token_cache.clone())) .app_data(web::Data::new(metrics_cache)) diff --git a/server/tests/streaming_test.rs b/server/tests/streaming_test.rs new file mode 100644 index 00000000..0aa981ad --- /dev/null +++ b/server/tests/streaming_test.rs @@ -0,0 +1,65 @@ +#[cfg(test)] +mod streaming_tests { + use actix_web::App; + use chrono::Duration; + use dashmap::DashMap; + use eventsource_client::Client; + use futures::TryStreamExt; + use reqwest::Client; + use std::{ + process::{Command, Stdio}, + str::FromStr, + sync::Arc, + }; + use unleash_edge::{ + http::{ + broadcaster::Broadcaster, feature_refresher::FeatureRefresher, + unleash_client::UnleashClient, + }, + tests::{edge_server, upstream_server}, + types::{BuildInfo, EdgeToken, TokenType, TokenValidationStatus}, + }; + use unleash_types::client_features::ClientFeatures; + + #[actix_web::test] + async fn test_streaming() { + let unleash_broadcaster = Broadcaster::new(Arc::new(DashMap::default())); + let unleash_features_cache: Arc> = + Arc::new(DashMap::default()); + let unleash_token_cache: Arc> = Arc::new(DashMap::default()); + + let unleash_server = upstream_server( + unleash_token_cache.clone(), + unleash_features_cache.clone(), + Arc::new(DashMap::default()), + unleash_broadcaster.clone(), + ) + .await; + + let edge = edge_server(&unleash_server.url("/")).await; + + let mut upstream_known_token = EdgeToken::from_str("dx:development.secret123").unwrap(); + upstream_known_token.status = TokenValidationStatus::Validated; + upstream_known_token.token_type = Some(TokenType::Client); + unleash_token_cache.insert( + upstream_known_token.token.clone(), + upstream_known_token.clone(), + ); + + let es_client = + eventsource_client::ClientBuilder::for_url(&edge.url("/api/client/streaming")) + .unwrap() + .header("Authorization", &upstream_known_token.token) + .unwrap() + .build(); + + let events = vec![]; + tokio::spawn(async move { + let mut stream = es_client + .stream() + .map_ok(move |sse| async move { events.push(sse) }); + }); + + print!("{events:?}") + } +}