From 39b0a2e770b5038de80f9d72dbba095b27961b75 Mon Sep 17 00:00:00 2001 From: Christopher Kolstad Date: Tue, 17 Dec 2024 14:57:41 +0100 Subject: [PATCH] feat: broadcast updates (#604) * Tokio broadcasting channel for updates of features * Allow subscription from feature cache Co-authored-by: Thomas Heartman --- .github/workflows/test-with-coverage.yaml | 13 +- server/src/builder.rs | 5 +- server/src/client_api.rs | 56 +-- server/src/feature_cache.rs | 140 ++++++ server/src/frontend_api.rs | 6 +- server/src/http/broadcaster.rs | 26 +- server/src/http/feature_refresher.rs | 165 ++----- server/src/http/unleash_client.rs | 2 +- server/src/internal_backstage.rs | 25 +- server/src/lib.rs | 5 +- server/src/main.rs | 80 ++-- .../client_token_from_frontend_token.rs | 10 +- server/src/offline/offline_hotload.rs | 6 +- server/src/persistence/mod.rs | 13 +- server/src/ready_checker.rs | 3 +- server/tests/streaming_test.rs | 450 ++++++++---------- 16 files changed, 507 insertions(+), 498 deletions(-) create mode 100644 server/src/feature_cache.rs diff --git a/.github/workflows/test-with-coverage.yaml b/.github/workflows/test-with-coverage.yaml index 6884eb3a..b4af9348 100644 --- a/.github/workflows/test-with-coverage.yaml +++ b/.github/workflows/test-with-coverage.yaml @@ -6,24 +6,26 @@ on: branches: - main paths: - - '**.rs' - - '**.toml' + - "**.rs" + - "**.toml" pull_request: branches: - main paths: - - '**.rs' - - '**.toml' + - "**.rs" + - "**.toml" jobs: tarpaulin: runs-on: ubuntu-latest name: Run test coverage using Tarpaulin - env: + env: CARGO_TERM_COLOR: always steps: - name: Checkout code uses: actions/checkout@v4 + with: + fetch-depth: 0 - name: Install rust run: | rustup set auto-self-update disable @@ -37,4 +39,3 @@ jobs: - name: Run Tarpaulin (Reporting to coveralls) run: | cargo tarpaulin --all-features --coveralls ${{ secrets.COVERALLS_KEY }} --skip-clean - \ No newline at end of file diff --git a/server/src/builder.rs b/server/src/builder.rs index 46758328..38655eed 100644 --- a/server/src/builder.rs +++ b/server/src/builder.rs @@ -11,6 +11,7 @@ use unleash_types::client_features::ClientFeatures; use unleash_yggdrasil::EngineState; use crate::cli::RedisMode; +use crate::feature_cache::FeatureCache; use crate::http::unleash_client::new_reqwest_client; use crate::offline::offline_hotload::{load_bootstrap, load_offline_engine_cache}; use crate::persistence::file::FilePersister; @@ -27,7 +28,7 @@ use crate::{ type CacheContainer = ( Arc>, - Arc>, + Arc, Arc>, ); type EdgeInfo = ( @@ -43,7 +44,7 @@ fn build_caches() -> CacheContainer { let engine_cache: DashMap = DashMap::default(); ( Arc::new(token_cache), - Arc::new(features_cache), + Arc::new(FeatureCache::new(features_cache)), Arc::new(engine_cache), ) } diff --git a/server/src/client_api.rs b/server/src/client_api.rs index 8adb4f88..b4252a44 100644 --- a/server/src/client_api.rs +++ b/server/src/client_api.rs @@ -1,4 +1,5 @@ use crate::error::EdgeError; +use crate::feature_cache::FeatureCache; use crate::filters::{ filter_client_features, name_match_filter, name_prefix_filter, project_filter, FeatureFilterSet, }; @@ -31,7 +32,7 @@ use unleash_types::client_metrics::{ClientApplication, ClientMetrics, ConnectVia #[get("/features")] pub async fn get_features( edge_token: EdgeToken, - features_cache: Data>, + features_cache: Data, token_cache: Data>, filter_query: Query, req: HttpRequest, @@ -76,7 +77,7 @@ pub async fn stream_features( #[post("/features")] pub async fn post_features( edge_token: EdgeToken, - features_cache: Data>, + features_cache: Data, token_cache: Data>, filter_query: Query, req: HttpRequest, @@ -119,7 +120,7 @@ fn get_feature_filter( async fn resolve_features( edge_token: EdgeToken, - features_cache: Data>, + features_cache: Data, token_cache: Data>, filter_query: Query, req: HttpRequest, @@ -160,7 +161,7 @@ async fn resolve_features( #[get("/features/{feature_name}")] pub async fn get_feature( edge_token: EdgeToken, - features_cache: Data>, + features_cache: Data, token_cache: Data>, feature_name: web::Path, req: HttpRequest, @@ -298,8 +299,6 @@ pub fn configure_experimental_post_features( #[cfg(test)] mod tests { - #[cfg(feature = "streaming")] - use crate::http::broadcaster::Broadcaster; use crate::metrics::client_metrics::{ApplicationKey, MetricsBatch, MetricsKey}; use crate::types::{TokenType, TokenValidationStatus}; use std::collections::HashMap; @@ -583,7 +582,7 @@ mod tests { #[tokio::test] async fn response_includes_variant_stickiness_for_strategy_variants() { - let features_cache: Arc> = Arc::new(DashMap::default()); + let features_cache = Arc::new(FeatureCache::default()); let token_cache: Arc> = Arc::new(DashMap::default()); let app = test::init_service( App::new() @@ -687,7 +686,7 @@ mod tests { token.status = TokenValidationStatus::Validated; token.token_type = Some(TokenType::Client); let upstream_token_cache = Arc::new(DashMap::default()); - let upstream_features_cache = Arc::new(DashMap::default()); + let upstream_features_cache = Arc::new(FeatureCache::default()); let upstream_engine_cache = Arc::new(DashMap::default()); upstream_token_cache.insert(token.token.clone(), token.clone()); let srv = upstream_server( @@ -726,7 +725,7 @@ mod tests { frontend_token.status = TokenValidationStatus::Validated; frontend_token.token_type = Some(TokenType::Frontend); let upstream_token_cache = Arc::new(DashMap::default()); - let upstream_features_cache = Arc::new(DashMap::default()); + let upstream_features_cache = Arc::new(FeatureCache::default()); let upstream_engine_cache = Arc::new(DashMap::default()); upstream_token_cache.insert(frontend_token.token.clone(), frontend_token.clone()); let srv = upstream_server( @@ -768,7 +767,7 @@ mod tests { #[tokio::test] async fn client_features_endpoint_correctly_returns_cached_features() { - let features_cache: Arc> = Arc::new(DashMap::default()); + let features_cache = Arc::new(FeatureCache::default()); let token_cache: Arc> = Arc::new(DashMap::default()); let app = test::init_service( App::new() @@ -805,7 +804,7 @@ mod tests { #[tokio::test] async fn post_request_to_client_features_does_the_same_as_get_when_mounted() { - let features_cache: Arc> = Arc::new(DashMap::default()); + let features_cache = Arc::new(FeatureCache::default()); let token_cache: Arc> = Arc::new(DashMap::default()); let app = test::init_service( App::new() @@ -855,7 +854,7 @@ mod tests { #[tokio::test] async fn client_features_endpoint_filters_on_project_access_in_token() { - let features_cache: Arc> = Arc::new(DashMap::default()); + let features_cache = Arc::new(FeatureCache::default()); let token_cache: Arc> = Arc::new(DashMap::default()); let app = test::init_service( App::new() @@ -884,7 +883,7 @@ mod tests { #[tokio::test] async fn client_features_endpoint_filters_when_multiple_projects_in_token() { - let features_cache: Arc> = Arc::new(DashMap::default()); + let features_cache = Arc::new(FeatureCache::default()); let token_cache: Arc> = Arc::new(DashMap::default()); let app = test::init_service( App::new() @@ -913,7 +912,7 @@ mod tests { #[tokio::test] async fn client_features_endpoint_filters_correctly_when_token_has_access_to_multiple_projects() { - let features_cache: Arc> = Arc::new(DashMap::default()); + let features_cache = Arc::new(FeatureCache::default()); let token_cache: Arc> = Arc::new(DashMap::default()); let app = test::init_service( App::new() @@ -964,7 +963,7 @@ mod tests { #[tokio::test] async fn when_running_in_offline_mode_with_proxy_key_should_not_filter_features() { - let features_cache: Arc> = Arc::new(DashMap::default()); + let features_cache = Arc::new(FeatureCache::default()); let token_cache: Arc> = Arc::new(DashMap::default()); let app = test::init_service( App::new() @@ -991,8 +990,7 @@ mod tests { #[tokio::test] async fn calling_client_features_endpoint_with_new_token_hydrates_from_upstream_when_dynamic() { - let upstream_features_cache: Arc> = - Arc::new(DashMap::default()); + let upstream_features_cache = Arc::new(FeatureCache::default()); let upstream_token_cache: Arc> = Arc::new(DashMap::default()); let upstream_engine_cache: Arc> = Arc::new(DashMap::default()); let server = upstream_server( @@ -1011,7 +1009,7 @@ mod tests { ); upstream_features_cache.insert(cache_key(&upstream_known_token), upstream_features.clone()); let unleash_client = Arc::new(UnleashClient::new(server.url("/").as_str(), None).unwrap()); - let features_cache: Arc> = Arc::new(DashMap::default()); + let features_cache: Arc = Arc::new(FeatureCache::default()); let token_cache: Arc> = Arc::new(DashMap::default()); let engine_cache: Arc> = Arc::new(DashMap::default()); let feature_refresher = Arc::new(FeatureRefresher { @@ -1023,8 +1021,6 @@ mod tests { persistence: None, strict: false, app_name: "test-app".into(), - #[cfg(feature = "streaming")] - broadcaster: Broadcaster::new(features_cache.clone()), }); let token_validator = Arc::new(TokenValidator { unleash_client: unleash_client.clone(), @@ -1055,8 +1051,7 @@ mod tests { #[tokio::test] async fn calling_client_features_endpoint_with_new_token_does_not_hydrate_when_strict() { - let upstream_features_cache: Arc> = - Arc::new(DashMap::default()); + let upstream_features_cache = Arc::new(FeatureCache::default()); let upstream_token_cache: Arc> = Arc::new(DashMap::default()); let upstream_engine_cache: Arc> = Arc::new(DashMap::default()); let server = upstream_server( @@ -1075,7 +1070,7 @@ mod tests { ); upstream_features_cache.insert(cache_key(&upstream_known_token), upstream_features.clone()); let unleash_client = Arc::new(UnleashClient::new(server.url("/").as_str(), None).unwrap()); - let features_cache: Arc> = Arc::new(DashMap::default()); + let features_cache: Arc = Arc::new(FeatureCache::default()); let token_cache: Arc> = Arc::new(DashMap::default()); let engine_cache: Arc> = Arc::new(DashMap::default()); let feature_refresher = Arc::new(FeatureRefresher { @@ -1114,7 +1109,7 @@ mod tests { #[tokio::test] pub async fn gets_feature_by_name() { - let features_cache: Arc> = Arc::new(DashMap::default()); + let features_cache = Arc::new(FeatureCache::default()); let token_cache: Arc> = Arc::new(DashMap::default()); let engine_cache: Arc> = Arc::new(DashMap::default()); let features = features_from_disk("../examples/hostedexample.json"); @@ -1146,7 +1141,7 @@ mod tests { #[tokio::test] pub async fn token_with_no_access_to_named_feature_yields_404() { - let features_cache: Arc> = Arc::new(DashMap::default()); + let features_cache = Arc::new(FeatureCache::default()); let token_cache: Arc> = Arc::new(DashMap::default()); let engine_cache: Arc> = Arc::new(DashMap::default()); let features = features_from_disk("../examples/hostedexample.json"); @@ -1178,8 +1173,7 @@ mod tests { #[tokio::test] pub async fn still_subsumes_tokens_after_moving_registration_to_initial_hydration_when_dynamic() { - let upstream_features_cache: Arc> = - Arc::new(DashMap::default()); + let upstream_features_cache: Arc = Arc::new(FeatureCache::default()); let upstream_token_cache: Arc> = Arc::new(DashMap::default()); let upstream_engine_cache: Arc> = Arc::new(DashMap::default()); let server = upstream_server( @@ -1199,7 +1193,7 @@ mod tests { upstream_token_cache.insert(upstream_eg_token.token.clone(), upstream_eg_token.clone()); upstream_features_cache.insert(cache_key(&upstream_dx_token), upstream_features.clone()); let unleash_client = Arc::new(UnleashClient::new(server.url("/").as_str(), None).unwrap()); - let features_cache: Arc> = Arc::new(DashMap::default()); + let features_cache: Arc = Arc::new(FeatureCache::default()); let token_cache: Arc> = Arc::new(DashMap::default()); let engine_cache: Arc> = Arc::new(DashMap::default()); let feature_refresher = Arc::new(FeatureRefresher { @@ -1248,7 +1242,7 @@ mod tests { #[tokio::test] pub async fn can_filter_features_list_by_name_prefix() { - let features_cache: Arc> = Arc::new(DashMap::default()); + let features_cache = Arc::new(FeatureCache::default()); let token_cache: Arc> = Arc::new(DashMap::default()); let engine_cache: Arc> = Arc::new(DashMap::default()); let features = features_from_disk("../examples/hostedexample.json"); @@ -1280,7 +1274,7 @@ mod tests { #[tokio::test] pub async fn only_gets_correct_feature_by_name() { - let features_cache: Arc> = Arc::new(DashMap::default()); + let features_cache = Arc::new(FeatureCache::default()); let token_cache: Arc> = Arc::new(DashMap::default()); let engine_cache: Arc> = Arc::new(DashMap::default()); let features = ClientFeatures { @@ -1356,7 +1350,7 @@ mod tests { #[tokio::test] async fn client_features_endpoint_works_with_overridden_token_header() { - let features_cache: Arc> = Arc::new(DashMap::default()); + let features_cache = Arc::new(FeatureCache::default()); let token_cache: Arc> = Arc::new(DashMap::default()); let token_header = TokenHeader::from_str("NeedsToBeTested").unwrap(); println!("token_header: {:?}", token_header); diff --git a/server/src/feature_cache.rs b/server/src/feature_cache.rs new file mode 100644 index 00000000..dec92097 --- /dev/null +++ b/server/src/feature_cache.rs @@ -0,0 +1,140 @@ +use dashmap::DashMap; +use tokio::sync::broadcast; +use unleash_types::{ + client_features::{ClientFeature, ClientFeatures, Segment}, + Deduplicate, +}; + +use crate::types::EdgeToken; + +#[derive(Debug, Clone)] +pub enum UpdateType { + Full(String), + Update(String), + Deletion, +} + +#[derive(Debug, Clone)] +pub struct FeatureCache { + features: DashMap, + update_sender: broadcast::Sender, +} + +impl FeatureCache { + pub fn new(features: DashMap) -> Self { + let (tx, _rx) = tokio::sync::broadcast::channel::(16); + Self { + features, + update_sender: tx, + } + } + + pub fn len(&self) -> usize { + self.features.len() + } + + pub fn subscribe(&self) -> broadcast::Receiver { + self.update_sender.subscribe() + } + pub fn get(&self, key: &str) -> Option> { + self.features.get(key) + } + + pub fn insert(&self, key: String, features: ClientFeatures) -> Option { + let v = self.features.insert(key.clone(), features); + self.send_full_update(key); + v + } + + pub fn send_full_update(&self, cache_key: String) { + let _ = self.update_sender.send(UpdateType::Full(cache_key)); + } + + pub fn remove(&self, key: &str) -> Option<(String, ClientFeatures)> { + let v = self.features.remove(key); + self.send_full_update(key.to_string()); + v + } + + pub fn modify(&self, key: String, token: &EdgeToken, features: ClientFeatures) { + self.features + .entry(key.clone()) + .and_modify(|existing_features| { + let updated = update_client_features(token, existing_features, &features); + *existing_features = updated; + }) + .or_insert(features); + self.send_full_update(key); + } + + pub fn is_empty(&self) -> bool { + self.features.is_empty() + } + + pub fn iter(&self) -> dashmap::iter::Iter<'_, String, ClientFeatures> { + self.features.iter() + } +} + +impl Default for FeatureCache { + fn default() -> Self { + FeatureCache::new(DashMap::default()) + } +} + +fn update_client_features( + token: &EdgeToken, + old: &ClientFeatures, + update: &ClientFeatures, +) -> ClientFeatures { + let mut updated_features = + update_projects_from_feature_update(token, &old.features, &update.features); + updated_features.sort(); + let segments = merge_segments_update(old.segments.clone(), update.segments.clone()); + ClientFeatures { + version: old.version.max(update.version), + features: updated_features, + segments: segments.map(|mut s| { + s.sort(); + s + }), + query: old.query.clone().or(update.query.clone()), + } +} + +pub(crate) fn update_projects_from_feature_update( + token: &EdgeToken, + original: &[ClientFeature], + updated: &[ClientFeature], +) -> Vec { + let projects_to_update = &token.projects; + if projects_to_update.contains(&"*".into()) { + updated.into() + } else { + let mut to_keep: Vec = original + .iter() + .filter(|toggle| { + let p = toggle.project.clone().unwrap_or_else(|| "default".into()); + !projects_to_update.contains(&p) + }) + .cloned() + .collect(); + to_keep.extend(updated.iter().cloned()); + to_keep + } +} + +fn merge_segments_update( + segments: Option>, + updated_segments: Option>, +) -> Option> { + match (segments, updated_segments) { + (Some(s), Some(mut o)) => { + o.extend(s); + Some(o.deduplicate()) + } + (Some(s), None) => Some(s), + (None, Some(o)) => Some(o), + (None, None) => None, + } +} diff --git a/server/src/frontend_api.rs b/server/src/frontend_api.rs index 43427600..a08c98ea 100644 --- a/server/src/frontend_api.rs +++ b/server/src/frontend_api.rs @@ -796,12 +796,12 @@ mod tests { }; use unleash_yggdrasil::EngineState; - use crate::builder::build_offline_mode; use crate::cli::{EdgeMode, OfflineArgs, TrustProxy}; use crate::metrics::client_metrics::MetricsCache; use crate::metrics::client_metrics::MetricsKey; use crate::middleware; use crate::types::{EdgeToken, TokenType, TokenValidationStatus}; + use crate::{builder::build_offline_mode, feature_cache::FeatureCache}; async fn make_test_request() -> Request { make_test_request_to("/api/proxy/client/metrics").await @@ -1318,7 +1318,7 @@ mod tests { #[tokio::test] async fn frontend_token_without_matching_client_token_yields_511_when_trying_to_access_frontend_api( ) { - let features_cache: Arc> = Arc::new(DashMap::default()); + let features_cache = Arc::new(FeatureCache::default()); let engine_cache: Arc> = Arc::new(DashMap::default()); let token_cache: Arc> = Arc::new(DashMap::default()); let app = test::init_service( @@ -1351,7 +1351,7 @@ mod tests { #[tokio::test] async fn invalid_token_is_refused_with_403() { - let features_cache: Arc> = Arc::new(DashMap::default()); + let features_cache = Arc::new(FeatureCache::default()); let engine_cache: Arc> = Arc::new(DashMap::default()); let token_cache: Arc> = Arc::new(DashMap::default()); let app = test::init_service( diff --git a/server/src/http/broadcaster.rs b/server/src/http/broadcaster.rs index 0b4a4250..5ad7286d 100644 --- a/server/src/http/broadcaster.rs +++ b/server/src/http/broadcaster.rs @@ -17,11 +17,12 @@ use futures::future; use serde::Serialize; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; -use tracing::warn; +use tracing::{debug, warn}; use unleash_types::client_features::{ClientFeatures, Query as FlagQuery}; use crate::{ error::EdgeError, + feature_cache::FeatureCache, filters::{filter_client_features, name_prefix_filter, project_filter, FeatureFilterSet}, tokens::cache_key, types::{EdgeJsonResult, EdgeResult, EdgeToken, FeatureFilters}, @@ -47,20 +48,21 @@ struct ClientGroup { pub struct Broadcaster { active_connections: DashMap, - features_cache: Arc>, + features_cache: Arc, } impl Broadcaster { /// Constructs new broadcaster and spawns ping loop. - pub fn new(features: Arc>) -> Arc { - let this = Arc::new(Broadcaster { + pub fn new(features: Arc) -> Arc { + let broadcaster = Arc::new(Broadcaster { active_connections: DashMap::new(), - features_cache: features, + features_cache: features.clone(), }); - Broadcaster::spawn_heartbeat(Arc::clone(&this)); + Broadcaster::spawn_heartbeat(broadcaster.clone()); + Broadcaster::spawn_feature_cache_subscriber(broadcaster.clone()); - this + broadcaster } /// Pings clients every 30 seconds to see if they are alive and remove them from the broadcast @@ -76,6 +78,16 @@ impl Broadcaster { }); } + fn spawn_feature_cache_subscriber(this: Arc) { + let mut rx = this.features_cache.subscribe(); + tokio::spawn(async move { + while let Ok(key) = rx.recv().await { + debug!("Received update for key: {:?}", key); + this.broadcast().await; + } + }); + } + /// Removes all non-responsive clients from broadcast list. async fn heartbeat(&self) { for mut group in self.active_connections.iter_mut() { diff --git a/server/src/http/feature_refresher.rs b/server/src/http/feature_refresher.rs index 3d6e526e..b79c5b18 100644 --- a/server/src/http/feature_refresher.rs +++ b/server/src/http/feature_refresher.rs @@ -10,15 +10,12 @@ use eventsource_client::Client; use futures::TryStreamExt; use reqwest::StatusCode; use tracing::{debug, info, warn}; -use unleash_types::client_features::Segment; +use unleash_types::client_features::ClientFeatures; use unleash_types::client_metrics::{ClientApplication, MetricsMetadata}; -use unleash_types::{ - client_features::{ClientFeature, ClientFeatures}, - Deduplicate, -}; use unleash_yggdrasil::EngineState; use crate::error::{EdgeError, FeatureError}; +use crate::feature_cache::FeatureCache; use crate::filters::{filter_client_features, FeatureFilterSet}; use crate::types::{build, EdgeResult, TokenType, TokenValidationStatus}; use crate::{ @@ -27,8 +24,6 @@ use crate::{ types::{ClientFeaturesRequest, ClientFeaturesResponse, EdgeToken, TokenRefresh}, }; -#[cfg(feature = "streaming")] -use super::broadcaster::Broadcaster; use super::unleash_client::UnleashClient; fn frontend_token_is_covered_by_tokens( @@ -42,74 +37,16 @@ fn frontend_token_is_covered_by_tokens( }) } -fn update_client_features( - token: &EdgeToken, - old: &ClientFeatures, - update: &ClientFeatures, -) -> ClientFeatures { - let mut updated_features = - update_projects_from_feature_update(token, &old.features, &update.features); - updated_features.sort(); - let segments = merge_segments_update(old.segments.clone(), update.segments.clone()); - ClientFeatures { - version: old.version.max(update.version), - features: updated_features, - segments: segments.map(|mut s| { - s.sort(); - s - }), - query: old.query.clone().or(update.query.clone()), - } -} - -fn merge_segments_update( - segments: Option>, - updated_segments: Option>, -) -> Option> { - match (segments, updated_segments) { - (Some(s), Some(mut o)) => { - o.extend(s); - Some(o.deduplicate()) - } - (Some(s), None) => Some(s), - (None, Some(o)) => Some(o), - (None, None) => None, - } -} -pub(crate) fn update_projects_from_feature_update( - token: &EdgeToken, - original: &[ClientFeature], - updated: &[ClientFeature], -) -> Vec { - let projects_to_update = &token.projects; - if projects_to_update.contains(&"*".into()) { - updated.into() - } else { - let mut to_keep: Vec = original - .iter() - .filter(|toggle| { - let p = toggle.project.clone().unwrap_or_else(|| "default".into()); - !projects_to_update.contains(&p) - }) - .cloned() - .collect(); - to_keep.extend(updated.iter().cloned()); - to_keep - } -} - #[derive(Clone)] pub struct FeatureRefresher { pub unleash_client: Arc, pub tokens_to_refresh: Arc>, - pub features_cache: Arc>, + pub features_cache: Arc, pub engine_cache: Arc>, pub refresh_interval: chrono::Duration, pub persistence: Option>, pub strict: bool, pub app_name: String, - #[cfg(feature = "streaming")] - pub broadcaster: Arc, } impl Default for FeatureRefresher { @@ -118,13 +55,11 @@ impl Default for FeatureRefresher { refresh_interval: chrono::Duration::seconds(10), unleash_client: Default::default(), tokens_to_refresh: Arc::new(DashMap::default()), - features_cache: Default::default(), + features_cache: Arc::new(Default::default()), engine_cache: Default::default(), persistence: None, strict: true, app_name: "unleash_edge".into(), - #[cfg(feature = "streaming")] - broadcaster: Broadcaster::new(Default::default()), } } } @@ -154,7 +89,7 @@ fn client_application_from_token_and_name( impl FeatureRefresher { pub fn new( unleash_client: Arc, - features: Arc>, + features_cache: Arc, engines: Arc>, features_refresh_interval: chrono::Duration, persistence: Option>, @@ -164,9 +99,7 @@ impl FeatureRefresher { FeatureRefresher { unleash_client, tokens_to_refresh: Arc::new(DashMap::default()), - #[cfg(feature = "streaming")] - broadcaster: Broadcaster::new(features.clone()), - features_cache: features, + features_cache, engine_cache: engines, refresh_interval: features_refresh_interval, persistence, @@ -398,10 +331,16 @@ impl FeatureRefresher { } pub async fn start_refresh_features_background_task(&self) { - loop { - tokio::select! { - _ = tokio::time::sleep(Duration::from_secs(5)) => { - self.refresh_features().await; + if cfg!(feature = "streaming") { + loop { + tokio::time::sleep(Duration::from_secs(3600)).await; + } + } else { + loop { + tokio::select! { + _ = tokio::time::sleep(Duration::from_secs(5)) => { + self.refresh_features().await; + } } } } @@ -430,12 +369,7 @@ impl FeatureRefresher { let key = cache_key(refresh_token); self.update_last_refresh(refresh_token, etag, features.features.len()); self.features_cache - .entry(key.clone()) - .and_modify(|existing_data| { - let updated_data = update_client_features(refresh_token, existing_data, &features); - *existing_data = updated_data; - }) - .or_insert_with(|| features.clone()); + .modify(key.clone(), refresh_token, features.clone()); self.engine_cache .entry(key.clone()) .and_modify(|engine| { @@ -458,9 +392,6 @@ impl FeatureRefresher { }; new_state }); - - #[cfg(feature = "streaming")] - self.broadcaster.broadcast().await; } pub async fn refresh_single(&self, refresh: TokenRefresh) { @@ -569,9 +500,10 @@ mod tests { use chrono::{Duration, Utc}; use dashmap::DashMap; use reqwest::Url; - use unleash_types::client_features::{ClientFeature, ClientFeatures}; + use unleash_types::client_features::ClientFeature; use unleash_yggdrasil::EngineState; + use crate::feature_cache::{update_projects_from_feature_update, FeatureCache}; use crate::filters::{project_filter, FeatureFilterSet}; use crate::http::unleash_client::new_reqwest_client; use crate::tests::features_from_disk; @@ -616,7 +548,7 @@ mod tests { #[tokio::test] pub async fn registering_token_for_refresh_works() { let unleash_client = create_test_client(); - let features_cache = Arc::new(DashMap::default()); + let features_cache = Arc::new(FeatureCache::default()); let engine_cache = Arc::new(DashMap::default()); let duration = Duration::seconds(5); @@ -640,7 +572,7 @@ mod tests { pub async fn registering_multiple_tokens_with_same_environment_reduces_tokens_to_valid_minimal_set( ) { let unleash_client = create_test_client(); - let features_cache = Arc::new(DashMap::default()); + let features_cache = Arc::new(FeatureCache::default()); let engine_cache = Arc::new(DashMap::default()); let duration = Duration::seconds(5); @@ -668,7 +600,7 @@ mod tests { #[tokio::test] pub async fn registering_multiple_non_overlapping_tokens_will_keep_all() { let unleash_client = create_test_client(); - let features_cache = Arc::new(DashMap::default()); + let features_cache = Arc::new(FeatureCache::default()); let engine_cache = Arc::new(DashMap::default()); let duration = Duration::seconds(5); let feature_refresher = FeatureRefresher { @@ -703,7 +635,7 @@ mod tests { #[tokio::test] pub async fn registering_wildcard_project_token_only_keeps_the_wildcard() { let unleash_client = create_test_client(); - let features_cache = Arc::new(DashMap::default()); + let features_cache = Arc::new(FeatureCache::default()); let engine_cache = Arc::new(DashMap::default()); let duration = Duration::seconds(5); let feature_refresher = FeatureRefresher { @@ -747,7 +679,7 @@ mod tests { #[tokio::test] pub async fn registering_tokens_with_multiple_projects_overwrites_single_tokens() { let unleash_client = create_test_client(); - let features_cache = Arc::new(DashMap::default()); + let features_cache = Arc::new(FeatureCache::default()); let engine_cache = Arc::new(DashMap::default()); let duration = Duration::seconds(5); let feature_refresher = FeatureRefresher { @@ -795,7 +727,7 @@ mod tests { #[tokio::test] pub async fn registering_a_token_that_is_already_subsumed_does_nothing() { let unleash_client = create_test_client(); - let features_cache = Arc::new(DashMap::default()); + let features_cache = Arc::new(FeatureCache::default()); let engine_cache = Arc::new(DashMap::default()); let duration = Duration::seconds(5); @@ -828,7 +760,7 @@ mod tests { #[tokio::test] pub async fn simplification_only_happens_in_same_environment() { let unleash_client = create_test_client(); - let features_cache = Arc::new(DashMap::default()); + let features_cache = Arc::new(FeatureCache::default()); let engine_cache = Arc::new(DashMap::default()); let duration = Duration::seconds(5); @@ -856,7 +788,7 @@ mod tests { #[tokio::test] pub async fn is_able_to_only_fetch_for_tokens_due_to_refresh() { let unleash_client = create_test_client(); - let features_cache = Arc::new(DashMap::default()); + let features_cache = Arc::new(FeatureCache::default()); let engine_cache = Arc::new(DashMap::default()); let duration = Duration::seconds(5); @@ -927,7 +859,7 @@ mod tests { async fn client_api_test_server( upstream_token_cache: Arc>, - upstream_features_cache: Arc>, + upstream_features_cache: Arc, upstream_engine_cache: Arc>, ) -> TestServer { test_server(move || { @@ -945,8 +877,7 @@ mod tests { } #[tokio::test] pub async fn getting_403_when_refreshing_features_will_remove_token() { - let upstream_features_cache: Arc> = - Arc::new(DashMap::default()); + let upstream_features_cache: Arc = Arc::new(FeatureCache::default()); let upstream_engine_cache: Arc> = Arc::new(DashMap::default()); let upstream_token_cache: Arc> = Arc::new(DashMap::default()); let server = client_api_test_server( @@ -956,7 +887,7 @@ mod tests { ) .await; let unleash_client = UnleashClient::new(server.url("/").as_str(), None).unwrap(); - let features_cache: Arc> = Arc::new(DashMap::default()); + let features_cache: Arc = Arc::new(FeatureCache::default()); let engine_cache: Arc> = Arc::new(DashMap::default()); let feature_refresher = FeatureRefresher { unleash_client: Arc::new(unleash_client), @@ -985,8 +916,7 @@ mod tests { token.token_type = Some(TokenType::Client); let token_cache = DashMap::default(); token_cache.insert(token.token.clone(), token.clone()); - let upstream_features_cache: Arc> = - Arc::new(DashMap::default()); + let upstream_features_cache: Arc = Arc::new(FeatureCache::default()); let upstream_engine_cache: Arc> = Arc::new(DashMap::default()); let upstream_token_cache: Arc> = Arc::new(token_cache); let example_features = features_from_disk("../examples/features.json"); @@ -1002,7 +932,7 @@ mod tests { ) .await; let unleash_client = UnleashClient::new(server.url("/").as_str(), None).unwrap(); - let features_cache: Arc> = Arc::new(DashMap::default()); + let features_cache: Arc = Arc::new(FeatureCache::default()); let engine_cache: Arc> = Arc::new(DashMap::default()); let feature_refresher = FeatureRefresher { unleash_client: Arc::new(unleash_client), @@ -1037,8 +967,7 @@ mod tests { #[tokio::test] pub async fn when_we_have_a_cache_and_token_gets_removed_caches_are_emptied() { - let upstream_features_cache: Arc> = - Arc::new(DashMap::default()); + let upstream_features_cache: Arc = Arc::new(FeatureCache::default()); let upstream_engine_cache: Arc> = Arc::new(DashMap::default()); let upstream_token_cache: Arc> = Arc::new(DashMap::default()); let token_cache_to_modify = upstream_token_cache.clone(); @@ -1079,8 +1008,7 @@ mod tests { #[tokio::test] pub async fn removing_one_of_multiple_keys_from_same_environment_does_not_remove_feature_and_engine_caches( ) { - let upstream_features_cache: Arc> = - Arc::new(DashMap::default()); + let upstream_features_cache: Arc = Arc::new(FeatureCache::default()); let upstream_engine_cache: Arc> = Arc::new(DashMap::default()); let upstream_token_cache: Arc> = Arc::new(DashMap::default()); let token_cache_to_modify = upstream_token_cache.clone(); @@ -1130,8 +1058,7 @@ mod tests { #[tokio::test] pub async fn fetching_two_projects_from_same_environment_should_get_features_for_both_when_dynamic( ) { - let upstream_features_cache: Arc> = - Arc::new(DashMap::default()); + let upstream_features_cache: Arc = Arc::new(FeatureCache::default()); let upstream_engine_cache: Arc> = Arc::new(DashMap::default()); let upstream_token_cache: Arc> = Arc::new(DashMap::default()); let mut dx_token = EdgeToken::try_from("dx:development.secret123".to_string()).unwrap(); @@ -1188,8 +1115,7 @@ mod tests { #[tokio::test] pub async fn should_get_data_for_multi_project_token_even_if_we_have_data_for_one_of_the_projects_when_dynamic( ) { - let upstream_features_cache: Arc> = - Arc::new(DashMap::default()); + let upstream_features_cache: Arc = Arc::new(FeatureCache::default()); let upstream_engine_cache: Arc> = Arc::new(DashMap::default()); let upstream_token_cache: Arc> = Arc::new(DashMap::default()); let mut dx_token = EdgeToken::from_str("dx:development.secret123").unwrap(); @@ -1306,8 +1232,7 @@ mod tests { #[tokio::test] async fn refetching_data_when_feature_is_archived_should_remove_archived_feature() { - let upstream_features_cache: Arc> = - Arc::new(DashMap::default()); + let upstream_features_cache: Arc = Arc::new(FeatureCache::default()); let upstream_engine_cache: Arc> = Arc::new(DashMap::default()); let upstream_token_cache: Arc> = Arc::new(DashMap::default()); let mut eg_token = EdgeToken::from_str("eg:development.devsecret").unwrap(); @@ -1326,7 +1251,7 @@ mod tests { upstream_engine_cache, ) .await; - let features_cache: Arc> = Arc::new(DashMap::default()); + let features_cache: Arc = Arc::new(FeatureCache::default()); let unleash_client = UnleashClient::new(server.url("/").as_str(), None).unwrap(); let feature_refresher = FeatureRefresher { unleash_client: Arc::new(unleash_client), @@ -1369,7 +1294,7 @@ mod tests { token.status = TokenValidationStatus::Validated; token.projects = vec![String::from("dx")]; - let updated = super::update_projects_from_feature_update(&token, &features, &dx_data); + let updated = update_projects_from_feature_update(&token, &features, &dx_data); assert_ne!( features .iter() @@ -1415,7 +1340,7 @@ mod tests { projects: vec![String::from("dx"), String::from("eg")], status: TokenValidationStatus::Validated, }; - let update = super::update_projects_from_feature_update(&edge_token, &features, &dx_data); + let update = update_projects_from_feature_update(&edge_token, &features, &dx_data); assert_eq!(features.len() - update.len(), 2); // We've removed two elements } @@ -1435,7 +1360,7 @@ mod tests { .filter(|f| f.project == Some("eg".into())) .cloned() .collect(); - let update = super::update_projects_from_feature_update(&edge_token, &features, &eg_data); + let update = update_projects_from_feature_update(&edge_token, &features, &eg_data); assert!(!update.iter().any(|p| p.project == Some(String::from("dx")))); } #[test] @@ -1453,7 +1378,7 @@ mod tests { .filter(|f| f.project == Some("eg".into())) .cloned() .collect(); - let update = super::update_projects_from_feature_update(&edge_token, &features, &eg_data); + let update = update_projects_from_feature_update(&edge_token, &features, &eg_data); assert_eq!( update .iter() @@ -1508,7 +1433,7 @@ mod tests { .filter(|t| t.project == Some("default".into())) .cloned() .collect(); - let updated = super::update_projects_from_feature_update(&edge_token, &features, &update); + let updated = update_projects_from_feature_update(&edge_token, &features, &update); assert_eq!(updated.len(), 1); assert!(updated.iter().all(|f| f.project == Some("default".into()))) } @@ -1555,7 +1480,7 @@ mod tests { projects: vec![String::from("someother")], status: TokenValidationStatus::Validated, }; - let updated = super::update_projects_from_feature_update( + let updated = update_projects_from_feature_update( &unrelated_token_to_existing_features, &features, &empty_features, @@ -1604,7 +1529,7 @@ mod tests { projects: vec![String::from("testproject"), String::from("someother")], status: TokenValidationStatus::Validated, }; - let updated = super::update_projects_from_feature_update( + let updated = update_projects_from_feature_update( &token_with_access_to_both_empty_and_full_project, &features, &empty_features, diff --git a/server/src/http/unleash_client.rs b/server/src/http/unleash_client.rs index 6397e842..b648e45e 100644 --- a/server/src/http/unleash_client.rs +++ b/server/src/http/unleash_client.rs @@ -575,7 +575,7 @@ mod tests { ) .service( web::resource("/api/edge/validate") - .route(web::post().to(|| HttpResponse::Forbidden())), + .route(web::post().to(HttpResponse::Forbidden)), ), |_| AppConfig::default(), )) diff --git a/server/src/internal_backstage.rs b/server/src/internal_backstage.rs index dc8bb76a..23c76f35 100644 --- a/server/src/internal_backstage.rs +++ b/server/src/internal_backstage.rs @@ -10,13 +10,13 @@ use serde::{Deserialize, Serialize}; use unleash_types::client_features::ClientFeatures; use unleash_types::client_metrics::ClientApplication; -use crate::error::EdgeError; use crate::http::feature_refresher::FeatureRefresher; use crate::metrics::actix_web_metrics::PrometheusMetricsHandler; use crate::metrics::client_metrics::MetricsCache; use crate::types::{BuildInfo, EdgeJsonResult, EdgeToken, TokenInfo, TokenRefresh}; use crate::types::{ClientMetric, MetricsInfo, Status}; use crate::{auth::token_validator::TokenValidator, cli::InternalBackstageArgs}; +use crate::{error::EdgeError, feature_cache::FeatureCache}; #[derive(Debug, Serialize, Deserialize)] pub struct EdgeStatus { @@ -54,7 +54,7 @@ pub async fn info() -> EdgeJsonResult { #[get("/ready")] pub async fn ready( token_cache: web::Data>, - features_cache: web::Data>, + features_cache: web::Data, ) -> EdgeJsonResult { if !token_cache.is_empty() && features_cache.is_empty() { Err(EdgeError::NotReady) @@ -137,7 +137,7 @@ pub async fn metrics_batch(metrics_cache: web::Data) -> EdgeJsonRe #[get("/features")] pub async fn features( - features_cache: web::Data>, + features_cache: web::Data, ) -> EdgeJsonResult> { let features = features_cache .iter() @@ -181,6 +181,7 @@ mod tests { use unleash_yggdrasil::EngineState; use crate::auth::token_validator::TokenValidator; + use crate::feature_cache::FeatureCache; use crate::http::feature_refresher::FeatureRefresher; use crate::http::unleash_client::UnleashClient; use crate::internal_backstage::EdgeStatus; @@ -222,7 +223,7 @@ mod tests { #[actix_web::test] async fn test_ready_endpoint_with_tokens_without_toggles() { - let client_features: DashMap = DashMap::default(); + let client_features = FeatureCache::default(); let client_features_arc = Arc::new(client_features); let token_cache: DashMap = DashMap::default(); let token = EdgeToken::from_str("[]:fancyenvironment.somerandomsecretstring").unwrap(); @@ -256,7 +257,7 @@ mod tests { segments: None, version: 2, }; - let client_features: DashMap = DashMap::default(); + let client_features = FeatureCache::default(); client_features.insert( "testproject:testenvironment.testtoken".into(), features.clone(), @@ -285,7 +286,7 @@ mod tests { #[actix_web::test] async fn test_ready_endpoint_without_tokens_and_toggles() { - let client_features: DashMap = DashMap::default(); + let client_features = FeatureCache::default(); let client_features_arc = Arc::new(client_features); let token_cache: DashMap = DashMap::default(); let token_cache_arc = Arc::new(token_cache); @@ -310,7 +311,7 @@ mod tests { async fn if_no_tokens_has_been_received_returns_empty_lists() { let upstream_server = upstream_server( Arc::new(DashMap::default()), - Arc::new(DashMap::default()), + Arc::new(FeatureCache::default()), Arc::new(DashMap::default()), ) .await; @@ -349,8 +350,7 @@ mod tests { #[actix_web::test] async fn returns_validated_tokens_when_dynamic() { - let upstream_features_cache: Arc> = - Arc::new(DashMap::default()); + let upstream_features_cache = Arc::new(FeatureCache::default()); let upstream_token_cache: Arc> = Arc::new(DashMap::default()); let upstream_engine_cache: Arc> = Arc::new(DashMap::default()); let server = upstream_server( @@ -369,7 +369,7 @@ mod tests { ); upstream_features_cache.insert(cache_key(&upstream_known_token), upstream_features.clone()); let unleash_client = Arc::new(UnleashClient::new(server.url("/").as_str(), None).unwrap()); - let features_cache: Arc> = Arc::new(DashMap::default()); + let features_cache: Arc = Arc::new(FeatureCache::default()); let token_cache: Arc> = Arc::new(DashMap::default()); let engine_cache: Arc> = Arc::new(DashMap::default()); let feature_refresher = Arc::new(FeatureRefresher { @@ -421,8 +421,7 @@ mod tests { #[actix_web::test] async fn returns_validated_tokens_when_strict() { - let upstream_features_cache: Arc> = - Arc::new(DashMap::default()); + let upstream_features_cache = Arc::new(FeatureCache::default()); let upstream_token_cache: Arc> = Arc::new(DashMap::default()); let upstream_engine_cache: Arc> = Arc::new(DashMap::default()); let server = upstream_server( @@ -441,7 +440,7 @@ mod tests { ); upstream_features_cache.insert(cache_key(&upstream_known_token), upstream_features.clone()); let unleash_client = Arc::new(UnleashClient::new(server.url("/").as_str(), None).unwrap()); - let features_cache: Arc> = Arc::new(DashMap::default()); + let features_cache: Arc = Arc::new(FeatureCache::default()); let token_cache: Arc> = Arc::new(DashMap::default()); let engine_cache: Arc> = Arc::new(DashMap::default()); let feature_refresher = Arc::new(FeatureRefresher { diff --git a/server/src/lib.rs b/server/src/lib.rs index d4f4e631..337f0e2f 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -7,6 +7,7 @@ pub mod client_api; pub mod edge_api; #[cfg(not(tarpaulin_include))] pub mod error; +pub mod feature_cache; pub mod filters; pub mod frontend_api; pub mod health_checker; @@ -20,7 +21,6 @@ pub mod openapi; pub mod persistence; #[cfg(not(tarpaulin_include))] pub mod prom_metrics; - pub mod ready_checker; #[cfg(not(tarpaulin_include))] pub mod tls; @@ -45,6 +45,7 @@ mod tests { use unleash_yggdrasil::EngineState; use crate::auth::token_validator::TokenValidator; + use crate::feature_cache::FeatureCache; use crate::metrics::client_metrics::MetricsCache; use crate::types::EdgeToken; @@ -57,7 +58,7 @@ mod tests { pub async fn upstream_server( upstream_token_cache: Arc>, - upstream_features_cache: Arc>, + upstream_features_cache: Arc, upstream_engine_cache: Arc>, ) -> TestServer { let token_validator = Arc::new(TokenValidator { diff --git a/server/src/main.rs b/server/src/main.rs index 811f8bbe..cad32a2d 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -15,6 +15,7 @@ use utoipa_swagger_ui::SwaggerUi; use tracing::info; use unleash_edge::builder::build_caches_and_refreshers; use unleash_edge::cli::{CliArgs, EdgeMode}; +use unleash_edge::feature_cache::FeatureCache; use unleash_edge::http::background_send_metrics::send_metrics_one_shot; use unleash_edge::http::feature_refresher::FeatureRefresher; use unleash_edge::metrics::client_metrics::MetricsCache; @@ -28,6 +29,8 @@ use unleash_edge::{internal_backstage, tls}; #[cfg(not(tarpaulin_include))] #[actix_web::main] async fn main() -> Result<(), anyhow::Error> { + #[cfg(feature = "streaming")] + use unleash_edge::http::broadcaster::Broadcaster; use unleash_edge::metrics::metrics_pusher; let args = CliArgs::parse(); @@ -97,6 +100,12 @@ async fn main() -> Result<(), anyhow::Error> { .app_data(web::Data::from(token_cache.clone())) .app_data(web::Data::from(features_cache.clone())) .app_data(web::Data::from(engine_cache.clone())); + + #[cfg(feature = "streaming")] + { + let broadcaster = Broadcaster::new(features_cache.clone()); + app = app.app_data(web::Data::from(broadcaster.clone())); + } app = match token_validator.clone() { Some(v) => app.app_data(web::Data::from(v)), None => app, @@ -163,54 +172,29 @@ async fn main() -> Result<(), anyhow::Error> { let validator = token_validator_schedule.clone().unwrap(); - if cfg!(feature = "streaming") { - tokio::select! { - _ = server.run() => { - tracing::info!("Actix is shutting down. Persisting data"); - clean_shutdown(persistence.clone(), lazy_feature_cache.clone(), lazy_token_cache.clone(), metrics_cache_clone.clone(), feature_refresher.clone()).await; - tracing::info!("Actix was shutdown properly"); - }, - _ = unleash_edge::http::background_send_metrics::send_metrics_task(metrics_cache_clone.clone(), refresher.clone(), edge.metrics_interval_seconds.try_into().unwrap()) => { - tracing::info!("Metrics poster unexpectedly shut down"); - } - _ = persist_data(persistence.clone(), lazy_token_cache.clone(), lazy_feature_cache.clone()) => { - tracing::info!("Persister was unexpectedly shut down"); - } - _ = validator.schedule_validation_of_known_tokens(edge.token_revalidation_interval_seconds) => { - tracing::info!("Token validator validation of known tokens was unexpectedly shut down"); - } - _ = validator.schedule_revalidation_of_startup_tokens(edge.tokens, lazy_feature_refresher) => { - tracing::info!("Token validator validation of startup tokens was unexpectedly shut down"); - } - _ = metrics_pusher::prometheus_remote_write(prom_registry_for_write, edge.prometheus_remote_write_url, edge.prometheus_push_interval, edge.prometheus_username, edge.prometheus_password, app_name) => { - tracing::info!("Prometheus push unexpectedly shut down"); - } + tokio::select! { + _ = server.run() => { + tracing::info!("Actix is shutting down. Persisting data"); + clean_shutdown(persistence.clone(), lazy_feature_cache.clone(), lazy_token_cache.clone(), metrics_cache_clone.clone(), feature_refresher.clone()).await; + tracing::info!("Actix was shutdown properly"); + }, + _ = refresher.start_refresh_features_background_task() => { + tracing::info!("Feature refresher unexpectedly shut down"); + } + _ = unleash_edge::http::background_send_metrics::send_metrics_task(metrics_cache_clone.clone(), refresher.clone(), edge.metrics_interval_seconds.try_into().unwrap()) => { + tracing::info!("Metrics poster unexpectedly shut down"); + } + _ = persist_data(persistence.clone(), lazy_token_cache.clone(), lazy_feature_cache.clone()) => { + tracing::info!("Persister was unexpectedly shut down"); + } + _ = validator.schedule_validation_of_known_tokens(edge.token_revalidation_interval_seconds) => { + tracing::info!("Token validator validation of known tokens was unexpectedly shut down"); + } + _ = validator.schedule_revalidation_of_startup_tokens(edge.tokens, lazy_feature_refresher) => { + tracing::info!("Token validator validation of startup tokens was unexpectedly shut down"); } - } else { - tokio::select! { - _ = server.run() => { - tracing::info!("Actix is shutting down. Persisting data"); - clean_shutdown(persistence.clone(), lazy_feature_cache.clone(), lazy_token_cache.clone(), metrics_cache_clone.clone(), feature_refresher.clone()).await; - tracing::info!("Actix was shutdown properly"); - }, - _ = refresher.start_refresh_features_background_task() => { - tracing::info!("Feature refresher unexpectedly shut down"); - } - _ = unleash_edge::http::background_send_metrics::send_metrics_task(metrics_cache_clone.clone(), refresher.clone(), edge.metrics_interval_seconds.try_into().unwrap()) => { - tracing::info!("Metrics poster unexpectedly shut down"); - } - _ = persist_data(persistence.clone(), lazy_token_cache.clone(), lazy_feature_cache.clone()) => { - tracing::info!("Persister was unexpectedly shut down"); - } - _ = validator.schedule_validation_of_known_tokens(edge.token_revalidation_interval_seconds) => { - tracing::info!("Token validator validation of known tokens was unexpectedly shut down"); - } - _ = validator.schedule_revalidation_of_startup_tokens(edge.tokens, lazy_feature_refresher) => { - tracing::info!("Token validator validation of startup tokens was unexpectedly shut down"); - } - _ = metrics_pusher::prometheus_remote_write(prom_registry_for_write, edge.prometheus_remote_write_url, edge.prometheus_push_interval, edge.prometheus_username, edge.prometheus_password, app_name) => { - tracing::info!("Prometheus push unexpectedly shut down"); - } + _ = metrics_pusher::prometheus_remote_write(prom_registry_for_write, edge.prometheus_remote_write_url, edge.prometheus_push_interval, edge.prometheus_username, edge.prometheus_password, app_name) => { + tracing::info!("Prometheus push unexpectedly shut down"); } } } @@ -240,7 +224,7 @@ async fn main() -> Result<(), anyhow::Error> { #[cfg(not(tarpaulin_include))] async fn clean_shutdown( persistence: Option>, - feature_cache: Arc>, + feature_cache: Arc, token_cache: Arc>, metrics_cache: Arc, feature_refresher: Option>, diff --git a/server/src/middleware/client_token_from_frontend_token.rs b/server/src/middleware/client_token_from_frontend_token.rs index 6e354ba8..b33eb17c 100644 --- a/server/src/middleware/client_token_from_frontend_token.rs +++ b/server/src/middleware/client_token_from_frontend_token.rs @@ -60,10 +60,10 @@ mod tests { use chrono::Duration; use dashmap::DashMap; use reqwest::{StatusCode, Url}; - use unleash_types::client_features::ClientFeatures; use unleash_yggdrasil::EngineState; use crate::auth::token_validator::TokenValidator; + use crate::feature_cache::FeatureCache; use crate::http::feature_refresher::FeatureRefresher; use crate::http::unleash_client::{new_reqwest_client, UnleashClient}; use crate::tests::upstream_server; @@ -72,7 +72,7 @@ mod tests { pub async fn local_server( unleash_client: Arc, local_token_cache: Arc>, - local_features_cache: Arc>, + local_features_cache: Arc, local_engine_cache: Arc>, ) -> TestServer { let token_validator = Arc::new(TokenValidator { @@ -120,8 +120,7 @@ mod tests { let mut frontend_token = EdgeToken::from_str("*:development.frontendtoken").unwrap(); frontend_token.status = TokenValidationStatus::Validated; frontend_token.token_type = Some(TokenType::Frontend); - let upstream_features_cache: Arc> = - Arc::new(DashMap::default()); + let upstream_features_cache = Arc::new(FeatureCache::default()); let upstream_token_cache: Arc> = Arc::new(DashMap::default()); upstream_token_cache.insert(frontend_token.token.clone(), frontend_token.clone()); let upstream_engine_cache: Arc> = Arc::new(DashMap::default()); @@ -148,8 +147,7 @@ mod tests { "test-client".into(), http_client, ); - let local_features_cache: Arc> = - Arc::new(DashMap::default()); + let local_features_cache: Arc = Arc::new(FeatureCache::default()); let local_token_cache: Arc> = Arc::new(DashMap::default()); let local_engine_cache: Arc> = Arc::new(DashMap::default()); diff --git a/server/src/offline/offline_hotload.rs b/server/src/offline/offline_hotload.rs index 45ce9a69..d51e1994 100644 --- a/server/src/offline/offline_hotload.rs +++ b/server/src/offline/offline_hotload.rs @@ -16,10 +16,10 @@ use unleash_types::client_features::{ }; use unleash_yggdrasil::EngineState; -use crate::{cli::OfflineArgs, error::EdgeError, types::EdgeToken}; +use crate::{cli::OfflineArgs, error::EdgeError, feature_cache::FeatureCache, types::EdgeToken}; pub async fn start_hotload_loop( - features_cache: Arc>, + features_cache: Arc, engine_cache: Arc>, offline_args: OfflineArgs, ) { @@ -60,7 +60,7 @@ pub async fn start_hotload_loop( pub(crate) fn load_offline_engine_cache( edge_token: &EdgeToken, - features_cache: Arc>, + features_cache: Arc, engine_cache: Arc>, client_features: ClientFeatures, ) { diff --git a/server/src/persistence/mod.rs b/server/src/persistence/mod.rs index 122f3d95..8561de19 100644 --- a/server/src/persistence/mod.rs +++ b/server/src/persistence/mod.rs @@ -1,12 +1,12 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; +use crate::feature_cache::FeatureCache; +use crate::types::{EdgeResult, EdgeToken, TokenValidationStatus}; use async_trait::async_trait; use dashmap::DashMap; use tracing::{debug, warn}; use unleash_types::client_features::ClientFeatures; -use crate::types::{EdgeResult, EdgeToken, TokenValidationStatus}; - pub mod file; pub mod redis; pub mod s3; @@ -23,7 +23,7 @@ pub trait EdgePersistence: Send + Sync { pub async fn persist_data( persistence: Option>, token_cache: Arc>, - features_cache: Arc>, + features_cache: Arc, ) { loop { tokio::select! { @@ -63,10 +63,7 @@ async fn save_known_tokens( } } -async fn save_features( - features_cache: &Arc>, - persister: &Arc, -) { +async fn save_features(features_cache: &FeatureCache, persister: &Arc) { if !features_cache.is_empty() { match persister .save_features( @@ -119,7 +116,7 @@ pub mod tests { let cache: DashMap = DashMap::new(); let persister = build_mock_persistence(); - save_features(&Arc::new(cache), &persister.clone()).await; + save_features(&Arc::new(FeatureCache::new(cache)), &persister.clone()).await; } #[tokio::test] diff --git a/server/src/ready_checker.rs b/server/src/ready_checker.rs index 39f6c75e..8328a849 100644 --- a/server/src/ready_checker.rs +++ b/server/src/ready_checker.rs @@ -74,6 +74,7 @@ mod tests { use unleash_types::client_features::{ClientFeature, ClientFeatures}; use crate::cli::ReadyCheckArgs; + use crate::feature_cache::FeatureCache; use crate::internal_backstage::ready; use crate::ready_checker::check_ready; use crate::types::EdgeToken; @@ -89,7 +90,7 @@ mod tests { segments: None, version: 2, }; - let client_features: DashMap = DashMap::default(); + let client_features: FeatureCache = FeatureCache::default(); client_features.insert( "testproject:testenvironment.testtoken".into(), features.clone(), diff --git a/server/tests/streaming_test.rs b/server/tests/streaming_test.rs index a88578c0..4593b0ab 100644 --- a/server/tests/streaming_test.rs +++ b/server/tests/streaming_test.rs @@ -1,265 +1,221 @@ -use dashmap::DashMap; -use eventsource_client::Client; -use futures::StreamExt; -use reqwest::Url; -use std::{fs, io::BufReader, path::PathBuf, str::FromStr, sync::Arc}; -use unleash_edge::{ - http::{ - broadcaster::Broadcaster, feature_refresher::FeatureRefresher, - unleash_client::UnleashClient, - }, - tokens::cache_key, - types::{EdgeToken, TokenType, TokenValidationStatus}, -}; -use unleash_types::client_features::{ClientFeatures, Query}; - -pub fn features_from_disk(path: &str) -> ClientFeatures { - let path = PathBuf::from(path); - let file = fs::File::open(path).unwrap(); - let reader = BufReader::new(file); - serde_json::from_reader(reader).unwrap() -} - -#[actix_web::test] -async fn test_streaming() { - let unleash_features_cache: Arc> = Arc::new(DashMap::default()); - let unleash_token_cache: Arc> = Arc::new(DashMap::default()); - let unleash_broadcaster = Broadcaster::new(unleash_features_cache.clone()); - - let unleash_server = upstream_server( - unleash_token_cache.clone(), - unleash_features_cache.clone(), - Arc::new(DashMap::default()), - unleash_broadcaster.clone(), - ) - .await; - - let mut upstream_known_token = EdgeToken::from_str("dx:development.secret123").unwrap(); - upstream_known_token.status = TokenValidationStatus::Validated; - upstream_known_token.token_type = Some(TokenType::Client); - unleash_token_cache.insert( - upstream_known_token.token.clone(), - upstream_known_token.clone(), - ); - - unleash_features_cache.insert( - cache_key(&upstream_known_token), - features_from_disk("../examples/features.json"), - ); - - let edge = edge_server(&unleash_server.url("/"), upstream_known_token.clone()).await; - - // Allow edge to establish a connection with upstream and populate the cache - tokio::time::sleep(std::time::Duration::from_secs(1)).await; +#[cfg(feature = "streaming")] +mod streaming_test { + use dashmap::DashMap; + use eventsource_client::Client; + use futures::StreamExt; + use std::{ + fs, + io::BufReader, + path::PathBuf, + process::{Command, Stdio}, + str::FromStr, + sync::Arc, + }; + use unleash_edge::{ + feature_cache::FeatureCache, + http::broadcaster::Broadcaster, + tokens::cache_key, + types::{EdgeToken, TokenType, TokenValidationStatus}, + }; + use unleash_types::client_features::{ClientFeatures, Query}; + + pub fn features_from_disk(path: &str) -> ClientFeatures { + let path = PathBuf::from(path); + let file = fs::File::open(path).unwrap(); + let reader = BufReader::new(file); + serde_json::from_reader(reader).unwrap() + } + + #[actix_web::test] + async fn test_streaming() { + let unleash_features_cache: Arc = + Arc::new(FeatureCache::new(DashMap::default())); + let unleash_token_cache: Arc> = Arc::new(DashMap::default()); + let unleash_broadcaster = Broadcaster::new(unleash_features_cache.clone()); + + let unleash_server = upstream_server( + unleash_token_cache.clone(), + unleash_features_cache.clone(), + Arc::new(DashMap::default()), + unleash_broadcaster.clone(), + ) + .await; - let es_client = eventsource_client::ClientBuilder::for_url(&edge.url("/api/client/streaming")) + let mut upstream_known_token = EdgeToken::from_str("dx:development.secret123").unwrap(); + upstream_known_token.status = TokenValidationStatus::Validated; + upstream_known_token.token_type = Some(TokenType::Client); + unleash_token_cache.insert( + upstream_known_token.token.clone(), + upstream_known_token.clone(), + ); + + unleash_features_cache.insert( + cache_key(&upstream_known_token), + features_from_disk("../examples/features.json"), + ); + + let mut edge = Command::new("./../target/debug/unleash-edge") + .arg("edge") + .arg("--upstream-url") + .arg(&unleash_server.url("/")) + .arg("--strict") + .arg("-t") + .arg(&upstream_known_token.token) + .stdout(Stdio::null()) // Suppress stdout + .stderr(Stdio::null()) // Suppress stderr + .spawn() + .expect("Failed to start the app"); + + // Allow edge to establish a connection with upstream and populate the cache + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + + // let es_client = eventsource_client::ClientBuilder::for_url(&edge.url("/api/client/streaming")) + let es_client = eventsource_client::ClientBuilder::for_url( + "http://localhost:3063/api/client/streaming", + ) .unwrap() .header("Authorization", &upstream_known_token.token) .unwrap() .build(); - let initial_features = ClientFeatures { - features: vec![], - version: 2, - segments: None, - query: Some(Query { - tags: None, - projects: Some(vec!["dx".into()]), - name_prefix: None, - environment: Some("development".into()), - inline_segment_constraints: Some(false), - }), - }; - - let mut stream = es_client.stream(); + let initial_features = ClientFeatures { + features: vec![], + version: 2, + segments: None, + query: Some(Query { + tags: None, + projects: Some(vec!["dx".into()]), + name_prefix: None, + environment: Some("development".into()), + inline_segment_constraints: Some(false), + }), + }; - tokio::time::timeout(std::time::Duration::from_secs(2), async { - loop { - if let Some(Ok(event)) = stream.next().await { - match event { - eventsource_client::SSE::Event(event) - if event.event_type == "unleash-connected" => - { - assert_eq!( - serde_json::from_str::(&event.data).unwrap(), - initial_features - ); - break; - } - _ => { - // ignore other events + let mut stream = es_client.stream(); + + if let Err(_) = tokio::time::timeout(std::time::Duration::from_secs(2), async { + loop { + if let Some(Ok(event)) = stream.next().await { + match event { + eventsource_client::SSE::Event(event) + if event.event_type == "unleash-connected" => + { + assert_eq!( + serde_json::from_str::(&event.data).unwrap(), + initial_features + ); + println!("Connected event received; features match expected"); + break; + } + _ => { + // ignore other events + } } } } + }) + .await + { + // If the test times out, kill the app process and fail the test + edge.kill().expect("Failed to kill the app process"); + edge.wait().expect("Failed to wait for the app process"); + panic!("Test timed out waiting for connected event"); } - }) - .await - .expect("Test timed out waiting for connected event"); - - unleash_features_cache.insert( - cache_key(&upstream_known_token), - features_from_disk("../examples/hostedexample.json"), - ); - unleash_broadcaster.broadcast().await; - tokio::time::timeout(std::time::Duration::from_secs(2), async { - loop { - if let Some(Ok(event)) = stream.next().await { - match event { - eventsource_client::SSE::Event(event) - if event.event_type == "unleash-updated" => - { - let update = serde_json::from_str::(&event.data).unwrap(); - assert_eq!(initial_features.query, update.query); - assert_eq!(initial_features.version, update.version); - assert_ne!(initial_features.features, update.features); - break; - } - _ => { - // panic!("Unexpected event: {:?}", event); - // ignore other events + unleash_features_cache.insert( + cache_key(&upstream_known_token), + features_from_disk("../examples/hostedexample.json"), + ); + + if let Err(_) = tokio::time::timeout(std::time::Duration::from_secs(2), async { + loop { + if let Some(Ok(event)) = stream.next().await { + match event { + eventsource_client::SSE::Event(event) + if event.event_type == "unleash-updated" => + { + let update = + serde_json::from_str::(&event.data).unwrap(); + assert_eq!(initial_features.query, update.query); + assert_eq!(initial_features.version, update.version); + assert_ne!(initial_features.features, update.features); + println!("Updated event received; features match expected"); + break; + } + _ => { + // ignore other events + } } } } + }) + .await + { + // If the test times out, kill the app process and fail the test + edge.kill().expect("Failed to kill the app process"); + edge.wait().expect("Failed to wait for the app process"); + panic!("Test timed out waiting for update event"); } - }) - .await - .expect("Test timed out waiting for update event"); -} - -use actix_http::HttpService; -use actix_http_test::{test_server, TestServer}; -use actix_service::map_config; -use actix_web::dev::AppConfig; -use actix_web::{web, App}; -use chrono::Duration; -use unleash_types::client_metrics::ConnectVia; -use unleash_yggdrasil::EngineState; - -use unleash_edge::auth::token_validator::TokenValidator; -use unleash_edge::http::unleash_client::new_reqwest_client; -use unleash_edge::metrics::client_metrics::MetricsCache; - -async fn edge_server(upstream_url: &str, token: EdgeToken) -> TestServer { - let unleash_client = Arc::new(UnleashClient::from_url( - Url::parse(upstream_url).unwrap(), - "Authorization".into(), - new_reqwest_client( - "something".into(), - false, - None, - None, - Duration::seconds(5), - Duration::seconds(5), - "test-client".into(), - ) - .unwrap(), - )); - let features_cache: Arc> = Arc::new(DashMap::default()); - let token_cache: Arc> = Arc::new(DashMap::default()); - token_cache.insert(token.token.clone(), token.clone()); - let engine_cache: Arc> = Arc::new(DashMap::default()); - let broadcaster = Broadcaster::new(features_cache.clone()); - let feature_refresher = Arc::new(FeatureRefresher { - unleash_client: unleash_client.clone(), - features_cache: features_cache.clone(), - engine_cache: engine_cache.clone(), - refresh_interval: Duration::seconds(6000), - broadcaster: broadcaster.clone(), - ..Default::default() - }); - let token_validator = Arc::new(TokenValidator { - unleash_client: unleash_client.clone(), - token_cache: token_cache.clone(), - persistence: None, - }); - feature_refresher - .register_token_for_refresh(token.clone(), None) - .await; - let refresher_for_background = feature_refresher.clone(); - - let handle = tokio::spawn(async move { - let _ = refresher_for_background - .start_streaming_features_background_task() - .await; - }); - - handle.await.unwrap(); - test_server(move || { - let config = - serde_qs::actix::QsQueryConfig::default().qs_config(serde_qs::Config::new(5, false)); - let metrics_cache = MetricsCache::default(); - let connect_via = ConnectVia { - app_name: "edge".into(), - instance_id: "testinstance".into(), - }; - HttpService::new(map_config( - App::new() - .app_data(config) - .app_data(web::Data::from(token_validator.clone())) - .app_data(web::Data::from(features_cache.clone())) - .app_data(web::Data::from(broadcaster.clone())) - .app_data(web::Data::from(engine_cache.clone())) - .app_data(web::Data::from(token_cache.clone())) - .app_data(web::Data::new(metrics_cache)) - .app_data(web::Data::new(connect_via)) - .app_data(web::Data::from(feature_refresher.clone())) - .service( - web::scope("/api") - .configure(unleash_edge::client_api::configure_client_api) - .configure(|cfg| { - unleash_edge::frontend_api::configure_frontend_api(cfg, false) - }), - ) - .service(web::scope("/edge").configure(unleash_edge::edge_api::configure_edge_api)), - |_| AppConfig::default(), - )) - .tcp() - }) - .await -} -async fn upstream_server( - upstream_token_cache: Arc>, - upstream_features_cache: Arc>, - upstream_engine_cache: Arc>, - upstream_broadcaster: Arc, -) -> TestServer { - let token_validator = Arc::new(TokenValidator { - unleash_client: Arc::new(Default::default()), - token_cache: upstream_token_cache.clone(), - persistence: None, - }); - - test_server(move || { - let config = - serde_qs::actix::QsQueryConfig::default().qs_config(serde_qs::Config::new(5, false)); - let metrics_cache = MetricsCache::default(); - let connect_via = ConnectVia { - app_name: "edge".into(), - instance_id: "testinstance".into(), - }; - HttpService::new(map_config( - App::new() - .app_data(config) - .app_data(web::Data::from(token_validator.clone())) - .app_data(web::Data::from(upstream_features_cache.clone())) - .app_data(web::Data::from(upstream_broadcaster.clone())) - .app_data(web::Data::from(upstream_engine_cache.clone())) - .app_data(web::Data::from(upstream_token_cache.clone())) - .app_data(web::Data::new(metrics_cache)) - .app_data(web::Data::new(connect_via)) - .service( - web::scope("/api") - .configure(unleash_edge::client_api::configure_client_api) - .configure(|cfg| { - unleash_edge::frontend_api::configure_frontend_api(cfg, false) - }), - ) - .service(web::scope("/edge").configure(unleash_edge::edge_api::configure_edge_api)), - |_| AppConfig::default(), - )) - .tcp() - }) - .await + edge.kill().expect("Failed to kill the app process"); + edge.wait().expect("Failed to wait for the app process"); + } + + use actix_http::HttpService; + use actix_http_test::{test_server, TestServer}; + use actix_service::map_config; + use actix_web::dev::AppConfig; + use actix_web::{web, App}; + use unleash_types::client_metrics::ConnectVia; + use unleash_yggdrasil::EngineState; + + use unleash_edge::auth::token_validator::TokenValidator; + use unleash_edge::metrics::client_metrics::MetricsCache; + + async fn upstream_server( + upstream_token_cache: Arc>, + upstream_features_cache: Arc, + upstream_engine_cache: Arc>, + upstream_broadcaster: Arc, + ) -> TestServer { + let token_validator = Arc::new(TokenValidator { + unleash_client: Arc::new(Default::default()), + token_cache: upstream_token_cache.clone(), + persistence: None, + }); + + test_server(move || { + let config = serde_qs::actix::QsQueryConfig::default() + .qs_config(serde_qs::Config::new(5, false)); + let metrics_cache = MetricsCache::default(); + let connect_via = ConnectVia { + app_name: "edge".into(), + instance_id: "testinstance".into(), + }; + HttpService::new(map_config( + App::new() + .app_data(config) + .app_data(web::Data::from(token_validator.clone())) + .app_data(web::Data::from(upstream_features_cache.clone())) + .app_data(web::Data::from(upstream_broadcaster.clone())) + .app_data(web::Data::from(upstream_engine_cache.clone())) + .app_data(web::Data::from(upstream_token_cache.clone())) + .app_data(web::Data::new(metrics_cache)) + .app_data(web::Data::new(connect_via)) + .service( + web::scope("/api") + .configure(unleash_edge::client_api::configure_client_api) + .configure(|cfg| { + unleash_edge::frontend_api::configure_frontend_api(cfg, false) + }), + ) + .service( + web::scope("/edge").configure(unleash_edge::edge_api::configure_edge_api), + ), + |_| AppConfig::default(), + )) + .tcp() + }) + .await + } }