Skip to content

Commit

Permalink
feat: broadcast updates (#604)
Browse files Browse the repository at this point in the history
* Tokio broadcasting channel for updates of features

* Allow subscription from feature cache

Co-authored-by: Thomas Heartman <[email protected]>
  • Loading branch information
chriswk and thomasheartman authored Dec 17, 2024
1 parent 6fa82a9 commit 39b0a2e
Show file tree
Hide file tree
Showing 16 changed files with 507 additions and 498 deletions.
13 changes: 7 additions & 6 deletions .github/workflows/test-with-coverage.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,26 @@ on:
branches:
- main
paths:
- '**.rs'
- '**.toml'
- "**.rs"
- "**.toml"
pull_request:
branches:
- main
paths:
- '**.rs'
- '**.toml'
- "**.rs"
- "**.toml"

jobs:
tarpaulin:
runs-on: ubuntu-latest
name: Run test coverage using Tarpaulin
env:
env:
CARGO_TERM_COLOR: always
steps:
- name: Checkout code
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Install rust
run: |
rustup set auto-self-update disable
Expand All @@ -37,4 +39,3 @@ jobs:
- name: Run Tarpaulin (Reporting to coveralls)
run: |
cargo tarpaulin --all-features --coveralls ${{ secrets.COVERALLS_KEY }} --skip-clean
5 changes: 3 additions & 2 deletions server/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use unleash_types::client_features::ClientFeatures;
use unleash_yggdrasil::EngineState;

use crate::cli::RedisMode;
use crate::feature_cache::FeatureCache;
use crate::http::unleash_client::new_reqwest_client;
use crate::offline::offline_hotload::{load_bootstrap, load_offline_engine_cache};
use crate::persistence::file::FilePersister;
Expand All @@ -27,7 +28,7 @@ use crate::{

type CacheContainer = (
Arc<DashMap<String, EdgeToken>>,
Arc<DashMap<String, ClientFeatures>>,
Arc<FeatureCache>,
Arc<DashMap<String, EngineState>>,
);
type EdgeInfo = (
Expand All @@ -43,7 +44,7 @@ fn build_caches() -> CacheContainer {
let engine_cache: DashMap<String, EngineState> = DashMap::default();
(
Arc::new(token_cache),
Arc::new(features_cache),
Arc::new(FeatureCache::new(features_cache)),
Arc::new(engine_cache),
)
}
Expand Down
56 changes: 25 additions & 31 deletions server/src/client_api.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::error::EdgeError;
use crate::feature_cache::FeatureCache;
use crate::filters::{
filter_client_features, name_match_filter, name_prefix_filter, project_filter, FeatureFilterSet,
};
Expand Down Expand Up @@ -31,7 +32,7 @@ use unleash_types::client_metrics::{ClientApplication, ClientMetrics, ConnectVia
#[get("/features")]
pub async fn get_features(
edge_token: EdgeToken,
features_cache: Data<DashMap<String, ClientFeatures>>,
features_cache: Data<FeatureCache>,
token_cache: Data<DashMap<String, EdgeToken>>,
filter_query: Query<FeatureFilters>,
req: HttpRequest,
Expand Down Expand Up @@ -76,7 +77,7 @@ pub async fn stream_features(
#[post("/features")]
pub async fn post_features(
edge_token: EdgeToken,
features_cache: Data<DashMap<String, ClientFeatures>>,
features_cache: Data<FeatureCache>,
token_cache: Data<DashMap<String, EdgeToken>>,
filter_query: Query<FeatureFilters>,
req: HttpRequest,
Expand Down Expand Up @@ -119,7 +120,7 @@ fn get_feature_filter(

async fn resolve_features(
edge_token: EdgeToken,
features_cache: Data<DashMap<String, ClientFeatures>>,
features_cache: Data<FeatureCache>,
token_cache: Data<DashMap<String, EdgeToken>>,
filter_query: Query<FeatureFilters>,
req: HttpRequest,
Expand Down Expand Up @@ -160,7 +161,7 @@ async fn resolve_features(
#[get("/features/{feature_name}")]
pub async fn get_feature(
edge_token: EdgeToken,
features_cache: Data<DashMap<String, ClientFeatures>>,
features_cache: Data<FeatureCache>,
token_cache: Data<DashMap<String, EdgeToken>>,
feature_name: web::Path<String>,
req: HttpRequest,
Expand Down Expand Up @@ -298,8 +299,6 @@ pub fn configure_experimental_post_features(
#[cfg(test)]
mod tests {

#[cfg(feature = "streaming")]
use crate::http::broadcaster::Broadcaster;
use crate::metrics::client_metrics::{ApplicationKey, MetricsBatch, MetricsKey};
use crate::types::{TokenType, TokenValidationStatus};
use std::collections::HashMap;
Expand Down Expand Up @@ -583,7 +582,7 @@ mod tests {

#[tokio::test]
async fn response_includes_variant_stickiness_for_strategy_variants() {
let features_cache: Arc<DashMap<String, ClientFeatures>> = Arc::new(DashMap::default());
let features_cache = Arc::new(FeatureCache::default());
let token_cache: Arc<DashMap<String, EdgeToken>> = Arc::new(DashMap::default());
let app = test::init_service(
App::new()
Expand Down Expand Up @@ -687,7 +686,7 @@ mod tests {
token.status = TokenValidationStatus::Validated;
token.token_type = Some(TokenType::Client);
let upstream_token_cache = Arc::new(DashMap::default());
let upstream_features_cache = Arc::new(DashMap::default());
let upstream_features_cache = Arc::new(FeatureCache::default());
let upstream_engine_cache = Arc::new(DashMap::default());
upstream_token_cache.insert(token.token.clone(), token.clone());
let srv = upstream_server(
Expand Down Expand Up @@ -726,7 +725,7 @@ mod tests {
frontend_token.status = TokenValidationStatus::Validated;
frontend_token.token_type = Some(TokenType::Frontend);
let upstream_token_cache = Arc::new(DashMap::default());
let upstream_features_cache = Arc::new(DashMap::default());
let upstream_features_cache = Arc::new(FeatureCache::default());
let upstream_engine_cache = Arc::new(DashMap::default());
upstream_token_cache.insert(frontend_token.token.clone(), frontend_token.clone());
let srv = upstream_server(
Expand Down Expand Up @@ -768,7 +767,7 @@ mod tests {

#[tokio::test]
async fn client_features_endpoint_correctly_returns_cached_features() {
let features_cache: Arc<DashMap<String, ClientFeatures>> = Arc::new(DashMap::default());
let features_cache = Arc::new(FeatureCache::default());
let token_cache: Arc<DashMap<String, EdgeToken>> = Arc::new(DashMap::default());
let app = test::init_service(
App::new()
Expand Down Expand Up @@ -805,7 +804,7 @@ mod tests {

#[tokio::test]
async fn post_request_to_client_features_does_the_same_as_get_when_mounted() {
let features_cache: Arc<DashMap<String, ClientFeatures>> = Arc::new(DashMap::default());
let features_cache = Arc::new(FeatureCache::default());
let token_cache: Arc<DashMap<String, EdgeToken>> = Arc::new(DashMap::default());
let app = test::init_service(
App::new()
Expand Down Expand Up @@ -855,7 +854,7 @@ mod tests {

#[tokio::test]
async fn client_features_endpoint_filters_on_project_access_in_token() {
let features_cache: Arc<DashMap<String, ClientFeatures>> = Arc::new(DashMap::default());
let features_cache = Arc::new(FeatureCache::default());
let token_cache: Arc<DashMap<String, EdgeToken>> = Arc::new(DashMap::default());
let app = test::init_service(
App::new()
Expand Down Expand Up @@ -884,7 +883,7 @@ mod tests {

#[tokio::test]
async fn client_features_endpoint_filters_when_multiple_projects_in_token() {
let features_cache: Arc<DashMap<String, ClientFeatures>> = Arc::new(DashMap::default());
let features_cache = Arc::new(FeatureCache::default());
let token_cache: Arc<DashMap<String, EdgeToken>> = Arc::new(DashMap::default());
let app = test::init_service(
App::new()
Expand Down Expand Up @@ -913,7 +912,7 @@ mod tests {
#[tokio::test]
async fn client_features_endpoint_filters_correctly_when_token_has_access_to_multiple_projects()
{
let features_cache: Arc<DashMap<String, ClientFeatures>> = Arc::new(DashMap::default());
let features_cache = Arc::new(FeatureCache::default());
let token_cache: Arc<DashMap<String, EdgeToken>> = Arc::new(DashMap::default());
let app = test::init_service(
App::new()
Expand Down Expand Up @@ -964,7 +963,7 @@ mod tests {

#[tokio::test]
async fn when_running_in_offline_mode_with_proxy_key_should_not_filter_features() {
let features_cache: Arc<DashMap<String, ClientFeatures>> = Arc::new(DashMap::default());
let features_cache = Arc::new(FeatureCache::default());
let token_cache: Arc<DashMap<String, EdgeToken>> = Arc::new(DashMap::default());
let app = test::init_service(
App::new()
Expand All @@ -991,8 +990,7 @@ mod tests {

#[tokio::test]
async fn calling_client_features_endpoint_with_new_token_hydrates_from_upstream_when_dynamic() {
let upstream_features_cache: Arc<DashMap<String, ClientFeatures>> =
Arc::new(DashMap::default());
let upstream_features_cache = Arc::new(FeatureCache::default());
let upstream_token_cache: Arc<DashMap<String, EdgeToken>> = Arc::new(DashMap::default());
let upstream_engine_cache: Arc<DashMap<String, EngineState>> = Arc::new(DashMap::default());
let server = upstream_server(
Expand All @@ -1011,7 +1009,7 @@ mod tests {
);
upstream_features_cache.insert(cache_key(&upstream_known_token), upstream_features.clone());
let unleash_client = Arc::new(UnleashClient::new(server.url("/").as_str(), None).unwrap());
let features_cache: Arc<DashMap<String, ClientFeatures>> = Arc::new(DashMap::default());
let features_cache: Arc<FeatureCache> = Arc::new(FeatureCache::default());
let token_cache: Arc<DashMap<String, EdgeToken>> = Arc::new(DashMap::default());
let engine_cache: Arc<DashMap<String, EngineState>> = Arc::new(DashMap::default());
let feature_refresher = Arc::new(FeatureRefresher {
Expand All @@ -1023,8 +1021,6 @@ mod tests {
persistence: None,
strict: false,
app_name: "test-app".into(),
#[cfg(feature = "streaming")]
broadcaster: Broadcaster::new(features_cache.clone()),
});
let token_validator = Arc::new(TokenValidator {
unleash_client: unleash_client.clone(),
Expand Down Expand Up @@ -1055,8 +1051,7 @@ mod tests {

#[tokio::test]
async fn calling_client_features_endpoint_with_new_token_does_not_hydrate_when_strict() {
let upstream_features_cache: Arc<DashMap<String, ClientFeatures>> =
Arc::new(DashMap::default());
let upstream_features_cache = Arc::new(FeatureCache::default());
let upstream_token_cache: Arc<DashMap<String, EdgeToken>> = Arc::new(DashMap::default());
let upstream_engine_cache: Arc<DashMap<String, EngineState>> = Arc::new(DashMap::default());
let server = upstream_server(
Expand All @@ -1075,7 +1070,7 @@ mod tests {
);
upstream_features_cache.insert(cache_key(&upstream_known_token), upstream_features.clone());
let unleash_client = Arc::new(UnleashClient::new(server.url("/").as_str(), None).unwrap());
let features_cache: Arc<DashMap<String, ClientFeatures>> = Arc::new(DashMap::default());
let features_cache: Arc<FeatureCache> = Arc::new(FeatureCache::default());
let token_cache: Arc<DashMap<String, EdgeToken>> = Arc::new(DashMap::default());
let engine_cache: Arc<DashMap<String, EngineState>> = Arc::new(DashMap::default());
let feature_refresher = Arc::new(FeatureRefresher {
Expand Down Expand Up @@ -1114,7 +1109,7 @@ mod tests {

#[tokio::test]
pub async fn gets_feature_by_name() {
let features_cache: Arc<DashMap<String, ClientFeatures>> = Arc::new(DashMap::default());
let features_cache = Arc::new(FeatureCache::default());
let token_cache: Arc<DashMap<String, EdgeToken>> = Arc::new(DashMap::default());
let engine_cache: Arc<DashMap<String, EngineState>> = Arc::new(DashMap::default());
let features = features_from_disk("../examples/hostedexample.json");
Expand Down Expand Up @@ -1146,7 +1141,7 @@ mod tests {

#[tokio::test]
pub async fn token_with_no_access_to_named_feature_yields_404() {
let features_cache: Arc<DashMap<String, ClientFeatures>> = Arc::new(DashMap::default());
let features_cache = Arc::new(FeatureCache::default());
let token_cache: Arc<DashMap<String, EdgeToken>> = Arc::new(DashMap::default());
let engine_cache: Arc<DashMap<String, EngineState>> = Arc::new(DashMap::default());
let features = features_from_disk("../examples/hostedexample.json");
Expand Down Expand Up @@ -1178,8 +1173,7 @@ mod tests {
#[tokio::test]
pub async fn still_subsumes_tokens_after_moving_registration_to_initial_hydration_when_dynamic()
{
let upstream_features_cache: Arc<DashMap<String, ClientFeatures>> =
Arc::new(DashMap::default());
let upstream_features_cache: Arc<FeatureCache> = Arc::new(FeatureCache::default());
let upstream_token_cache: Arc<DashMap<String, EdgeToken>> = Arc::new(DashMap::default());
let upstream_engine_cache: Arc<DashMap<String, EngineState>> = Arc::new(DashMap::default());
let server = upstream_server(
Expand All @@ -1199,7 +1193,7 @@ mod tests {
upstream_token_cache.insert(upstream_eg_token.token.clone(), upstream_eg_token.clone());
upstream_features_cache.insert(cache_key(&upstream_dx_token), upstream_features.clone());
let unleash_client = Arc::new(UnleashClient::new(server.url("/").as_str(), None).unwrap());
let features_cache: Arc<DashMap<String, ClientFeatures>> = Arc::new(DashMap::default());
let features_cache: Arc<FeatureCache> = Arc::new(FeatureCache::default());
let token_cache: Arc<DashMap<String, EdgeToken>> = Arc::new(DashMap::default());
let engine_cache: Arc<DashMap<String, EngineState>> = Arc::new(DashMap::default());
let feature_refresher = Arc::new(FeatureRefresher {
Expand Down Expand Up @@ -1248,7 +1242,7 @@ mod tests {

#[tokio::test]
pub async fn can_filter_features_list_by_name_prefix() {
let features_cache: Arc<DashMap<String, ClientFeatures>> = Arc::new(DashMap::default());
let features_cache = Arc::new(FeatureCache::default());
let token_cache: Arc<DashMap<String, EdgeToken>> = Arc::new(DashMap::default());
let engine_cache: Arc<DashMap<String, EngineState>> = Arc::new(DashMap::default());
let features = features_from_disk("../examples/hostedexample.json");
Expand Down Expand Up @@ -1280,7 +1274,7 @@ mod tests {

#[tokio::test]
pub async fn only_gets_correct_feature_by_name() {
let features_cache: Arc<DashMap<String, ClientFeatures>> = Arc::new(DashMap::default());
let features_cache = Arc::new(FeatureCache::default());
let token_cache: Arc<DashMap<String, EdgeToken>> = Arc::new(DashMap::default());
let engine_cache: Arc<DashMap<String, EngineState>> = Arc::new(DashMap::default());
let features = ClientFeatures {
Expand Down Expand Up @@ -1356,7 +1350,7 @@ mod tests {

#[tokio::test]
async fn client_features_endpoint_works_with_overridden_token_header() {
let features_cache: Arc<DashMap<String, ClientFeatures>> = Arc::new(DashMap::default());
let features_cache = Arc::new(FeatureCache::default());
let token_cache: Arc<DashMap<String, EdgeToken>> = Arc::new(DashMap::default());
let token_header = TokenHeader::from_str("NeedsToBeTested").unwrap();
println!("token_header: {:?}", token_header);
Expand Down
Loading

0 comments on commit 39b0a2e

Please sign in to comment.