From 118f60dd8d0053ca23e4ea68598786ffbb5d8cc4 Mon Sep 17 00:00:00 2001 From: Andrew Plaza Date: Sat, 21 Sep 2024 09:50:14 -0400 Subject: [PATCH 1/3] try to reproduce bug --- xmtp_mls/src/lib.rs | 10 ++++- xmtp_mls/src/subscriptions.rs | 84 +++++++++++++++++++++++++++++++++-- 2 files changed, 88 insertions(+), 6 deletions(-) diff --git a/xmtp_mls/src/lib.rs b/xmtp_mls/src/lib.rs index 83afcb6bb..049db2db4 100644 --- a/xmtp_mls/src/lib.rs +++ b/xmtp_mls/src/lib.rs @@ -129,10 +129,16 @@ mod tests { // Execute once before any tests are run #[ctor::ctor] // Capture traces in a variable that can be checked in tests, as well as outputting them to stdout on test failure - #[traced_test] + // #[traced_test] fn setup() { + use tracing_subscriber::{fmt, prelude::*, EnvFilter}; + + tracing_subscriber::registry() + .with(fmt::layer()) + .with(EnvFilter::from_default_env()) + .init(); // Capture logs (e.g. log::info!()) as traces too - let _ = tracing_log::LogTracer::init_with_filter(LevelFilter::Debug); + // let _ = tracing_log::LogTracer::init_with_filter(LevelFilter::Debug); } /// Note: tests that use this must have the #[traced_test] attribute diff --git a/xmtp_mls/src/subscriptions.rs b/xmtp_mls/src/subscriptions.rs index 2bb6f8cc4..ef9005689 100644 --- a/xmtp_mls/src/subscriptions.rs +++ b/xmtp_mls/src/subscriptions.rs @@ -392,8 +392,9 @@ mod tests { }; use futures::StreamExt; use parking_lot::Mutex; + use std::cmp::Ordering; use std::sync::{ - atomic::{AtomicU64, Ordering}, + atomic::{self, AtomicU64}, Arc, }; use xmtp_cryptography::utils::generate_local_wallet; @@ -673,7 +674,7 @@ mod tests { let mut handle = Client::::stream_all_messages_with_callback(caro.clone(), move |message| { (*messages_clone.lock()).push(message); - blocked_pointer.fetch_sub(1, Ordering::SeqCst); + blocked_pointer.fetch_sub(1, atomic::Ordering::SeqCst); }); handle.wait_for_ready().await; @@ -704,13 +705,13 @@ mod tests { } let _ = tokio::time::timeout(std::time::Duration::from_secs(60), async { - while blocked.load(Ordering::SeqCst) > 0 { + while blocked.load(atomic::Ordering::SeqCst) > 0 { tokio::task::yield_now().await; } }) .await; - let missed_messages = blocked.load(Ordering::SeqCst); + let missed_messages = blocked.load(atomic::Ordering::SeqCst); if missed_messages > 0 { println!("Missed {} Messages", missed_messages); panic!("Test failed due to missed messages"); @@ -763,4 +764,79 @@ mod tests { closer.handle.abort(); } + + #[tokio::test(flavor = "multi_thread")] + async fn test_conversation_streaming_with_message_streaming() { + let alix = Arc::new(ClientBuilder::new_test_client(&generate_local_wallet()).await); + let caro = Arc::new(ClientBuilder::new_test_client(&generate_local_wallet()).await); + + log::info!("Starting"); + let alix_group = alix + .create_group(None, GroupMetadataOptions::default()) + .unwrap(); + + alix_group + .add_members_by_inbox_id(&alix, vec![caro.inbox_id()]) + .await + .unwrap(); + + let mut handle = Client::::stream_all_messages_with_callback( + caro.clone(), + move |_message| {}, + ); + handle.wait_for_ready().await; + + let mut handle2 = Client::::stream_all_messages_with_callback( + caro.clone(), + move |_message| {}, + ); + + let mut handle3 = Client::::stream_all_messages_with_callback( + caro.clone(), + move |_message| {}, + ); + futures::future::join_all(vec![ + handle.wait_for_ready(), + handle2.wait_for_ready(), + handle3.wait_for_ready(), + ]) + .await; + + let alix_group_pointer = alix_group.clone(); + let alix_pointer = alix.clone(); + tokio::spawn(async move { + for _ in 0..100 { + let _ = alix_group_pointer + .send_message(b"spam", &alix_pointer) + .await; + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + } + }); + + let conversation_amount = Arc::new(AtomicU64::new(10)); + let amt = conversation_amount.clone(); + let _closer = + Client::::stream_conversations_with_callback(caro.clone(), move |_g| { + amt.fetch_sub(1, atomic::Ordering::SeqCst); + }); + + for _ in 0..10 { + let alix_group = alix + .create_group(None, GroupMetadataOptions::default()) + .unwrap(); + alix_group + .add_members_by_inbox_id(&alix, vec![caro.inbox_id()]) + .await + .unwrap(); + } + + let _ = tokio::time::timeout(std::time::Duration::from_secs(30), async { + while conversation_amount.load(atomic::Ordering::SeqCst) > 0 { + tokio::task::yield_now().await; + } + }) + .await; + + assert_eq!(conversation_amount.load(atomic::Ordering::SeqCst), 0); + } } From c1a069ae5f2f325db4604c597b7e91c92e56ed9c Mon Sep 17 00:00:00 2001 From: Andrew Plaza Date: Sat, 21 Sep 2024 09:55:05 -0400 Subject: [PATCH 2/3] dont need to spam messages --- xmtp_mls/src/subscriptions.rs | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/xmtp_mls/src/subscriptions.rs b/xmtp_mls/src/subscriptions.rs index ef9005689..c569a1d43 100644 --- a/xmtp_mls/src/subscriptions.rs +++ b/xmtp_mls/src/subscriptions.rs @@ -802,17 +802,6 @@ mod tests { ]) .await; - let alix_group_pointer = alix_group.clone(); - let alix_pointer = alix.clone(); - tokio::spawn(async move { - for _ in 0..100 { - let _ = alix_group_pointer - .send_message(b"spam", &alix_pointer) - .await; - tokio::time::sleep(std::time::Duration::from_millis(10)).await; - } - }); - let conversation_amount = Arc::new(AtomicU64::new(10)); let amt = conversation_amount.clone(); let _closer = From 22802c615b9b4f41e6da5a0cc0c0ecf1abe6770d Mon Sep 17 00:00:00 2001 From: Andrew Plaza Date: Sat, 21 Sep 2024 09:58:44 -0400 Subject: [PATCH 3/3] 100 groups --- xmtp_mls/src/subscriptions.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/xmtp_mls/src/subscriptions.rs b/xmtp_mls/src/subscriptions.rs index c569a1d43..d2165afdc 100644 --- a/xmtp_mls/src/subscriptions.rs +++ b/xmtp_mls/src/subscriptions.rs @@ -802,14 +802,14 @@ mod tests { ]) .await; - let conversation_amount = Arc::new(AtomicU64::new(10)); + let conversation_amount = Arc::new(AtomicU64::new(100)); let amt = conversation_amount.clone(); let _closer = Client::::stream_conversations_with_callback(caro.clone(), move |_g| { amt.fetch_sub(1, atomic::Ordering::SeqCst); }); - for _ in 0..10 { + for _ in 0..100 { let alix_group = alix .create_group(None, GroupMetadataOptions::default()) .unwrap();