Skip to content

Commit

Permalink
fix clippy issues
Browse files Browse the repository at this point in the history
  • Loading branch information
mchenani committed Dec 11, 2024
1 parent 1472473 commit fbc057c
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 68 deletions.
6 changes: 3 additions & 3 deletions xmtp_mls/src/groups/intents.rs
Original file line number Diff line number Diff line change
Expand Up @@ -867,9 +867,9 @@ pub(crate) mod tests {
let provider = group.client.mls_provider().unwrap();
let decrypted_message = group
.load_mls_group_with_lock(&provider, |mut mls_group| {
Ok(mls_group
.process_message(&provider, mls_message).unwrap())
}).unwrap();
Ok(mls_group.process_message(&provider, mls_message).unwrap())
})
.unwrap();

let staged_commit = match decrypted_message.into_content() {
ProcessedMessageContent::StagedCommitMessage(staged_commit) => *staged_commit,
Expand Down
60 changes: 31 additions & 29 deletions xmtp_mls/src/groups/mls_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ where
&pending_commit,
&mls_group,
)
.await;
.await;

if let Err(err) = maybe_validated_commit {
tracing::error!(
Expand All @@ -453,7 +453,11 @@ where
return Ok(IntentState::ToPublish);
} else {
// If no error committing the change, write a transcript message
self.save_transcript_message(conn, validated_commit, envelope_timestamp_ns)?;
self.save_transcript_message(
conn,
validated_commit,
envelope_timestamp_ns,
)?;
}
}
IntentKind::SendMessage => {
Expand All @@ -473,7 +477,8 @@ where
};

Ok(IntentState::Committed)
}).await
})
.await
}

