Skip to content

Commit

Permalink
extremely cursed immpl of working filters
Browse files Browse the repository at this point in the history
  • Loading branch information
thomasheartman committed Dec 5, 2024
1 parent 323ff96 commit ae70a59
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 26 deletions.
15 changes: 12 additions & 3 deletions server/src/client_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,23 @@ pub async fn stream_features(
let (validated_token, filter_set, query) =
get_feature_filter(&edge_token, &token_cache, filter_query.clone())?;
let features = resolve_features_2(
query,
query.clone(),
validated_token.clone(),
filter_set,
features_cache,
req.clone(),
)
.await;
match (req.app_data::<Data<FeatureRefresher>>(), features) {
(Some(refresher), Ok(features)) => Ok(refresher.broadcaster.new_client(features).await),
(Some(refresher), Ok(features)) => Ok(refresher
.broadcaster
.new_client(
validated_token,
filter_query.clone(),
query.clone(),
features,
)
.await),
_ => todo!(),
}
}
Expand Down Expand Up @@ -330,6 +338,7 @@ pub fn configure_experimental_post_features(
mod tests {

use crate::http::broadcaster::Broadcaster;
use crate::internal_backstage::features;
use crate::metrics::client_metrics::{ApplicationKey, MetricsBatch, MetricsKey};
use crate::types::{TokenType, TokenValidationStatus};
use std::collections::HashMap;
Expand Down Expand Up @@ -1053,7 +1062,7 @@ mod tests {
persistence: None,
strict: false,
app_name: "test-app".into(),
broadcaster: Broadcaster::create(),
broadcaster: Broadcaster::new(features_cache.clone()),
});
let token_validator = Arc::new(TokenValidator {
unleash_client: unleash_client.clone(),
Expand Down
102 changes: 84 additions & 18 deletions server/src/http/broadcaster.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,30 @@
/// copied from https://github.com/actix/examples/blob/master/server-sent-events/src/broadcast.rs
use std::{sync::Arc, time::Duration};
use std::{collections::HashMap, sync::Arc, time::Duration};

use actix_web::{rt::time::interval, web::Json};
use actix_web::{
rt::time::interval,
web::{Json, Query},
};
use actix_web_lab::{
sse::{self, Event, Sse},
util::InfallibleStream,
};
use dashmap::DashMap;
use futures_util::future;
use parking_lot::Mutex;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use unleash_types::client_features::{ClientFeatures, Query};
use unleash_types::client_features::ClientFeatures;

use crate::{filters::FeatureFilterSet, types::EdgeToken};
use crate::{
filters::{filter_client_features, name_prefix_filter, project_filter, FeatureFilterSet},
tokens::cache_key,
types::{EdgeResult, EdgeToken, FeatureFilters},
};

pub struct Broadcaster {
inner: Mutex<BroadcasterInner>,
features_cache: Arc<DashMap<String, ClientFeatures>>,
}

// this doesn't work because filter_set isn't clone. However, we can probably
Expand All @@ -31,25 +40,37 @@ pub struct Broadcaster {
// Then, when we drop clients, also drop their corresponding entries from the
// map.

// #[derive(Debug, Clone)]
#[derive(Debug, Clone)]

struct StreamClient {
stream: mpsc::Sender<sse::Event>,
id: String,
}

struct QueryStuff {
token: EdgeToken,
filter_set: FeatureFilterSet,
query: Query,
filter_set: Query<FeatureFilters>,
query: unleash_types::client_features::Query,
}

impl std::fmt::Debug for QueryStuff {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "QueryStuff")
}
}

#[derive(Debug, Default)]
struct BroadcasterInner {
clients: Vec<mpsc::Sender<sse::Event>>,
clients: Vec<StreamClient>,
filters: HashMap<String, QueryStuff>,
}

impl Broadcaster {
/// Constructs new broadcaster and spawns ping loop.
pub fn create() -> Arc<Self> {
pub fn new(features: Arc<DashMap<String, ClientFeatures>>) -> Arc<Self> {
let this = Arc::new(Broadcaster {
inner: Mutex::new(BroadcasterInner::default()),
features_cache: features,
});

Broadcaster::spawn_ping(Arc::clone(&this));
Expand Down Expand Up @@ -78,6 +99,7 @@ impl Broadcaster {

for client in clients {
if client
.stream
.send(sse::Event::Comment("keep-alive".into()))
.await
.is_ok()
Expand All @@ -97,13 +119,25 @@ impl Broadcaster {
/// updates later.
pub async fn new_client(
&self,
// token: EdgeToken,
// filter_set: FeatureFilterSet,
// query: Query,
token: EdgeToken,
filter_set: Query<FeatureFilters>,
query: unleash_types::client_features::Query,
features: Json<ClientFeatures>,
) -> Sse<InfallibleStream<ReceiverStream<sse::Event>>> {
let (tx, rx) = mpsc::channel(10);

let token_string = token.token.clone();
let query_stuff = QueryStuff {
token,
filter_set,
query,
};

self.inner
.lock()
.filters
.insert(token_string.clone(), query_stuff);

tx.send(
sse::Data::new_json(features)
.unwrap()
Expand All @@ -113,7 +147,10 @@ impl Broadcaster {
.await
.unwrap();

self.inner.lock().clients.push(tx);
self.inner.lock().clients.push(StreamClient {
stream: tx,
id: token_string,
});

Sse::from_infallible_receiver(rx)
// we're already using remove_stale_clients to clean up disconnected
Expand All @@ -130,22 +167,51 @@ impl Broadcaster {
pub async fn rebroadcast(&self, data: Event) {
let clients = self.inner.lock().clients.clone();

let send_futures = clients.iter().map(|client| client.send(data.clone()));
let send_futures = clients
.iter()
.map(|client| client.stream.send(data.clone()));

// try to send to all clients, ignoring failures
// disconnected clients will get swept up by `remove_stale_clients`
let _ = future::join_all(send_futures).await;
}

fn get_query_filters(
filter_query: Query<FeatureFilters>,
token: EdgeToken,
) -> FeatureFilterSet {
let query_filters = filter_query.into_inner();

let filter_set = if let Some(name_prefix) = query_filters.name_prefix {
FeatureFilterSet::from(Box::new(name_prefix_filter(name_prefix)))
} else {
FeatureFilterSet::default()
}
.with_filter(project_filter(&token));
filter_set
}

/// Broadcasts `msg` to all clients.
///
/// This is the example implementation of the broadcast function. It's not used anywhere today.
pub async fn broadcast(&self, msg: &str) {
pub async fn broadcast(&self) {
let clients = self.inner.lock().clients.clone();

let send_futures = clients
.iter()
.map(|client| client.send(sse::Data::new(msg).into()));
let send_futures = clients.iter().map(|client| {
let binding = self.inner.lock();
let query_stuff = binding.filters.get(&client.id).unwrap();
let filter_set = Broadcaster::get_query_filters(
query_stuff.filter_set.clone(),
query_stuff.token.clone(),
);
let features = self
.features_cache
.get(&cache_key(&query_stuff.token))
.map(|client_features| filter_client_features(&client_features, &filter_set));
// let features = get_features_for_filter(query_stuff.token.clone(), &filter_set).unwrap();
let event = sse::Data::new_json(&features).unwrap().into();
client.stream.send(event)
});

// try to send to all clients, ignoring failures
// disconnected clients will get swept up by `remove_stale_clients`
Expand Down
11 changes: 6 additions & 5 deletions server/src/http/feature_refresher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ impl Default for FeatureRefresher {
persistence: None,
strict: true,
app_name: "unleash_edge".into(),
broadcaster: Broadcaster::create(),
broadcaster: Broadcaster::new(Default::default()),
}
}
}
Expand Down Expand Up @@ -160,13 +160,13 @@ impl FeatureRefresher {
FeatureRefresher {
unleash_client,
tokens_to_refresh: Arc::new(DashMap::default()),
features_cache: features,
features_cache: features.clone(),
engine_cache: engines,
refresh_interval: features_refresh_interval,
persistence,
strict,
app_name: app_name.into(),
broadcaster: Broadcaster::create(),
broadcaster: Broadcaster::new(features.clone()),
}
}

Expand Down Expand Up @@ -378,8 +378,9 @@ impl FeatureRefresher {
// feature set OR even just a "filter flags"
// function. The broadcaster will take care
// of filtering the flags per listener.
let data = Data::new(event.data).event("unleash-updated");
broadcaster.rebroadcast(actix_web_lab::sse::Event::Data(data)).await;
// let data = Data::new(event.data).event("unleash-updated");
// broadcaster.rebroadcast(actix_web_lab::sse::Event::Data(data)).await;
broadcaster.broadcast().await;
}
eventsource_client::SSE::Event(event) => {
debug!(
Expand Down

0 comments on commit ae70a59

Please sign in to comment.