Skip to content

Commit

Permalink
fix: remove feature refresher (#457)
Browse files Browse the repository at this point in the history
  • Loading branch information
sighphyre authored May 7, 2024
1 parent f567cce commit dc5e88d
Show file tree
Hide file tree
Showing 6 changed files with 13 additions and 218 deletions.
16 changes: 0 additions & 16 deletions server/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ fn build_caches() -> CacheContainer {

async fn hydrate_from_persistent_storage(
cache: CacheContainer,
feature_refresher: Arc<FeatureRefresher>,
storage: Arc<dyn EdgePersistence>,
) {
let (token_cache, features_cache, engine_cache) = cache;
Expand All @@ -60,13 +59,6 @@ async fn hydrate_from_persistent_storage(
warn!("Failed to load features from cache {error:?}");
Default::default()
});
let refresh_targets = storage
.load_refresh_targets()
.await
.unwrap_or_else(|error| {
warn!("Failed to load refresh targets from cache {error:?}");
vec![]
});
for token in tokens {
tracing::debug!("Hydrating tokens {token:?}");
token_cache.insert(token.token.clone(), token);
Expand All @@ -83,13 +75,6 @@ async fn hydrate_from_persistent_storage(
}
engine_cache.insert(key.clone(), engine_state);
}

for target in refresh_targets {
tracing::debug!("Hydrating refresh target for {target:?}");
feature_refresher
.tokens_to_refresh
.insert(target.token.token.clone(), target);
}
}

pub(crate) fn build_offline_mode(
Expand Down Expand Up @@ -207,7 +192,6 @@ async fn build_edge(args: &EdgeArgs) -> EdgeResult<EdgeInfo> {
feature_cache.clone(),
engine_cache.clone(),
),
feature_refresher.clone(),
persistence,
)
.await;
Expand Down
17 changes: 4 additions & 13 deletions server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use unleash_edge::metrics::client_metrics::MetricsCache;
use unleash_edge::middleware::request_tracing::RequestTracing;
use unleash_edge::offline::offline_hotload;
use unleash_edge::persistence::{persist_data, EdgePersistence};
use unleash_edge::types::{EdgeToken, TokenRefresh, TokenValidationStatus};
use unleash_edge::types::{EdgeToken, TokenValidationStatus};
use unleash_edge::{cli, client_api, frontend_api, health_checker, openapi, ready_checker};
use unleash_edge::{edge_api, prom_metrics};
use unleash_edge::{internal_backstage, tls};
Expand Down Expand Up @@ -147,7 +147,7 @@ async fn main() -> Result<(), anyhow::Error> {
tokio::select! {
_ = server.run() => {
tracing::info!("Actix is shutting down. Persisting data");
clean_shutdown(persistence.clone(), lazy_feature_cache.clone(), lazy_token_cache.clone(), refresher.tokens_to_refresh.clone()).await;
clean_shutdown(persistence.clone(), lazy_feature_cache.clone(), lazy_token_cache.clone()).await;
tracing::info!("Actix was shutdown properly");
},
_ = refresher.start_refresh_features_background_task() => {
Expand All @@ -156,7 +156,7 @@ async fn main() -> Result<(), anyhow::Error> {
_ = unleash_edge::http::background_send_metrics::send_metrics_task(metrics_cache_clone.clone(), refresher.clone(), edge.metrics_interval_seconds.try_into().unwrap()) => {
tracing::info!("Metrics poster unexpectedly shut down");
}
_ = persist_data(persistence.clone(), lazy_token_cache.clone(), lazy_feature_cache.clone(), refresher.tokens_to_refresh.clone()) => {
_ = persist_data(persistence.clone(), lazy_token_cache.clone(), lazy_feature_cache.clone()) => {
tracing::info!("Persister was unexpectedly shut down");
}
_ = validator.schedule_validation_of_known_tokens(edge.token_revalidation_interval_seconds) => {
Expand All @@ -180,9 +180,7 @@ async fn main() -> Result<(), anyhow::Error> {
_ => tokio::select! {
_ = server.run() => {
tracing::info!("Actix is shutting down. Persisting data");
if let Some(refresher) = feature_refresher.clone() {
clean_shutdown(persistence, lazy_feature_cache.clone(), lazy_token_cache.clone(), refresher.tokens_to_refresh.clone()).await;
}
clean_shutdown(persistence, lazy_feature_cache.clone(), lazy_token_cache.clone()).await;
tracing::info!("Actix was shutdown properly");

}
Expand All @@ -197,19 +195,13 @@ async fn clean_shutdown(
persistence: Option<Arc<dyn EdgePersistence>>,
feature_cache: Arc<DashMap<String, ClientFeatures>>,
token_cache: Arc<DashMap<String, EdgeToken>>,
refresh_target_cache: Arc<DashMap<String, TokenRefresh>>,
) {
let tokens: Vec<EdgeToken> = token_cache
.iter()
.filter(|e| e.value().status == TokenValidationStatus::Validated)
.map(|entry| entry.value().clone())
.collect();

let refresh_targets: Vec<TokenRefresh> = refresh_target_cache
.iter()
.map(|entry| entry.value().clone())
.collect();

let features: Vec<(String, ClientFeatures)> = feature_cache
.iter()
.map(|entry| (entry.key().clone(), entry.value().clone()))
Expand All @@ -219,7 +211,6 @@ async fn clean_shutdown(
let res = join_all(vec![
persistence.save_tokens(tokens),
persistence.save_features(features),
persistence.save_refresh_targets(refresh_targets),
])
.await;
if res.iter().all(|save| save.is_ok()) {
Expand Down
94 changes: 4 additions & 90 deletions server/src/persistence/file.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
use std::{path::PathBuf, str::FromStr};
use std::collections::HashMap;
use std::path::Path;
use std::{path::PathBuf, str::FromStr};

use async_trait::async_trait;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
use unleash_types::client_features::ClientFeatures;

use crate::{
error::EdgeError,
types::{EdgeResult, TokenRefresh},
};
use crate::types::EdgeToken;
use crate::{error::EdgeError, types::EdgeResult};

use super::EdgePersistence;

Expand Down Expand Up @@ -145,59 +142,17 @@ impl EdgePersistence for FilePersister {
.map_err(|_| EdgeError::PersistenceError("Could not serialize tokens to disc".to_string()))
.map(|_| ())
}

async fn load_refresh_targets(&self) -> EdgeResult<Vec<TokenRefresh>> {
let mut file = tokio::fs::File::open(self.refresh_target_path())
.await
.map_err(|_| {
EdgeError::PersistenceError(
"Cannot load tokens from backup, opening backup file failed".to_string(),
)
})?;

let mut contents = vec![];

file.read_to_end(&mut contents).await.map_err(|_| {
EdgeError::PersistenceError(
"Cannot load tokens from backup, reading backup file failed".to_string(),
)
})?;
serde_json::from_slice(&contents).map_err(|_| {
EdgeError::PersistenceError(
"Cannot load tokens from backup, parsing backup file failed".to_string(),
)
})
}
async fn save_refresh_targets(&self, refresh_targets: Vec<TokenRefresh>) -> EdgeResult<()> {
let mut file = tokio::fs::File::create(self.refresh_target_path())
.await
.map_err(|_| {
EdgeError::PersistenceError(
"Cannot write tokens to backup. Opening backup file for writing failed"
.to_string(),
)
})?;

file.write_all(&serde_json::to_vec(&refresh_targets).map_err(|_| {
EdgeError::PersistenceError("Failed to serialize refresh tokens".to_string())
})?)
.await
.map_err(|_| EdgeError::PersistenceError("Could not serialize tokens to disc".to_string()))
.map(|_| ())
}
}

#[cfg(test)]
mod tests {
use std::env::temp_dir;

use actix_web::http::header::EntityTag;
use chrono::Utc;
use unleash_types::client_features::{ClientFeature, ClientFeatures};

use crate::persistence::EdgePersistence;
use crate::persistence::file::FilePersister;
use crate::types::{EdgeToken, TokenRefresh, TokenType, TokenValidationStatus};
use crate::persistence::EdgePersistence;
use crate::types::{EdgeToken, TokenType, TokenValidationStatus};

#[tokio::test]
async fn file_persister_can_save_and_load_features() {
Expand Down Expand Up @@ -238,47 +193,6 @@ mod tests {
assert_eq!(reloaded, formatted_data.into_iter().collect());
}

#[tokio::test]
async fn file_persister_can_save_and_load_refresh_targets() {
let persister = FilePersister::try_from(temp_dir().to_str().unwrap()).unwrap();
let tokens = vec![
TokenRefresh {
token: EdgeToken {
token: "default:development:ajsdkajnsdlsan".into(),
token_type: Some(TokenType::Client),
environment: Some("development".into()),
projects: vec!["default".into()],
status: TokenValidationStatus::Validated,
},
etag: Some(EntityTag::new_weak("1234".into())),
next_refresh: None,
last_refreshed: Some(Utc::now()),
last_check: Some(Utc::now()),
failure_count: 0,
},
TokenRefresh {
token: EdgeToken {
token: "otherthing:otherthing:aljjsdnasd".into(),
..EdgeToken::default()
},
etag: None,
next_refresh: None,
last_refreshed: None,
last_check: None,
failure_count: 0,
},
];

persister
.save_refresh_targets(tokens.clone())
.await
.unwrap();

let reloaded = persister.load_refresh_targets().await.unwrap();

assert_eq!(reloaded, tokens);
}

#[tokio::test]
async fn file_persister_can_save_and_load_tokens() {
let persister = FilePersister::try_from(temp_dir().to_str().unwrap()).unwrap();
Expand Down
44 changes: 1 addition & 43 deletions server/src/persistence/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use dashmap::DashMap;
use tracing::{debug, warn};
use unleash_types::client_features::ClientFeatures;

use crate::types::{EdgeResult, EdgeToken, TokenRefresh, TokenValidationStatus};
use crate::types::{EdgeResult, EdgeToken, TokenValidationStatus};

pub mod file;
pub mod redis;
Expand All @@ -14,8 +14,6 @@ pub mod redis;
pub trait EdgePersistence: Send + Sync {
async fn load_tokens(&self) -> EdgeResult<Vec<EdgeToken>>;
async fn save_tokens(&self, tokens: Vec<EdgeToken>) -> EdgeResult<()>;
async fn load_refresh_targets(&self) -> EdgeResult<Vec<TokenRefresh>>;
async fn save_refresh_targets(&self, refresh_targets: Vec<TokenRefresh>) -> EdgeResult<()>;
async fn load_features(&self) -> EdgeResult<HashMap<String, ClientFeatures>>;
async fn save_features(&self, features: Vec<(String, ClientFeatures)>) -> EdgeResult<()>;
}
Expand All @@ -25,7 +23,6 @@ pub async fn persist_data(
persistence: Option<Arc<dyn EdgePersistence>>,
token_cache: Arc<DashMap<String, EdgeToken>>,
features_cache: Arc<DashMap<String, ClientFeatures>>,
refresh_targets_cache: Arc<DashMap<String, TokenRefresh>>,
) {
loop {
tokio::select! {
Expand All @@ -34,7 +31,6 @@ pub async fn persist_data(

save_known_tokens(&token_cache, &persister).await;
save_features(&features_cache, &persister).await;
save_refresh_targets(&refresh_targets_cache, &persister).await;
} else {
debug!("No persistence configured, skipping persistence");
}
Expand Down Expand Up @@ -88,28 +84,6 @@ async fn save_features(
}
}

async fn save_refresh_targets(
refresh_targets_cache: &Arc<DashMap<String, TokenRefresh>>,
persister: &Arc<dyn EdgePersistence>,
) {
if !refresh_targets_cache.is_empty() {
match persister
.save_refresh_targets(
refresh_targets_cache
.iter()
.map(|e| e.value().clone())
.collect(),
)
.await
{
Ok(()) => debug!("Persisted validated tokens"),
Err(save_error) => warn!("Could not persist refresh targets: {save_error:?}"),
}
} else {
debug!("No refresh targets found, skipping refresh targets persistence");
}
}

#[cfg(test)]
pub mod tests {
use super::*;
Expand All @@ -130,14 +104,6 @@ pub mod tests {
panic!("Not expected to be called");
}

async fn load_refresh_targets(&self) -> EdgeResult<Vec<TokenRefresh>> {
panic!("Not expected to be called");
}

async fn save_refresh_targets(&self, _: Vec<TokenRefresh>) -> EdgeResult<()> {
panic!("Not expected to be called");
}

async fn load_features(&self) -> EdgeResult<HashMap<String, ClientFeatures>> {
panic!("Not expected to be called");
}
Expand All @@ -162,12 +128,4 @@ pub mod tests {

save_known_tokens(&Arc::new(cache), &persister.clone()).await;
}

#[tokio::test]
async fn persistence_ignores_empty_refresh_target_sets() {
let cache: DashMap<String, TokenRefresh> = DashMap::new();
let persister = build_mock_persistence();

save_refresh_targets(&Arc::new(cache), &persister.clone()).await;
}
}
32 changes: 1 addition & 31 deletions server/src/persistence/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use tracing::{debug, info};
use unleash_types::client_features::ClientFeatures;

use crate::persistence::redis::RedisClientOptions::{Cluster, Single};
use crate::types::{EdgeToken, TokenRefresh};
use crate::types::EdgeToken;
use crate::{error::EdgeError, types::EdgeResult};

use super::EdgePersistence;
Expand Down Expand Up @@ -80,36 +80,6 @@ impl EdgePersistence for RedisPersister {
Ok(())
}

async fn load_refresh_targets(&self) -> EdgeResult<Vec<TokenRefresh>> {
debug!("Loading refresh targets");
let mut client = self.redis_client.write().await;
let refresh_targets: String = match &mut *client {
Single(client) => client.get(REFRESH_TARGETS_KEY)?,
Cluster(client) => {
let mut conn = client.get_connection()?;
conn.get(REFRESH_TARGETS_KEY)?
}
};
serde_json::from_str::<Vec<TokenRefresh>>(&refresh_targets).map_err(|_| {
EdgeError::TokenParseError("Failed to load refresh targets from redis".into())
})
}

async fn save_refresh_targets(&self, refresh_targets: Vec<TokenRefresh>) -> EdgeResult<()> {
debug!("Saving refresh targets: {}", refresh_targets.len());
let mut client = self.redis_client.write().await;
let refresh_targets = serde_json::to_string(&refresh_targets)?;
match &mut *client {
Single(client) => client.set(REFRESH_TARGETS_KEY, refresh_targets)?,
Cluster(client) => {
let mut conn = client.get_connection()?;
conn.set(REFRESH_TARGETS_KEY, refresh_targets)?
}
};
debug!("Done saving refresh target");
Ok(())
}

async fn load_features(&self) -> EdgeResult<HashMap<String, ClientFeatures>> {
debug!("Loading features from persistence");
let mut client = self.redis_client.write().await;
Expand Down
Loading

0 comments on commit dc5e88d

Please sign in to comment.