Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Node Streams Debugging + Logging #1342

Merged
merged 3 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ pub async fn create_client(
None => EncryptedMessageStore::new_unencrypted(storage_option).await?,
};
log::info!("Creating XMTP client");
let identity_strategy = IdentityStrategy::CreateIfNotFound(
let identity_strategy = IdentityStrategy::new(
inbox_id.clone(),
account_address.clone(),
nonce,
Expand Down
2 changes: 2 additions & 0 deletions bindings_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ crate-type = ["cdylib"]
# Default enable napi4 feature, see https://nodejs.org/api/n-api.html#node-api-version-matrix
hex.workspace = true
napi = { version = "2.12.2", default-features = false, features = [
"napi4",
"napi6",
"async",
] }
Expand All @@ -23,6 +24,7 @@ xmtp_cryptography = { path = "../xmtp_cryptography" }
xmtp_id = { path = "../xmtp_id" }
xmtp_mls = { path = "../xmtp_mls" }
xmtp_proto = { path = "../xmtp_proto", features = ["proto_full"] }
futures.workspace = true

[build-dependencies]
napi-build = "2.0.1"
Expand Down
2 changes: 1 addition & 1 deletion bindings_node/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ pub async fn create_client(
.map_err(|_| Error::from_reason("Error creating unencrypted message store"))?,
};

let identity_strategy = IdentityStrategy::CreateIfNotFound(
let identity_strategy = IdentityStrategy::new(
inbox_id.clone(),
account_address.clone().to_lowercase(),
// this is a temporary solution
Expand Down
10 changes: 10 additions & 0 deletions bindings_node/src/conversations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,12 +345,22 @@ impl Conversations {
callback: JsFunction,
conversation_type: Option<ConversationType>,
) -> Result<StreamCloser> {
tracing::trace!(
inbox_id = self.inner_client.inbox_id(),
conversation_type = ?conversation_type,
);
let tsfn: ThreadsafeFunction<Message, ErrorStrategy::CalleeHandled> =
callback.create_threadsafe_function(0, |ctx| Ok(vec![ctx.value]))?;
let inbox_id = self.inner_client.inbox_id().to_string();
let stream_closer = RustXmtpClient::stream_all_messages_with_callback(
self.inner_client.clone(),
conversation_type.map(Into::into),
move |message| {
tracing::trace!(
inbox_id,
conversation_type = ?conversation_type,
"[received] calling tsfn callback"
);
tsfn.call(
message
.map(Into::into)
Expand Down
8 changes: 8 additions & 0 deletions bindings_node/src/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ impl StreamCloser {
}
}

#[napi]
pub async fn wait_for_ready(&self) -> Result<(), Error> {
let mut stream_handle = self.handle.lock().await;
futures::future::OptionFuture::from((*stream_handle).as_mut().map(|s| s.wait_for_ready()))
.await;
Ok(())
}

/// Checks if this stream is closed
#[napi]
pub fn is_closed(&self) -> bool {
Expand Down
40 changes: 39 additions & 1 deletion bindings_node/test/Client.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import { v4 } from 'uuid'
import { toBytes } from 'viem'
import { describe, expect, it } from 'vitest'
import { createClient, createRegisteredClient, createUser } from '@test/helpers'
import {
createClient,
createRegisteredClient,
createUser,
encodeTextMessage,
sleep,
} from '@test/helpers'
import {
ConsentEntityType,
ConsentState,
Expand Down Expand Up @@ -250,3 +256,35 @@ describe('Client', () => {
).toThrow()
})
})

describe('Streams', () => {
it('should stream all messages', async () => {
const user = createUser()
const client1 = await createRegisteredClient(user)

const user2 = createUser()
const client2 = await createRegisteredClient(user2)

const group = await client1
.conversations()
.createGroup([user2.account.address])

await client2.conversations().sync()
const group2 = client2.conversations().findGroupById(group.id())

let messages = new Array()
client2.conversations().syncAllConversations()
let stream = client2.conversations().streamAllMessages((msg) => {
console.log('Message', msg)
messages.push(msg)
})
await stream.waitForReady()
group.send(encodeTextMessage('Test1'))
group.send(encodeTextMessage('Test2'))
group.send(encodeTextMessage('Test3'))
group.send(encodeTextMessage('Test4'))
await sleep(1000)
await stream.endAndWait()
expect(messages.length).toBe(4)
})
})
6 changes: 6 additions & 0 deletions bindings_node/test/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,9 @@ export const encodeTextMessage = (text: string) => {
content: new TextEncoder().encode(text),
}
}

export function sleep(ms) {
return new Promise((resolve) => {
setTimeout(resolve, ms)
})
}
2 changes: 1 addition & 1 deletion bindings_wasm/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ pub async fn create_client(
.map_err(|_| JsError::new("Error creating unencrypted message store"))?,
};

let identity_strategy = IdentityStrategy::CreateIfNotFound(
let identity_strategy = IdentityStrategy::new(
inbox_id.clone(),
account_address.clone().to_lowercase(),
// this is a temporary solution
Expand Down
2 changes: 1 addition & 1 deletion examples/cli/cli-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ where
let inbox_id = generate_inbox_id(&w.get_address(), &nonce)?;
let client = create_client(
cli,
IdentityStrategy::CreateIfNotFound(inbox_id, w.get_address(), nonce, None),
IdentityStrategy::new(inbox_id, w.get_address(), nonce, None),
client,
)
.await?;
Expand Down
2 changes: 1 addition & 1 deletion xmtp_debug/src/app/clients.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ async fn new_client_inner(
dir.join(db_name)
};

let client = crate::DbgClient::builder(IdentityStrategy::CreateIfNotFound(
let client = crate::DbgClient::builder(IdentityStrategy::new(
inbox_id,
wallet.get_address(),
nonce,
Expand Down
22 changes: 22 additions & 0 deletions xmtp_mls/src/api/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ where
group_id: Vec<u8>,
id_cursor: Option<u64>,
) -> Result<Vec<GroupMessage>, ApiError> {
tracing::debug!(
group_id = hex::encode(&group_id),
id_cursor,
inbox_id = self.inbox_id,
"query group messages"
);
let mut out: Vec<GroupMessage> = vec![];
let page_size = 100;
let mut id_cursor = id_cursor;
Expand Down Expand Up @@ -114,6 +120,12 @@ where
installation_id: Vec<u8>,
id_cursor: Option<u64>,
) -> Result<Vec<WelcomeMessage>, ApiError> {
tracing::debug!(
installation_id = hex::encode(&installation_id),
cursor = id_cursor,
inbox_id = self.inbox_id,
"query welcomes"
);
let mut out: Vec<WelcomeMessage> = vec![];
let page_size = 100;
let mut id_cursor = id_cursor;
Expand Down Expand Up @@ -162,6 +174,7 @@ where
key_package: Vec<u8>,
is_inbox_id_credential: bool,
) -> Result<(), ApiError> {
tracing::debug!(inbox_id = self.inbox_id, "upload key packages");
retry_async!(
self.retry_strategy,
(async {
Expand All @@ -184,6 +197,7 @@ where
&self,
installation_keys: Vec<Vec<u8>>,
) -> Result<KeyPackageMap, ApiError> {
tracing::debug!(inbox_id = self.inbox_id, "fetch key packages");
let res = retry_async!(
self.retry_strategy,
(async {
Expand Down Expand Up @@ -220,6 +234,7 @@ where
&self,
messages: &[WelcomeMessageInput],
) -> Result<(), ApiError> {
tracing::debug!(inbox_id = self.inbox_id, "send welcome messages");
retry_async!(
self.retry_strategy,
(async {
Expand All @@ -236,6 +251,11 @@ where

#[tracing::instrument(level = "trace", skip_all)]
pub async fn send_group_messages(&self, group_messages: Vec<&[u8]>) -> Result<(), ApiError> {
tracing::debug!(
inbox_id = self.inbox_id,
"sending [{}] group messages",
group_messages.len()
);
let to_send: Vec<GroupMessageInput> = group_messages
.iter()
.map(|msg| GroupMessageInput {
Expand Down Expand Up @@ -267,6 +287,7 @@ where
where
ApiClient: XmtpMlsStreams,
{
tracing::debug!(inbox_id = self.inbox_id, "subscribing to group messages");
self.api_client
.subscribe_group_messages(SubscribeGroupMessagesRequest {
filters: filters.into_iter().map(|f| f.into()).collect(),
Expand All @@ -282,6 +303,7 @@ where
where
ApiClient: XmtpMlsStreams,
{
tracing::debug!(inbox_id = self.inbox_id, "subscribing to welcome messages");
self.api_client
.subscribe_welcome_messages(SubscribeWelcomeMessagesRequest {
filters: vec![WelcomeFilterProto {
Expand Down
10 changes: 9 additions & 1 deletion xmtp_mls/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
XmtpApi,
};
use thiserror::Error;
use xmtp_id::associations::DeserializationError as AssociationDeserializationError;
use xmtp_id::{associations::DeserializationError as AssociationDeserializationError, InboxId};
use xmtp_proto::Error as ApiError;

pub use identity::*;
Expand All @@ -34,6 +34,7 @@ impl RetryableError for WrappedApiError {
pub struct ApiClientWrapper<ApiClient> {
pub(crate) api_client: Arc<ApiClient>,
pub(crate) retry_strategy: Retry,
pub(crate) inbox_id: Option<InboxId>,
}

impl<ApiClient> ApiClientWrapper<ApiClient>
Expand All @@ -44,6 +45,13 @@ where
Self {
api_client,
retry_strategy,
inbox_id: None,
}
}

/// Attach an InboxId to this API Client Wrapper.
/// Attaches an inbox_id context to tracing logs, useful for debugging
pub(crate) fn attach_inbox_id(&mut self, inbox_id: Option<InboxId>) {
self.inbox_id = inbox_id;
}
}
Loading