Skip to content

Commit

Permalink
Add UserPreferenceUpdate Message Compat (#1387)
Browse files Browse the repository at this point in the history
* proto update

* swap out consent updates

* ok to fail

* missing file

* lint

* update name

* Less aggressive restarting
  • Loading branch information
codabrink authored Dec 6, 2024
1 parent 29b2b16 commit 2d0640c
Show file tree
Hide file tree
Showing 13 changed files with 558 additions and 837 deletions.
2 changes: 2 additions & 0 deletions xmtp_id/src/associations/serialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ pub enum DeserializationError {
Unspecified(&'static str),
#[error("Error creating public key from proto bytes")]
Ed25519(#[from] ed25519_dalek::ed25519::Error),
#[error("Unable to deserialize")]
Bincode,
}

impl TryFrom<IdentityUpdateProto> for UnverifiedIdentityUpdate {
Expand Down
7 changes: 6 additions & 1 deletion xmtp_mls/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use crate::{
identity_updates::{load_identity_updates, IdentityUpdateError},
intents::Intents,
mutex_registry::MutexRegistry,
preferences::UserPreferenceUpdate,
retry::Retry,
retry_async, retryable,
storage::{
Expand Down Expand Up @@ -443,8 +444,12 @@ where
if self.history_sync_url.is_some() {
let mut records = records.to_vec();
records.append(&mut new_records);
let records = records
.into_iter()
.map(UserPreferenceUpdate::ConsentUpdate)
.collect();
self.local_events
.send(LocalEvents::OutgoingConsentUpdates(records))
.send(LocalEvents::OutgoingPreferenceUpdates(records))
.map_err(|e| ClientError::Generic(e.to_string()))?;
}

Expand Down
28 changes: 22 additions & 6 deletions xmtp_mls/src/groups/device_sync.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::{GroupError, MlsGroup};
use crate::configuration::NS_IN_HOUR;
use crate::preferences::UserPreferenceUpdate;
use crate::retry::{Retry, RetryableError};
use crate::storage::group::{ConversationType, GroupQueryArgs};
use crate::storage::group_message::MsgQueryArgs;
Expand Down Expand Up @@ -107,6 +108,8 @@ pub enum DeviceSyncError {
SyncPayloadTooOld,
#[error(transparent)]
Subscribe(#[from] SubscribeError),
#[error("Unable to serialize: {0}")]
Bincode(String),
}

impl RetryableError for DeviceSyncError {
Expand Down Expand Up @@ -169,16 +172,27 @@ where
self.on_request(message_id, &provider).await?
}
},
LocalEvents::OutgoingConsentUpdates(consent_records) => {
LocalEvents::OutgoingPreferenceUpdates(consent_records) => {
let provider = self.client.mls_provider()?;
for consent_record in consent_records {
for record in consent_records {
let UserPreferenceUpdate::ConsentUpdate(consent_record) = record else {
continue;
};

self.client
.send_consent_update(&provider, &consent_record)
.send_consent_update(&provider, consent_record)
.await?;
}
}
LocalEvents::IncomingConsentUpdates(consent_records) => {
LocalEvents::IncomingPreferenceUpdate(updates) => {
let provider = self.client.mls_provider()?;
let consent_records = updates
.into_iter()
.filter_map(|pu| match pu {
UserPreferenceUpdate::ConsentUpdate(cr) => Some(cr),
_ => None,
})
.collect::<Vec<_>>();
provider
.conn_ref()
.insert_or_replace_consent_records(&consent_records)?;
Expand Down Expand Up @@ -326,6 +340,8 @@ where
}
_ => {
tracing::error!(inbox_id, installation_id, "sync worker error {err}");
// Wait 2 seconds before restarting.
crate::sleep(Duration::from_secs(2)).await;
}
}
}
Expand Down Expand Up @@ -671,8 +687,8 @@ where
if existing_consent_record.state != consent_record.state {
warn!("Existing consent record exists and does not match payload state. Streaming consent_record update to sync group.");
self.local_events
.send(LocalEvents::OutgoingConsentUpdates(vec![
existing_consent_record,
.send(LocalEvents::OutgoingPreferenceUpdates(vec![
UserPreferenceUpdate::ConsentUpdate(existing_consent_record),
]))
.map_err(|e| DeviceSyncError::Generic(e.to_string()))?;
}
Expand Down
33 changes: 9 additions & 24 deletions xmtp_mls/src/groups/device_sync/consent_sync.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
use super::*;
use crate::{
storage::consent_record::{ConsentState, ConsentType},
Client, XmtpApi,
};
use crate::{preferences::UserPreferenceUpdate, Client, XmtpApi};
use xmtp_id::scw_verifier::SmartContractSignatureVerifier;
use xmtp_proto::xmtp::mls::message_contents::{
ConsentEntityType, ConsentState as ConsentStateProto, ConsentUpdate as ConsentUpdateProto,
};
use xmtp_proto::xmtp::mls::message_contents::UserPreferenceUpdate as UserPreferenceUpdateProto;

impl<ApiClient, V> Client<ApiClient, V>
where
Expand All @@ -16,7 +11,7 @@ where
pub(crate) async fn send_consent_update(
&self,
provider: &XmtpOpenMlsProvider,
record: &StoredConsentRecord,
record: StoredConsentRecord,
) -> Result<(), DeviceSyncError> {
tracing::info!(
inbox_id = self.inbox_id(),
Expand All @@ -26,26 +21,16 @@ where
);
let conn = provider.conn_ref();

let consent_update_proto = ConsentUpdateProto {
entity: record.entity.clone(),
entity_type: match record.entity_type {
ConsentType::Address => ConsentEntityType::Address,
ConsentType::ConversationId => ConsentEntityType::ConversationId,
ConsentType::InboxId => ConsentEntityType::InboxId,
} as i32,
state: match record.state {
ConsentState::Allowed => ConsentStateProto::Allowed,
ConsentState::Denied => ConsentStateProto::Denied,
ConsentState::Unknown => ConsentStateProto::Unspecified,
} as i32,
};

let sync_group = self.ensure_sync_group(provider).await?;
let content_bytes = serde_json::to_vec(&consent_update_proto)?;

let update_proto: UserPreferenceUpdateProto = UserPreferenceUpdate::ConsentUpdate(record)
.try_into()
.map_err(|e| DeviceSyncError::Bincode(format!("{e:?}")))?;
let content_bytes = serde_json::to_vec(&update_proto)?;
sync_group.prepare_message(&content_bytes, conn, |_time_ns| PlaintextEnvelope {
content: Some(Content::V2(V2 {
idempotency_key: new_request_id(),
message_type: Some(MessageType::ConsentUpdate(consent_update_proto)),
message_type: Some(MessageType::UserPreferenceUpdate(update_proto)),
})),
})?;

Expand Down
22 changes: 11 additions & 11 deletions xmtp_mls/src/groups/mls_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,17 +606,17 @@ where
SyncMessage::Reply { message_id },
));
}
Some(MessageType::ConsentUpdate(update)) => {
tracing::info!(
"Incoming streamed consent update: {:?} {} updated to {:?}.",
update.entity_type(),
update.entity,
update.state()
);

let _ = self.client.local_events().send(
LocalEvents::IncomingConsentUpdates(vec![update.try_into()?]),
);
Some(MessageType::UserPreferenceUpdate(update)) => {
// Ignore errors since this may come from a newer version of the lib
// that has new update types.
if let Ok(update) = update.try_into() {
let _ = self
.client
.local_events()
.send(LocalEvents::IncomingPreferenceUpdate(vec![update]));
} else {
tracing::warn!("Failed to deserialize preference update. Is this libxmtp version old?");
}
}
_ => {
return Err(GroupMessageProcessingError::InvalidPayload);
Expand Down
5 changes: 4 additions & 1 deletion xmtp_mls/src/groups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ use crate::{
identity::{parse_credential, IdentityError},
identity_updates::{load_identity_updates, InstallationDiffError},
intents::ProcessIntentError,
preferences::UserPreferenceUpdate,
retry::RetryableError,
storage::{
consent_record::{ConsentState, ConsentType, StoredConsentRecord},
Expand Down Expand Up @@ -1093,7 +1094,9 @@ impl<ScopedClient: ScopedGroupClient> MlsGroup<ScopedClient> {
let _ = self
.client
.local_events()
.send(LocalEvents::OutgoingConsentUpdates(vec![consent_record]));
.send(LocalEvents::OutgoingPreferenceUpdates(vec![
UserPreferenceUpdate::ConsentUpdate(consent_record),
]));
}

Ok(())
Expand Down
1 change: 1 addition & 0 deletions xmtp_mls/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub mod identity;
pub mod identity_updates;
mod intents;
mod mutex_registry;
mod preferences;
pub mod retry;
pub mod storage;
mod stream_handles;
Expand Down
30 changes: 30 additions & 0 deletions xmtp_mls/src/preferences.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use serde::{Deserialize, Serialize};
use xmtp_id::associations::DeserializationError;
use xmtp_proto::xmtp::mls::message_contents::UserPreferenceUpdate as UserPreferenceUpdateProto;

use crate::storage::consent_record::StoredConsentRecord;

#[derive(Serialize, Deserialize, Clone)]
pub enum UserPreferenceUpdate {
ConsentUpdate(StoredConsentRecord),
HmacKeyUpdate { key: Vec<u8> },
}

impl TryFrom<UserPreferenceUpdateProto> for UserPreferenceUpdate {
type Error = DeserializationError;
fn try_from(value: UserPreferenceUpdateProto) -> Result<Self, Self::Error> {
let update =
bincode::deserialize(&value.content).map_err(|_| DeserializationError::Bincode)?;

Ok(update)
}
}

impl TryInto<UserPreferenceUpdateProto> for UserPreferenceUpdate {
type Error = bincode::Error;

fn try_into(self) -> Result<UserPreferenceUpdateProto, Self::Error> {
let content = bincode::serialize(&self)?;
Ok(UserPreferenceUpdateProto { content })
}
}
27 changes: 0 additions & 27 deletions xmtp_mls/src/storage/encrypted_store/consent_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@ use diesel::{
upsert::excluded,
};
use serde::{Deserialize, Serialize};
use xmtp_id::associations::DeserializationError;
use xmtp_proto::xmtp::mls::message_contents::{
ConsentEntityType, ConsentState as ConsentStateProto, ConsentUpdate as ConsentUpdateProto,
};

/// StoredConsentRecord holds a serialized ConsentRecord
#[derive(Insertable, Queryable, Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
Expand Down Expand Up @@ -182,29 +178,6 @@ where
}
}

impl TryFrom<ConsentUpdateProto> for StoredConsentRecord {
type Error = DeserializationError;

fn try_from(update: ConsentUpdateProto) -> Result<Self, Self::Error> {
Ok(Self {
entity_type: match update.entity_type() {
ConsentEntityType::Address => ConsentType::Address,
ConsentEntityType::ConversationId => ConsentType::ConversationId,
ConsentEntityType::InboxId => ConsentType::InboxId,
ConsentEntityType::Unspecified => {
return Err(DeserializationError::Unspecified("entity_type"))
}
},
state: match update.state() {
ConsentStateProto::Unspecified => ConsentState::Unknown,
ConsentStateProto::Allowed => ConsentState::Allowed,
ConsentStateProto::Denied => ConsentState::Denied,
},
entity: update.entity,
})
}
}

#[cfg(test)]
mod tests {
use crate::storage::encrypted_store::tests::with_connection;
Expand Down
31 changes: 25 additions & 6 deletions xmtp_mls/src/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use xmtp_proto::{api_client::XmtpMlsStreams, xmtp::mls::api::v1::WelcomeMessage}
use crate::{
client::{extract_welcome_message, ClientError},
groups::{mls_sync::GroupMessageProcessingError, subscriptions, GroupError, MlsGroup},
preferences::UserPreferenceUpdate,
retry::{Retry, RetryableError},
retry_async, retryable,
storage::{
Expand Down Expand Up @@ -52,8 +53,8 @@ pub enum LocalEvents<C> {
// a new group was created
NewGroup(MlsGroup<C>),
SyncMessage(SyncMessage),
OutgoingConsentUpdates(Vec<StoredConsentRecord>),
IncomingConsentUpdates(Vec<StoredConsentRecord>),
OutgoingPreferenceUpdates(Vec<UserPreferenceUpdate>),
IncomingPreferenceUpdate(Vec<UserPreferenceUpdate>),
}

#[derive(Clone)]
Expand All @@ -77,8 +78,8 @@ impl<C> LocalEvents<C> {

match &self {
SyncMessage(_) => Some(self),
OutgoingConsentUpdates(_) => Some(self),
IncomingConsentUpdates(_) => Some(self),
OutgoingPreferenceUpdates(_) => Some(self),
IncomingPreferenceUpdate(_) => Some(self),
_ => None,
}
}
Expand All @@ -87,8 +88,26 @@ impl<C> LocalEvents<C> {
use LocalEvents::*;

match self {
OutgoingConsentUpdates(cr) => Some(cr),
IncomingConsentUpdates(cr) => Some(cr),
OutgoingPreferenceUpdates(updates) => {
let updates = updates
.into_iter()
.filter_map(|pu| match pu {
UserPreferenceUpdate::ConsentUpdate(cr) => Some(cr),
_ => None,
})
.collect();
Some(updates)
}
IncomingPreferenceUpdate(updates) => {
let updates = updates
.into_iter()
.filter_map(|pu| match pu {
UserPreferenceUpdate::ConsentUpdate(cr) => Some(cr),
_ => None,
})
.collect();
Some(updates)
}
_ => None,
}
}
Expand Down
2 changes: 1 addition & 1 deletion xmtp_proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,4 @@ proto_full = ["xmtp-identity","xmtp-identity-api-v1","xmtp-identity-associations
"xmtp-xmtpv4-envelopes" = ["xmtp-identity-associations","xmtp-mls-api-v1"]
"xmtp-xmtpv4-message_api" = ["xmtp-xmtpv4-envelopes"]
"xmtp-xmtpv4-payer_api" = ["xmtp-xmtpv4-envelopes"]
## @@protoc_insertion_point(features)
## @@protoc_insertion_point(features)
Loading

0 comments on commit 2d0640c

Please sign in to comment.