Skip to content

Commit

Permalink
chore: make the test pass
Browse files Browse the repository at this point in the history
probably breaks everything else, but hey
  • Loading branch information
thomasheartman committed Dec 13, 2024
1 parent e906899 commit 6bc8637
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 24 deletions.
16 changes: 11 additions & 5 deletions server/src/client_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,22 @@ pub async fn stream_features(
filter_query: Query<FeatureFilters>,
req: HttpRequest,
) -> EdgeResult<impl Responder> {
use crate::http::broadcaster::Broadcaster;

println!("{req:?}");
let (validated_token, _filter_set, query) =
get_feature_filter(&edge_token, &token_cache, filter_query.clone())?;
match req.app_data::<Data<FeatureRefresher>>() {
Some(refresher) => {
refresher
.broadcaster
println!("validated token!");
match req.app_data::<Data<Broadcaster>>() {
Some(broadcaster) => {
broadcaster
.connect(validated_token, filter_query, query)
.await
}
_ => Err(EdgeError::ClientCacheError),
_ => {
println!("No broadcaster found");
Err(EdgeError::ClientCacheError)
}
}
}

Expand Down
9 changes: 3 additions & 6 deletions server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,19 @@ pub mod tests {
use std::path::PathBuf;
use std::sync::Arc;

use crate::client_api::configure_client_api;
use crate::middleware;
use actix_http::HttpService;
use actix_http_test::{test_server, TestServer};
use actix_service::map_config;
use actix_web::dev::{AppConfig, Url};
use actix_web::web::Data;
use actix_web::{test, web, App};
use actix_web::dev::AppConfig;
use actix_web::{web, App};
use chrono::Duration;
use dashmap::DashMap;
use unleash_types::client_features::ClientFeatures;
use unleash_types::client_metrics::ConnectVia;
use unleash_yggdrasil::EngineState;

use crate::auth::token_validator::TokenValidator;
use crate::http::broadcaster::{self, Broadcaster};
use crate::http::broadcaster::Broadcaster;
use crate::http::feature_refresher::FeatureRefresher;
use crate::http::unleash_client::UnleashClient;
use crate::metrics::client_metrics::MetricsCache;
Expand Down
2 changes: 1 addition & 1 deletion server/src/tokens.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ fn clean_hash(hash: &str) -> String {
)
}

pub(crate) fn cache_key(token: &EdgeToken) -> String {
pub fn cache_key(token: &EdgeToken) -> String {
token
.environment
.clone()
Expand Down
81 changes: 69 additions & 12 deletions server/tests/streaming_test.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use dashmap::DashMap;
use eventsource_client::Client;
use futures::{StreamExt, TryStreamExt};
use futures::{future, StreamExt, TryStreamExt};
use reqwest::Url;
use std::{
fs,
io::BufReader,
path::PathBuf,
str::FromStr,
sync::{Arc, Mutex},
};
Expand All @@ -13,16 +16,23 @@ use unleash_edge::{
broadcaster::Broadcaster, feature_refresher::FeatureRefresher,
unleash_client::UnleashClient,
},
// tests::{edge_server, upstream_server},
tokens::cache_key,
types::{EdgeToken, TokenType, TokenValidationStatus},
};
use unleash_types::client_features::ClientFeatures;

pub fn features_from_disk(path: &str) -> ClientFeatures {
let path = PathBuf::from(path);
let file = fs::File::open(path).unwrap();
let reader = BufReader::new(file);
serde_json::from_reader(reader).unwrap()
}

#[actix_web::test]
async fn test_streaming() {
let unleash_broadcaster = Broadcaster::new(Arc::new(DashMap::default()));
let unleash_features_cache: Arc<DashMap<String, ClientFeatures>> = Arc::new(DashMap::default());
let unleash_token_cache: Arc<DashMap<String, EdgeToken>> = Arc::new(DashMap::default());
let unleash_broadcaster = Broadcaster::new(unleash_features_cache.clone());

let unleash_server = upstream_server(
unleash_token_cache.clone(),
Expand All @@ -32,8 +42,6 @@ async fn test_streaming() {
)
.await;

let edge = edge_server(&unleash_server.url("/")).await;

let mut upstream_known_token = EdgeToken::from_str("dx:development.secret123").unwrap();
upstream_known_token.status = TokenValidationStatus::Validated;
upstream_known_token.token_type = Some(TokenType::Client);
Expand All @@ -42,34 +50,70 @@ async fn test_streaming() {
upstream_known_token.clone(),
);

unleash_features_cache.insert(
cache_key(&upstream_known_token),
features_from_disk("../examples/features.json"),
);

// println!("upstream.features_cache: {:?}", unleash_features_cache);

let edge = edge_server(&unleash_server.url("/"), upstream_known_token.clone()).await;

let es_client = eventsource_client::ClientBuilder::for_url(&edge.url("/api/client/streaming"))
.unwrap()
.header("Authorization", &upstream_known_token.token)
.unwrap()
.build();
let num_events_to_collect = 5;
let num_events_to_collect = 3;
let events = Arc::new(Mutex::new(Vec::new()));
let events_clone = events.clone();

tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;

let handle = tokio::spawn(async move {
let _ = es_client
.stream()
.take(num_events_to_collect)
.take(4)
.try_for_each(|sse| {
let events_clone = events.clone();
async move {
println!("{:?}", sse);
// match sse {}
events_clone.lock().unwrap().push(sse);
Ok(())
}
});
})
.await
.expect("Stream processing failed");
});

handle.await.unwrap();
let handle2 = tokio::spawn(async move {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;

println!("Inserting new features");
unleash_features_cache.insert(
cache_key(&upstream_known_token),
features_from_disk("../examples/hostedexample.json"),
);

unleash_broadcaster.broadcast().await;
});

let _ = tokio::join!(handle, handle2);
// let _ = future::join_all(iter::once(handle, handle2)).await;
// handle.await.unwrap();

// unleash_features_cache.insert(
// cache_key(&upstream_known_token),
// features_from_disk("../examples/hostedexample.json"),
// );

// unleash_broadcaster.broadcast().await;

// println!()

// Now we can inspect the collected events
let collected_events = events_clone.lock().unwrap();
print!("Collected events: {collected_events:?}");
println!("Collected events: {:?}", collected_events.len());
for (i, event) in collected_events.iter().enumerate() {
println!("Event {}: {:?}", i, event);
}
Expand All @@ -88,7 +132,7 @@ use unleash_edge::auth::token_validator::TokenValidator;
use unleash_edge::http::unleash_client::new_reqwest_client;
use unleash_edge::metrics::client_metrics::MetricsCache;

async fn edge_server(upstream_url: &str) -> TestServer {
async fn edge_server(upstream_url: &str, token: EdgeToken) -> TestServer {
let unleash_client = Arc::new(UnleashClient::from_url(
Url::parse(upstream_url).unwrap(),
"Authorization".into(),
Expand All @@ -106,6 +150,7 @@ async fn edge_server(upstream_url: &str) -> TestServer {

let features_cache: Arc<DashMap<String, ClientFeatures>> = Arc::new(DashMap::default());
let token_cache: Arc<DashMap<String, EdgeToken>> = Arc::new(DashMap::default());
token_cache.insert(token.token.clone(), token.clone());
let engine_cache: Arc<DashMap<String, EngineState>> = Arc::new(DashMap::default());
let broadcaster = Broadcaster::new(features_cache.clone());
let feature_refresher = Arc::new(FeatureRefresher {
Expand All @@ -121,6 +166,18 @@ async fn edge_server(upstream_url: &str) -> TestServer {
token_cache: token_cache.clone(),
persistence: None,
});
feature_refresher
.register_token_for_refresh(token.clone(), None)
.await;
let refresher_for_background = feature_refresher.clone();

let handle = tokio::spawn(async move {
let _ = refresher_for_background
.start_streaming_features_background_task()
.await;
});

handle.await.unwrap();
test_server(move || {
let config =
serde_qs::actix::QsQueryConfig::default().qs_config(serde_qs::Config::new(5, false));
Expand Down

0 comments on commit 6bc8637

Please sign in to comment.