Skip to content

Commit

Permalink
fix teeests?
Browse files Browse the repository at this point in the history
  • Loading branch information
thomasheartman committed Dec 16, 2024
1 parent fd63dd5 commit 5d4d6ef
Showing 1 changed file with 44 additions and 25 deletions.
69 changes: 44 additions & 25 deletions server/tests/streaming_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ async fn test_streaming() {

let edge = edge_server(&unleash_server.url("/"), upstream_known_token.clone()).await;

// Allow edge to establish a connection with upstream and populate the cache
tokio::time::sleep(std::time::Duration::from_secs(1)).await;

let es_client = eventsource_client::ClientBuilder::for_url(&edge.url("/api/client/streaming"))
.unwrap()
.header("Authorization", &upstream_known_token.token)
Expand All @@ -69,40 +72,56 @@ async fn test_streaming() {
};

let mut stream = es_client.stream();
while let Some(Ok(event)) = stream.next().await {
match event {
eventsource_client::SSE::Event(event) if event.event_type == "unleash-connected" => {
assert_eq!(
serde_json::from_str::<ClientFeatures>(&event.data).unwrap(),
initial_features
);
break;
}
_ => {
// ignore other events

tokio::time::timeout(std::time::Duration::from_secs(2), async {
loop {
if let Some(Ok(event)) = stream.next().await {
match event {
eventsource_client::SSE::Event(event)
if event.event_type == "unleash-connected" =>
{
assert_eq!(
serde_json::from_str::<ClientFeatures>(&event.data).unwrap(),
initial_features
);
break;
}
_ => {
// ignore other events
}
}
}
}
}
})
.await
.expect("Test timed out waiting for connected event");

unleash_features_cache.insert(
cache_key(&upstream_known_token),
features_from_disk("../examples/hostedexample.json"),
);
unleash_broadcaster.broadcast().await;

tokio::time::timeout(std::time::Duration::from_secs(1), async {
while let Some(Ok(event)) = stream.next().await {
match event {
eventsource_client::SSE::Event(event) if event.event_type == "unleash-updated" => {
let update = serde_json::from_str::<ClientFeatures>(&event.data).unwrap();
assert_eq!(initial_features.query, update.query);
assert_eq!(initial_features.version, update.version);
assert!(initial_features.features != update.features);
break;
}
_ => {
// ignore other events
}
tokio::time::timeout(std::time::Duration::from_secs(2), async {
loop {
match stream.next().await {
Some(Ok(event)) => match event {
eventsource_client::SSE::Event(event)
if event.event_type == "unleash-updated" =>
{
let update = serde_json::from_str::<ClientFeatures>(&event.data).unwrap();
assert_eq!(initial_features.query, update.query);
assert_eq!(initial_features.version, update.version);
assert_ne!(initial_features.features, update.features);
break;
}
_ => {
// panic!("Unexpected event: {:?}", event);
// ignore other events
}
},
Some(Err(e)) => println!("Got an error, ignoring, {e:?}"),
None => println!("Stream ended"),
}
}
})
Expand Down

0 comments on commit 5d4d6ef

Please sign in to comment.