Skip to content

Commit

Permalink
Replication Client: Fix Batch message send (#1300)
Browse files Browse the repository at this point in the history
In the previous code we would break down a batch message containing N messages to N network calls with 1 message each.

Fix that.
  • Loading branch information
mkysel authored Nov 22, 2024
1 parent ab9d335 commit 363daec
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 53 deletions.
44 changes: 24 additions & 20 deletions xmtp_api_grpc/src/replication_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,14 +362,8 @@ impl XmtpIdentityClient for ClientV4 {
&self,
request: PublishIdentityUpdateRequest,
) -> Result<PublishIdentityUpdateResponse, Error> {
let client = &mut self.payer_client.clone();
let res = client
.publish_client_envelopes(PublishClientEnvelopesRequest::try_from(request)?)
.await;
match res {
Ok(_) => Ok(PublishIdentityUpdateResponse {}),
Err(e) => Err(Error::new(ErrorKind::MlsError).with(e)),
}
self.publish_envelopes_to_payer(vec![request]).await?;
Ok(PublishIdentityUpdateResponse {})
}

#[tracing::instrument(level = "trace", skip_all)]
Expand Down Expand Up @@ -482,20 +476,30 @@ impl ClientV4 {
}

#[tracing::instrument(level = "trace", skip_all)]
async fn publish_envelopes_to_payer<
T: TryInto<PublishClientEnvelopesRequest, Error = Error>,
>(
async fn publish_envelopes_to_payer<T>(
&self,
items: impl IntoIterator<Item = T>,
) -> Result<(), Error> {
messages: impl IntoIterator<Item = T>,
) -> Result<(), Error>
where
T: TryInto<ClientEnvelope>,
<T as TryInto<ClientEnvelope>>::Error: std::error::Error + Send + Sync + 'static,
{
let client = &mut self.payer_client.clone();
for item in items {
let request = item.try_into()?;
let res = client.publish_client_envelopes(request).await;
if let Err(e) = res {
return Err(Error::new(ErrorKind::MlsError).with(e));
}
}

let envelopes: Vec<ClientEnvelope> = messages
.into_iter()
.map(|message| {
message
.try_into()
.map_err(|e| Error::new(ErrorKind::MlsError).with(e))
})
.collect::<Result<_, _>>()?;

client
.publish_client_envelopes(PublishClientEnvelopesRequest { envelopes })
.await
.map_err(|e| Error::new(ErrorKind::MlsError).with(e))?;

Ok(())
}
}
Expand Down
57 changes: 24 additions & 33 deletions xmtp_proto/src/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use crate::xmtp::mls::api::v1::{
};
use crate::xmtp::xmtpv4::envelopes::client_envelope::Payload;
use crate::xmtp::xmtpv4::envelopes::{AuthenticatedData, ClientEnvelope};
use crate::xmtp::xmtpv4::payer_api::PublishClientEnvelopesRequest;

use crate::v4_utils::{
build_identity_topic_from_hex_encoded, build_welcome_message_topic, get_group_message_topic,
Expand Down Expand Up @@ -34,75 +33,67 @@ mod inbox_id {
}
}

impl TryFrom<UploadKeyPackageRequest> for PublishClientEnvelopesRequest {
impl TryFrom<UploadKeyPackageRequest> for ClientEnvelope {
type Error = Error;

fn try_from(req: UploadKeyPackageRequest) -> Result<Self, Error> {
if let Some(key_package) = req.key_package.as_ref() {
Ok(PublishClientEnvelopesRequest {
envelopes: vec![ClientEnvelope {
aad: Some(AuthenticatedData::with_topic(get_key_package_topic(
key_package,
)?)),
payload: Some(Payload::UploadKeyPackage(req)),
}],
Ok(ClientEnvelope {
aad: Some(AuthenticatedData::with_topic(get_key_package_topic(
key_package,
)?)),
payload: Some(Payload::UploadKeyPackage(req)),
})
} else {
Err(Error::new(InternalError(MissingPayloadError)))
}
}
}

impl TryFrom<PublishIdentityUpdateRequest> for PublishClientEnvelopesRequest {
impl TryFrom<PublishIdentityUpdateRequest> for ClientEnvelope {
type Error = Error;

fn try_from(req: PublishIdentityUpdateRequest) -> Result<Self, Error> {
if let Some(identity_update) = req.identity_update {
Ok(PublishClientEnvelopesRequest {
envelopes: vec![ClientEnvelope {
aad: Some(AuthenticatedData::with_topic(
build_identity_topic_from_hex_encoded(&identity_update.inbox_id)?,
)),
payload: Some(Payload::IdentityUpdate(identity_update)),
}],
Ok(ClientEnvelope {
aad: Some(AuthenticatedData::with_topic(
build_identity_topic_from_hex_encoded(&identity_update.inbox_id)?,
)),
payload: Some(Payload::IdentityUpdate(identity_update)),
})
} else {
Err(Error::new(InternalError(MissingPayloadError)))
}
}
}

impl TryFrom<GroupMessageInput> for PublishClientEnvelopesRequest {
impl TryFrom<GroupMessageInput> for ClientEnvelope {
type Error = crate::Error;

fn try_from(req: GroupMessageInput) -> Result<Self, Error> {
if let Some(GroupMessageInputVersion::V1(ref version)) = req.version {
Ok(PublishClientEnvelopesRequest {
envelopes: vec![ClientEnvelope {
aad: Some(AuthenticatedData::with_topic(get_group_message_topic(
version.data.clone(),
)?)),
payload: Some(Payload::GroupMessage(req)),
}],
Ok(ClientEnvelope {
aad: Some(AuthenticatedData::with_topic(get_group_message_topic(
version.data.clone(),
)?)),
payload: Some(Payload::GroupMessage(req)),
})
} else {
Err(Error::new(InternalError(MissingPayloadError)))
}
}
}

impl TryFrom<WelcomeMessageInput> for PublishClientEnvelopesRequest {
impl TryFrom<WelcomeMessageInput> for ClientEnvelope {
type Error = crate::Error;

fn try_from(req: WelcomeMessageInput) -> Result<Self, Self::Error> {
if let Some(WelcomeMessageVersion::V1(ref version)) = req.version {
Ok(PublishClientEnvelopesRequest {
envelopes: vec![ClientEnvelope {
aad: Some(AuthenticatedData::with_topic(build_welcome_message_topic(
version.installation_key.as_slice(),
))),
payload: Some(Payload::WelcomeMessage(req)),
}],
Ok(ClientEnvelope {
aad: Some(AuthenticatedData::with_topic(build_welcome_message_topic(
version.installation_key.as_slice(),
))),
payload: Some(Payload::WelcomeMessage(req)),
})
} else {
Err(Error::new(InternalError(MissingPayloadError)))
Expand Down

0 comments on commit 363daec

Please sign in to comment.