Skip to content

Commit

Permalink
fix: unbreak edge; add comments
Browse files Browse the repository at this point in the history
  • Loading branch information
thomasheartman committed Dec 5, 2024
1 parent f19be5a commit 323ff96
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 14 deletions.
12 changes: 10 additions & 2 deletions server/src/client_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ use crate::types::{
};
use actix_web::web::{self, Data, Json, Query};
use actix_web::{get, post, HttpRequest, HttpResponse, Responder};
use aws_sdk_s3::config::endpoint::ResolveEndpoint;
use dashmap::DashMap;
use tracing_subscriber::filter;
use unleash_types::client_features::{ClientFeature, ClientFeatures};
use unleash_types::client_metrics::{ClientApplication, ClientMetrics, ConnectVia};

Expand Down Expand Up @@ -86,6 +84,9 @@ pub async fn post_features(
resolve_features(edge_token, features_cache, token_cache, filter_query, req).await
}

/// this was extracted from resolve_features because it gives us the necessary
/// values to filter flags and construct client feature responses later. It's a
/// standalone function and can be moved to a different file if necessary.
fn get_feature_filter(
edge_token: &EdgeToken,
token_cache: &Data<DashMap<String, EdgeToken>>,
Expand Down Expand Up @@ -119,6 +120,10 @@ fn get_feature_filter(
Ok((validated_token, filter_set, query))
}

/// This is the second half of resolve_features. The idea was that you don't
/// need to do the extraction work twice. The broadcaster shold be able to do
/// something like the Some arm of the match here, except we'll know that we
/// already have the refresher.
async fn resolve_features_2(
query: unleash_types::client_features::Query,
validated_token: EdgeToken,
Expand All @@ -144,6 +149,7 @@ async fn resolve_features_2(
}))
}

/// This is the same as it always was, except I extracted bits of it.
async fn resolve_features(
edge_token: EdgeToken,
features_cache: Data<DashMap<String, ClientFeatures>>,
Expand Down Expand Up @@ -323,6 +329,7 @@ pub fn configure_experimental_post_features(
#[cfg(test)]
mod tests {

use crate::http::broadcaster::Broadcaster;
use crate::metrics::client_metrics::{ApplicationKey, MetricsBatch, MetricsKey};
use crate::types::{TokenType, TokenValidationStatus};
use std::collections::HashMap;
Expand Down Expand Up @@ -1046,6 +1053,7 @@ mod tests {
persistence: None,
strict: false,
app_name: "test-app".into(),
broadcaster: Broadcaster::create(),
});
let token_validator = Arc::new(TokenValidator {
unleash_client: unleash_client.clone(),
Expand Down
40 changes: 31 additions & 9 deletions server/src/http/broadcaster.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/// copied from https://github.com/actix/examples/blob/master/server-sent-events/src/broadcast.rs
use std::{sync::Arc, time::Duration};

use actix_web::{rt::time::interval, web::Json};
Expand All @@ -17,7 +18,21 @@ pub struct Broadcaster {
inner: Mutex<BroadcasterInner>,
}

// #[derive(Debug)]
// this doesn't work because filter_set isn't clone. However, we can probably
// find a way around that. For instance, we can create a hash map / dash map of
// some client identifier to each filter set, so that we don't need to clone the
// filter set.

// I'd thought at first that we could map the token to the filter set, but I
// think that might not be enough, as the filter set may also contain query
// param information, which can vary between uses of the same token.

// It might be that the easiest way is to create an ID per client and use that.
// Then, when we drop clients, also drop their corresponding entries from the
// map.

// #[derive(Debug, Clone)]

struct StreamClient {
stream: mpsc::Sender<sse::Event>,
token: EdgeToken,
Expand Down Expand Up @@ -75,7 +90,11 @@ impl Broadcaster {
}

/// Registers client with broadcaster, returning an SSE response body.
/// should take the current feature set as input and send it to the client.
/// The current impl takes the feature set as input and sends it to the client as a connected event.
///
/// The commented-out arguments are what we'll need to store per client so
/// that we can properly filter / format the feature response when they get
/// updates later.
pub async fn new_client(
&self,
// token: EdgeToken,
Expand All @@ -94,20 +113,20 @@ impl Broadcaster {
.await
.unwrap();

self.inner.lock().clients.push(StreamClient {
stream: tx,
token,
filter_set,
query,
});
self.inner.lock().clients.push(tx);

Sse::from_infallible_receiver(rx)
// we're already using remove_stale_clients to clean up disconnected
// clients and send heartbeats. we probably don't need this.
// .with_keep_alive(Duration::from_secs(30))
}

/// re-~roadcasts `data` to all clients.
/// broadcasts a pre-formatted `data` event to all clients.
///
/// The final implementation will probably not use this. Instead, it will
/// probably use each client's filters to determine the features to send.
/// We'll need to pass in either the full set of features or a way to filter
/// them. Both might work.
pub async fn rebroadcast(&self, data: Event) {
let clients = self.inner.lock().clients.clone();

Expand All @@ -117,7 +136,10 @@ impl Broadcaster {
// disconnected clients will get swept up by `remove_stale_clients`
let _ = future::join_all(send_futures).await;
}

/// Broadcasts `msg` to all clients.
///
/// This is the example implementation of the broadcast function. It's not used anywhere today.
pub async fn broadcast(&self, msg: &str) {
let clients = self.inner.lock().clients.clone();

Expand Down
13 changes: 10 additions & 3 deletions server/src/http/feature_refresher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ impl FeatureRefresher {
}
}

/// This is where we set up a listener per token.
pub async fn start_streaming_features_background_task(&self) {
let refreshes = self.get_tokens_due_for_refresh();
for refresh in refreshes {
Expand Down Expand Up @@ -347,32 +348,38 @@ impl FeatureRefresher {
let broadcaster = broadcaster.clone();
async move {
match sse {
// The first time we're connecting to Unleash. Just store the data.
eventsource_client::SSE::Event(event)
if event.event_type == "unleash-connected" =>
{
debug!(
"Connected to unleash! I should populate my flag cache now.",
);

// very rough handling of client features.
let features: ClientFeatures =
serde_json::from_str(&event.data).unwrap();
refresher.handle_client_features_updated(TokenRefresh::new(token, None), features);
}
// Unleash has updated. This is where we send data to listeners.
eventsource_client::SSE::Event(event)
if event.event_type == "unleash-updated" =>
{
debug!(
"Got an unleash updated event. I should update my cache and notify listeners.",
);

// store the data locally
let features: ClientFeatures =
serde_json::from_str(&event.data).unwrap();
refresher.handle_client_features_updated(TokenRefresh::new(token, None), features);


// send the data to the broadcaster. This should probably just send the new
// feature set OR even just a "filter flags"
// function. The broadcaster will take care
// of filtering the flags per listener.
let data = Data::new(event.data).event("unleash-updated");
broadcaster.rebroadcast(actix_web_lab::sse::Event::Data(data)).await;
// self.broadcaster.broadcast("got an update".clone).await;
}
eventsource_client::SSE::Event(event) => {
debug!(
Expand Down Expand Up @@ -421,7 +428,7 @@ impl FeatureRefresher {
}
}

// this is a copy of the handling in refresh_single.
// this is a copy of the handling in refresh_single. Extracting just so we can handle the new flags in the same way without fetching them first.
fn handle_client_features_updated(&self, refresh: TokenRefresh, features: ClientFeatures) {
debug!("Handling client features update.");
let key = cache_key(&refresh.token);
Expand Down

0 comments on commit 323ff96

Please sign in to comment.