Skip to content

Commit

Permalink
stopgap
Browse files Browse the repository at this point in the history
  • Loading branch information
thomasheartman committed Dec 11, 2024
1 parent 7c03c5a commit 36d6efd
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 33 deletions.
20 changes: 3 additions & 17 deletions server/src/client_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,31 +43,17 @@ pub async fn get_features(
#[get("/streaming")]
pub async fn stream_features(
edge_token: EdgeToken,
features_cache: Data<DashMap<String, ClientFeatures>>,
token_cache: Data<DashMap<String, EdgeToken>>,
filter_query: Query<FeatureFilters>,
req: HttpRequest,
) -> EdgeResult<impl Responder> {
let (validated_token, filter_set, query) =
get_feature_filter(&edge_token, &token_cache, filter_query.clone())?;
let features = resolve_features_2(
query.clone(),
validated_token.clone(),
filter_set,
features_cache,
req.clone(),
)
.await;
match (req.app_data::<Data<FeatureRefresher>>(), features) {
(Some(refresher), Ok(features)) => {
match req.app_data::<Data<FeatureRefresher>>() {
Some(refresher) => {
refresher
.broadcaster
.connect(
validated_token,
filter_query.clone(),
query.clone(),
features,
)
.connect(validated_token, filter_query, query)
.await
}
_ => Err(EdgeError::ClientCacheError),
Expand Down
58 changes: 42 additions & 16 deletions server/src/http/broadcaster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use unleash_types::client_features::{ClientFeatures, Query as FlagQuery};
use crate::{
filters::{filter_client_features, name_prefix_filter, project_filter, FeatureFilterSet},
tokens::cache_key,
types::{EdgeResult, EdgeToken, FeatureFilters},
types::{EdgeJsonResult, EdgeResult, EdgeToken, FeatureFilters},
};

#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
Expand Down Expand Up @@ -108,14 +108,22 @@ impl Broadcaster {
token: EdgeToken,
filter_set: Query<FeatureFilters>,
query: unleash_types::client_features::Query,
features: Json<ClientFeatures>,
) -> EdgeResult<Sse<InfallibleStream<ReceiverStream<sse::Event>>>> {
let (tx, rx) = mpsc::channel(10);

let features = &self
.resolve_features(&token, filter_set.clone(), query.clone())
.await?;

tx.send(
sse::Data::new_json(features)?
.event("unleash-connected")
.into(),
)
.await;

self.active_connections
.entry(QueryWrapper {
query: query.clone(),
})
.entry(QueryWrapper { query })
.and_modify(|group| {
group.clients.push(tx.clone());
})
Expand All @@ -125,19 +133,12 @@ impl Broadcaster {
token,
});

tx.send(
sse::Data::new_json(features)?
.event("unleash-connected")
.into(),
)
.await?;

Ok(Sse::from_infallible_receiver(rx))
}

fn get_query_filters(
filter_query: Query<FeatureFilters>,
token: EdgeToken,
token: &EdgeToken,
) -> FeatureFilterSet {
let query_filters = filter_query.into_inner();

Expand All @@ -146,17 +147,42 @@ impl Broadcaster {
} else {
FeatureFilterSet::default()
}
.with_filter(project_filter(&token));
.with_filter(project_filter(token));
filter_set
}

async fn resolve_features(
&self,
validated_token: &EdgeToken,
filter_set: Query<FeatureFilters>,
query: FlagQuery,
) -> EdgeJsonResult<ClientFeatures> {
let filter_set = Broadcaster::get_query_filters(filter_set.clone(), validated_token);

let features = self
.features_cache
.get(&cache_key(validated_token))
.map(|client_features| filter_client_features(&client_features, &filter_set));

match features {
Some(features) => Ok(Json(ClientFeatures {
query: Some(query),
..features
})),
// Note: this is a simplification for now, using the following assumptions:
// 1. We'll only allow streaming in strict mode
// 2. We'll check whether the token is subsumed *before* trying to add it to the broadcaster
// If both of these are true, then we should never hit this case (if Thomas's understanding is correct).
None => todo!(),
}
}

/// Broadcast new features to all clients.
pub async fn broadcast(&self) {
let mut client_events = Vec::new();
for entry in self.active_connections.iter() {
let (_query, group) = entry.pair();
let filter_set =
Broadcaster::get_query_filters(group.filter_set.clone(), group.token.clone());
let filter_set = Broadcaster::get_query_filters(group.filter_set.clone(), &group.token);
let features = self
.features_cache
.get(&cache_key(&group.token))
Expand Down

0 comments on commit 36d6efd

Please sign in to comment.