diff --git a/Cargo.lock b/Cargo.lock index 4829fc93d..3fa5707ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7127,7 +7127,9 @@ version = "0.1.0" dependencies = [ "async-trait", "futures", + "hex", "openmls", + "openmls_rust_crypto", "pbjson", "pbjson-types", "prost", diff --git a/bindings_ffi/src/lib.rs b/bindings_ffi/src/lib.rs index c4b22d6ab..3d16186be 100755 --- a/bindings_ffi/src/lib.rs +++ b/bindings_ffi/src/lib.rs @@ -23,7 +23,7 @@ pub enum GenericError { #[error("Storage error: {0}")] Storage(#[from] xmtp_mls::storage::StorageError), #[error("API error: {0}")] - ApiError(#[from] xmtp_proto::api_client::Error), + ApiError(#[from] xmtp_proto::Error), #[error("Group error: {0}")] GroupError(#[from] xmtp_mls::groups::GroupError), #[error("Signature: {0}")] diff --git a/bindings_ffi/src/v2.rs b/bindings_ffi/src/v2.rs index 57bb20886..faf021b62 100644 --- a/bindings_ffi/src/v2.rs +++ b/bindings_ffi/src/v2.rs @@ -562,7 +562,8 @@ mod tests { use tokio::sync::Notify; use futures::stream; - use xmtp_proto::api_client::{Envelope, Error as ApiError}; + use xmtp_proto::api_client::Envelope; + use xmtp_proto::Error as ApiError; use crate::{ v2::{ diff --git a/examples/cli/cli-client.rs b/examples/cli/cli-client.rs index 69a6aba8b..d7d02a811 100755 --- a/examples/cli/cli-client.rs +++ b/examples/cli/cli-client.rs @@ -39,6 +39,7 @@ use xmtp_cryptography::{ use xmtp_id::associations::unverified::{UnverifiedRecoverableEcdsaSignature, UnverifiedSignature}; use xmtp_id::associations::{generate_inbox_id, AssociationError, AssociationState, MemberKind}; use xmtp_mls::groups::device_sync::DeviceSyncContent; +use xmtp_mls::groups::scoped_client::ScopedGroupClient; use xmtp_mls::storage::group::GroupQueryArgs; use xmtp_mls::storage::group_message::{GroupMessageKind, MsgQueryArgs}; use xmtp_mls::XmtpApi; @@ -141,6 +142,10 @@ enum Commands { /// Information about the account that owns the DB Info {}, Clear {}, + GetInboxId { + #[arg(value_name = "Account Address")] + account_address: String, + }, #[command(subcommand)] Debug(DebugCommands), } @@ -196,7 +201,9 @@ async fn main() -> color_eyre::eyre::Result<()> { color_eyre::install()?; let cli = Cli::parse(); let crate_name = env!("CARGO_PKG_NAME"); - let filter = EnvFilter::builder().parse(format!("{crate_name}=INFO,xmtp_mls=INFO"))?; + let filter = EnvFilter::builder().parse(format!( + "{crate_name}=INFO,xmtp_mls=INFO,xmtp_api_grpc=INFO" + ))?; if cli.json { let fmt = tracing_subscriber::fmt::layer() .json() @@ -225,30 +232,20 @@ async fn main() -> color_eyre::eyre::Result<()> { info!("Starting CLI Client...."); let grpc: Box = match (cli.testnet, &cli.env) { - (true, Env::Local) => Box::new( - ClientV4::create("http://localhost:5050".into(), false) - .await - .unwrap(), - ), - (true, Env::Dev) => Box::new( - ClientV4::create("https://grpc.testnet.xmtp.network:443".into(), true) - .await - .unwrap(), - ), - (false, Env::Local) => Box::new( - ClientV3::create("http://localhost:5556".into(), false) - .await - .unwrap(), - ), - (false, Env::Dev) => Box::new( - ClientV3::create("https://grpc.dev.xmtp.network:443".into(), true) - .await - .unwrap(), - ), + (true, Env::Local) => { + Box::new(ClientV4::create("http://localhost:5050".into(), false).await?) + } + (true, Env::Dev) => { + Box::new(ClientV4::create("https://grpc.testnet.xmtp.network:443".into(), true).await?) + } + (false, Env::Local) => { + Box::new(ClientV3::create("http://localhost:5556".into(), false).await?) + } + (false, Env::Dev) => { + Box::new(ClientV3::create("https://grpc.dev.xmtp.network:443".into(), true).await?) + } (false, Env::Production) => Box::new( - ClientV3::create("https://grpc.production.xmtp.network:443".into(), true) - .await - .unwrap(), + ClientV3::create("https://grpc.production.xmtp.network:443".into(), true).await?, ), (true, Env::Production) => todo!("not supported"), }; @@ -478,6 +475,14 @@ async fn main() -> color_eyre::eyre::Result<()> { Commands::Debug(debug_commands) => { debug::handle_debug(&client, debug_commands).await.unwrap(); } + Commands::GetInboxId { account_address } => { + let mapping = client + .api() + .get_inbox_ids(vec![account_address.clone()]) + .await?; + let inbox_id = mapping.get(account_address).unwrap(); + info!("Inbox_id {inbox_id}"); + } } Ok(()) diff --git a/xmtp_api_grpc/src/grpc_api_helper.rs b/xmtp_api_grpc/src/grpc_api_helper.rs index 880ec5eaf..150e0b57c 100644 --- a/xmtp_api_grpc/src/grpc_api_helper.rs +++ b/xmtp_api_grpc/src/grpc_api_helper.rs @@ -12,9 +12,7 @@ use tonic::{metadata::MetadataValue, transport::Channel, Request, Streaming}; use xmtp_proto::api_client::{ClientWithMetadata, XmtpMlsStreams}; use xmtp_proto::xmtp::mls::api::v1::{GroupMessage, WelcomeMessage}; use xmtp_proto::{ - api_client::{ - Error, ErrorKind, MutableApiSubscription, XmtpApiClient, XmtpApiSubscription, XmtpMlsClient, - }, + api_client::{MutableApiSubscription, XmtpApiClient, XmtpApiSubscription, XmtpMlsClient}, xmtp::identity::api::v1::identity_api_client::IdentityApiClient as ProtoIdentityApiClient, xmtp::message_api::v1::{ message_api_client::MessageApiClient, BatchQueryRequest, BatchQueryResponse, Envelope, @@ -27,9 +25,10 @@ use xmtp_proto::{ SendWelcomeMessagesRequest, SubscribeGroupMessagesRequest, SubscribeWelcomeMessagesRequest, UploadKeyPackageRequest, }, + Error, ErrorKind, }; -async fn create_tls_channel(address: String) -> Result { +pub async fn create_tls_channel(address: String) -> Result { let channel = Channel::from_shared(address) .map_err(|e| Error::new(ErrorKind::SetupCreateChannelError).with(e))? // Purpose: This setting controls the size of the initial connection-level flow control window for HTTP/2, which is the underlying protocol for gRPC. diff --git a/xmtp_api_grpc/src/identity.rs b/xmtp_api_grpc/src/identity.rs index d0011d6e2..9e7092d02 100644 --- a/xmtp_api_grpc/src/identity.rs +++ b/xmtp_api_grpc/src/identity.rs @@ -1,12 +1,13 @@ use crate::Client; use xmtp_proto::{ - api_client::{Error, ErrorKind, XmtpIdentityClient}, + api_client::XmtpIdentityClient, xmtp::identity::api::v1::{ GetIdentityUpdatesRequest as GetIdentityUpdatesV2Request, GetIdentityUpdatesResponse as GetIdentityUpdatesV2Response, GetInboxIdsRequest, GetInboxIdsResponse, PublishIdentityUpdateRequest, PublishIdentityUpdateResponse, VerifySmartContractWalletSignaturesRequest, VerifySmartContractWalletSignaturesResponse, }, + Error, ErrorKind, }; #[async_trait::async_trait] diff --git a/xmtp_api_grpc/src/replication_client.rs b/xmtp_api_grpc/src/replication_client.rs index df8e582a9..d7ae88e2c 100644 --- a/xmtp_api_grpc/src/replication_client.rs +++ b/xmtp_api_grpc/src/replication_client.rs @@ -7,6 +7,7 @@ use std::time::Duration; use futures::stream::{AbortHandle, Abortable}; use futures::{SinkExt, Stream, StreamExt, TryStreamExt}; +use prost::Message; use tokio::sync::oneshot; use tonic::transport::ClientTlsConfig; use tonic::{metadata::MetadataValue, transport::Channel, Request, Streaming}; @@ -14,12 +15,32 @@ use tonic::{metadata::MetadataValue, transport::Channel, Request, Streaming}; #[cfg(any(feature = "test-utils", test))] use xmtp_proto::api_client::XmtpTestClient; use xmtp_proto::api_client::{ClientWithMetadata, XmtpIdentityClient, XmtpMlsStreams}; -use xmtp_proto::xmtp::mls::api::v1::{GroupMessage, WelcomeMessage}; + +use crate::grpc_api_helper::{create_tls_channel, GrpcMutableSubscription, Subscription}; +use crate::{GroupMessageStream, WelcomeMessageStream}; +use xmtp_proto::v4_utils::{ + build_group_message_topic, build_identity_topic_from_hex_encoded, build_identity_update_topic, + build_key_package_topic, build_welcome_message_topic, extract_client_envelope, + extract_unsigned_originator_envelope, +}; +use xmtp_proto::xmtp::identity::api::v1::get_identity_updates_response; +use xmtp_proto::xmtp::identity::api::v1::get_identity_updates_response::IdentityUpdateLog; +use xmtp_proto::xmtp::mls::api::v1::{ + fetch_key_packages_response, group_message, group_message_input, welcome_message, + welcome_message_input, GroupMessage, WelcomeMessage, +}; +use xmtp_proto::xmtp::xmtpv4::envelopes::client_envelope::Payload; +use xmtp_proto::xmtp::xmtpv4::envelopes::{ + ClientEnvelope, OriginatorEnvelope, PayerEnvelope, UnsignedOriginatorEnvelope, +}; use xmtp_proto::xmtp::xmtpv4::message_api::replication_api_client::ReplicationApiClient; +use xmtp_proto::xmtp::xmtpv4::message_api::{ + EnvelopesQuery, PublishPayerEnvelopesRequest, QueryEnvelopesRequest, +}; +use xmtp_proto::xmtp::xmtpv4::payer_api::payer_api_client::PayerApiClient; +use xmtp_proto::xmtp::xmtpv4::payer_api::PublishClientEnvelopesRequest; use xmtp_proto::{ - api_client::{ - Error, ErrorKind, MutableApiSubscription, XmtpApiClient, XmtpApiSubscription, XmtpMlsClient, - }, + api_client::{MutableApiSubscription, XmtpApiClient, XmtpApiSubscription, XmtpMlsClient}, xmtp::identity::api::v1::{ get_inbox_ids_response, GetIdentityUpdatesRequest as GetIdentityUpdatesV2Request, GetIdentityUpdatesResponse as GetIdentityUpdatesV2Response, GetInboxIdsRequest, @@ -39,47 +60,13 @@ use xmtp_proto::{ xmtp::xmtpv4::message_api::{ get_inbox_ids_request, GetInboxIdsRequest as GetInboxIdsRequestV4, }, + Error, ErrorKind, InternalError, }; -async fn create_tls_channel(address: String) -> Result { - let channel = Channel::from_shared(address) - .map_err(|e| Error::new(ErrorKind::SetupCreateChannelError).with(e))? - // Purpose: This setting controls the size of the initial connection-level flow control window for HTTP/2, which is the underlying protocol for gRPC. - // Functionality: Flow control in HTTP/2 manages how much data can be in flight on the network. Setting the initial connection window size to (1 << 31) - 1 (the maximum possible value for a 32-bit integer, which is 2,147,483,647 bytes) essentially allows the client to receive a very large amount of data from the server before needing to acknowledge receipt and permit more data to be sent. This can be particularly useful in high-latency networks or when transferring large amounts of data. - // Impact: Increasing the window size can improve throughput by allowing more data to be in transit at a time, but it may also increase memory usage and can potentially lead to inefficient use of bandwidth if the network is unreliable. - .initial_connection_window_size(Some((1 << 31) - 1)) - // Purpose: Configures whether the client should send keep-alive pings to the server when the connection is idle. - // Functionality: When set to true, this option ensures that periodic pings are sent on an idle connection to keep it alive and detect if the server is still responsive. - // Impact: This helps maintain active connections, particularly through NATs, load balancers, and other middleboxes that might drop idle connections. It helps ensure that the connection is promptly usable when new requests need to be sent. - .keep_alive_while_idle(true) - // Purpose: Sets the maximum amount of time the client will wait for a connection to be established. - // Functionality: If a connection cannot be established within the specified duration, the attempt is aborted and an error is returned. - // Impact: This setting prevents the client from waiting indefinitely for a connection to be established, which is crucial in scenarios where rapid failure detection is necessary to maintain responsiveness or to quickly fallback to alternative services or retry logic. - .connect_timeout(Duration::from_secs(10)) - // Purpose: Configures the TCP keep-alive interval for the socket connection. - // Functionality: This setting tells the operating system to send TCP keep-alive probes periodically when no data has been transferred over the connection within the specified interval. - // Impact: Similar to the gRPC-level keep-alive, this helps keep the connection alive at the TCP layer and detect broken connections. It's particularly useful for detecting half-open connections and ensuring that resources are not wasted on unresponsive peers. - .tcp_keepalive(Some(Duration::from_secs(15))) - // Purpose: Sets a maximum duration for the client to wait for a response to a request. - // Functionality: If a response is not received within the specified timeout, the request is canceled and an error is returned. - // Impact: This is critical for bounding the wait time for operations, which can enhance the predictability and reliability of client interactions by avoiding indefinitely hanging requests. - .timeout(Duration::from_secs(120)) - // Purpose: Specifies how long the client will wait for a response to a keep-alive ping before considering the connection dead. - // Functionality: If a ping response is not received within this duration, the connection is presumed to be lost and is closed. - // Impact: This setting is crucial for quickly detecting unresponsive connections and freeing up resources associated with them. It ensures that the client has up-to-date information on the status of connections and can react accordingly. - .keep_alive_timeout(Duration::from_secs(25)) - .tls_config(ClientTlsConfig::new().with_enabled_roots()) - .map_err(|e| Error::new(ErrorKind::SetupTLSConfigError).with(e))? - .connect() - .await - .map_err(|e| Error::new(ErrorKind::SetupConnectionError).with(e))?; - - Ok(channel) -} - #[derive(Debug, Clone)] pub struct ClientV4 { pub(crate) client: ReplicationApiClient, + pub(crate) payer_client: PayerApiClient, pub(crate) app_version: MetadataValue, pub(crate) libxmtp_version: MetadataValue, } @@ -101,10 +88,13 @@ impl ClientV4 { .map_err(|e| Error::new(ErrorKind::SetupConnectionError).with(e))?, }; + // GroupMessageInputTODO(mkysel) for now we assume both payer and replication are on the same host let client = ReplicationApiClient::new(channel.clone()); + let payer_client = PayerApiClient::new(channel.clone()); Ok(Self { client, + payer_client, app_version, libxmtp_version, }) @@ -170,127 +160,11 @@ impl XmtpApiClient for ClientV4 { } } -pub struct Subscription { - pending: Arc>>, - close_sender: Option>, - closed: Arc, -} - -impl Subscription { - pub async fn start(stream: Streaming) -> Self { - let pending = Arc::new(Mutex::new(Vec::new())); - let pending_clone = pending.clone(); - let (close_sender, close_receiver) = oneshot::channel::<()>(); - let closed = Arc::new(AtomicBool::new(false)); - let closed_clone = closed.clone(); - tokio::spawn(async move { - let mut stream = Box::pin(stream); - let mut close_receiver = Box::pin(close_receiver); - - loop { - tokio::select! { - item = stream.message() => { - match item { - Ok(Some(envelope)) => { - let mut pending = pending_clone.lock().unwrap(); - pending.push(envelope); - } - _ => break, - } - }, - _ = &mut close_receiver => { - break; - } - } - } - - closed_clone.store(true, Ordering::SeqCst); - }); - - Subscription { - pending, - closed, - close_sender: Some(close_sender), - } - } -} - -impl XmtpApiSubscription for Subscription { - fn is_closed(&self) -> bool { - self.closed.load(Ordering::SeqCst) - } - - fn get_messages(&self) -> Vec { - let mut pending = self.pending.lock().unwrap(); - let items = pending.drain(..).collect::>(); - items - } - - fn close_stream(&mut self) { - // Set this value here, even if it will be eventually set again when the loop exits - // This makes the `closed` status immediately correct - self.closed.store(true, Ordering::SeqCst); - if let Some(close_tx) = self.close_sender.take() { - let _ = close_tx.send(()); - } - } -} - -type EnvelopeStream = Pin> + Send>>; - -pub struct GrpcMutableSubscription { - envelope_stream: Abortable, - update_channel: futures::channel::mpsc::UnboundedSender, - abort_handle: AbortHandle, -} - -impl GrpcMutableSubscription { - pub fn new( - envelope_stream: EnvelopeStream, - update_channel: futures::channel::mpsc::UnboundedSender, - ) -> Self { - let (abort_handle, abort_registration) = AbortHandle::new_pair(); - Self { - envelope_stream: Abortable::new(envelope_stream, abort_registration), - update_channel, - abort_handle, - } - } -} - -impl Stream for GrpcMutableSubscription { - type Item = Result; - - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - self.envelope_stream.poll_next_unpin(cx) - } -} - -#[async_trait::async_trait] -impl MutableApiSubscription for GrpcMutableSubscription { - async fn update(&mut self, req: SubscribeRequest) -> Result<(), Error> { - self.update_channel - .send(req) - .await - .map_err(|_| Error::new(ErrorKind::SubscriptionUpdateError))?; - - Ok(()) - } - - fn close(&self) { - self.abort_handle.abort(); - self.update_channel.close_channel(); - } -} - #[async_trait::async_trait] impl XmtpMlsClient for ClientV4 { #[tracing::instrument(level = "trace", skip_all)] async fn upload_key_package(&self, req: UploadKeyPackageRequest) -> Result<(), Error> { - unimplemented!(); + self.publish_envelopes_to_payer(std::iter::once(req)).await } #[tracing::instrument(level = "trace", skip_all)] @@ -298,17 +172,55 @@ impl XmtpMlsClient for ClientV4 { &self, req: FetchKeyPackagesRequest, ) -> Result { - unimplemented!(); + let topics = req + .installation_keys + .iter() + .map(|key| build_key_package_topic(key.as_slice())) + .collect(); + + let envelopes = self.query_v4_envelopes(topics).await?; + let key_packages: Result, Error> = envelopes + .iter() + .map(|envelopes| { + // The last envelope should be the newest key package upload + let unsigned = envelopes.last().ok_or_else(|| { + Error::new(ErrorKind::InternalError(InternalError::MissingPayloadError)) + .with("No envelopes found") + })?; + + let client_env = extract_client_envelope(unsigned)?; + + if let Some(Payload::UploadKeyPackage(upload_key_package)) = client_env.payload { + let key_package = upload_key_package.key_package.ok_or_else(|| { + Error::new(ErrorKind::InternalError(InternalError::MissingPayloadError)) + .with("Missing key package") + })?; + + Ok(fetch_key_packages_response::KeyPackage { + key_package_tls_serialized: key_package.key_package_tls_serialized, + }) + } else { + Err( + Error::new(ErrorKind::InternalError(InternalError::MissingPayloadError)) + .with("Payload is not a key package"), + ) + } + }) + .collect(); + + Ok(FetchKeyPackagesResponse { + key_packages: key_packages?, + }) } #[tracing::instrument(level = "trace", skip_all)] async fn send_group_messages(&self, req: SendGroupMessagesRequest) -> Result<(), Error> { - unimplemented!(); + self.publish_envelopes_to_payer(req.messages).await } #[tracing::instrument(level = "trace", skip_all)] async fn send_welcome_messages(&self, req: SendWelcomeMessagesRequest) -> Result<(), Error> { - unimplemented!(); + self.publish_envelopes_to_payer(req.messages).await } #[tracing::instrument(level = "trace", skip_all)] @@ -316,7 +228,55 @@ impl XmtpMlsClient for ClientV4 { &self, req: QueryGroupMessagesRequest, ) -> Result { - unimplemented!(); + let client = &mut self.client.clone(); + let res = client + .query_envelopes(QueryEnvelopesRequest { + query: Some(EnvelopesQuery { + topics: vec![build_group_message_topic(req.group_id.as_slice())], + originator_node_ids: vec![], + last_seen: None, + }), + limit: 100, + }) + .await + .map_err(|e| Error::new(ErrorKind::MlsError).with(e))?; + + let envelopes = res.into_inner().envelopes; + let response = QueryGroupMessagesResponse { + messages: envelopes + .iter() + .map(|envelope| { + let unsigned_originator_envelope = + extract_unsigned_originator_envelope(envelope)?; + let client_envelope = extract_client_envelope(envelope)?; + let payload = client_envelope.payload.ok_or_else(|| { + Error::new(ErrorKind::InternalError(InternalError::MissingPayloadError)) + })?; + let Payload::GroupMessage(group_message) = payload else { + return Err(Error::new(ErrorKind::InternalError( + InternalError::MissingPayloadError, + ))); + }; + + let group_message_input::Version::V1(v1_group_message) = + group_message.version.ok_or_else(|| { + Error::new(ErrorKind::InternalError(InternalError::MissingPayloadError)) + })?; + + Ok(GroupMessage { + version: Some(group_message::Version::V1(group_message::V1 { + id: unsigned_originator_envelope.originator_sequence_id, + created_ns: unsigned_originator_envelope.originator_ns as u64, + group_id: req.group_id.clone(), + data: v1_group_message.data, + sender_hmac: v1_group_message.sender_hmac, + })), + }) + }) + .collect::, Error>>()?, + paging_info: None, + }; + Ok(response) } #[tracing::instrument(level = "trace", skip_all)] @@ -324,53 +284,54 @@ impl XmtpMlsClient for ClientV4 { &self, req: QueryWelcomeMessagesRequest, ) -> Result { - unimplemented!(); - } -} - -pub struct GroupMessageStream { - inner: tonic::codec::Streaming, -} - -impl From> for GroupMessageStream { - fn from(inner: tonic::codec::Streaming) -> Self { - GroupMessageStream { inner } - } -} - -impl Stream for GroupMessageStream { - type Item = Result; - - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - self.inner - .poll_next_unpin(cx) - .map(|data| data.map(|v| v.map_err(|e| Error::new(ErrorKind::SubscribeError).with(e)))) - } -} - -pub struct WelcomeMessageStream { - inner: tonic::codec::Streaming, -} - -impl From> for WelcomeMessageStream { - fn from(inner: tonic::codec::Streaming) -> Self { - WelcomeMessageStream { inner } - } -} - -impl Stream for WelcomeMessageStream { - type Item = Result; - - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - self.inner - .poll_next_unpin(cx) - .map(|data| data.map(|v| v.map_err(|e| Error::new(ErrorKind::SubscribeError).with(e)))) + let client = &mut self.client.clone(); + let res = client + .query_envelopes(QueryEnvelopesRequest { + query: Some(EnvelopesQuery { + topics: vec![build_welcome_message_topic(req.installation_key.as_slice())], + originator_node_ids: vec![], + last_seen: None, + }), + limit: 100, + }) + .await + .map_err(|e| Error::new(ErrorKind::MlsError).with(e))?; + + let envelopes = res.into_inner().envelopes; + let response = QueryWelcomeMessagesResponse { + messages: envelopes + .iter() + .map(|envelope| { + let unsigned_originator_envelope = + extract_unsigned_originator_envelope(envelope)?; + let client_envelope = extract_client_envelope(envelope)?; + let payload = client_envelope.payload.ok_or_else(|| { + Error::new(ErrorKind::InternalError(InternalError::MissingPayloadError)) + })?; + let Payload::WelcomeMessage(welcome_message) = payload else { + return Err(Error::new(ErrorKind::InternalError( + InternalError::MissingPayloadError, + ))); + }; + let welcome_message_input::Version::V1(v1_welcome_message) = + welcome_message.version.ok_or_else(|| { + Error::new(ErrorKind::InternalError(InternalError::MissingPayloadError)) + })?; + + Ok(WelcomeMessage { + version: Some(welcome_message::Version::V1(welcome_message::V1 { + id: unsigned_originator_envelope.originator_sequence_id, + created_ns: unsigned_originator_envelope.originator_ns as u64, + installation_key: req.installation_key.clone(), + data: v1_welcome_message.data, + hpke_public_key: v1_welcome_message.hpke_public_key, + })), + }) + }) + .collect::, Error>>()?, + paging_info: None, + }; + Ok(response) } } @@ -401,7 +362,14 @@ impl XmtpIdentityClient for ClientV4 { &self, request: PublishIdentityUpdateRequest, ) -> Result { - unimplemented!() + let client = &mut self.payer_client.clone(); + let res = client + .publish_client_envelopes(PublishClientEnvelopesRequest::try_from(request)?) + .await; + match res { + Ok(_) => Ok(PublishIdentityUpdateResponse {}), + Err(e) => Err(Error::new(ErrorKind::MlsError).with(e)), + } } #[tracing::instrument(level = "trace", skip_all)] @@ -439,7 +407,32 @@ impl XmtpIdentityClient for ClientV4 { &self, request: GetIdentityUpdatesV2Request, ) -> Result { - unimplemented!() + let topics: Result, Error> = request + .requests + .iter() + .map(|r| build_identity_topic_from_hex_encoded(&r.inbox_id.clone())) + .collect(); + let v4_envelopes = self.query_v4_envelopes(topics?).await?; + let joined_data = v4_envelopes + .into_iter() + .zip(request.requests.into_iter()) + .collect::>(); + let responses = joined_data + .iter() + .map(|(envelopes, inner_req)| { + let identity_updates = envelopes + .iter() + .map(convert_v4_envelope_to_identity_update) + .collect::, Error>>()?; + + Ok(get_identity_updates_response::Response { + inbox_id: inner_req.inbox_id.clone(), + updates: identity_updates, + }) + }) + .collect::, Error>>()?; + + Ok(GetIdentityUpdatesV2Response { responses }) } #[tracing::instrument(level = "trace", skip_all)] @@ -462,3 +455,83 @@ impl XmtpTestClient for ClientV4 { todo!() } } +impl ClientV4 { + #[tracing::instrument(level = "trace", skip_all)] + async fn query_v4_envelopes( + &self, + topics: Vec>, + ) -> Result>, Error> { + let requests = topics.iter().map(|topic| async { + let client = &mut self.client.clone(); + let v4_envelopes = client + .query_envelopes(QueryEnvelopesRequest { + query: Some(EnvelopesQuery { + topics: vec![topic.clone()], + originator_node_ids: vec![], + last_seen: None, + }), + limit: 100, + }) + .await + .map_err(|err| Error::new(ErrorKind::IdentityError).with(err))?; + + Ok(v4_envelopes.into_inner().envelopes) + }); + + futures::future::try_join_all(requests).await + } + + #[tracing::instrument(level = "trace", skip_all)] + async fn publish_envelopes_to_payer< + T: TryInto, + >( + &self, + items: impl IntoIterator, + ) -> Result<(), Error> { + let client = &mut self.payer_client.clone(); + for item in items { + let request = item.try_into()?; + let res = client.publish_client_envelopes(request).await; + if let Err(e) = res { + return Err(Error::new(ErrorKind::MlsError).with(e)); + } + } + Ok(()) + } +} + +fn convert_v4_envelope_to_identity_update( + envelope: &OriginatorEnvelope, +) -> Result { + let mut unsigned_originator_envelope = envelope.unsigned_originator_envelope.as_slice(); + let originator_envelope = UnsignedOriginatorEnvelope::decode(&mut unsigned_originator_envelope) + .map_err(|e| Error::new(ErrorKind::IdentityError).with(e))?; + + let payer_envelope = originator_envelope + .payer_envelope + .ok_or(Error::new(ErrorKind::IdentityError).with("Payer envelope is None"))?; + + // TODO: validate payer signatures + let mut unsigned_client_envelope = payer_envelope.unsigned_client_envelope.as_slice(); + + let client_envelope = ClientEnvelope::decode(&mut unsigned_client_envelope) + .map_err(|e| Error::new(ErrorKind::IdentityError).with(e))?; + let payload = client_envelope + .payload + .ok_or(Error::new(ErrorKind::IdentityError).with("Payload is None"))?; + + let identity_update = match payload { + Payload::IdentityUpdate(update) => update, + _ => { + return Err( + Error::new(ErrorKind::IdentityError).with("Payload is not an identity update") + ) + } + }; + + Ok(IdentityUpdateLog { + sequence_id: originator_envelope.originator_sequence_id, + server_timestamp_ns: originator_envelope.originator_ns as u64, + update: Some(identity_update), + }) +} diff --git a/xmtp_api_http/src/lib.rs b/xmtp_api_http/src/lib.rs index 668c059ae..80489fb3c 100755 --- a/xmtp_api_http/src/lib.rs +++ b/xmtp_api_http/src/lib.rs @@ -6,8 +6,7 @@ mod util; use futures::stream; use reqwest::header; use util::{create_grpc_stream, handle_error}; -// use xmtp_proto::api_client::XmtpMlsStreams; -use xmtp_proto::api_client::{ClientWithMetadata, Error, ErrorKind, XmtpIdentityClient}; +use xmtp_proto::api_client::{ClientWithMetadata, XmtpIdentityClient}; use xmtp_proto::xmtp::identity::api::v1::{ GetIdentityUpdatesRequest as GetIdentityUpdatesV2Request, GetIdentityUpdatesResponse as GetIdentityUpdatesV2Response, GetInboxIdsRequest, @@ -23,6 +22,7 @@ use xmtp_proto::{ SendGroupMessagesRequest, SendWelcomeMessagesRequest, SubscribeGroupMessagesRequest, SubscribeWelcomeMessagesRequest, UploadKeyPackageRequest, }, + Error, ErrorKind, }; use crate::constants::ApiEndpoints; diff --git a/xmtp_api_http/src/util.rs b/xmtp_api_http/src/util.rs index 431ad0144..55e4ff3fa 100644 --- a/xmtp_api_http/src/util.rs +++ b/xmtp_api_http/src/util.rs @@ -5,7 +5,7 @@ use futures::{ use serde::{de::DeserializeOwned, Deserialize, Serialize}; use serde_json::Deserializer; use std::io::Read; -use xmtp_proto::api_client::{Error, ErrorKind}; +use xmtp_proto::{Error, ErrorKind}; #[derive(Deserialize, Serialize, Debug)] #[serde(untagged)] diff --git a/xmtp_id/src/scw_verifier/mod.rs b/xmtp_id/src/scw_verifier/mod.rs index 40481332a..97dca12f2 100644 --- a/xmtp_id/src/scw_verifier/mod.rs +++ b/xmtp_id/src/scw_verifier/mod.rs @@ -29,7 +29,7 @@ pub enum VerifierError { #[error(transparent)] Provider(#[from] ethers::providers::ProviderError), #[error(transparent)] - ApiClient(#[from] xmtp_proto::api_client::Error), + ApiClient(#[from] xmtp_proto::Error), #[error(transparent)] Url(#[from] url::ParseError), #[error(transparent)] diff --git a/xmtp_mls/src/api/mls.rs b/xmtp_mls/src/api/mls.rs index 82d339ab1..01f903546 100644 --- a/xmtp_mls/src/api/mls.rs +++ b/xmtp_mls/src/api/mls.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use super::ApiClientWrapper; use crate::{retry_async, XmtpApi}; -use xmtp_proto::api_client::{Error as ApiError, ErrorKind, XmtpMlsStreams}; +use xmtp_proto::api_client::XmtpMlsStreams; use xmtp_proto::xmtp::mls::api::v1::{ group_message_input::{Version as GroupMessageInputVersion, V1 as GroupMessageInputV1}, subscribe_group_messages_request::Filter as GroupFilterProto, @@ -12,6 +12,7 @@ use xmtp_proto::xmtp::mls::api::v1::{ SendWelcomeMessagesRequest, SortDirection, SubscribeGroupMessagesRequest, SubscribeWelcomeMessagesRequest, UploadKeyPackageRequest, WelcomeMessage, WelcomeMessageInput, }; +use xmtp_proto::{Error as ApiError, ErrorKind}; /// A filter for querying group messages pub struct GroupFilter { @@ -301,11 +302,11 @@ pub mod tests { use super::super::*; use xmtp_proto::{ - api_client::{Error, ErrorKind}, xmtp::mls::api::v1::{ fetch_key_packages_response::KeyPackage, FetchKeyPackagesResponse, PagingInfo, QueryGroupMessagesResponse, }, + Error, ErrorKind, }; #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)] diff --git a/xmtp_mls/src/api/mod.rs b/xmtp_mls/src/api/mod.rs index 0d1fc7daf..64d173c13 100644 --- a/xmtp_mls/src/api/mod.rs +++ b/xmtp_mls/src/api/mod.rs @@ -11,7 +11,7 @@ use crate::{ }; use thiserror::Error; use xmtp_id::associations::DeserializationError as AssociationDeserializationError; -use xmtp_proto::api_client::Error as ApiError; +use xmtp_proto::Error as ApiError; pub use identity::*; pub use mls::*; diff --git a/xmtp_mls/src/api/test_utils.rs b/xmtp_mls/src/api/test_utils.rs index e20def9cd..829166507 100644 --- a/xmtp_mls/src/api/test_utils.rs +++ b/xmtp_mls/src/api/test_utils.rs @@ -1,6 +1,6 @@ use mockall::mock; use xmtp_proto::{ - api_client::{ClientWithMetadata, Error, XmtpIdentityClient, XmtpMlsClient, XmtpMlsStreams}, + api_client::{ClientWithMetadata, XmtpIdentityClient, XmtpMlsClient, XmtpMlsStreams}, xmtp::{ identity::api::v1::{ GetIdentityUpdatesRequest as GetIdentityUpdatesV2Request, @@ -18,6 +18,7 @@ use xmtp_proto::{ UploadKeyPackageRequest, }, }, + Error, }; #[cfg(any(feature = "http-api", target_arch = "wasm32"))] diff --git a/xmtp_mls/src/builder.rs b/xmtp_mls/src/builder.rs index d82d87b1b..1e0fc216f 100644 --- a/xmtp_mls/src/builder.rs +++ b/xmtp_mls/src/builder.rs @@ -42,7 +42,7 @@ pub enum ClientBuilderError { #[error(transparent)] GroupError(#[from] crate::groups::GroupError), #[error(transparent)] - ApiError(#[from] xmtp_proto::api_client::Error), + ApiError(#[from] xmtp_proto::Error), #[error(transparent)] DeviceSync(#[from] crate::groups::device_sync::DeviceSyncError), } diff --git a/xmtp_mls/src/client.rs b/xmtp_mls/src/client.rs index 2d766758b..8a0601f3d 100644 --- a/xmtp_mls/src/client.rs +++ b/xmtp_mls/src/client.rs @@ -75,7 +75,7 @@ pub enum ClientError { #[error("dieselError: {0}")] Diesel(#[from] diesel::result::Error), #[error("Query failed: {0}")] - QueryError(#[from] xmtp_proto::api_client::Error), + QueryError(#[from] xmtp_proto::Error), #[error("API error: {0}")] Api(#[from] crate::api::WrappedApiError), #[error("identity error: {0}")] diff --git a/xmtp_mls/src/groups/mod.rs b/xmtp_mls/src/groups/mod.rs index 00b8f02bd..80a0a02c1 100644 --- a/xmtp_mls/src/groups/mod.rs +++ b/xmtp_mls/src/groups/mod.rs @@ -104,7 +104,7 @@ pub enum GroupError { #[error("Max user limit exceeded.")] UserLimitExceeded, #[error("api error: {0}")] - Api(#[from] xmtp_proto::api_client::Error), + Api(#[from] xmtp_proto::Error), #[error("api error: {0}")] WrappedApi(#[from] WrappedApiError), #[error("invalid group membership")] diff --git a/xmtp_mls/src/identity.rs b/xmtp_mls/src/identity.rs index b0a283fdd..8962532fc 100644 --- a/xmtp_mls/src/identity.rs +++ b/xmtp_mls/src/identity.rs @@ -140,7 +140,7 @@ pub enum IdentityError { #[error(transparent)] WrappedApi(#[from] WrappedApiError), #[error(transparent)] - Api(#[from] xmtp_proto::api_client::Error), + Api(#[from] xmtp_proto::Error), #[error("installation not found: {0}")] InstallationIdNotFound(String), #[error(transparent)] diff --git a/xmtp_mls/src/retry.rs b/xmtp_mls/src/retry.rs index f67151f9c..6e5aca993 100644 --- a/xmtp_mls/src/retry.rs +++ b/xmtp_mls/src/retry.rs @@ -208,7 +208,7 @@ macro_rules! retryable { } // network errors should generally be retryable, unless there's a bug in our code -impl RetryableError for xmtp_proto::api_client::Error { +impl RetryableError for xmtp_proto::Error { fn is_retryable(&self) -> bool { true } diff --git a/xmtp_mls/src/subscriptions.rs b/xmtp_mls/src/subscriptions.rs index 0304869ab..e79b39def 100644 --- a/xmtp_mls/src/subscriptions.rs +++ b/xmtp_mls/src/subscriptions.rs @@ -131,7 +131,7 @@ pub enum SubscribeError { #[error(transparent)] Storage(#[from] StorageError), #[error(transparent)] - Api(#[from] xmtp_proto::api_client::Error), + Api(#[from] xmtp_proto::Error), #[error(transparent)] Decode(#[from] prost::DecodeError), } diff --git a/xmtp_proto/Cargo.toml b/xmtp_proto/Cargo.toml index 14c343ddb..bca40bd06 100644 --- a/xmtp_proto/Cargo.toml +++ b/xmtp_proto/Cargo.toml @@ -12,6 +12,8 @@ pbjson.workspace = true prost = { workspace = true, features = ["prost-derive"] } serde = { workspace = true } async-trait = "0.1" +hex.workspace = true +openmls_rust_crypto = { workspace = true, optional = true } [target.'cfg(not(target_arch = "wasm32"))'.dependencies] tonic = { workspace = true } @@ -20,7 +22,7 @@ tonic = { workspace = true } wasm-bindgen-test.workspace = true [features] -convert = ["openmls", "proto_full"] +convert = ["openmls", "openmls_rust_crypto", "proto_full"] default = [] test-utils = [] diff --git a/xmtp_proto/src/api_client.rs b/xmtp_proto/src/api_client.rs index ab02a0443..d531bb867 100644 --- a/xmtp_proto/src/api_client.rs +++ b/xmtp_proto/src/api_client.rs @@ -1,7 +1,3 @@ -use std::{error::Error as StdError, fmt}; - -use futures::Stream; - pub use super::xmtp::message_api::v1::{ BatchQueryRequest, BatchQueryResponse, Envelope, PagingInfo, PublishRequest, PublishResponse, QueryRequest, QueryResponse, SubscribeRequest, @@ -18,6 +14,8 @@ use crate::xmtp::mls::api::v1::{ SendGroupMessagesRequest, SendWelcomeMessagesRequest, SubscribeGroupMessagesRequest, SubscribeWelcomeMessagesRequest, UploadKeyPackageRequest, WelcomeMessage, }; +use crate::Error; +use futures::Stream; #[cfg(any(test, feature = "test-utils"))] #[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] @@ -99,85 +97,6 @@ pub mod trait_impls { } } -#[derive(Debug)] -pub enum ErrorKind { - SetupCreateChannelError, - SetupTLSConfigError, - SetupConnectionError, - PublishError, - QueryError, - SubscribeError, - BatchQueryError, - MlsError, - IdentityError, - SubscriptionUpdateError, - MetadataError, -} - -type ErrorSource = Box; - -pub struct Error { - kind: ErrorKind, - source: Option, -} - -impl Error { - pub fn new(kind: ErrorKind) -> Self { - Self { kind, source: None } - } - - pub fn with(mut self, source: impl Into) -> Self { - self.source = Some(source.into()); - self - } -} - -impl fmt::Debug for Error { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let mut f = f.debug_tuple("xmtp::error::Error"); - - f.field(&self.kind); - - if let Some(source) = &self.source { - f.field(source); - } - - f.finish() - } -} - -impl fmt::Display for Error { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let s = match &self.kind { - ErrorKind::SetupCreateChannelError => "failed to create channel", - ErrorKind::SetupTLSConfigError => "tls configuration failed", - ErrorKind::SetupConnectionError => "connection failed", - ErrorKind::PublishError => "publish error", - ErrorKind::QueryError => "query error", - ErrorKind::SubscribeError => "subscribe error", - ErrorKind::BatchQueryError => "batch query error", - ErrorKind::IdentityError => "identity error", - ErrorKind::MlsError => "mls error", - ErrorKind::SubscriptionUpdateError => "subscription update error", - ErrorKind::MetadataError => "metadata error", - }; - f.write_str(s)?; - if self.source().is_some() { - f.write_str(": ")?; - f.write_str(&self.source().unwrap().to_string())?; - } - Ok(()) - } -} - -impl StdError for Error { - fn source(&self) -> Option<&(dyn StdError + 'static)> { - self.source - .as_ref() - .map(|source| &**source as &(dyn StdError + 'static)) - } -} - pub trait XmtpApiSubscription { fn is_closed(&self) -> bool; fn get_messages(&self) -> Vec; diff --git a/xmtp_proto/src/convert.rs b/xmtp_proto/src/convert.rs index f85e8c8a0..fe6f2a1e8 100644 --- a/xmtp_proto/src/convert.rs +++ b/xmtp_proto/src/convert.rs @@ -1,5 +1,22 @@ -mod inbox_id { +use crate::xmtp::identity::api::v1::PublishIdentityUpdateRequest; +use crate::xmtp::mls::api::v1::{ + group_message_input::Version as GroupMessageInputVersion, + welcome_message_input::Version as WelcomeMessageVersion, GroupMessageInput, + UploadKeyPackageRequest, WelcomeMessageInput, +}; +use crate::xmtp::xmtpv4::envelopes::client_envelope::Payload; +use crate::xmtp::xmtpv4::envelopes::{AuthenticatedData, ClientEnvelope}; +use crate::xmtp::xmtpv4::payer_api::PublishClientEnvelopesRequest; + +use crate::v4_utils::{ + build_identity_topic_from_hex_encoded, build_welcome_message_topic, get_group_message_topic, + get_key_package_topic, +}; +use crate::Error; +use crate::ErrorKind::InternalError; +use crate::InternalError::MissingPayloadError; +mod inbox_id { use crate::xmtp::identity::MlsCredential; use openmls::{ credentials::{errors::BasicCredentialError, BasicCredential}, @@ -16,3 +33,90 @@ mod inbox_id { } } } + +impl TryFrom for PublishClientEnvelopesRequest { + type Error = Error; + + fn try_from(req: UploadKeyPackageRequest) -> Result { + if let Some(key_package) = req.key_package.as_ref() { + Ok(PublishClientEnvelopesRequest { + envelopes: vec![ClientEnvelope { + aad: Some(AuthenticatedData::with_topic(get_key_package_topic( + key_package, + )?)), + payload: Some(Payload::UploadKeyPackage(req)), + }], + }) + } else { + Err(Error::new(InternalError(MissingPayloadError))) + } + } +} + +impl TryFrom for PublishClientEnvelopesRequest { + type Error = Error; + + fn try_from(req: PublishIdentityUpdateRequest) -> Result { + if let Some(identity_update) = req.identity_update { + Ok(PublishClientEnvelopesRequest { + envelopes: vec![ClientEnvelope { + aad: Some(AuthenticatedData::with_topic( + build_identity_topic_from_hex_encoded(&identity_update.inbox_id)?, + )), + payload: Some(Payload::IdentityUpdate(identity_update)), + }], + }) + } else { + Err(Error::new(InternalError(MissingPayloadError))) + } + } +} + +impl TryFrom for PublishClientEnvelopesRequest { + type Error = crate::Error; + + fn try_from(req: GroupMessageInput) -> Result { + if let Some(GroupMessageInputVersion::V1(ref version)) = req.version { + Ok(PublishClientEnvelopesRequest { + envelopes: vec![ClientEnvelope { + aad: Some(AuthenticatedData::with_topic(get_group_message_topic( + version.data.clone(), + )?)), + payload: Some(Payload::GroupMessage(req)), + }], + }) + } else { + Err(Error::new(InternalError(MissingPayloadError))) + } + } +} + +impl TryFrom for PublishClientEnvelopesRequest { + type Error = crate::Error; + + fn try_from(req: WelcomeMessageInput) -> Result { + if let Some(WelcomeMessageVersion::V1(ref version)) = req.version { + Ok(PublishClientEnvelopesRequest { + envelopes: vec![ClientEnvelope { + aad: Some(AuthenticatedData::with_topic(build_welcome_message_topic( + version.installation_key.as_slice(), + ))), + payload: Some(Payload::WelcomeMessage(req)), + }], + }) + } else { + Err(Error::new(InternalError(MissingPayloadError))) + } + } +} + +impl AuthenticatedData { + pub fn with_topic(topic: Vec) -> AuthenticatedData { + AuthenticatedData { + //TODO(mkysel) originator is hardcoded for now, but will have to become configurable + target_originator: 100, + target_topic: topic, + last_seen: None, + } + } +} diff --git a/xmtp_proto/src/error.rs b/xmtp_proto/src/error.rs new file mode 100644 index 000000000..d8109b9b8 --- /dev/null +++ b/xmtp_proto/src/error.rs @@ -0,0 +1,140 @@ +use openmls::prelude::tls_codec::Error as TlsCodecError; +use serde::de::StdError; +use std::fmt; +use std::string::FromUtf8Error; + +#[derive(Debug)] +pub enum ErrorKind { + SetupCreateChannelError, + SetupTLSConfigError, + SetupConnectionError, + PublishError, + QueryError, + SubscribeError, + BatchQueryError, + MlsError, + IdentityError, + SubscriptionUpdateError, + MetadataError, + InternalError(InternalError), +} + +#[derive(Debug)] +pub enum InternalError { + MissingPayloadError, + UnexpectedPayloadError, + InvalidTopicError(String), + DecodingError(String), + TLSError(String), +} + +type ErrorSource = Box; + +pub struct Error { + kind: ErrorKind, + source: Option, +} + +impl Error { + pub fn new(kind: ErrorKind) -> Self { + Self { kind, source: None } + } + + pub fn with(mut self, source: impl Into) -> Self { + self.source = Some(source.into()); + self + } +} + +impl From for Error { + fn from(err: hex::FromHexError) -> Self { + Error::new(ErrorKind::InternalError(InternalError::DecodingError( + err.to_string(), + ))) + } +} + +impl From for Error { + fn from(err: prost::DecodeError) -> Self { + Error::new(ErrorKind::InternalError(InternalError::DecodingError( + err.to_string(), + ))) + } +} + +impl From for Error { + fn from(err: FromUtf8Error) -> Self { + Error::new(ErrorKind::InternalError(InternalError::DecodingError( + err.to_string(), + ))) + } +} + +impl From for Error { + fn from(err: TlsCodecError) -> Self { + Error::new(ErrorKind::InternalError(InternalError::TLSError( + err.to_string(), + ))) + } +} + +impl From for Error { + fn from(internal: InternalError) -> Self { + Error::new(ErrorKind::InternalError(internal)) + } +} + +impl fmt::Debug for Error { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut f = f.debug_tuple("xmtp::error::Error"); + + f.field(&self.kind); + + if let Some(source) = &self.source { + f.field(source); + } + + f.finish() + } +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let s = match &self.kind { + ErrorKind::SetupCreateChannelError => "failed to create channel", + ErrorKind::SetupTLSConfigError => "tls configuration failed", + ErrorKind::SetupConnectionError => "connection failed", + ErrorKind::PublishError => "publish error", + ErrorKind::QueryError => "query error", + ErrorKind::SubscribeError => "subscribe error", + ErrorKind::BatchQueryError => "batch query error", + ErrorKind::IdentityError => "identity error", + ErrorKind::MlsError => "mls error", + ErrorKind::SubscriptionUpdateError => "subscription update error", + ErrorKind::MetadataError => "metadata error", + ErrorKind::InternalError(internal) => match internal { + InternalError::MissingPayloadError => "missing payload error", + InternalError::UnexpectedPayloadError => "unexpected payload error", + InternalError::InvalidTopicError(topic) => { + &format!("invalid topic error: {}", topic) + } + InternalError::DecodingError(msg) => msg, + InternalError::TLSError(msg) => msg, + }, + }; + f.write_str(s)?; + if self.source().is_some() { + f.write_str(": ")?; + f.write_str(&self.source().unwrap().to_string())?; + } + Ok(()) + } +} + +impl StdError for Error { + fn source(&self) -> Option<&(dyn StdError + 'static)> { + self.source + .as_ref() + .map(|source| &**source as &(dyn StdError + 'static)) + } +} diff --git a/xmtp_proto/src/lib.rs b/xmtp_proto/src/lib.rs index 9e398ac76..ac167e7dc 100644 --- a/xmtp_proto/src/lib.rs +++ b/xmtp_proto/src/lib.rs @@ -4,11 +4,18 @@ mod generated { } pub use generated::*; +mod error; +pub use error::*; + #[cfg(feature = "xmtp-message_api-v1")] pub mod api_client; #[cfg(feature = "convert")] pub mod convert; +#[cfg(feature = "convert")] +pub mod types; +#[cfg(feature = "convert")] +pub mod v4_utils; #[cfg(test)] pub mod test { diff --git a/xmtp_proto/src/types.rs b/xmtp_proto/src/types.rs new file mode 100644 index 000000000..908bb36ee --- /dev/null +++ b/xmtp_proto/src/types.rs @@ -0,0 +1,8 @@ +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[repr(u8)] +pub enum TopicKind { + GroupMessagesV1 = 0, + WelcomeMessagesV1, + IdentityUpdatesV1, + KeyPackagesV1, +} diff --git a/xmtp_proto/src/v4_utils.rs b/xmtp_proto/src/v4_utils.rs new file mode 100644 index 000000000..ac0ee625b --- /dev/null +++ b/xmtp_proto/src/v4_utils.rs @@ -0,0 +1,90 @@ +use crate::types::TopicKind; +use crate::xmtp::mls::api::v1::KeyPackageUpload; +use crate::xmtp::xmtpv4::envelopes::{ + ClientEnvelope, OriginatorEnvelope, UnsignedOriginatorEnvelope, +}; +use crate::InternalError::MissingPayloadError; +use crate::{Error, ErrorKind}; +use openmls::key_packages::KeyPackageIn; +use openmls::prelude::tls_codec::Deserialize; +use openmls::prelude::{MlsMessageIn, ProtocolMessage, ProtocolVersion}; +use openmls_rust_crypto::RustCrypto; +use prost::Message; + +pub const MLS_PROTOCOL_VERSION: ProtocolVersion = ProtocolVersion::Mls10; + +pub fn build_key_package_topic(installation_id: &[u8]) -> Vec { + let mut topic = Vec::with_capacity(1 + installation_id.len()); + topic.push(TopicKind::KeyPackagesV1 as u8); + topic.extend_from_slice(installation_id); + topic +} + +pub fn build_identity_update_topic(inbox_id: &[u8]) -> Vec { + let mut topic = Vec::with_capacity(1 + inbox_id.len()); + topic.push(TopicKind::IdentityUpdatesV1 as u8); + topic.extend_from_slice(inbox_id); + topic +} + +pub fn build_group_message_topic(group_id: &[u8]) -> Vec { + let mut topic = Vec::with_capacity(1 + group_id.len()); + topic.push(TopicKind::GroupMessagesV1 as u8); + topic.extend_from_slice(group_id); + topic +} + +pub fn build_welcome_message_topic(installation_id: &[u8]) -> Vec { + let mut topic = Vec::with_capacity(1 + installation_id.len()); + topic.push(TopicKind::WelcomeMessagesV1 as u8); + topic.extend_from_slice(installation_id); + topic +} + +pub fn build_identity_topic_from_hex_encoded( + hex_encoded_inbox_id: &String, +) -> Result, Error> { + let decoded_inbox_id = hex::decode(hex_encoded_inbox_id)?; + Ok(build_identity_update_topic(&decoded_inbox_id)) +} + +pub fn extract_unsigned_originator_envelope( + req: &OriginatorEnvelope, +) -> Result { + let mut unsigned_bytes = req.unsigned_originator_envelope.as_slice(); + Ok(UnsignedOriginatorEnvelope::decode(&mut unsigned_bytes)?) +} + +pub fn extract_client_envelope(req: &OriginatorEnvelope) -> Result { + let unsigned_originator = extract_unsigned_originator_envelope(req)?; + + let payer_envelope = unsigned_originator + .payer_envelope + .ok_or(Error::new(ErrorKind::InternalError(MissingPayloadError)))?; + + let mut payer_bytes = payer_envelope.unsigned_client_envelope.as_slice(); + Ok(ClientEnvelope::decode(&mut payer_bytes)?) +} + +pub fn get_group_message_topic(message: Vec) -> Result, Error> { + let msg_result = MlsMessageIn::tls_deserialize(&mut message.as_slice())?; + let protocol_message: ProtocolMessage = + msg_result.try_into_protocol_message().map_err(|_| { + Error::new(ErrorKind::MlsError).with("Failed to convert to protocol message") + })?; + + Ok(build_group_message_topic( + protocol_message.group_id().as_slice(), + )) +} + +pub fn get_key_package_topic(key_package: &KeyPackageUpload) -> Result, Error> { + let kp_in: KeyPackageIn = + KeyPackageIn::tls_deserialize_exact(key_package.key_package_tls_serialized.as_slice())?; + let rust_crypto = RustCrypto::default(); + let kp = kp_in + .validate(&rust_crypto, MLS_PROTOCOL_VERSION) + .map_err(|_| Error::new(ErrorKind::MlsError).with("key package validation failed"))?; + let installation_key = kp.leaf_node().signature_key().as_slice(); + Ok(build_key_package_topic(installation_key)) +}