diff --git a/Cargo.lock b/Cargo.lock index 63d6dbd5e..c6e3452c5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -431,6 +431,7 @@ dependencies = [ name = "bindings_node" version = "0.1.0" dependencies = [ + "futures", "hex", "napi", "napi-build", diff --git a/bindings_node/Cargo.toml b/bindings_node/Cargo.toml index e3beae204..12ababf3a 100644 --- a/bindings_node/Cargo.toml +++ b/bindings_node/Cargo.toml @@ -23,6 +23,7 @@ xmtp_cryptography = { path = "../xmtp_cryptography" } xmtp_id = { path = "../xmtp_id" } xmtp_mls = { path = "../xmtp_mls" } xmtp_proto = { path = "../xmtp_proto", features = ["proto_full"] } +futures.workspace = true [build-dependencies] napi-build = "2.0.1" diff --git a/bindings_node/src/client.rs b/bindings_node/src/client.rs index 32740d90c..d554274f0 100644 --- a/bindings_node/src/client.rs +++ b/bindings_node/src/client.rs @@ -155,7 +155,7 @@ pub async fn create_client( .map_err(|_| Error::from_reason("Error creating unencrypted message store"))?, }; - let identity_strategy = IdentityStrategy::CreateIfNotFound( + let identity_strategy = IdentityStrategy::new( inbox_id.clone(), account_address.clone().to_lowercase(), // this is a temporary solution diff --git a/bindings_node/src/conversations.rs b/bindings_node/src/conversations.rs index d6a449e70..fb35a4d83 100644 --- a/bindings_node/src/conversations.rs +++ b/bindings_node/src/conversations.rs @@ -345,12 +345,22 @@ impl Conversations { callback: JsFunction, conversation_type: Option, ) -> Result { + tracing::trace!( + inbox_id = self.inner_client.inbox_id(), + conversation_type = ?conversation_type, + ); let tsfn: ThreadsafeFunction = callback.create_threadsafe_function(0, |ctx| Ok(vec![ctx.value]))?; + let inbox_id = self.inner_client.inbox_id().to_string(); let stream_closer = RustXmtpClient::stream_all_messages_with_callback( self.inner_client.clone(), conversation_type.map(Into::into), move |message| { + tracing::trace!( + inbox_id, + conversation_type = ?conversation_type, + "[received] calling tsfn callback" + ); tsfn.call( message .map(Into::into) diff --git a/bindings_node/src/streams.rs b/bindings_node/src/streams.rs index 1094bf54d..782f85edc 100644 --- a/bindings_node/src/streams.rs +++ b/bindings_node/src/streams.rs @@ -63,6 +63,14 @@ impl StreamCloser { } } + #[napi] + pub async fn wait_for_ready(&self) -> Result<(), Error> { + let mut stream_handle = self.handle.lock().await; + futures::future::OptionFuture::from((*stream_handle).as_mut().map(|s| s.wait_for_ready())) + .await; + Ok(()) + } + /// Checks if this stream is closed #[napi] pub fn is_closed(&self) -> bool { diff --git a/bindings_node/test/Client.test.ts b/bindings_node/test/Client.test.ts index ea84ea155..726429ab6 100644 --- a/bindings_node/test/Client.test.ts +++ b/bindings_node/test/Client.test.ts @@ -1,7 +1,12 @@ import { v4 } from 'uuid' import { toBytes } from 'viem' import { describe, expect, it } from 'vitest' -import { createClient, createRegisteredClient, createUser } from '@test/helpers' +import { + createClient, + createRegisteredClient, + createUser, + encodeTextMessage, +} from '@test/helpers' import { ConsentEntityType, ConsentState, @@ -250,3 +255,33 @@ describe('Client', () => { ).toThrow() }) }) + +describe('Streams', () => { + it.only('should stream all messages', async () => { + const user = createUser() + const client1 = await createRegisteredClient(user) + + const user2 = createUser() + const client2 = await createRegisteredClient(user2) + + const group = await client1 + .conversations() + .createGroup([user2.account.address]) + + await client2.conversations().sync() + const group2 = client2.conversations().findGroupById(group.id()) + + let messages = new Array() + let stream = client2.conversations().streamAllMessages((msg) => { + console.log('Message', msg) + messages.push(msg) + }) + await stream.waitForReady() + group.send(encodeTextMessage('Test1')) + group.send(encodeTextMessage('Test2')) + group.send(encodeTextMessage('Test3')) + group.send(encodeTextMessage('Test4')) + await stream.endAndWait() + expect(messages.length).toBe(4) + }) +}) diff --git a/bindings_node/test/helpers.ts b/bindings_node/test/helpers.ts index 1d3260925..7a417317f 100644 --- a/bindings_node/test/helpers.ts +++ b/bindings_node/test/helpers.ts @@ -44,7 +44,7 @@ export const createClient = async (user: User) => { user.account.address, undefined, undefined, - { level: 'info' } + { level: 'trace', structured: true } ) } diff --git a/xmtp_mls/src/api/mls.rs b/xmtp_mls/src/api/mls.rs index 01f903546..826f84325 100644 --- a/xmtp_mls/src/api/mls.rs +++ b/xmtp_mls/src/api/mls.rs @@ -70,6 +70,12 @@ where group_id: Vec, id_cursor: Option, ) -> Result, ApiError> { + tracing::debug!( + group_id = hex::encode(&group_id), + id_cursor, + inbox_id = self.inbox_id, + "query group messages" + ); let mut out: Vec = vec![]; let page_size = 100; let mut id_cursor = id_cursor; @@ -114,6 +120,12 @@ where installation_id: Vec, id_cursor: Option, ) -> Result, ApiError> { + tracing::debug!( + installation_id = hex::encode(&installation_id), + cursor = id_cursor, + inbox_id = self.inbox_id, + "query welcomes" + ); let mut out: Vec = vec![]; let page_size = 100; let mut id_cursor = id_cursor; @@ -162,6 +174,7 @@ where key_package: Vec, is_inbox_id_credential: bool, ) -> Result<(), ApiError> { + tracing::debug!(inbox_id = self.inbox_id, "upload key packages"); retry_async!( self.retry_strategy, (async { @@ -184,6 +197,7 @@ where &self, installation_keys: Vec>, ) -> Result { + tracing::debug!(inbox_id = self.inbox_id, "fetch key packages"); let res = retry_async!( self.retry_strategy, (async { @@ -220,6 +234,7 @@ where &self, messages: &[WelcomeMessageInput], ) -> Result<(), ApiError> { + tracing::debug!(inbox_id = self.inbox_id, "send welcome messages"); retry_async!( self.retry_strategy, (async { @@ -236,6 +251,11 @@ where #[tracing::instrument(level = "trace", skip_all)] pub async fn send_group_messages(&self, group_messages: Vec<&[u8]>) -> Result<(), ApiError> { + tracing::debug!( + inbox_id = self.inbox_id, + "sending [{}] group messages", + group_messages.len() + ); let to_send: Vec = group_messages .iter() .map(|msg| GroupMessageInput { @@ -267,6 +287,7 @@ where where ApiClient: XmtpMlsStreams, { + tracing::debug!(inbox_id = self.inbox_id, "subscribing to group messages"); self.api_client .subscribe_group_messages(SubscribeGroupMessagesRequest { filters: filters.into_iter().map(|f| f.into()).collect(), @@ -282,6 +303,7 @@ where where ApiClient: XmtpMlsStreams, { + tracing::debug!(inbox_id = self.inbox_id, "subscribing to welcome messages"); self.api_client .subscribe_welcome_messages(SubscribeWelcomeMessagesRequest { filters: vec![WelcomeFilterProto { diff --git a/xmtp_mls/src/api/mod.rs b/xmtp_mls/src/api/mod.rs index 64d173c13..c9255e01f 100644 --- a/xmtp_mls/src/api/mod.rs +++ b/xmtp_mls/src/api/mod.rs @@ -10,7 +10,7 @@ use crate::{ XmtpApi, }; use thiserror::Error; -use xmtp_id::associations::DeserializationError as AssociationDeserializationError; +use xmtp_id::{associations::DeserializationError as AssociationDeserializationError, InboxId}; use xmtp_proto::Error as ApiError; pub use identity::*; @@ -34,6 +34,7 @@ impl RetryableError for WrappedApiError { pub struct ApiClientWrapper { pub(crate) api_client: Arc, pub(crate) retry_strategy: Retry, + pub(crate) inbox_id: Option, } impl ApiClientWrapper @@ -44,6 +45,13 @@ where Self { api_client, retry_strategy, + inbox_id: None, } } + + /// Attach an InboxId to this API Client Wrapper. + /// Attaches an inbox_id context to tracing logs, useful for debugging + pub(crate) fn attach_inbox_id(&mut self, inbox_id: Option) { + self.inbox_id = inbox_id; + } } diff --git a/xmtp_mls/src/builder.rs b/xmtp_mls/src/builder.rs index 1e0fc216f..ac779cb01 100644 --- a/xmtp_mls/src/builder.rs +++ b/xmtp_mls/src/builder.rs @@ -186,7 +186,11 @@ where let store = store .take() .ok_or(ClientBuilderError::MissingParameter { parameter: "store" })?; - debug!("Initializing identity"); + + debug!( + inbox_id = identity_strategy.inbox_id(), + "Initializing identity" + ); let identity = identity_strategy .initialize_identity(&api_client_wrapper, &store, &scw_verifier) diff --git a/xmtp_mls/src/client.rs b/xmtp_mls/src/client.rs index d680e3018..055252832 100644 --- a/xmtp_mls/src/client.rs +++ b/xmtp_mls/src/client.rs @@ -218,7 +218,7 @@ where /// It is expected that most users will use the [`ClientBuilder`](crate::builder::ClientBuilder) instead of instantiating /// a client directly. pub fn new( - api_client: ApiClientWrapper, + mut api_client: ApiClientWrapper, identity: Identity, store: EncryptedMessageStore, scw_verifier: V, @@ -227,6 +227,7 @@ where where V: SmartContractSignatureVerifier, { + api_client.attach_inbox_id(Some(identity.inbox_id().to_string())); let context = Arc::new(XmtpMlsLocalContext { identity, store, @@ -723,6 +724,7 @@ where /// Download all unread welcome messages and converts to a group struct, ignoring malformed messages. /// Returns any new groups created in the operation + #[tracing::instrument(level = "debug", skip_all)] pub async fn sync_welcomes( &self, conn: &DbConnection, diff --git a/xmtp_mls/src/groups/intents.rs b/xmtp_mls/src/groups/intents.rs index 13767ff5c..80f5eb167 100644 --- a/xmtp_mls/src/groups/intents.rs +++ b/xmtp_mls/src/groups/intents.rs @@ -86,6 +86,7 @@ impl MlsGroup { if intent_kind != IntentKind::SendMessage { conn.update_rotated_at_ns(self.group_id.clone())?; } + tracing::debug!(inbox_id = self.client.inbox_id(), intent_kind = %intent_kind, "queued intent"); Ok(intent) } diff --git a/xmtp_mls/src/groups/mod.rs b/xmtp_mls/src/groups/mod.rs index f6de6dd48..143ca5f28 100644 --- a/xmtp_mls/src/groups/mod.rs +++ b/xmtp_mls/src/groups/mod.rs @@ -591,6 +591,7 @@ impl MlsGroup { /// Send a message on this users XMTP [`Client`]. pub async fn send_message(&self, message: &[u8]) -> Result, GroupError> { + tracing::debug!(inbox_id = self.client.inbox_id(), "sending message"); let conn = self.context().store().conn()?; let provider = XmtpOpenMlsProvider::from(conn); self.send_message_with_provider(message, &provider).await diff --git a/xmtp_mls/src/groups/subscriptions.rs b/xmtp_mls/src/groups/subscriptions.rs index 5701bb882..2caf9bec9 100644 --- a/xmtp_mls/src/groups/subscriptions.rs +++ b/xmtp_mls/src/groups/subscriptions.rs @@ -180,6 +180,7 @@ impl MlsGroup { } /// Stream messages from groups in `group_id_to_info` +#[tracing::instrument(level = "debug", skip_all)] pub(crate) async fn stream_messages( client: &ScopedClient, group_id_to_info: Arc, MessagesStreamInfo>>, @@ -200,9 +201,12 @@ where let group_id_to_info = group_id_to_info.clone(); async move { let envelope = res.map_err(GroupError::from)?; - tracing::info!("Received message streaming payload"); let group_id = extract_group_id(&envelope)?; - tracing::info!("Extracted group id {}", hex::encode(&group_id)); + tracing::info!( + inbox_id = client.inbox_id(), + group_id = hex::encode(&group_id), + "Received message streaming payload" + ); let stream_info = group_id_to_info .get(&group_id) diff --git a/xmtp_mls/src/identity.rs b/xmtp_mls/src/identity.rs index 0f7ffdaa5..b253d889f 100644 --- a/xmtp_mls/src/identity.rs +++ b/xmtp_mls/src/identity.rs @@ -60,7 +60,12 @@ use xmtp_proto::xmtp::identity::MlsCredential; #[derive(Debug, Clone)] pub enum IdentityStrategy { /// Tries to get an identity from the disk store. If not found, getting one from backend. - CreateIfNotFound(InboxId, String, u64, Option>), // (inbox_id, address, nonce, legacy_signed_private_key) + CreateIfNotFound { + inbox_id: InboxId, + address: String, + nonce: u64, + legacy_signed_private_key: Option>, + }, /// Identity that is already in the disk store CachedOnly, /// An already-built Identity for testing purposes @@ -68,6 +73,32 @@ pub enum IdentityStrategy { ExternalIdentity(Identity), } +impl IdentityStrategy { + pub fn inbox_id<'a>(&'a self) -> Option> { + use IdentityStrategy::*; + match self { + CreateIfNotFound { ref inbox_id, .. } => Some(inbox_id), + _ => None, + } + } + + /// Create a new Identity Strategy. + /// If an Identity is not found in the local store, creates a new one. + pub fn new( + inbox_id: InboxId, + address: String, + nonce: u64, + legacy_signed_private_key: Option>, + ) -> Self { + Self::CreateIfNotFound { + inbox_id, + address, + nonce, + legacy_signed_private_key, + } + } +} + impl IdentityStrategy { /** * Initialize an identity from the given strategy. If a stored identity is found in the database, @@ -83,6 +114,8 @@ impl IdentityStrategy { store: &EncryptedMessageStore, scw_signature_verifier: impl SmartContractSignatureVerifier, ) -> Result { + use IdentityStrategy::*; + info!("Initializing identity"); let conn = store.conn()?; let provider = XmtpOpenMlsProvider::new(conn); @@ -94,15 +127,13 @@ impl IdentityStrategy { debug!("identity in store: {:?}", stored_identity); match self { - IdentityStrategy::CachedOnly => { - stored_identity.ok_or(IdentityError::RequiredIdentityNotFound) - } - IdentityStrategy::CreateIfNotFound( + CachedOnly => stored_identity.ok_or(IdentityError::RequiredIdentityNotFound), + CreateIfNotFound { inbox_id, address, nonce, legacy_signed_private_key, - ) => { + } => { if let Some(stored_identity) = stored_identity { if inbox_id != stored_identity.inbox_id { return Err(IdentityError::InboxIdMismatch { @@ -126,7 +157,7 @@ impl IdentityStrategy { } } #[cfg(test)] - IdentityStrategy::ExternalIdentity(identity) => Ok(identity), + ExternalIdentity(identity) => Ok(identity), } } } diff --git a/xmtp_mls/src/storage/encrypted_store/group.rs b/xmtp_mls/src/storage/encrypted_store/group.rs index f5241f53e..6b9bc0221 100644 --- a/xmtp_mls/src/storage/encrypted_store/group.rs +++ b/xmtp_mls/src/storage/encrypted_store/group.rs @@ -492,6 +492,7 @@ pub enum ConversationType { Dm = 2, Sync = 3, } + impl ToSql for ConversationType where i32: ToSql, @@ -516,6 +517,17 @@ where } } +impl std::fmt::Display for ConversationType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + use ConversationType::*; + match self { + Group => write!(f, "{}", "group"), + Dm => write!(f, "{}", "dm"), + Sync => write!(f, "{}", "sync"), + } + } +} + #[cfg(test)] pub(crate) mod tests { #[cfg(target_arch = "wasm32")] diff --git a/xmtp_mls/src/subscriptions.rs b/xmtp_mls/src/subscriptions.rs index 90de8461f..c4d958f7b 100644 --- a/xmtp_mls/src/subscriptions.rs +++ b/xmtp_mls/src/subscriptions.rs @@ -235,6 +235,7 @@ where match result { Ok(Some(group)) => { tracing::info!( + inbox_id = self.inbox_id(), group_id = hex::encode(&group.id), welcome_id = ?group.welcome_id, "Loading existing group for welcome_id: {:?}", @@ -261,6 +262,7 @@ where Ok(welcome) } + #[tracing::instrument(level = "debug", skip_all)] pub async fn stream_conversations( &self, conversation_type: Option, @@ -298,7 +300,7 @@ where let installation_key = self.installation_public_key(); let id_cursor = 0; - tracing::info!("Setting up conversation stream"); + tracing::info!(inbox_id = self.inbox_id(), "Setting up conversation stream"); let subscription = self .api_client .subscribe_welcome_messages(installation_key, Some(id_cursor)) @@ -306,7 +308,10 @@ where let stream = subscription .map(|welcome| async { - tracing::info!("Received conversation streaming payload"); + tracing::info!( + inbox_id = self.inbox_id(), + "Received conversation streaming payload" + ); self.process_streamed_welcome(welcome?).await }) .filter_map(|v| async { Some(v.await) }); @@ -340,11 +345,18 @@ where }) } + #[tracing::instrument(level = "debug", skip_all)] pub async fn stream_all_messages( &self, conversation_type: Option, ) -> Result> + '_, ClientError> { + tracing::debug!( + inbox_id = self.inbox_id(), + conversation_type = ?conversation_type, + "stream all messages" + ); + let conn = self.store().conn()?; self.sync_welcomes(&conn).await?; @@ -364,7 +376,6 @@ where .await?; futures::pin_mut!(messages_stream); - tracing::info!("Setting up conversation stream in stream_all_messages"); let convo_stream = self.stream_conversations(conversation_type).await?; futures::pin_mut!(convo_stream);