From a2c7c719e7cbaaaeb6b38b5751db47582d692892 Mon Sep 17 00:00:00 2001 From: Samuel Onoja Date: Thu, 12 Dec 2024 20:20:28 +0100 Subject: [PATCH] nits --- .../kdf_walletconnect/src/inbound_message.rs | 39 ++++++++++++------- mm2src/kdf_walletconnect/src/lib.rs | 22 +++++++---- .../src/session/rpc/delete.rs | 5 +-- 3 files changed, 39 insertions(+), 27 deletions(-) diff --git a/mm2src/kdf_walletconnect/src/inbound_message.rs b/mm2src/kdf_walletconnect/src/inbound_message.rs index 5febdd9e7c..72b8676d93 100644 --- a/mm2src/kdf_walletconnect/src/inbound_message.rs +++ b/mm2src/kdf_walletconnect/src/inbound_message.rs @@ -10,7 +10,7 @@ use crate::{error::WalletConnectError, WalletConnectCtxImpl}; use common::log::info; -use mm2_err_handle::prelude::{MapToMmResult, MmError, MmResult}; +use mm2_err_handle::prelude::*; use relay_rpc::domain::{MessageId, Topic}; use relay_rpc::rpc::{params::ResponseParamsSuccess, Params, Request, Response}; @@ -63,25 +63,34 @@ pub(crate) async fn process_inbound_request( pub(crate) async fn process_inbound_response(ctx: &WalletConnectCtxImpl, response: Response, topic: &Topic) { let message_id = response.id(); let result = match &response { - Response::Success(value) => { - let data = serde_json::from_value::(value.result.clone()); - if let Ok(ResponseParamsSuccess::SessionPropose(propose)) = &data { - let mut pending_requests = ctx.pending_requests.lock().await; - pending_requests.remove(&message_id); - let _ = process_session_propose_response(ctx, topic, propose).await; + Response::Success(value) => match serde_json::from_value::(value.result.clone()) { + Ok(ResponseParamsSuccess::SessionPropose(propose)) => { + // If this is a session propose response, process it right away and return. + // Session proposal responses are not waited for since it might take a long time + // for the proposal to be accepted (user interaction). So they are handled in async fashion. + ctx.pending_requests + .lock() + .expect("pending request lock shouldn't fail!") + .remove(&message_id); + if let Err(err) = process_session_propose_response(ctx, topic, &propose).await { + common::log::error!("Failed to process session propose response: {err:?}"); + } return; - }; - data.map_to_mm(|err| WalletConnectError::SerdeError(err.to_string())) - .map(|data| SessionMessage { - message_id, - topic: topic.clone(), - data, - }) + }, + Ok(data) => Ok(SessionMessage { + message_id, + topic: topic.clone(), + data, + }), + Err(err) => MmError::err(WalletConnectError::SerdeError(err.to_string())), }, Response::Error(err) => MmError::err(WalletConnectError::UnSuccessfulResponse(format!("{err:?}"))), }; - let mut pending_requests = ctx.pending_requests.lock().await; + let mut pending_requests = ctx + .pending_requests + .lock() + .expect("pending request lock shouldn't fail!"); if let Some(tx) = pending_requests.remove(&message_id) { tx.send(result).ok(); } else { diff --git a/mm2src/kdf_walletconnect/src/lib.rs b/mm2src/kdf_walletconnect/src/lib.rs index b2c6ee8d83..77510a1f47 100644 --- a/mm2src/kdf_walletconnect/src/lib.rs +++ b/mm2src/kdf_walletconnect/src/lib.rs @@ -19,7 +19,6 @@ use common::{executor::SpawnFuture, log::error}; use connection_handler::Handler; use error::WalletConnectError; use futures::channel::mpsc::{unbounded, UnboundedReceiver}; -use futures::lock::Mutex; use futures::StreamExt; use inbound_message::{process_inbound_request, process_inbound_response, SessionMessageType}; use metadata::{generate_metadata, AUTH_TOKEN_DURATION, AUTH_TOKEN_SUB, PROJECT_ID, RELAY_ADDRESS}; @@ -41,7 +40,8 @@ use session::Session; use session::{key::SymKeyPair, SessionManager}; use std::collections::{BTreeSet, HashMap}; use std::ops::Deref; -use std::{sync::Arc, time::Duration}; +use std::{sync::{Arc, Mutex as SyncMutex}, + time::Duration}; use storage::SessionStorageDb; use storage::WalletConnectStorageOps; use tokio::sync::oneshot; @@ -75,11 +75,12 @@ pub trait WalletConnectOps { pub struct WalletConnectCtxImpl { pub(crate) client: Client, pub(crate) pairing: PairingClient, - pub session_manager: SessionManager, pub(crate) key_pair: SymKeyPair, + pub session_manager: SessionManager, relay: Relay, metadata: Metadata, - pending_requests: Mutex>>, + message_id_generator: MessageIdGenerator, + pending_requests: SyncMutex>>, abortable_system: AbortableQueue, } @@ -108,6 +109,7 @@ impl WalletConnectCtx { |r, h| abortable_system.weak_spawner().spawn(client_event_loop(r, h)), ); + let message_id_generator = MessageIdGenerator::new(); let context = Arc::new(WalletConnectCtxImpl { client, pairing, @@ -116,6 +118,7 @@ impl WalletConnectCtx { key_pair: SymKeyPair::new(), session_manager: SessionManager::new(storage), pending_requests: Default::default(), + message_id_generator, abortable_system, }); @@ -342,14 +345,17 @@ impl WalletConnectCtxImpl { ) -> MmResult<(oneshot::Receiver, Duration), WalletConnectError> { let irn_metadata = param.irn_metadata(); let ttl = irn_metadata.ttl; - let message_id = MessageIdGenerator::new().next(); + let message_id = self.message_id_generator.next(); let request = Request::new(message_id, param.into()); self.publish_payload(topic, irn_metadata, Payload::Request(request)) .await?; let (tx, rx) = oneshot::channel(); - let mut pending_requests = self.pending_requests.lock().await; + let mut pending_requests = self + .pending_requests + .lock() + .expect("pending request lock shouldn't fail!"); pending_requests.insert(message_id, tx); Ok((rx, Duration::from_secs(ttl))) @@ -585,12 +591,12 @@ impl WalletConnectCtxImpl { let request = RequestParams::SessionRequest(request); let (rx, ttl) = self.publish_request(&active_topic, request).await?; - let maybe_response = rx + let response = rx .timeout(ttl) .await .map_to_mm(|_| WalletConnectError::TimeoutError)? .map_to_mm(|err| WalletConnectError::InternalError(err.to_string()))??; - match maybe_response.data { + match response.data { ResponseParamsSuccess::Arbitrary(data) => callback(serde_json::from_value::(data)?), _ => MmError::err(WalletConnectError::PayloadError("Unexpected response type".to_string())), } diff --git a/mm2src/kdf_walletconnect/src/session/rpc/delete.rs b/mm2src/kdf_walletconnect/src/session/rpc/delete.rs index c8600a7bc8..aa004cc437 100644 --- a/mm2src/kdf_walletconnect/src/session/rpc/delete.rs +++ b/mm2src/kdf_walletconnect/src/session/rpc/delete.rs @@ -40,9 +40,7 @@ pub(crate) async fn send_session_delete_request( } async fn session_delete_cleanup(ctx: &WalletConnectCtxImpl, topic: &Topic) -> MmResult<(), WalletConnectError> { - { - ctx.client.unsubscribe(topic.clone()).await?; - }; + ctx.client.unsubscribe(topic.clone()).await?; if let Some(session) = ctx.session_manager.delete_session(topic) { debug!( @@ -53,7 +51,6 @@ async fn session_delete_cleanup(ctx: &WalletConnectCtxImpl, topic: &Topic) -> Mm ctx.client.unsubscribe(session.pairing_topic.clone()).await?; // Attempt to delete/disconnect the pairing ctx.pairing.delete(&session.pairing_topic); - // delete session from storage as well. ctx.session_manager .storage()