From 29b2b16e19304de03eea17e9db267b40b049b3c8 Mon Sep 17 00:00:00 2001 From: Naomi Plasterer Date: Thu, 5 Dec 2024 15:02:52 -0800 Subject: [PATCH] add the ability to sync based on consent state (#1385) --- bindings_ffi/src/mls.rs | 49 +++++++++--- bindings_node/src/conversations.rs | 2 +- bindings_wasm/src/conversations.rs | 2 +- xmtp_mls/src/client.rs | 119 ++++++++++++++++++++++++++++- 4 files changed, 159 insertions(+), 13 deletions(-) diff --git a/bindings_ffi/src/mls.rs b/bindings_ffi/src/mls.rs index 6c58ee573..4861deb56 100644 --- a/bindings_ffi/src/mls.rs +++ b/bindings_ffi/src/mls.rs @@ -893,11 +893,16 @@ impl FfiConversations { Ok(sync_group.into()) } - pub async fn sync_all_conversations(&self) -> Result { + pub async fn sync_all_conversations( + &self, + consent_state: Option, + ) -> Result { let inner = self.inner_client.as_ref(); let conn = inner.store().conn()?; - let num_groups_synced: usize = inner.sync_all_welcomes_and_groups(&conn).await?; + let consent: Option = consent_state.map(|state| state.into()); + + let num_groups_synced: usize = inner.sync_all_welcomes_and_groups(&conn, consent).await?; // Convert usize to u32 for compatibility with Uniffi let num_groups_synced: u32 = num_groups_synced @@ -2524,7 +2529,10 @@ mod tests { .unwrap(); } - bo.conversations().sync_all_conversations().await.unwrap(); + bo.conversations() + .sync_all_conversations(None) + .await + .unwrap(); let alix_groups = alix .conversations() .list(FfiListConversationsOptions::default()) @@ -2548,7 +2556,10 @@ mod tests { assert_eq!(bo_messages1.len(), 0); assert_eq!(bo_messages5.len(), 0); - bo.conversations().sync_all_conversations().await.unwrap(); + bo.conversations() + .sync_all_conversations(None) + .await + .unwrap(); let bo_messages1 = bo_group1 .find_messages(FfiListMessagesOptions::default()) @@ -2576,7 +2587,11 @@ mod tests { .unwrap(); } bo.conversations().sync().await.unwrap(); - let num_groups_synced_1: u32 = bo.conversations().sync_all_conversations().await.unwrap(); + let num_groups_synced_1: u32 = bo + .conversations() + .sync_all_conversations(None) + .await + .unwrap(); assert_eq!(num_groups_synced_1, 30); // Remove bo from all groups and sync @@ -2593,11 +2608,19 @@ mod tests { } // First sync after removal needs to process all groups and set them to inactive - let num_groups_synced_2: u32 = bo.conversations().sync_all_conversations().await.unwrap(); + let num_groups_synced_2: u32 = bo + .conversations() + .sync_all_conversations(None) + .await + .unwrap(); assert_eq!(num_groups_synced_2, 30); // Second sync after removal will not process inactive groups - let num_groups_synced_3: u32 = bo.conversations().sync_all_conversations().await.unwrap(); + let num_groups_synced_3: u32 = bo + .conversations() + .sync_all_conversations(None) + .await + .unwrap(); assert_eq!(num_groups_synced_3, 0); } @@ -3837,9 +3860,15 @@ mod tests { .create_dm(bola.account_address.clone()) .await .unwrap(); - let alix_num_sync = alix_conversations.sync_all_conversations().await.unwrap(); + let alix_num_sync = alix_conversations + .sync_all_conversations(None) + .await + .unwrap(); bola_conversations.sync().await.unwrap(); - let bola_num_sync = bola_conversations.sync_all_conversations().await.unwrap(); + let bola_num_sync = bola_conversations + .sync_all_conversations(None) + .await + .unwrap(); assert_eq!(alix_num_sync, 1); assert_eq!(bola_num_sync, 1); @@ -4121,7 +4150,7 @@ mod tests { // update the sync group's messages to pipe them into the events alix_b .conversations() - .sync_all_conversations() + .sync_all_conversations(None) .await .unwrap(); diff --git a/bindings_node/src/conversations.rs b/bindings_node/src/conversations.rs index fb35a4d83..2ee01f5c6 100644 --- a/bindings_node/src/conversations.rs +++ b/bindings_node/src/conversations.rs @@ -258,7 +258,7 @@ impl Conversations { let num_groups_synced = self .inner_client - .sync_all_welcomes_and_groups(&conn) + .sync_all_welcomes_and_groups(&conn, None) .await .map_err(ErrorWrapper::from)?; diff --git a/bindings_wasm/src/conversations.rs b/bindings_wasm/src/conversations.rs index c33cd901d..790868d29 100644 --- a/bindings_wasm/src/conversations.rs +++ b/bindings_wasm/src/conversations.rs @@ -293,7 +293,7 @@ impl Conversations { let num_groups_synced = self .inner_client - .sync_all_welcomes_and_groups(&conn) + .sync_all_welcomes_and_groups(&conn, None) .await .map_err(|e| JsError::new(format!("{}", e).as_str()))?; diff --git a/xmtp_mls/src/client.rs b/xmtp_mls/src/client.rs index eb6a86ab1..75a1f9f21 100644 --- a/xmtp_mls/src/client.rs +++ b/xmtp_mls/src/client.rs @@ -877,9 +877,17 @@ where pub async fn sync_all_welcomes_and_groups( &self, conn: &DbConnection, + consent_state: Option, ) -> Result { self.sync_welcomes(conn).await?; - let groups = self.find_groups(GroupQueryArgs::default().include_sync_groups())?; + + let query_args = GroupQueryArgs { + consent_state, + include_sync_groups: true, + ..GroupQueryArgs::default() + }; + + let groups = self.find_groups(query_args)?; let active_groups_count = self.sync_all_groups(groups).await?; Ok(active_groups_count) @@ -1185,6 +1193,115 @@ pub(crate) mod tests { assert_eq!(bo_messages2.len(), 1); } + #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)] + #[cfg_attr( + not(target_arch = "wasm32"), + tokio::test(flavor = "multi_thread", worker_threads = 2) + )] + async fn test_sync_all_groups_and_welcomes() { + let alix = ClientBuilder::new_test_client(&generate_local_wallet()).await; + let bo = ClientBuilder::new_test_client(&generate_local_wallet()).await; + + // Create two groups and add Bob + let alix_bo_group1 = alix + .create_group(None, GroupMetadataOptions::default()) + .unwrap(); + let alix_bo_group2 = alix + .create_group(None, GroupMetadataOptions::default()) + .unwrap(); + + alix_bo_group1 + .add_members_by_inbox_id(&[bo.inbox_id()]) + .await + .unwrap(); + alix_bo_group2 + .add_members_by_inbox_id(&[bo.inbox_id()]) + .await + .unwrap(); + + // Initial sync (None): Bob should fetch both groups + let bob_received_groups = bo + .sync_all_welcomes_and_groups(&bo.store().conn().unwrap(), None) + .await + .unwrap(); + assert_eq!(bob_received_groups, 2); + + // Verify Bob initially has no messages + let bo_group1 = bo.group(alix_bo_group1.group_id.clone()).unwrap(); + assert_eq!( + bo_group1 + .find_messages(&MsgQueryArgs::default()) + .unwrap() + .len(), + 0 + ); + let bo_group2 = bo.group(alix_bo_group2.group_id.clone()).unwrap(); + assert_eq!( + bo_group2 + .find_messages(&MsgQueryArgs::default()) + .unwrap() + .len(), + 0 + ); + + // Alix sends a message to both groups + alix_bo_group1 + .send_message(vec![1, 2, 3].as_slice()) + .await + .unwrap(); + alix_bo_group2 + .send_message(vec![4, 5, 6].as_slice()) + .await + .unwrap(); + + // Sync with `Unknown`: Bob should not fetch new messages + let bob_received_groups_unknown = bo + .sync_all_welcomes_and_groups(&bo.store().conn().unwrap(), Some(ConsentState::Allowed)) + .await + .unwrap(); + assert_eq!(bob_received_groups_unknown, 0); + + // Verify Bob still has no messages + assert_eq!( + bo_group1 + .find_messages(&MsgQueryArgs::default()) + .unwrap() + .len(), + 0 + ); + assert_eq!( + bo_group2 + .find_messages(&MsgQueryArgs::default()) + .unwrap() + .len(), + 0 + ); + + // Alix sends another message to both groups + alix_bo_group1 + .send_message(vec![7, 8, 9].as_slice()) + .await + .unwrap(); + alix_bo_group2 + .send_message(vec![10, 11, 12].as_slice()) + .await + .unwrap(); + + // Sync with `None`: Bob should fetch all messages + let bob_received_groups_all = bo + .sync_all_welcomes_and_groups(&bo.store().conn().unwrap(), Some(ConsentState::Unknown)) + .await + .unwrap(); + assert_eq!(bob_received_groups_all, 2); + + // Verify Bob now has all messages + let bo_messages1 = bo_group1.find_messages(&MsgQueryArgs::default()).unwrap(); + assert_eq!(bo_messages1.len(), 2); + + let bo_messages2 = bo_group2.find_messages(&MsgQueryArgs::default()).unwrap(); + assert_eq!(bo_messages2.len(), 2); + } + #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)] #[cfg_attr( not(target_arch = "wasm32"),