Skip to content

Commit

Permalink
broken: don't use this
Browse files Browse the repository at this point in the history
  • Loading branch information
thomasheartman committed Dec 5, 2024
1 parent 2abc296 commit f19be5a
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 15 deletions.
67 changes: 55 additions & 12 deletions server/src/client_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use actix_web::web::{self, Data, Json, Query};
use actix_web::{get, post, HttpRequest, HttpResponse, Responder};
use aws_sdk_s3::config::endpoint::ResolveEndpoint;
use dashmap::DashMap;
use tracing_subscriber::filter;
use unleash_types::client_features::{ClientFeature, ClientFeatures};
use unleash_types::client_metrics::{ClientApplication, ClientMetrics, ConnectVia};

Expand Down Expand Up @@ -45,17 +46,19 @@ pub async fn stream_features(
token_cache: Data<DashMap<String, EdgeToken>>,
filter_query: Query<FeatureFilters>,
req: HttpRequest,
) -> impl Responder {
let features = resolve_features(
edge_token,
) -> EdgeResult<impl Responder> {
let (validated_token, filter_set, query) =
get_feature_filter(&edge_token, &token_cache, filter_query.clone())?;
let features = resolve_features_2(
query,
validated_token.clone(),
filter_set,
features_cache,
token_cache,
filter_query,
req.clone(),
)
.await;
match (req.app_data::<Data<FeatureRefresher>>(), features) {
(Some(refresher), Ok(features)) => refresher.broadcaster.new_client(features).await,
(Some(refresher), Ok(features)) => Ok(refresher.broadcaster.new_client(features).await),
_ => todo!(),
}
}
Expand Down Expand Up @@ -83,13 +86,15 @@ pub async fn post_features(
resolve_features(edge_token, features_cache, token_cache, filter_query, req).await
}

async fn resolve_features(
edge_token: EdgeToken,
features_cache: Data<DashMap<String, ClientFeatures>>,
token_cache: Data<DashMap<String, EdgeToken>>,
fn get_feature_filter(
edge_token: &EdgeToken,
token_cache: &Data<DashMap<String, EdgeToken>>,
filter_query: Query<FeatureFilters>,
req: HttpRequest,
) -> EdgeJsonResult<ClientFeatures> {
) -> EdgeResult<(
EdgeToken,
FeatureFilterSet,
unleash_types::client_features::Query,
)> {
let validated_token = token_cache
.get(&edge_token.token)
.map(|e| e.value().clone())
Expand All @@ -111,6 +116,44 @@ async fn resolve_features(
}
.with_filter(project_filter(&validated_token));

Ok((validated_token, filter_set, query))
}

async fn resolve_features_2(
query: unleash_types::client_features::Query,
validated_token: EdgeToken,
filter_set: FeatureFilterSet,
features_cache: Data<DashMap<String, ClientFeatures>>,
req: HttpRequest,
) -> EdgeJsonResult<ClientFeatures> {
let client_features = match req.app_data::<Data<FeatureRefresher>>() {
Some(refresher) => {
refresher
.features_for_filter(validated_token.clone(), &filter_set)
.await
}
None => features_cache
.get(&cache_key(&validated_token))
.map(|client_features| filter_client_features(&client_features, &filter_set))
.ok_or(EdgeError::ClientCacheError),
}?;

Ok(Json(ClientFeatures {
query: Some(query),
..client_features
}))
}

async fn resolve_features(
edge_token: EdgeToken,
features_cache: Data<DashMap<String, ClientFeatures>>,
token_cache: Data<DashMap<String, EdgeToken>>,
filter_query: Query<FeatureFilters>,
req: HttpRequest,
) -> EdgeJsonResult<ClientFeatures> {
let (validated_token, filter_set, query) =
get_feature_filter(&edge_token, &token_cache, filter_query.clone())?;

let client_features = match req.app_data::<Data<FeatureRefresher>>() {
Some(refresher) => {
refresher
Expand Down
24 changes: 21 additions & 3 deletions server/src/http/broadcaster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,23 @@ use futures_util::future;
use parking_lot::Mutex;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use unleash_types::client_features::ClientFeatures;
use unleash_types::client_features::{ClientFeatures, Query};

use crate::{filters::FeatureFilterSet, types::EdgeToken};

pub struct Broadcaster {
inner: Mutex<BroadcasterInner>,
}

#[derive(Debug, Clone, Default)]
// #[derive(Debug)]
struct StreamClient {
stream: mpsc::Sender<sse::Event>,
token: EdgeToken,
filter_set: FeatureFilterSet,
query: Query,
}

#[derive(Debug, Default)]
struct BroadcasterInner {
clients: Vec<mpsc::Sender<sse::Event>>,
}
Expand Down Expand Up @@ -68,6 +78,9 @@ impl Broadcaster {
/// should take the current feature set as input and send it to the client.
pub async fn new_client(
&self,
// token: EdgeToken,
// filter_set: FeatureFilterSet,
// query: Query,
features: Json<ClientFeatures>,
) -> Sse<InfallibleStream<ReceiverStream<sse::Event>>> {
let (tx, rx) = mpsc::channel(10);
Expand All @@ -81,7 +94,12 @@ impl Broadcaster {
.await
.unwrap();

self.inner.lock().clients.push(tx);
self.inner.lock().clients.push(StreamClient {
stream: tx,
token,
filter_set,
query,
});

Sse::from_infallible_receiver(rx)
// we're already using remove_stale_clients to clean up disconnected
Expand Down
2 changes: 2 additions & 0 deletions server/src/http/feature_refresher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ impl FeatureRefresher {
serde_json::from_str(&event.data).unwrap();
refresher.handle_client_features_updated(TokenRefresh::new(token, None), features);


let data = Data::new(event.data).event("unleash-updated");
broadcaster.rebroadcast(actix_web_lab::sse::Event::Data(data)).await;
// self.broadcaster.broadcast("got an update".clone).await;
Expand Down Expand Up @@ -420,6 +421,7 @@ impl FeatureRefresher {
}
}

// this is a copy of the handling in refresh_single.
fn handle_client_features_updated(&self, refresh: TokenRefresh, features: ClientFeatures) {
debug!("Handling client features update.");
let key = cache_key(&refresh.token);
Expand Down

0 comments on commit f19be5a

Please sign in to comment.