#[tracing::instrument(level = "trace", skip_all)]
Expand Down Expand Up @@ -734,26 +739,25 @@ where
envelope.id,
intent_id
);
match self
.process_own_message(intent, provider, message.into(), envelope)
.await?
{
IntentState::ToPublish => {
Ok(provider.conn_ref().set_group_intent_to_publish(intent_id)?)
}
IntentState::Committed => {
Ok(provider.conn_ref().set_group_intent_committed(intent_id)?)
}
IntentState::Published => {
tracing::error!("Unexpected behaviour: returned intent state published from process_own_message");
Ok(())
}
IntentState::Error => {
tracing::warn!("Intent [{}] moved to error status", intent_id);
Ok(provider.conn_ref().set_group_intent_error(intent_id)?)
}
match self
.process_own_message(intent, provider, message.into(), envelope)
.await?
{
IntentState::ToPublish => {
Ok(provider.conn_ref().set_group_intent_to_publish(intent_id)?)
}

IntentState::Committed => {
Ok(provider.conn_ref().set_group_intent_committed(intent_id)?)
}
IntentState::Published => {
tracing::error!("Unexpected behaviour: returned intent state published from process_own_message");
Ok(())
}
IntentState::Error => {
tracing::warn!("Intent [{}] moved to error status", intent_id);
Ok(provider.conn_ref().set_group_intent_error(intent_id)?)
}
}
}
// No matching intent found
Ok(None) => {
Expand Down Expand Up @@ -850,10 +854,7 @@ where
for message in messages.into_iter() {
let result = retry_async!(
Retry::default(),
(async {
self.consume_message(provider, &message)
.await
})
(async { self.consume_message(provider, &message).await })
);
if let Err(e) = result {
let is_retryable = e.is_retryable();
Expand Down Expand Up @@ -1202,7 +1203,7 @@ where
update_interval_ns: Option<i64>,
) -> Result<(), GroupError> {
// determine how long of an interval in time to use before updating list
let interval_ns = update_interval_ns.unwrap_or_else(|| SYNC_UPDATE_INSTALLATIONS_INTERVAL_NS);
let interval_ns = update_interval_ns.unwrap_or(SYNC_UPDATE_INSTALLATIONS_INTERVAL_NS);

let now_ns = crate::utils::time::now_ns();
let last_ns = provider
Expand Down Expand Up @@ -1269,7 +1270,7 @@ where
inbox_ids_to_add: &[InboxIdRef<'_>],
inbox_ids_to_remove: &[InboxIdRef<'_>],
) -> Result<UpdateGroupMembershipIntentData, GroupError> {
self.load_mls_group_with_lock_async(provider, | mls_group| async move {
self.load_mls_group_with_lock_async(provider, |mls_group| async move {
let existing_group_membership = extract_group_membership(mls_group.extensions())?;
// TODO:nm prevent querying for updates on members who are being removed
let mut inbox_ids = existing_group_membership.inbox_ids();
Expand Down Expand Up @@ -1320,7 +1321,8 @@ where
.map(|s| s.to_string())
.collect::<Vec<String>>(),
))
}).await
})
.await
}

/**
Expand Down
31 changes: 15 additions & 16 deletions xmtp_mls/src/groups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,20 +58,7 @@ use self::{
group_permissions::PolicySet,
validated_commit::CommitValidationError,
};
use std::future::Future;
use std::{collections::HashSet, sync::Arc};
use xmtp_cryptography::signature::{sanitize_evm_addresses, AddressValidationError};
use xmtp_id::{InboxId, InboxIdRef};
use xmtp_proto::xmtp::mls::{
api::v1::{
group_message::{Version as GroupMessageVersion, V1 as GroupMessageV1},
GroupMessage,
},
message_contents::{
plaintext_envelope::{Content, V1},
PlaintextEnvelope,
},
};
use crate::storage::StorageError;
use crate::{
api::WrappedApiError,
client::{deserialize_welcome, ClientError, XmtpMlsLocalContext},
Expand All @@ -98,7 +85,20 @@ use crate::{
xmtp_openmls_provider::XmtpOpenMlsProvider,
Store, MLS_COMMIT_LOCK,
};
use crate::storage::StorageError;
use std::future::Future;
use std::{collections::HashSet, sync::Arc};
use xmtp_cryptography::signature::{sanitize_evm_addresses, AddressValidationError};
use xmtp_id::{InboxId, InboxIdRef};
use xmtp_proto::xmtp::mls::{
api::v1::{
group_message::{Version as GroupMessageVersion, V1 as GroupMessageV1},
GroupMessage,
},
message_contents::{
plaintext_envelope::{Content, V1},
PlaintextEnvelope,
},
};

#[derive(Debug, Error)]
pub enum GroupError {
Expand Down Expand Up @@ -1650,7 +1650,6 @@ pub(crate) mod tests {

use diesel::connection::SimpleConnection;
use futures::future::join_all;
use openmls::prelude::Member;
use prost::Message;
use std::sync::Arc;
use xmtp_cryptography::utils::generate_local_wallet;
Expand Down
66 changes: 46 additions & 20 deletions xmtp_mls/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#![recursion_limit = "256"]
#![warn(clippy::unwrap_used)]

#[macro_use]
extern crate tracing;
pub mod api;
pub mod builder;
pub mod client;
Expand All @@ -21,26 +23,29 @@ pub mod utils;
pub mod verified_key_package_v2;
mod xmtp_openmls_provider;

pub use client::{Client, Network};
use std::collections::HashMap;
use std::sync::LazyLock;
use std::sync::{Arc, Mutex};
use tokio::sync::{Semaphore, OwnedSemaphorePermit};
pub use client::{Client, Network};
use storage::{DuplicateItem, StorageError};
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
pub use xmtp_openmls_provider::XmtpOpenMlsProvider;
use std::sync::LazyLock;

pub use xmtp_id::InboxOwner;
pub use xmtp_proto::api_client::trait_impls::*;

#[macro_use]
extern crate tracing;
/// A manager for group-specific semaphores
#[derive(Debug)]
pub struct GroupCommitLock {
// Storage for group-specific semaphores
locks: Mutex<HashMap<Vec<u8>, Arc<Semaphore>>>,
}

impl Default for GroupCommitLock {
fn default() -> Self {
Self::new()
}
}
impl GroupCommitLock {
/// Create a new `GroupCommitLock`
pub fn new() -> Self {
Expand All @@ -50,32 +55,53 @@ impl GroupCommitLock {
}

/// Get or create a semaphore for a specific group and acquire it, returning a guard
pub async fn get_lock_async(&self, group_id: Vec<u8>) -> SemaphoreGuard {
pub async fn get_lock_async(&self, group_id: Vec<u8>) -> Result<SemaphoreGuard, GroupError> {
let semaphore = {
let mut locks = self.locks.lock().unwrap();
locks
.entry(group_id)
.or_insert_with(|| Arc::new(Semaphore::new(1)))
.clone()
match self.locks.lock() {
Ok(mut locks) => locks
.entry(group_id)
.or_insert_with(|| Arc::new(Semaphore::new(1)))
.clone(),
Err(err) => {
eprintln!("Failed to lock the mutex: {}", err);
return Err(GroupError::LockUnavailable);
}
}
};

let semaphore_clone = semaphore.clone();
let permit = semaphore.acquire_owned().await.unwrap();
SemaphoreGuard { _permit: permit, _semaphore: semaphore_clone }
let permit = match semaphore.acquire_owned().await {
Ok(permit) => permit,
Err(err) => {
eprintln!("Failed to acquire semaphore permit: {}", err);
return Err(GroupError::LockUnavailable);
}
}; Ok(SemaphoreGuard {
_permit: permit,
_semaphore: semaphore_clone,
})
}

/// Get or create a semaphore for a specific group and acquire it synchronously
pub fn get_lock_sync(&self, group_id: Vec<u8>) -> Result<SemaphoreGuard, GroupError> {
let semaphore = {
let mut locks = self.locks.lock().unwrap();
locks
.entry(group_id)
.or_insert_with(|| Arc::new(Semaphore::new(1)))
.clone() // Clone here to retain ownership for later use
match self.locks.lock() {
Ok(mut locks) => locks
.entry(group_id)
.or_insert_with(|| Arc::new(Semaphore::new(1)))
.clone(),
Err(err) => {
eprintln!("Failed to lock the mutex: {}", err);
return Err(GroupError::LockUnavailable);
}
}
};

// Synchronously acquire the permit
let permit = semaphore.clone().try_acquire_owned().map_err(|_| GroupError::LockUnavailable)?;
let permit = semaphore
.clone()
.try_acquire_owned()
.map_err(|_| GroupError::LockUnavailable)?;
Ok(SemaphoreGuard {
_permit: permit,
_semaphore: semaphore, // semaphore is now valid because we cloned it earlier
Expand Down Expand Up @@ -136,10 +162,10 @@ pub trait Delete<Model> {
fn delete(&self, key: Self::Key) -> Result<usize, StorageError>;
}

use crate::groups::GroupError;
pub use stream_handles::{
spawn, AbortHandle, GenericStreamHandle, StreamHandle, StreamHandleError,
};
use crate::groups::GroupError;

#[cfg(target_arch = "wasm32")]
#[doc(hidden)]
Expand Down

0 comments on commit fbc057c

Please sign in to comment.