Skip to content

Commit

Permalink
initial streaming spike
Browse files Browse the repository at this point in the history
  • Loading branch information
thomasheartman committed Dec 12, 2024
1 parent 0481ebf commit 925fc0c
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 3 deletions.
10 changes: 10 additions & 0 deletions server/src/client_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -689,10 +689,12 @@ mod tests {
let upstream_features_cache = Arc::new(DashMap::default());
let upstream_engine_cache = Arc::new(DashMap::default());
upstream_token_cache.insert(token.token.clone(), token.clone());
let broadcaster = Broadcaster::new(upstream_features_cache.clone());
let srv = upstream_server(
upstream_token_cache,
upstream_features_cache,
upstream_engine_cache,
broadcaster.clone(),
)
.await;
let req = reqwest::Client::new();
Expand Down Expand Up @@ -728,10 +730,12 @@ mod tests {
let upstream_features_cache = Arc::new(DashMap::default());
let upstream_engine_cache = Arc::new(DashMap::default());
upstream_token_cache.insert(frontend_token.token.clone(), frontend_token.clone());
let broadcaster = Broadcaster::new(upstream_features_cache.clone());
let srv = upstream_server(
upstream_token_cache,
upstream_features_cache,
upstream_engine_cache,
broadcaster.clone(),
)
.await;
let client = UnleashClient::new(srv.url("/").as_str(), None).unwrap();
Expand Down Expand Up @@ -994,10 +998,12 @@ mod tests {
Arc::new(DashMap::default());
let upstream_token_cache: Arc<DashMap<String, EdgeToken>> = Arc::new(DashMap::default());
let upstream_engine_cache: Arc<DashMap<String, EngineState>> = Arc::new(DashMap::default());
let broadcaster = Broadcaster::new(upstream_features_cache.clone());
let server = upstream_server(
upstream_token_cache.clone(),
upstream_features_cache.clone(),
upstream_engine_cache.clone(),
broadcaster.clone(),
)
.await;
let upstream_features = features_from_disk("../examples/hostedexample.json");
Expand Down Expand Up @@ -1058,10 +1064,12 @@ mod tests {
Arc::new(DashMap::default());
let upstream_token_cache: Arc<DashMap<String, EdgeToken>> = Arc::new(DashMap::default());
let upstream_engine_cache: Arc<DashMap<String, EngineState>> = Arc::new(DashMap::default());
let broadcaster = Broadcaster::new(upstream_features_cache.clone());
let server = upstream_server(
upstream_token_cache.clone(),
upstream_features_cache.clone(),
upstream_engine_cache.clone(),
broadcaster.clone(),
)
.await;
let upstream_features = features_from_disk("../examples/hostedexample.json");
Expand Down Expand Up @@ -1181,10 +1189,12 @@ mod tests {
Arc::new(DashMap::default());
let upstream_token_cache: Arc<DashMap<String, EdgeToken>> = Arc::new(DashMap::default());
let upstream_engine_cache: Arc<DashMap<String, EngineState>> = Arc::new(DashMap::default());
let broadcaster = Broadcaster::new(upstream_features_cache.clone());
let server = upstream_server(
upstream_token_cache.clone(),
upstream_features_cache.clone(),
upstream_engine_cache.clone(),
broadcaster.clone(),
)
.await;
let upstream_features = features_from_disk("../examples/hostedexample.json");
Expand Down
2 changes: 2 additions & 0 deletions server/src/http/broadcaster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,3 +206,5 @@ impl Broadcaster {
let _ = future::join_all(send_events).await;
}
}

// e2e test? -> test module in the test folder (makes it a separate compilation unit; will increase test compliation time)
6 changes: 6 additions & 0 deletions server/src/internal_backstage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ mod tests {
use unleash_yggdrasil::EngineState;

use crate::auth::token_validator::TokenValidator;
use crate::http::broadcaster::Broadcaster;
use crate::http::feature_refresher::FeatureRefresher;
use crate::http::unleash_client::UnleashClient;
use crate::internal_backstage::EdgeStatus;
Expand Down Expand Up @@ -312,6 +313,7 @@ mod tests {
Arc::new(DashMap::default()),
Arc::new(DashMap::default()),
Arc::new(DashMap::default()),
Broadcaster::new(Arc::new(DashMap::default())),
)
.await;
let unleash_client =
Expand Down Expand Up @@ -353,10 +355,12 @@ mod tests {
Arc::new(DashMap::default());
let upstream_token_cache: Arc<DashMap<String, EdgeToken>> = Arc::new(DashMap::default());
let upstream_engine_cache: Arc<DashMap<String, EngineState>> = Arc::new(DashMap::default());
let broadcaster = Broadcaster::new(upstream_features_cache.clone());
let server = upstream_server(
upstream_token_cache.clone(),
upstream_features_cache.clone(),
upstream_engine_cache.clone(),
broadcaster.clone(),
)
.await;
let upstream_features = crate::tests::features_from_disk("../examples/hostedexample.json");
Expand Down Expand Up @@ -425,10 +429,12 @@ mod tests {
Arc::new(DashMap::default());
let upstream_token_cache: Arc<DashMap<String, EdgeToken>> = Arc::new(DashMap::default());
let upstream_engine_cache: Arc<DashMap<String, EngineState>> = Arc::new(DashMap::default());
let broadcaster = Broadcaster::new(upstream_features_cache.clone());
let server = upstream_server(
upstream_token_cache.clone(),
upstream_features_cache.clone(),
upstream_engine_cache.clone(),
broadcaster.clone(),
)
.await;
let upstream_features = crate::tests::features_from_disk("../examples/hostedexample.json");
Expand Down
69 changes: 66 additions & 3 deletions server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,30 @@ pub mod tokens;
pub mod types;
pub mod urls;
#[cfg(test)]
mod tests {
pub mod tests {
use std::fs;
use std::io::BufReader;
use std::path::PathBuf;
use std::sync::Arc;

use crate::client_api::configure_client_api;
use crate::middleware;
use actix_http::HttpService;
use actix_http_test::{test_server, TestServer};
use actix_service::map_config;
use actix_web::dev::AppConfig;
use actix_web::{web, App};
use actix_web::dev::{AppConfig, Url};
use actix_web::web::Data;
use actix_web::{test, web, App};
use chrono::Duration;
use dashmap::DashMap;
use unleash_types::client_features::ClientFeatures;
use unleash_types::client_metrics::ConnectVia;
use unleash_yggdrasil::EngineState;

use crate::auth::token_validator::TokenValidator;
use crate::http::broadcaster::{self, Broadcaster};
use crate::http::feature_refresher::FeatureRefresher;
use crate::http::unleash_client::UnleashClient;
use crate::metrics::client_metrics::MetricsCache;
use crate::types::EdgeToken;

Expand All @@ -55,10 +62,65 @@ mod tests {
serde_json::from_reader(reader).unwrap()
}

pub async fn edge_server(upstream_url: &str) -> TestServer {
let unleash_client = Arc::new(UnleashClient::new(upstream_url, None).unwrap());

let features_cache: Arc<DashMap<String, ClientFeatures>> = Arc::new(DashMap::default());
let token_cache: Arc<DashMap<String, EdgeToken>> = Arc::new(DashMap::default());
let engine_cache: Arc<DashMap<String, EngineState>> = Arc::new(DashMap::default());
let broadcaster = Broadcaster::new(features_cache.clone());
let feature_refresher = Arc::new(FeatureRefresher {
unleash_client: unleash_client.clone(),
features_cache: features_cache.clone(),
engine_cache: engine_cache.clone(),
refresh_interval: Duration::seconds(6000),
broadcaster: broadcaster.clone(),
..Default::default()
});
let token_validator = Arc::new(TokenValidator {
unleash_client: unleash_client.clone(),
token_cache: token_cache.clone(),
persistence: None,
});
test_server(move || {
let config = serde_qs::actix::QsQueryConfig::default()
.qs_config(serde_qs::Config::new(5, false));
let metrics_cache = MetricsCache::default();
let connect_via = ConnectVia {
app_name: "edge".into(),
instance_id: "testinstance".into(),
};
HttpService::new(map_config(
App::new()
.app_data(config)
.app_data(web::Data::from(token_validator.clone()))
.app_data(web::Data::from(features_cache.clone()))
.app_data(web::Data::from(broadcaster.clone()))
.app_data(web::Data::from(engine_cache.clone()))
.app_data(web::Data::from(token_cache.clone()))
.app_data(web::Data::new(metrics_cache))
.app_data(web::Data::new(connect_via))
.app_data(web::Data::from(feature_refresher.clone()))
.service(
web::scope("/api")
.configure(crate::client_api::configure_client_api)
.configure(|cfg| {
crate::frontend_api::configure_frontend_api(cfg, false)
}),
)
.service(web::scope("/edge").configure(crate::edge_api::configure_edge_api)),
|_| AppConfig::default(),
))
.tcp()
})
.await
}

pub async fn upstream_server(
upstream_token_cache: Arc<DashMap<String, EdgeToken>>,
upstream_features_cache: Arc<DashMap<String, ClientFeatures>>,
upstream_engine_cache: Arc<DashMap<String, EngineState>>,
upstream_broadcaster: Arc<Broadcaster>,
) -> TestServer {
let token_validator = Arc::new(TokenValidator {
unleash_client: Arc::new(Default::default()),
Expand All @@ -79,6 +141,7 @@ mod tests {
.app_data(config)
.app_data(web::Data::from(token_validator.clone()))
.app_data(web::Data::from(upstream_features_cache.clone()))
.app_data(web::Data::from(upstream_broadcaster.clone()))
.app_data(web::Data::from(upstream_engine_cache.clone()))
.app_data(web::Data::from(upstream_token_cache.clone()))
.app_data(web::Data::new(metrics_cache))
Expand Down
65 changes: 65 additions & 0 deletions server/tests/streaming_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#[cfg(test)]
mod streaming_tests {
use actix_web::App;
use chrono::Duration;
use dashmap::DashMap;
use eventsource_client::Client;
use futures::TryStreamExt;
use reqwest::Client;
use std::{
process::{Command, Stdio},
str::FromStr,
sync::Arc,
};
use unleash_edge::{
http::{
broadcaster::Broadcaster, feature_refresher::FeatureRefresher,
unleash_client::UnleashClient,
},
tests::{edge_server, upstream_server},
types::{BuildInfo, EdgeToken, TokenType, TokenValidationStatus},
};
use unleash_types::client_features::ClientFeatures;

#[actix_web::test]
async fn test_streaming() {
let unleash_broadcaster = Broadcaster::new(Arc::new(DashMap::default()));
let unleash_features_cache: Arc<DashMap<String, ClientFeatures>> =
Arc::new(DashMap::default());
let unleash_token_cache: Arc<DashMap<String, EdgeToken>> = Arc::new(DashMap::default());

let unleash_server = upstream_server(
unleash_token_cache.clone(),
unleash_features_cache.clone(),
Arc::new(DashMap::default()),
unleash_broadcaster.clone(),
)
.await;

let edge = edge_server(&unleash_server.url("/")).await;

let mut upstream_known_token = EdgeToken::from_str("dx:development.secret123").unwrap();
upstream_known_token.status = TokenValidationStatus::Validated;
upstream_known_token.token_type = Some(TokenType::Client);
unleash_token_cache.insert(
upstream_known_token.token.clone(),
upstream_known_token.clone(),
);

let es_client =
eventsource_client::ClientBuilder::for_url(&edge.url("/api/client/streaming"))
.unwrap()
.header("Authorization", &upstream_known_token.token)
.unwrap()
.build();

let events = vec![];
tokio::spawn(async move {
let mut stream = es_client
.stream()
.map_ok(move |sse| async move { events.push(sse) });
});

print!("{events:?}")
}
}

0 comments on commit 925fc0c

Please sign in to comment.