diff --git a/server/tests/streaming_test.rs b/server/tests/streaming_test.rs index d96a70c3..9ad8c3cc 100644 --- a/server/tests/streaming_test.rs +++ b/server/tests/streaming_test.rs @@ -49,6 +49,9 @@ async fn test_streaming() { let edge = edge_server(&unleash_server.url("/"), upstream_known_token.clone()).await; + // Allow edge to establish a connection with upstream and populate the cache + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + let es_client = eventsource_client::ClientBuilder::for_url(&edge.url("/api/client/streaming")) .unwrap() .header("Authorization", &upstream_known_token.token) @@ -69,20 +72,29 @@ async fn test_streaming() { }; let mut stream = es_client.stream(); - while let Some(Ok(event)) = stream.next().await { - match event { - eventsource_client::SSE::Event(event) if event.event_type == "unleash-connected" => { - assert_eq!( - serde_json::from_str::(&event.data).unwrap(), - initial_features - ); - break; - } - _ => { - // ignore other events + + tokio::time::timeout(std::time::Duration::from_secs(2), async { + loop { + if let Some(Ok(event)) = stream.next().await { + match event { + eventsource_client::SSE::Event(event) + if event.event_type == "unleash-connected" => + { + assert_eq!( + serde_json::from_str::(&event.data).unwrap(), + initial_features + ); + break; + } + _ => { + // ignore other events + } + } } } - } + }) + .await + .expect("Test timed out waiting for connected event"); unleash_features_cache.insert( cache_key(&upstream_known_token), @@ -90,19 +102,26 @@ async fn test_streaming() { ); unleash_broadcaster.broadcast().await; - tokio::time::timeout(std::time::Duration::from_secs(1), async { - while let Some(Ok(event)) = stream.next().await { - match event { - eventsource_client::SSE::Event(event) if event.event_type == "unleash-updated" => { - let update = serde_json::from_str::(&event.data).unwrap(); - assert_eq!(initial_features.query, update.query); - assert_eq!(initial_features.version, update.version); - assert!(initial_features.features != update.features); - break; - } - _ => { - // ignore other events - } + tokio::time::timeout(std::time::Duration::from_secs(2), async { + loop { + match stream.next().await { + Some(Ok(event)) => match event { + eventsource_client::SSE::Event(event) + if event.event_type == "unleash-updated" => + { + let update = serde_json::from_str::(&event.data).unwrap(); + assert_eq!(initial_features.query, update.query); + assert_eq!(initial_features.version, update.version); + assert_ne!(initial_features.features, update.features); + break; + } + _ => { + // panic!("Unexpected event: {:?}", event); + // ignore other events + } + }, + Some(Err(e)) => println!("Got an error, ignoring, {e:?}"), + None => println!("Stream ended"), } } })