Skip to content

Commit

Permalink
more logs around streaming and api queries
Browse files Browse the repository at this point in the history
  • Loading branch information
insipx committed Nov 26, 2024
1 parent ce01a73 commit 7f64078
Show file tree
Hide file tree
Showing 17 changed files with 169 additions and 18 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bindings_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ xmtp_cryptography = { path = "../xmtp_cryptography" }
xmtp_id = { path = "../xmtp_id" }
xmtp_mls = { path = "../xmtp_mls" }
xmtp_proto = { path = "../xmtp_proto", features = ["proto_full"] }
futures.workspace = true

[build-dependencies]
napi-build = "2.0.1"
Expand Down
2 changes: 1 addition & 1 deletion bindings_node/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ pub async fn create_client(
.map_err(|_| Error::from_reason("Error creating unencrypted message store"))?,
};

let identity_strategy = IdentityStrategy::CreateIfNotFound(
let identity_strategy = IdentityStrategy::new(
inbox_id.clone(),
account_address.clone().to_lowercase(),
// this is a temporary solution
Expand Down
10 changes: 10 additions & 0 deletions bindings_node/src/conversations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,12 +342,22 @@ impl Conversations {
callback: JsFunction,
conversation_type: Option<ConversationType>,
) -> Result<StreamCloser> {
tracing::trace!(
inbox_id = self.inner_client.inbox_id(),
conversation_type = ?conversation_type,
);
let tsfn: ThreadsafeFunction<Message, ErrorStrategy::CalleeHandled> =
callback.create_threadsafe_function(0, |ctx| Ok(vec![ctx.value]))?;
let inbox_id = self.inner_client.inbox_id().to_string();
let stream_closer = RustXmtpClient::stream_all_messages_with_callback(
self.inner_client.clone(),
conversation_type.map(Into::into),
move |message| {
tracing::trace!(
inbox_id,
conversation_type = ?conversation_type,
"[received] calling tsfn callback"
);
tsfn.call(
message
.map(Into::into)
Expand Down
8 changes: 8 additions & 0 deletions bindings_node/src/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ impl StreamCloser {
}
}

#[napi]
pub async fn wait_for_ready(&self) -> Result<(), Error> {
let mut stream_handle = self.handle.lock().await;
futures::future::OptionFuture::from((*stream_handle).as_mut().map(|s| s.wait_for_ready()))
.await;
Ok(())
}

/// Checks if this stream is closed
#[napi]
pub fn is_closed(&self) -> bool {
Expand Down
37 changes: 36 additions & 1 deletion bindings_node/test/Client.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
import { v4 } from 'uuid'
import { toBytes } from 'viem'
import { describe, expect, it } from 'vitest'
import { createClient, createRegisteredClient, createUser } from '@test/helpers'
import {
createClient,
createRegisteredClient,
createUser,
encodeTextMessage,
} from '@test/helpers'
import {
ConsentEntityType,
ConsentState,
Expand Down Expand Up @@ -250,3 +255,33 @@ describe('Client', () => {
).toThrow()
})
})

describe('Streams', () => {
it.only('should stream all messages', async () => {
const user = createUser()
const client1 = await createRegisteredClient(user)

const user2 = createUser()
const client2 = await createRegisteredClient(user2)

const group = await client1
.conversations()
.createGroup([user2.account.address])

await client2.conversations().sync()
const group2 = client2.conversations().findGroupById(group.id())

let messages = new Array()
let stream = client2.conversations().streamAllMessages((msg) => {
console.log('Message', msg)
messages.push(msg)
})
await stream.waitForReady()
group.send(encodeTextMessage('Test1'))
group.send(encodeTextMessage('Test2'))
group.send(encodeTextMessage('Test3'))
group.send(encodeTextMessage('Test4'))
await stream.endAndWait()
expect(messages.length).toBe(4)
})
})
2 changes: 1 addition & 1 deletion bindings_node/test/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ export const createClient = async (user: User) => {
user.account.address,
undefined,
undefined,
{ level: 'info' }
{ level: 'trace', structured: true }
)
}

Expand Down
22 changes: 22 additions & 0 deletions xmtp_mls/src/api/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ where
group_id: Vec<u8>,
id_cursor: Option<u64>,
) -> Result<Vec<GroupMessage>, ApiError> {
tracing::debug!(
group_id = hex::encode(&group_id),
id_cursor,
inbox_id = self.inbox_id,
"query welcomes"
);
let mut out: Vec<GroupMessage> = vec![];
let page_size = 100;
let mut id_cursor = id_cursor;
Expand Down Expand Up @@ -114,6 +120,12 @@ where
installation_id: Vec<u8>,
id_cursor: Option<u64>,
) -> Result<Vec<WelcomeMessage>, ApiError> {
tracing::debug!(
installation_id = hex::encode(&installation_id),
cursor = id_cursor,
inbox_id = self.inbox_id,
"query welcomes"
);
let mut out: Vec<WelcomeMessage> = vec![];
let page_size = 100;
let mut id_cursor = id_cursor;
Expand Down Expand Up @@ -162,6 +174,7 @@ where
key_package: Vec<u8>,
is_inbox_id_credential: bool,
) -> Result<(), ApiError> {
tracing::debug!(inbox_id = self.inbox_id, "upload key package");
retry_async!(
self.retry_strategy,
(async {
Expand All @@ -184,6 +197,7 @@ where
&self,
installation_keys: Vec<Vec<u8>>,
) -> Result<KeyPackageMap, ApiError> {
tracing::debug!(inbox_id = self.inbox_id, "fetch key packages");
let res = retry_async!(
self.retry_strategy,
(async {
Expand Down Expand Up @@ -220,6 +234,7 @@ where
&self,
messages: &[WelcomeMessageInput],
) -> Result<(), ApiError> {
tracing::debug!(inbox_id = self.inbox_id, "send welcome messages");
retry_async!(
self.retry_strategy,
(async {
Expand All @@ -236,6 +251,11 @@ where

#[tracing::instrument(level = "trace", skip_all)]
pub async fn send_group_messages(&self, group_messages: Vec<&[u8]>) -> Result<(), ApiError> {
tracing::debug!(
inbox_id = self.inbox_id,
"sending [{}] group messages",
group_messages.len()
);
let to_send: Vec<GroupMessageInput> = group_messages
.iter()
.map(|msg| GroupMessageInput {
Expand Down Expand Up @@ -267,6 +287,7 @@ where
where
ApiClient: XmtpMlsStreams,
{
tracing::debug!(inbox_id = self.inbox_id, "subscribing to group messages");
self.api_client
.subscribe_group_messages(SubscribeGroupMessagesRequest {
filters: filters.into_iter().map(|f| f.into()).collect(),
Expand All @@ -282,6 +303,7 @@ where
where
ApiClient: XmtpMlsStreams,
{
tracing::debug!(inbox_id = self.inbox_id, "subscribing to welcome messages");
self.api_client
.subscribe_welcome_messages(SubscribeWelcomeMessagesRequest {
filters: vec![WelcomeFilterProto {
Expand Down
10 changes: 9 additions & 1 deletion xmtp_mls/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
XmtpApi,
};
use thiserror::Error;
use xmtp_id::associations::DeserializationError as AssociationDeserializationError;
use xmtp_id::{associations::DeserializationError as AssociationDeserializationError, InboxId};
use xmtp_proto::Error as ApiError;

pub use identity::*;
Expand All @@ -34,6 +34,7 @@ impl RetryableError for WrappedApiError {
pub struct ApiClientWrapper<ApiClient> {
pub(crate) api_client: Arc<ApiClient>,
pub(crate) retry_strategy: Retry,
pub(crate) inbox_id: Option<InboxId>,
}

impl<ApiClient> ApiClientWrapper<ApiClient>
Expand All @@ -44,6 +45,13 @@ where
Self {
api_client,
retry_strategy,
inbox_id: None,
}
}

/// Attach an InboxId to this API Client Wrapper.
/// Attaches an inbox_id context to tracing logs, useful for debugging
pub(crate) fn attach_inbox_id(&mut self, inbox_id: Option<InboxId>) {
self.inbox_id = inbox_id;
}
}
6 changes: 5 additions & 1 deletion xmtp_mls/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,11 @@ where
let store = store
.take()
.ok_or(ClientBuilderError::MissingParameter { parameter: "store" })?;
debug!("Initializing identity");

debug!(
inbox_id = identity_strategy.inbox_id(),
"Initializing identity"
);

let identity = identity_strategy
.initialize_identity(&api_client_wrapper, &store, &scw_verifier)
Expand Down
4 changes: 3 additions & 1 deletion xmtp_mls/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ where
/// It is expected that most users will use the [`ClientBuilder`](crate::builder::ClientBuilder) instead of instantiating
/// a client directly.
pub fn new(
api_client: ApiClientWrapper<ApiClient>,
mut api_client: ApiClientWrapper<ApiClient>,
identity: Identity,
store: EncryptedMessageStore,
scw_verifier: V,
Expand All @@ -227,6 +227,7 @@ where
where
V: SmartContractSignatureVerifier,
{
api_client.attach_inbox_id(Some(identity.inbox_id().to_string()));
let context = Arc::new(XmtpMlsLocalContext {
identity,
store,
Expand Down Expand Up @@ -728,6 +729,7 @@ where

/// Download all unread welcome messages and converts to a group struct, ignoring malformed messages.
/// Returns any new groups created in the operation
#[tracing::instrument(level = "debug", skip_all)]
pub async fn sync_welcomes(
&self,
conn: &DbConnection,
Expand Down
1 change: 1 addition & 0 deletions xmtp_mls/src/groups/intents.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ impl<ScopedClient: ScopedGroupClient> MlsGroup<ScopedClient> {
if intent_kind != IntentKind::SendMessage {
conn.update_rotated_at_ns(self.group_id.clone())?;
}
tracing::debug!(inbox_id = self.client.inbox_id(), intent_kind = %intent_kind, "queued intent");

Ok(intent)
}
Expand Down
1 change: 1 addition & 0 deletions xmtp_mls/src/groups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,7 @@ impl<ScopedClient: ScopedGroupClient> MlsGroup<ScopedClient> {

/// Send a message on this users XMTP [`Client`].
pub async fn send_message(&self, message: &[u8]) -> Result<Vec<u8>, GroupError> {
tracing::debug!(inbox_id = self.client.inbox_id(), "sending message");
let conn = self.context().store().conn()?;
let provider = XmtpOpenMlsProvider::from(conn);
self.send_message_with_provider(message, &provider).await
Expand Down
8 changes: 6 additions & 2 deletions xmtp_mls/src/groups/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ impl<ScopedClient: ScopedGroupClient> MlsGroup<ScopedClient> {
}

/// Stream messages from groups in `group_id_to_info`
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) async fn stream_messages<ScopedClient>(
client: &ScopedClient,
group_id_to_info: Arc<HashMap<Vec<u8>, MessagesStreamInfo>>,
Expand All @@ -200,9 +201,12 @@ where
let group_id_to_info = group_id_to_info.clone();
async move {
let envelope = res.map_err(GroupError::from)?;
tracing::info!("Received message streaming payload");
let group_id = extract_group_id(&envelope)?;
tracing::info!("Extracted group id {}", hex::encode(&group_id));
tracing::info!(
inbox_id = client.inbox_id(),
group_id = hex::encode(&group_id),
"Received message streaming payload"
);
let stream_info =
group_id_to_info
.get(&group_id)
Expand Down
45 changes: 38 additions & 7 deletions xmtp_mls/src/identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,45 @@ use xmtp_proto::xmtp::identity::MlsCredential;
#[derive(Debug, Clone)]
pub enum IdentityStrategy {
/// Tries to get an identity from the disk store. If not found, getting one from backend.
CreateIfNotFound(InboxId, String, u64, Option<Vec<u8>>), // (inbox_id, address, nonce, legacy_signed_private_key)
CreateIfNotFound {
inbox_id: InboxId,
address: String,
nonce: u64,
legacy_signed_private_key: Option<Vec<u8>>,
},
/// Identity that is already in the disk store
CachedOnly,
/// An already-built Identity for testing purposes
#[cfg(test)]
ExternalIdentity(Identity),
}

impl IdentityStrategy {
pub fn inbox_id<'a>(&'a self) -> Option<InboxIdRef<'a>> {
use IdentityStrategy::*;
match self {
CreateIfNotFound { ref inbox_id, .. } => Some(inbox_id),
_ => None,
}
}

/// Create a new Identity Strategy.
/// If an Identity is not found in the local store, creates a new one.
pub fn new(
inbox_id: InboxId,
address: String,
nonce: u64,
legacy_signed_private_key: Option<Vec<u8>>,
) -> Self {
Self::CreateIfNotFound {
inbox_id,
address,
nonce,
legacy_signed_private_key,
}
}
}

impl IdentityStrategy {
/**
* Initialize an identity from the given strategy. If a stored identity is found in the database,
Expand All @@ -83,6 +114,8 @@ impl IdentityStrategy {
store: &EncryptedMessageStore,
scw_signature_verifier: impl SmartContractSignatureVerifier,
) -> Result<Identity, IdentityError> {
use IdentityStrategy::*;

info!("Initializing identity");
let conn = store.conn()?;
let provider = XmtpOpenMlsProvider::new(conn);
Expand All @@ -94,15 +127,13 @@ impl IdentityStrategy {

debug!("identity in store: {:?}", stored_identity);
match self {
IdentityStrategy::CachedOnly => {
stored_identity.ok_or(IdentityError::RequiredIdentityNotFound)
}
IdentityStrategy::CreateIfNotFound(
CachedOnly => stored_identity.ok_or(IdentityError::RequiredIdentityNotFound),
CreateIfNotFound {
inbox_id,
address,
nonce,
legacy_signed_private_key,
) => {
} => {
if let Some(stored_identity) = stored_identity {
if inbox_id != stored_identity.inbox_id {
return Err(IdentityError::InboxIdMismatch {
Expand All @@ -126,7 +157,7 @@ impl IdentityStrategy {
}
}
#[cfg(test)]
IdentityStrategy::ExternalIdentity(identity) => Ok(identity),
ExternalIdentity(identity) => Ok(identity),
}
}
}
Expand Down
Loading

0 comments on commit 7f64078

Please sign in to comment.