Skip to content

Commit

Permalink
add the ability to sync based on consent state (#1385)
Browse files Browse the repository at this point in the history
  • Loading branch information
nplasterer authored Dec 5, 2024
1 parent df1583f commit 29b2b16
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 13 deletions.
49 changes: 39 additions & 10 deletions bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -893,11 +893,16 @@ impl FfiConversations {
Ok(sync_group.into())
}

pub async fn sync_all_conversations(&self) -> Result<u32, GenericError> {
pub async fn sync_all_conversations(
&self,
consent_state: Option<FfiConsentState>,
) -> Result<u32, GenericError> {
let inner = self.inner_client.as_ref();
let conn = inner.store().conn()?;

let num_groups_synced: usize = inner.sync_all_welcomes_and_groups(&conn).await?;
let consent: Option<ConsentState> = consent_state.map(|state| state.into());

let num_groups_synced: usize = inner.sync_all_welcomes_and_groups(&conn, consent).await?;

// Convert usize to u32 for compatibility with Uniffi
let num_groups_synced: u32 = num_groups_synced
Expand Down Expand Up @@ -2524,7 +2529,10 @@ mod tests {
.unwrap();
}

bo.conversations().sync_all_conversations().await.unwrap();
bo.conversations()
.sync_all_conversations(None)
.await
.unwrap();
let alix_groups = alix
.conversations()
.list(FfiListConversationsOptions::default())
Expand All @@ -2548,7 +2556,10 @@ mod tests {
assert_eq!(bo_messages1.len(), 0);
assert_eq!(bo_messages5.len(), 0);

bo.conversations().sync_all_conversations().await.unwrap();
bo.conversations()
.sync_all_conversations(None)
.await
.unwrap();

let bo_messages1 = bo_group1
.find_messages(FfiListMessagesOptions::default())
Expand Down Expand Up @@ -2576,7 +2587,11 @@ mod tests {
.unwrap();
}
bo.conversations().sync().await.unwrap();
let num_groups_synced_1: u32 = bo.conversations().sync_all_conversations().await.unwrap();
let num_groups_synced_1: u32 = bo
.conversations()
.sync_all_conversations(None)
.await
.unwrap();
assert_eq!(num_groups_synced_1, 30);

// Remove bo from all groups and sync
Expand All @@ -2593,11 +2608,19 @@ mod tests {
}

// First sync after removal needs to process all groups and set them to inactive
let num_groups_synced_2: u32 = bo.conversations().sync_all_conversations().await.unwrap();
let num_groups_synced_2: u32 = bo
.conversations()
.sync_all_conversations(None)
.await
.unwrap();
assert_eq!(num_groups_synced_2, 30);

// Second sync after removal will not process inactive groups
let num_groups_synced_3: u32 = bo.conversations().sync_all_conversations().await.unwrap();
let num_groups_synced_3: u32 = bo
.conversations()
.sync_all_conversations(None)
.await
.unwrap();
assert_eq!(num_groups_synced_3, 0);
}

Expand Down Expand Up @@ -3837,9 +3860,15 @@ mod tests {
.create_dm(bola.account_address.clone())
.await
.unwrap();
let alix_num_sync = alix_conversations.sync_all_conversations().await.unwrap();
let alix_num_sync = alix_conversations
.sync_all_conversations(None)
.await
.unwrap();
bola_conversations.sync().await.unwrap();
let bola_num_sync = bola_conversations.sync_all_conversations().await.unwrap();
let bola_num_sync = bola_conversations
.sync_all_conversations(None)
.await
.unwrap();
assert_eq!(alix_num_sync, 1);
assert_eq!(bola_num_sync, 1);

Expand Down Expand Up @@ -4121,7 +4150,7 @@ mod tests {
// update the sync group's messages to pipe them into the events
alix_b
.conversations()
.sync_all_conversations()
.sync_all_conversations(None)
.await
.unwrap();

Expand Down
2 changes: 1 addition & 1 deletion bindings_node/src/conversations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ impl Conversations {

let num_groups_synced = self
.inner_client
.sync_all_welcomes_and_groups(&conn)
.sync_all_welcomes_and_groups(&conn, None)
.await
.map_err(ErrorWrapper::from)?;

Expand Down
2 changes: 1 addition & 1 deletion bindings_wasm/src/conversations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ impl Conversations {

let num_groups_synced = self
.inner_client
.sync_all_welcomes_and_groups(&conn)
.sync_all_welcomes_and_groups(&conn, None)
.await
.map_err(|e| JsError::new(format!("{}", e).as_str()))?;

Expand Down
119 changes: 118 additions & 1 deletion xmtp_mls/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -877,9 +877,17 @@ where
pub async fn sync_all_welcomes_and_groups(
&self,
conn: &DbConnection,
consent_state: Option<ConsentState>,
) -> Result<usize, ClientError> {
self.sync_welcomes(conn).await?;
let groups = self.find_groups(GroupQueryArgs::default().include_sync_groups())?;

let query_args = GroupQueryArgs {
consent_state,
include_sync_groups: true,
..GroupQueryArgs::default()
};

let groups = self.find_groups(query_args)?;
let active_groups_count = self.sync_all_groups(groups).await?;

Ok(active_groups_count)
Expand Down Expand Up @@ -1185,6 +1193,115 @@ pub(crate) mod tests {
assert_eq!(bo_messages2.len(), 1);
}

#[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)]
#[cfg_attr(
not(target_arch = "wasm32"),
tokio::test(flavor = "multi_thread", worker_threads = 2)
)]
async fn test_sync_all_groups_and_welcomes() {
let alix = ClientBuilder::new_test_client(&generate_local_wallet()).await;
let bo = ClientBuilder::new_test_client(&generate_local_wallet()).await;

// Create two groups and add Bob
let alix_bo_group1 = alix
.create_group(None, GroupMetadataOptions::default())
.unwrap();
let alix_bo_group2 = alix
.create_group(None, GroupMetadataOptions::default())
.unwrap();

alix_bo_group1
.add_members_by_inbox_id(&[bo.inbox_id()])
.await
.unwrap();
alix_bo_group2
.add_members_by_inbox_id(&[bo.inbox_id()])
.await
.unwrap();

// Initial sync (None): Bob should fetch both groups
let bob_received_groups = bo
.sync_all_welcomes_and_groups(&bo.store().conn().unwrap(), None)
.await
.unwrap();
assert_eq!(bob_received_groups, 2);

// Verify Bob initially has no messages
let bo_group1 = bo.group(alix_bo_group1.group_id.clone()).unwrap();
assert_eq!(
bo_group1
.find_messages(&MsgQueryArgs::default())
.unwrap()
.len(),
0
);
let bo_group2 = bo.group(alix_bo_group2.group_id.clone()).unwrap();
assert_eq!(
bo_group2
.find_messages(&MsgQueryArgs::default())
.unwrap()
.len(),
0
);

// Alix sends a message to both groups
alix_bo_group1
.send_message(vec![1, 2, 3].as_slice())
.await
.unwrap();
alix_bo_group2
.send_message(vec![4, 5, 6].as_slice())
.await
.unwrap();

// Sync with `Unknown`: Bob should not fetch new messages
let bob_received_groups_unknown = bo
.sync_all_welcomes_and_groups(&bo.store().conn().unwrap(), Some(ConsentState::Allowed))
.await
.unwrap();
assert_eq!(bob_received_groups_unknown, 0);

// Verify Bob still has no messages
assert_eq!(
bo_group1
.find_messages(&MsgQueryArgs::default())
.unwrap()
.len(),
0
);
assert_eq!(
bo_group2
.find_messages(&MsgQueryArgs::default())
.unwrap()
.len(),
0
);

// Alix sends another message to both groups
alix_bo_group1
.send_message(vec![7, 8, 9].as_slice())
.await
.unwrap();
alix_bo_group2
.send_message(vec![10, 11, 12].as_slice())
.await
.unwrap();

// Sync with `None`: Bob should fetch all messages
let bob_received_groups_all = bo
.sync_all_welcomes_and_groups(&bo.store().conn().unwrap(), Some(ConsentState::Unknown))
.await
.unwrap();
assert_eq!(bob_received_groups_all, 2);

// Verify Bob now has all messages
let bo_messages1 = bo_group1.find_messages(&MsgQueryArgs::default()).unwrap();
assert_eq!(bo_messages1.len(), 2);

let bo_messages2 = bo_group2.find_messages(&MsgQueryArgs::default()).unwrap();
assert_eq!(bo_messages2.len(), 2);
}

#[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)]
#[cfg_attr(
not(target_arch = "wasm32"),
Expand Down

0 comments on commit 29b2b16

Please sign in to comment.