diff --git a/server/src/client_api.rs b/server/src/client_api.rs index d324e075..8347a242 100644 --- a/server/src/client_api.rs +++ b/server/src/client_api.rs @@ -47,16 +47,22 @@ pub async fn stream_features( filter_query: Query, req: HttpRequest, ) -> EdgeResult { + use crate::http::broadcaster::Broadcaster; + + println!("{req:?}"); let (validated_token, _filter_set, query) = get_feature_filter(&edge_token, &token_cache, filter_query.clone())?; - match req.app_data::>() { - Some(refresher) => { - refresher - .broadcaster + println!("validated token!"); + match req.app_data::>() { + Some(broadcaster) => { + broadcaster .connect(validated_token, filter_query, query) .await } - _ => Err(EdgeError::ClientCacheError), + _ => { + println!("No broadcaster found"); + Err(EdgeError::ClientCacheError) + } } } diff --git a/server/src/lib.rs b/server/src/lib.rs index f8ee3da2..31dc001d 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -34,14 +34,11 @@ pub mod tests { use std::path::PathBuf; use std::sync::Arc; - use crate::client_api::configure_client_api; - use crate::middleware; use actix_http::HttpService; use actix_http_test::{test_server, TestServer}; use actix_service::map_config; - use actix_web::dev::{AppConfig, Url}; - use actix_web::web::Data; - use actix_web::{test, web, App}; + use actix_web::dev::AppConfig; + use actix_web::{web, App}; use chrono::Duration; use dashmap::DashMap; use unleash_types::client_features::ClientFeatures; @@ -49,7 +46,7 @@ pub mod tests { use unleash_yggdrasil::EngineState; use crate::auth::token_validator::TokenValidator; - use crate::http::broadcaster::{self, Broadcaster}; + use crate::http::broadcaster::Broadcaster; use crate::http::feature_refresher::FeatureRefresher; use crate::http::unleash_client::UnleashClient; use crate::metrics::client_metrics::MetricsCache; diff --git a/server/src/tokens.rs b/server/src/tokens.rs index 4816471f..ef74d88d 100644 --- a/server/src/tokens.rs +++ b/server/src/tokens.rs @@ -74,7 +74,7 @@ fn clean_hash(hash: &str) -> String { ) } -pub(crate) fn cache_key(token: &EdgeToken) -> String { +pub fn cache_key(token: &EdgeToken) -> String { token .environment .clone() diff --git a/server/tests/streaming_test.rs b/server/tests/streaming_test.rs index 883b25f4..f95a0d04 100644 --- a/server/tests/streaming_test.rs +++ b/server/tests/streaming_test.rs @@ -1,8 +1,11 @@ use dashmap::DashMap; use eventsource_client::Client; -use futures::{StreamExt, TryStreamExt}; +use futures::{future, StreamExt, TryStreamExt}; use reqwest::Url; use std::{ + fs, + io::BufReader, + path::PathBuf, str::FromStr, sync::{Arc, Mutex}, }; @@ -13,16 +16,23 @@ use unleash_edge::{ broadcaster::Broadcaster, feature_refresher::FeatureRefresher, unleash_client::UnleashClient, }, - // tests::{edge_server, upstream_server}, + tokens::cache_key, types::{EdgeToken, TokenType, TokenValidationStatus}, }; use unleash_types::client_features::ClientFeatures; +pub fn features_from_disk(path: &str) -> ClientFeatures { + let path = PathBuf::from(path); + let file = fs::File::open(path).unwrap(); + let reader = BufReader::new(file); + serde_json::from_reader(reader).unwrap() +} + #[actix_web::test] async fn test_streaming() { - let unleash_broadcaster = Broadcaster::new(Arc::new(DashMap::default())); let unleash_features_cache: Arc> = Arc::new(DashMap::default()); let unleash_token_cache: Arc> = Arc::new(DashMap::default()); + let unleash_broadcaster = Broadcaster::new(unleash_features_cache.clone()); let unleash_server = upstream_server( unleash_token_cache.clone(), @@ -32,8 +42,6 @@ async fn test_streaming() { ) .await; - let edge = edge_server(&unleash_server.url("/")).await; - let mut upstream_known_token = EdgeToken::from_str("dx:development.secret123").unwrap(); upstream_known_token.status = TokenValidationStatus::Validated; upstream_known_token.token_type = Some(TokenType::Client); @@ -42,34 +50,70 @@ async fn test_streaming() { upstream_known_token.clone(), ); + unleash_features_cache.insert( + cache_key(&upstream_known_token), + features_from_disk("../examples/features.json"), + ); + + // println!("upstream.features_cache: {:?}", unleash_features_cache); + + let edge = edge_server(&unleash_server.url("/"), upstream_known_token.clone()).await; + let es_client = eventsource_client::ClientBuilder::for_url(&edge.url("/api/client/streaming")) .unwrap() .header("Authorization", &upstream_known_token.token) .unwrap() .build(); - let num_events_to_collect = 5; + let num_events_to_collect = 3; let events = Arc::new(Mutex::new(Vec::new())); let events_clone = events.clone(); + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + let handle = tokio::spawn(async move { let _ = es_client .stream() - .take(num_events_to_collect) + .take(4) .try_for_each(|sse| { let events_clone = events.clone(); async move { - println!("{:?}", sse); + // match sse {} events_clone.lock().unwrap().push(sse); Ok(()) } - }); + }) + .await + .expect("Stream processing failed"); }); - handle.await.unwrap(); + let handle2 = tokio::spawn(async move { + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + + println!("Inserting new features"); + unleash_features_cache.insert( + cache_key(&upstream_known_token), + features_from_disk("../examples/hostedexample.json"), + ); + + unleash_broadcaster.broadcast().await; + }); + + let _ = tokio::join!(handle, handle2); + // let _ = future::join_all(iter::once(handle, handle2)).await; + // handle.await.unwrap(); + + // unleash_features_cache.insert( + // cache_key(&upstream_known_token), + // features_from_disk("../examples/hostedexample.json"), + // ); + + // unleash_broadcaster.broadcast().await; + + // println!() // Now we can inspect the collected events let collected_events = events_clone.lock().unwrap(); - print!("Collected events: {collected_events:?}"); + println!("Collected events: {:?}", collected_events.len()); for (i, event) in collected_events.iter().enumerate() { println!("Event {}: {:?}", i, event); } @@ -88,7 +132,7 @@ use unleash_edge::auth::token_validator::TokenValidator; use unleash_edge::http::unleash_client::new_reqwest_client; use unleash_edge::metrics::client_metrics::MetricsCache; -async fn edge_server(upstream_url: &str) -> TestServer { +async fn edge_server(upstream_url: &str, token: EdgeToken) -> TestServer { let unleash_client = Arc::new(UnleashClient::from_url( Url::parse(upstream_url).unwrap(), "Authorization".into(), @@ -106,6 +150,7 @@ async fn edge_server(upstream_url: &str) -> TestServer { let features_cache: Arc> = Arc::new(DashMap::default()); let token_cache: Arc> = Arc::new(DashMap::default()); + token_cache.insert(token.token.clone(), token.clone()); let engine_cache: Arc> = Arc::new(DashMap::default()); let broadcaster = Broadcaster::new(features_cache.clone()); let feature_refresher = Arc::new(FeatureRefresher { @@ -121,6 +166,18 @@ async fn edge_server(upstream_url: &str) -> TestServer { token_cache: token_cache.clone(), persistence: None, }); + feature_refresher + .register_token_for_refresh(token.clone(), None) + .await; + let refresher_for_background = feature_refresher.clone(); + + let handle = tokio::spawn(async move { + let _ = refresher_for_background + .start_streaming_features_background_task() + .await; + }); + + handle.await.unwrap(); test_server(move || { let config = serde_qs::actix::QsQueryConfig::default().qs_config(serde_qs::Config::new(5, false));