From dc5e88d9e703b5f07c84f22985909d2e1ab9781e Mon Sep 17 00:00:00 2001 From: Simon Hornby Date: Tue, 7 May 2024 16:37:49 +0200 Subject: [PATCH] fix: remove feature refresher (#457) --- server/src/builder.rs | 16 ------ server/src/main.rs | 17 ++---- server/src/persistence/file.rs | 94 ++------------------------------- server/src/persistence/mod.rs | 44 +-------------- server/src/persistence/redis.rs | 32 +---------- server/tests/redis_test.rs | 28 ++-------- 6 files changed, 13 insertions(+), 218 deletions(-) diff --git a/server/src/builder.rs b/server/src/builder.rs index 389d8959..f8701f47 100644 --- a/server/src/builder.rs +++ b/server/src/builder.rs @@ -48,7 +48,6 @@ fn build_caches() -> CacheContainer { async fn hydrate_from_persistent_storage( cache: CacheContainer, - feature_refresher: Arc, storage: Arc, ) { let (token_cache, features_cache, engine_cache) = cache; @@ -60,13 +59,6 @@ async fn hydrate_from_persistent_storage( warn!("Failed to load features from cache {error:?}"); Default::default() }); - let refresh_targets = storage - .load_refresh_targets() - .await - .unwrap_or_else(|error| { - warn!("Failed to load refresh targets from cache {error:?}"); - vec![] - }); for token in tokens { tracing::debug!("Hydrating tokens {token:?}"); token_cache.insert(token.token.clone(), token); @@ -83,13 +75,6 @@ async fn hydrate_from_persistent_storage( } engine_cache.insert(key.clone(), engine_state); } - - for target in refresh_targets { - tracing::debug!("Hydrating refresh target for {target:?}"); - feature_refresher - .tokens_to_refresh - .insert(target.token.token.clone(), target); - } } pub(crate) fn build_offline_mode( @@ -207,7 +192,6 @@ async fn build_edge(args: &EdgeArgs) -> EdgeResult { feature_cache.clone(), engine_cache.clone(), ), - feature_refresher.clone(), persistence, ) .await; diff --git a/server/src/main.rs b/server/src/main.rs index 9e0de24a..7022634c 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -18,7 +18,7 @@ use unleash_edge::metrics::client_metrics::MetricsCache; use unleash_edge::middleware::request_tracing::RequestTracing; use unleash_edge::offline::offline_hotload; use unleash_edge::persistence::{persist_data, EdgePersistence}; -use unleash_edge::types::{EdgeToken, TokenRefresh, TokenValidationStatus}; +use unleash_edge::types::{EdgeToken, TokenValidationStatus}; use unleash_edge::{cli, client_api, frontend_api, health_checker, openapi, ready_checker}; use unleash_edge::{edge_api, prom_metrics}; use unleash_edge::{internal_backstage, tls}; @@ -147,7 +147,7 @@ async fn main() -> Result<(), anyhow::Error> { tokio::select! { _ = server.run() => { tracing::info!("Actix is shutting down. Persisting data"); - clean_shutdown(persistence.clone(), lazy_feature_cache.clone(), lazy_token_cache.clone(), refresher.tokens_to_refresh.clone()).await; + clean_shutdown(persistence.clone(), lazy_feature_cache.clone(), lazy_token_cache.clone()).await; tracing::info!("Actix was shutdown properly"); }, _ = refresher.start_refresh_features_background_task() => { @@ -156,7 +156,7 @@ async fn main() -> Result<(), anyhow::Error> { _ = unleash_edge::http::background_send_metrics::send_metrics_task(metrics_cache_clone.clone(), refresher.clone(), edge.metrics_interval_seconds.try_into().unwrap()) => { tracing::info!("Metrics poster unexpectedly shut down"); } - _ = persist_data(persistence.clone(), lazy_token_cache.clone(), lazy_feature_cache.clone(), refresher.tokens_to_refresh.clone()) => { + _ = persist_data(persistence.clone(), lazy_token_cache.clone(), lazy_feature_cache.clone()) => { tracing::info!("Persister was unexpectedly shut down"); } _ = validator.schedule_validation_of_known_tokens(edge.token_revalidation_interval_seconds) => { @@ -180,9 +180,7 @@ async fn main() -> Result<(), anyhow::Error> { _ => tokio::select! { _ = server.run() => { tracing::info!("Actix is shutting down. Persisting data"); - if let Some(refresher) = feature_refresher.clone() { - clean_shutdown(persistence, lazy_feature_cache.clone(), lazy_token_cache.clone(), refresher.tokens_to_refresh.clone()).await; - } + clean_shutdown(persistence, lazy_feature_cache.clone(), lazy_token_cache.clone()).await; tracing::info!("Actix was shutdown properly"); } @@ -197,7 +195,6 @@ async fn clean_shutdown( persistence: Option>, feature_cache: Arc>, token_cache: Arc>, - refresh_target_cache: Arc>, ) { let tokens: Vec = token_cache .iter() @@ -205,11 +202,6 @@ async fn clean_shutdown( .map(|entry| entry.value().clone()) .collect(); - let refresh_targets: Vec = refresh_target_cache - .iter() - .map(|entry| entry.value().clone()) - .collect(); - let features: Vec<(String, ClientFeatures)> = feature_cache .iter() .map(|entry| (entry.key().clone(), entry.value().clone())) @@ -219,7 +211,6 @@ async fn clean_shutdown( let res = join_all(vec![ persistence.save_tokens(tokens), persistence.save_features(features), - persistence.save_refresh_targets(refresh_targets), ]) .await; if res.iter().all(|save| save.is_ok()) { diff --git a/server/src/persistence/file.rs b/server/src/persistence/file.rs index c6c910b0..eb1a5c38 100644 --- a/server/src/persistence/file.rs +++ b/server/src/persistence/file.rs @@ -1,17 +1,14 @@ -use std::{path::PathBuf, str::FromStr}; use std::collections::HashMap; use std::path::Path; +use std::{path::PathBuf, str::FromStr}; use async_trait::async_trait; use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; use unleash_types::client_features::ClientFeatures; -use crate::{ - error::EdgeError, - types::{EdgeResult, TokenRefresh}, -}; use crate::types::EdgeToken; +use crate::{error::EdgeError, types::EdgeResult}; use super::EdgePersistence; @@ -145,59 +142,17 @@ impl EdgePersistence for FilePersister { .map_err(|_| EdgeError::PersistenceError("Could not serialize tokens to disc".to_string())) .map(|_| ()) } - - async fn load_refresh_targets(&self) -> EdgeResult> { - let mut file = tokio::fs::File::open(self.refresh_target_path()) - .await - .map_err(|_| { - EdgeError::PersistenceError( - "Cannot load tokens from backup, opening backup file failed".to_string(), - ) - })?; - - let mut contents = vec![]; - - file.read_to_end(&mut contents).await.map_err(|_| { - EdgeError::PersistenceError( - "Cannot load tokens from backup, reading backup file failed".to_string(), - ) - })?; - serde_json::from_slice(&contents).map_err(|_| { - EdgeError::PersistenceError( - "Cannot load tokens from backup, parsing backup file failed".to_string(), - ) - }) - } - async fn save_refresh_targets(&self, refresh_targets: Vec) -> EdgeResult<()> { - let mut file = tokio::fs::File::create(self.refresh_target_path()) - .await - .map_err(|_| { - EdgeError::PersistenceError( - "Cannot write tokens to backup. Opening backup file for writing failed" - .to_string(), - ) - })?; - - file.write_all(&serde_json::to_vec(&refresh_targets).map_err(|_| { - EdgeError::PersistenceError("Failed to serialize refresh tokens".to_string()) - })?) - .await - .map_err(|_| EdgeError::PersistenceError("Could not serialize tokens to disc".to_string())) - .map(|_| ()) - } } #[cfg(test)] mod tests { use std::env::temp_dir; - use actix_web::http::header::EntityTag; - use chrono::Utc; use unleash_types::client_features::{ClientFeature, ClientFeatures}; - use crate::persistence::EdgePersistence; use crate::persistence::file::FilePersister; - use crate::types::{EdgeToken, TokenRefresh, TokenType, TokenValidationStatus}; + use crate::persistence::EdgePersistence; + use crate::types::{EdgeToken, TokenType, TokenValidationStatus}; #[tokio::test] async fn file_persister_can_save_and_load_features() { @@ -238,47 +193,6 @@ mod tests { assert_eq!(reloaded, formatted_data.into_iter().collect()); } - #[tokio::test] - async fn file_persister_can_save_and_load_refresh_targets() { - let persister = FilePersister::try_from(temp_dir().to_str().unwrap()).unwrap(); - let tokens = vec![ - TokenRefresh { - token: EdgeToken { - token: "default:development:ajsdkajnsdlsan".into(), - token_type: Some(TokenType::Client), - environment: Some("development".into()), - projects: vec!["default".into()], - status: TokenValidationStatus::Validated, - }, - etag: Some(EntityTag::new_weak("1234".into())), - next_refresh: None, - last_refreshed: Some(Utc::now()), - last_check: Some(Utc::now()), - failure_count: 0, - }, - TokenRefresh { - token: EdgeToken { - token: "otherthing:otherthing:aljjsdnasd".into(), - ..EdgeToken::default() - }, - etag: None, - next_refresh: None, - last_refreshed: None, - last_check: None, - failure_count: 0, - }, - ]; - - persister - .save_refresh_targets(tokens.clone()) - .await - .unwrap(); - - let reloaded = persister.load_refresh_targets().await.unwrap(); - - assert_eq!(reloaded, tokens); - } - #[tokio::test] async fn file_persister_can_save_and_load_tokens() { let persister = FilePersister::try_from(temp_dir().to_str().unwrap()).unwrap(); diff --git a/server/src/persistence/mod.rs b/server/src/persistence/mod.rs index 0d080fd3..bd091b5b 100644 --- a/server/src/persistence/mod.rs +++ b/server/src/persistence/mod.rs @@ -5,7 +5,7 @@ use dashmap::DashMap; use tracing::{debug, warn}; use unleash_types::client_features::ClientFeatures; -use crate::types::{EdgeResult, EdgeToken, TokenRefresh, TokenValidationStatus}; +use crate::types::{EdgeResult, EdgeToken, TokenValidationStatus}; pub mod file; pub mod redis; @@ -14,8 +14,6 @@ pub mod redis; pub trait EdgePersistence: Send + Sync { async fn load_tokens(&self) -> EdgeResult>; async fn save_tokens(&self, tokens: Vec) -> EdgeResult<()>; - async fn load_refresh_targets(&self) -> EdgeResult>; - async fn save_refresh_targets(&self, refresh_targets: Vec) -> EdgeResult<()>; async fn load_features(&self) -> EdgeResult>; async fn save_features(&self, features: Vec<(String, ClientFeatures)>) -> EdgeResult<()>; } @@ -25,7 +23,6 @@ pub async fn persist_data( persistence: Option>, token_cache: Arc>, features_cache: Arc>, - refresh_targets_cache: Arc>, ) { loop { tokio::select! { @@ -34,7 +31,6 @@ pub async fn persist_data( save_known_tokens(&token_cache, &persister).await; save_features(&features_cache, &persister).await; - save_refresh_targets(&refresh_targets_cache, &persister).await; } else { debug!("No persistence configured, skipping persistence"); } @@ -88,28 +84,6 @@ async fn save_features( } } -async fn save_refresh_targets( - refresh_targets_cache: &Arc>, - persister: &Arc, -) { - if !refresh_targets_cache.is_empty() { - match persister - .save_refresh_targets( - refresh_targets_cache - .iter() - .map(|e| e.value().clone()) - .collect(), - ) - .await - { - Ok(()) => debug!("Persisted validated tokens"), - Err(save_error) => warn!("Could not persist refresh targets: {save_error:?}"), - } - } else { - debug!("No refresh targets found, skipping refresh targets persistence"); - } -} - #[cfg(test)] pub mod tests { use super::*; @@ -130,14 +104,6 @@ pub mod tests { panic!("Not expected to be called"); } - async fn load_refresh_targets(&self) -> EdgeResult> { - panic!("Not expected to be called"); - } - - async fn save_refresh_targets(&self, _: Vec) -> EdgeResult<()> { - panic!("Not expected to be called"); - } - async fn load_features(&self) -> EdgeResult> { panic!("Not expected to be called"); } @@ -162,12 +128,4 @@ pub mod tests { save_known_tokens(&Arc::new(cache), &persister.clone()).await; } - - #[tokio::test] - async fn persistence_ignores_empty_refresh_target_sets() { - let cache: DashMap = DashMap::new(); - let persister = build_mock_persistence(); - - save_refresh_targets(&Arc::new(cache), &persister.clone()).await; - } } diff --git a/server/src/persistence/redis.rs b/server/src/persistence/redis.rs index bcaa8a85..3ce707ba 100644 --- a/server/src/persistence/redis.rs +++ b/server/src/persistence/redis.rs @@ -9,7 +9,7 @@ use tracing::{debug, info}; use unleash_types::client_features::ClientFeatures; use crate::persistence::redis::RedisClientOptions::{Cluster, Single}; -use crate::types::{EdgeToken, TokenRefresh}; +use crate::types::EdgeToken; use crate::{error::EdgeError, types::EdgeResult}; use super::EdgePersistence; @@ -80,36 +80,6 @@ impl EdgePersistence for RedisPersister { Ok(()) } - async fn load_refresh_targets(&self) -> EdgeResult> { - debug!("Loading refresh targets"); - let mut client = self.redis_client.write().await; - let refresh_targets: String = match &mut *client { - Single(client) => client.get(REFRESH_TARGETS_KEY)?, - Cluster(client) => { - let mut conn = client.get_connection()?; - conn.get(REFRESH_TARGETS_KEY)? - } - }; - serde_json::from_str::>(&refresh_targets).map_err(|_| { - EdgeError::TokenParseError("Failed to load refresh targets from redis".into()) - }) - } - - async fn save_refresh_targets(&self, refresh_targets: Vec) -> EdgeResult<()> { - debug!("Saving refresh targets: {}", refresh_targets.len()); - let mut client = self.redis_client.write().await; - let refresh_targets = serde_json::to_string(&refresh_targets)?; - match &mut *client { - Single(client) => client.set(REFRESH_TARGETS_KEY, refresh_targets)?, - Cluster(client) => { - let mut conn = client.get_connection()?; - conn.set(REFRESH_TARGETS_KEY, refresh_targets)? - } - }; - debug!("Done saving refresh target"); - Ok(()) - } - async fn load_features(&self) -> EdgeResult> { debug!("Loading features from persistence"); let mut client = self.redis_client.write().await; diff --git a/server/tests/redis_test.rs b/server/tests/redis_test.rs index 99253a20..0b0ed398 100644 --- a/server/tests/redis_test.rs +++ b/server/tests/redis_test.rs @@ -1,16 +1,14 @@ use std::str::FromStr; -use actix_web::http::header::EntityTag; -use chrono::Utc; use redis::Client; -use testcontainers::ContainerAsync; use testcontainers::runners::AsyncRunner; +use testcontainers::ContainerAsync; use testcontainers_modules::redis::Redis; use unleash_types::client_features::{ClientFeature, ClientFeatures}; use unleash_edge::{ - persistence::{EdgePersistence, redis::RedisPersister}, - types::{EdgeToken, TokenRefresh, TokenType}, + persistence::{redis::RedisPersister, EdgePersistence}, + types::{EdgeToken, TokenType}, }; async fn setup_redis() -> (Client, String, ContainerAsync) { @@ -60,23 +58,3 @@ async fn redis_saves_and_restores_edge_tokens_correctly() { let saved_tokens = redis_persister.load_tokens().await.unwrap(); assert_eq!(saved_tokens.len(), 2); } - -#[tokio::test] -async fn redis_saves_and_restores_token_refreshes_correctly() { - let (_client, url, _node) = setup_redis().await; - let redis_persister = RedisPersister::new(&url).unwrap(); - let edge_token = EdgeToken::from_str("someproject:development.abcdefghijklmnopqr").unwrap(); - - let mut token_refresh = TokenRefresh::new(edge_token.clone(), None); - let now = Utc::now(); - token_refresh.last_check = Some(now); - token_refresh.last_refreshed = Some(now); - token_refresh.etag = Some(EntityTag::new_weak("abcdefghijl".into())); - redis_persister - .save_refresh_targets(vec![token_refresh]) - .await - .unwrap(); - let saved_refreshes = redis_persister.load_refresh_targets().await.unwrap(); - assert_eq!(saved_refreshes.len(), 1); - assert_eq!(saved_refreshes.first().unwrap().token, edge_token); -}