Skip to content

Commit

Permalink
t Merge branch 'main' of github.com:xmtp/libxmtp into insipx/compile-…
Browse files Browse the repository at this point in the history
…http-wasm
  • Loading branch information
insipx committed Aug 30, 2024
2 parents d160f17 + f7ac3c5 commit a1b1e87
Show file tree
Hide file tree
Showing 9 changed files with 198 additions and 85 deletions.
45 changes: 36 additions & 9 deletions bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -714,14 +714,14 @@ impl FfiConversations {
_ => None,
};

let convo = self
.inner_client
.create_group(group_permissions, metadata_options)?;
if !account_addresses.is_empty() {
convo
.add_members(&self.inner_client, account_addresses)
.await?;
}
let convo = if account_addresses.is_empty() {
self.inner_client
.create_group(group_permissions, metadata_options)?
} else {
self.inner_client
.create_group_with_members(account_addresses, group_permissions, metadata_options)
.await?
};

let out = Arc::new(FfiGroup {
inner_client: self.inner_client.clone(),
Expand Down Expand Up @@ -1584,7 +1584,11 @@ mod tests {
use tokio::{sync::Notify, time::error::Elapsed};
use xmtp_cryptography::{signature::RecoverableSignature, utils::rng};
use xmtp_id::associations::generate_inbox_id;
use xmtp_mls::{storage::EncryptionKey, InboxOwner};
use xmtp_mls::{
groups::{GroupError, MlsGroup},
storage::EncryptionKey,
InboxOwner,
};

#[derive(Clone)]
pub struct LocalWalletInboxOwner {
Expand Down Expand Up @@ -1728,6 +1732,20 @@ mod tests {
new_test_client_with_wallet(wallet).await
}

impl FfiGroup {
#[cfg(test)]
async fn update_installations(&self) -> Result<(), GroupError> {
let group = MlsGroup::new(
self.inner_client.context().clone(),
self.group_id.clone(),
self.created_at_ns,
);

group.update_installations(&self.inner_client).await?;
Ok(())
}
}

#[tokio::test]
async fn get_inbox_id() {
let client = new_test_client().await;
Expand Down Expand Up @@ -2464,6 +2482,8 @@ mod tests {
// Recreate client2 (new installation)
let client2 = new_test_client_with_wallet(wallet2).await;

client1_group.update_installations().await.unwrap();

// Send a message that will break the group
client1_group
.send("This message will break the group".as_bytes().to_vec())
Expand Down Expand Up @@ -2517,6 +2537,8 @@ mod tests {
let alix_group = alix.group(group.id()).unwrap();
let bo_group = bo.group(group.id()).unwrap();
let caro_group = caro.group(group.id()).unwrap();

alix_group.update_installations().await.unwrap();
log::info!("Alix sending first message");
// Alix sends a message in the group
alix_group
Expand All @@ -2525,6 +2547,7 @@ mod tests {
.unwrap();

log::info!("Caro sending second message");
caro_group.update_installations().await.unwrap();
// Caro sends a message in the group
caro_group
.send("Second message".as_bytes().to_vec())
Expand All @@ -2542,6 +2565,8 @@ mod tests {
.await;
bo_stream_messages.wait_for_ready().await;

alix_group.update_installations().await.unwrap();

log::info!("Alix sending third message after Bo's second installation added");
// Alix sends a message to the group
alix_group
Expand All @@ -2555,13 +2580,15 @@ mod tests {

log::info!("Bo sending fourth message");
// Bo sends a message to the group
bo2_group.update_installations().await.unwrap();
bo2_group
.send("Fourth message".as_bytes().to_vec())
.await
.unwrap();

log::info!("Caro sending fifth message");
// Caro sends a message in the group
caro_group.update_installations().await.unwrap();
caro_group
.send("Fifth message".as_bytes().to_vec())
.await
Expand Down
23 changes: 23 additions & 0 deletions xmtp_mls/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,29 @@ where
Ok(group)
}

pub async fn create_group_with_members(
&self,
account_addresses: Vec<String>,
permissions_policy_set: Option<PolicySet>,
opts: GroupMetadataOptions,
) -> Result<MlsGroup, ClientError> {
log::info!("creating group");

let group = MlsGroup::create_and_insert(
self.context.clone(),
GroupMembershipState::Allowed,
permissions_policy_set.unwrap_or_default(),
opts,
)?;

group.add_members(self, account_addresses).await?;

// notify any streams of the new group
let _ = self.local_events.send(LocalEvents::NewGroup(group.clone()));

Ok(group)
}

pub(crate) fn create_sync_group(&self) -> Result<MlsGroup, ClientError> {
log::info!("creating sync group");
let sync_group = MlsGroup::create_and_insert_sync_group(self.context.clone())?;
Expand Down
8 changes: 6 additions & 2 deletions xmtp_mls/src/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,13 @@ pub const MAX_GROUP_SYNC_RETRIES: usize = 3;

pub const MAX_INTENT_PUBLISH_ATTEMPTS: usize = 3;

const NANOSECONDS_IN_HOUR: i64 = 3_600_000_000_000;
const NS_IN_SEC: i64 = 1_000_000_000;

pub const UPDATE_INSTALLATIONS_INTERVAL_NS: i64 = NANOSECONDS_IN_HOUR / 2; // 30 min
const NS_IN_HOUR: i64 = NS_IN_SEC * 60 * 60;

pub const SYNC_UPDATE_INSTALLATIONS_INTERVAL_NS: i64 = NS_IN_HOUR / 2; // 30 min

pub const SEND_MESSAGE_UPDATE_INSTALLATIONS_INTERVAL_NS: i64 = 5 * NS_IN_SEC;

pub const MAX_GROUP_SIZE: u16 = 400;

Expand Down
26 changes: 22 additions & 4 deletions xmtp_mls/src/groups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ use crate::{
configuration::{
CIPHERSUITE, GROUP_MEMBERSHIP_EXTENSION_ID, GROUP_PERMISSIONS_EXTENSION_ID, MAX_GROUP_SIZE,
MAX_PAST_EPOCHS, MUTABLE_METADATA_EXTENSION_ID,
SEND_MESSAGE_UPDATE_INSTALLATIONS_INTERVAL_NS,
},
hpke::{decrypt_welcome, HpkeError},
identity::{parse_credential, Identity, IdentityError},
Expand Down Expand Up @@ -187,6 +188,8 @@ pub enum GroupError {
SqlKeyStore(#[from] sql_key_store::SqlKeyStoreError),
#[error("No pending commit found")]
MissingPendingCommit,
#[error("Sync failed to wait for intent")]
SyncFailedToWait,
}

impl RetryableError for GroupError {
Expand Down Expand Up @@ -454,10 +457,10 @@ impl MlsGroup {
where
ApiClient: XmtpApi,
{
let update_interval = Some(5_000_000); // 5 seconds in nanoseconds
let update_interval_ns = Some(SEND_MESSAGE_UPDATE_INSTALLATIONS_INTERVAL_NS);
let conn = self.context.store.conn()?;
let provider = XmtpOpenMlsProvider::from(conn);
self.maybe_update_installations(&provider, update_interval, client)
self.maybe_update_installations(&provider, update_interval_ns, client)
.await?;

let message_id = self.prepare_message(message, provider.conn_ref(), |now| {
Expand All @@ -480,14 +483,29 @@ impl MlsGroup {
{
let conn = self.context.store.conn()?;
let provider = XmtpOpenMlsProvider::from(conn);
let update_interval = Some(5_000_000);
self.maybe_update_installations(&provider, update_interval, client)
let update_interval_ns = Some(SEND_MESSAGE_UPDATE_INSTALLATIONS_INTERVAL_NS);
self.maybe_update_installations(&provider, update_interval_ns, client)
.await?;
self.sync_until_last_intent_resolved(&provider, client)
.await?;
Ok(())
}

/// Update group installations
pub async fn update_installations<ApiClient>(
&self,
client: &Client<ApiClient>,
) -> Result<(), GroupError>
where
ApiClient: XmtpApi,
{
let conn = self.context.store.conn()?;
let provider = XmtpOpenMlsProvider::from(conn);
self.maybe_update_installations(&provider, Some(0), client)
.await?;
Ok(())
}

/// Send a message, optimistically returning the ID of the message before the result of a message publish.
pub fn send_message_optimistic(&self, message: &[u8]) -> Result<Vec<u8>, GroupError> {
let conn = self.context.store.conn()?;
Expand Down
110 changes: 63 additions & 47 deletions xmtp_mls/src/groups/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ use futures::Stream;

use super::{extract_message_v1, GroupError, MlsGroup};
use crate::storage::group_message::StoredGroupMessage;
use crate::storage::refresh_state::EntityKind;
use crate::storage::StorageError;
use crate::subscriptions::{MessagesStreamInfo, StreamHandle};
use crate::XmtpApi;
use crate::{retry::Retry, retry_async, Client};
use crate::{retry_sync, XmtpApi};
use prost::Message;
use xmtp_proto::xmtp::mls::api::v1::GroupMessage;

Expand All @@ -31,53 +33,55 @@ impl MlsGroup {
);
let created_ns = msgv1.created_ns;

let client_pointer = client.clone();
let process_result = retry_async!(
Retry::default(),
(async {
let client_pointer = client_pointer.clone();
let client_id = client_id.clone();
let msgv1 = msgv1.clone();
self.context
.store
.transaction_async(|provider| async move {
let mut openmls_group = self.load_mls_group(&provider)?;

// Attempt processing immediately, but fail if the message is not an Application Message
// Returning an error should roll back the DB tx
log::info!(
"current epoch for [{}] in process_stream_entry() is Epoch: [{}]",
client_id,
openmls_group.epoch()
);

self.process_message(
client_pointer.as_ref(),
&mut openmls_group,
&provider,
&msgv1,
false,
)
if !self.has_already_synced(msg_id)? {
let client_pointer = client.clone();
let process_result = retry_async!(
Retry::default(),
(async {
let client_pointer = client_pointer.clone();
let client_id = client_id.clone();
let msgv1 = msgv1.clone();
self.context
.store
.transaction_async(|provider| async move {
let mut openmls_group = self.load_mls_group(&provider)?;

// Attempt processing immediately, but fail if the message is not an Application Message
// Returning an error should roll back the DB tx
log::info!(
"current epoch for [{}] in process_stream_entry() is Epoch: [{}]",
client_id,
openmls_group.epoch()
);

self.process_message(
client_pointer.as_ref(),
&mut openmls_group,
&provider,
&msgv1,
false,
)
.await
.map_err(GroupError::ReceiveError)
})
.await
.map_err(GroupError::ReceiveError)
})
.await
})
);

if let Some(GroupError::ReceiveError(_)) = process_result.as_ref().err() {
// Swallow errors here, since another process may have successfully saved the message
// to the DB
match self.sync_with_conn(&client.mls_provider()?, &client).await {
Ok(_) => {
log::debug!("Sync triggered by streamed message successful")
}
Err(err) => {
log::warn!("Sync triggered by streamed message failed: {}", err);
}
};
} else if process_result.is_err() {
log::error!("Process stream entry {:?}", process_result.err());
})
);

if let Some(GroupError::ReceiveError(_)) = process_result.as_ref().err() {
// Swallow errors here, since another process may have successfully saved the message
// to the DB
match self.sync_with_conn(&client.mls_provider()?, &client).await {
Ok(_) => {
log::debug!("Sync triggered by streamed message successful")
}
Err(err) => {
log::warn!("Sync triggered by streamed message failed: {}", err);
}
};
} else if process_result.is_err() {
log::error!("Process stream entry {:?}", process_result.err());
}
}

// Load the message from the DB to handle cases where it may have been already processed in
Expand All @@ -91,6 +95,18 @@ impl MlsGroup {
Ok(new_message)
}

// Checks if a message has already been processed through a sync
fn has_already_synced(&self, id: u64) -> Result<bool, GroupError> {
let check_for_last_cursor = || -> Result<i64, StorageError> {
let conn = self.context.store.conn()?;
conn.get_last_cursor_for_id(&self.group_id, EntityKind::Group)
};

let last_id = retry_sync!(Retry::default(), check_for_last_cursor)?;

Ok(last_id >= id as i64)
}

pub async fn process_streamed_group_message<ApiClient>(
&self,
envelope_bytes: Vec<u8>,
Expand Down
Loading

0 comments on commit a1b1e87

Please sign in to comment.