From 323ff96595d7b8e20dcaf7a08a128b9edff70c8b Mon Sep 17 00:00:00 2001 From: Thomas Heartman Date: Thu, 5 Dec 2024 13:58:50 +0100 Subject: [PATCH] fix: unbreak edge; add comments --- server/src/client_api.rs | 12 +++++++-- server/src/http/broadcaster.rs | 40 +++++++++++++++++++++------- server/src/http/feature_refresher.rs | 13 ++++++--- 3 files changed, 51 insertions(+), 14 deletions(-) diff --git a/server/src/client_api.rs b/server/src/client_api.rs index 1e66dbf0..58d134db 100644 --- a/server/src/client_api.rs +++ b/server/src/client_api.rs @@ -10,9 +10,7 @@ use crate::types::{ }; use actix_web::web::{self, Data, Json, Query}; use actix_web::{get, post, HttpRequest, HttpResponse, Responder}; -use aws_sdk_s3::config::endpoint::ResolveEndpoint; use dashmap::DashMap; -use tracing_subscriber::filter; use unleash_types::client_features::{ClientFeature, ClientFeatures}; use unleash_types::client_metrics::{ClientApplication, ClientMetrics, ConnectVia}; @@ -86,6 +84,9 @@ pub async fn post_features( resolve_features(edge_token, features_cache, token_cache, filter_query, req).await } +/// this was extracted from resolve_features because it gives us the necessary +/// values to filter flags and construct client feature responses later. It's a +/// standalone function and can be moved to a different file if necessary. fn get_feature_filter( edge_token: &EdgeToken, token_cache: &Data>, @@ -119,6 +120,10 @@ fn get_feature_filter( Ok((validated_token, filter_set, query)) } +/// This is the second half of resolve_features. The idea was that you don't +/// need to do the extraction work twice. The broadcaster shold be able to do +/// something like the Some arm of the match here, except we'll know that we +/// already have the refresher. async fn resolve_features_2( query: unleash_types::client_features::Query, validated_token: EdgeToken, @@ -144,6 +149,7 @@ async fn resolve_features_2( })) } +/// This is the same as it always was, except I extracted bits of it. async fn resolve_features( edge_token: EdgeToken, features_cache: Data>, @@ -323,6 +329,7 @@ pub fn configure_experimental_post_features( #[cfg(test)] mod tests { + use crate::http::broadcaster::Broadcaster; use crate::metrics::client_metrics::{ApplicationKey, MetricsBatch, MetricsKey}; use crate::types::{TokenType, TokenValidationStatus}; use std::collections::HashMap; @@ -1046,6 +1053,7 @@ mod tests { persistence: None, strict: false, app_name: "test-app".into(), + broadcaster: Broadcaster::create(), }); let token_validator = Arc::new(TokenValidator { unleash_client: unleash_client.clone(), diff --git a/server/src/http/broadcaster.rs b/server/src/http/broadcaster.rs index 50239408..19b68d5b 100644 --- a/server/src/http/broadcaster.rs +++ b/server/src/http/broadcaster.rs @@ -1,3 +1,4 @@ +/// copied from https://github.com/actix/examples/blob/master/server-sent-events/src/broadcast.rs use std::{sync::Arc, time::Duration}; use actix_web::{rt::time::interval, web::Json}; @@ -17,7 +18,21 @@ pub struct Broadcaster { inner: Mutex, } -// #[derive(Debug)] +// this doesn't work because filter_set isn't clone. However, we can probably +// find a way around that. For instance, we can create a hash map / dash map of +// some client identifier to each filter set, so that we don't need to clone the +// filter set. + +// I'd thought at first that we could map the token to the filter set, but I +// think that might not be enough, as the filter set may also contain query +// param information, which can vary between uses of the same token. + +// It might be that the easiest way is to create an ID per client and use that. +// Then, when we drop clients, also drop their corresponding entries from the +// map. + +// #[derive(Debug, Clone)] + struct StreamClient { stream: mpsc::Sender, token: EdgeToken, @@ -75,7 +90,11 @@ impl Broadcaster { } /// Registers client with broadcaster, returning an SSE response body. - /// should take the current feature set as input and send it to the client. + /// The current impl takes the feature set as input and sends it to the client as a connected event. + /// + /// The commented-out arguments are what we'll need to store per client so + /// that we can properly filter / format the feature response when they get + /// updates later. pub async fn new_client( &self, // token: EdgeToken, @@ -94,12 +113,7 @@ impl Broadcaster { .await .unwrap(); - self.inner.lock().clients.push(StreamClient { - stream: tx, - token, - filter_set, - query, - }); + self.inner.lock().clients.push(tx); Sse::from_infallible_receiver(rx) // we're already using remove_stale_clients to clean up disconnected @@ -107,7 +121,12 @@ impl Broadcaster { // .with_keep_alive(Duration::from_secs(30)) } - /// re-~roadcasts `data` to all clients. + /// broadcasts a pre-formatted `data` event to all clients. + /// + /// The final implementation will probably not use this. Instead, it will + /// probably use each client's filters to determine the features to send. + /// We'll need to pass in either the full set of features or a way to filter + /// them. Both might work. pub async fn rebroadcast(&self, data: Event) { let clients = self.inner.lock().clients.clone(); @@ -117,7 +136,10 @@ impl Broadcaster { // disconnected clients will get swept up by `remove_stale_clients` let _ = future::join_all(send_futures).await; } + /// Broadcasts `msg` to all clients. + /// + /// This is the example implementation of the broadcast function. It's not used anywhere today. pub async fn broadcast(&self, msg: &str) { let clients = self.inner.lock().clients.clone(); diff --git a/server/src/http/feature_refresher.rs b/server/src/http/feature_refresher.rs index b15dbb85..a64712d8 100644 --- a/server/src/http/feature_refresher.rs +++ b/server/src/http/feature_refresher.rs @@ -306,6 +306,7 @@ impl FeatureRefresher { } } + /// This is where we set up a listener per token. pub async fn start_streaming_features_background_task(&self) { let refreshes = self.get_tokens_due_for_refresh(); for refresh in refreshes { @@ -347,6 +348,7 @@ impl FeatureRefresher { let broadcaster = broadcaster.clone(); async move { match sse { + // The first time we're connecting to Unleash. Just store the data. eventsource_client::SSE::Event(event) if event.event_type == "unleash-connected" => { @@ -354,10 +356,12 @@ impl FeatureRefresher { "Connected to unleash! I should populate my flag cache now.", ); + // very rough handling of client features. let features: ClientFeatures = serde_json::from_str(&event.data).unwrap(); refresher.handle_client_features_updated(TokenRefresh::new(token, None), features); } + // Unleash has updated. This is where we send data to listeners. eventsource_client::SSE::Event(event) if event.event_type == "unleash-updated" => { @@ -365,14 +369,17 @@ impl FeatureRefresher { "Got an unleash updated event. I should update my cache and notify listeners.", ); + // store the data locally let features: ClientFeatures = serde_json::from_str(&event.data).unwrap(); refresher.handle_client_features_updated(TokenRefresh::new(token, None), features); - + // send the data to the broadcaster. This should probably just send the new + // feature set OR even just a "filter flags" + // function. The broadcaster will take care + // of filtering the flags per listener. let data = Data::new(event.data).event("unleash-updated"); broadcaster.rebroadcast(actix_web_lab::sse::Event::Data(data)).await; - // self.broadcaster.broadcast("got an update".clone).await; } eventsource_client::SSE::Event(event) => { debug!( @@ -421,7 +428,7 @@ impl FeatureRefresher { } } - // this is a copy of the handling in refresh_single. + // this is a copy of the handling in refresh_single. Extracting just so we can handle the new flags in the same way without fetching them first. fn handle_client_features_updated(&self, refresh: TokenRefresh, features: ClientFeatures) { debug!("Handling client features update."); let key = cache_key(&refresh.token);