Skip to content

Commit

Permalink
nits
Browse files Browse the repository at this point in the history
  • Loading branch information
borngraced committed Dec 12, 2024
1 parent 89756db commit a2c7c71
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 27 deletions.
39 changes: 24 additions & 15 deletions mm2src/kdf_walletconnect/src/inbound_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{error::WalletConnectError,
WalletConnectCtxImpl};

use common::log::info;
use mm2_err_handle::prelude::{MapToMmResult, MmError, MmResult};
use mm2_err_handle::prelude::*;
use relay_rpc::domain::{MessageId, Topic};
use relay_rpc::rpc::{params::ResponseParamsSuccess, Params, Request, Response};

Expand Down Expand Up @@ -63,25 +63,34 @@ pub(crate) async fn process_inbound_request(
pub(crate) async fn process_inbound_response(ctx: &WalletConnectCtxImpl, response: Response, topic: &Topic) {
let message_id = response.id();
let result = match &response {
Response::Success(value) => {
let data = serde_json::from_value::<ResponseParamsSuccess>(value.result.clone());
if let Ok(ResponseParamsSuccess::SessionPropose(propose)) = &data {
let mut pending_requests = ctx.pending_requests.lock().await;
pending_requests.remove(&message_id);
let _ = process_session_propose_response(ctx, topic, propose).await;
Response::Success(value) => match serde_json::from_value::<ResponseParamsSuccess>(value.result.clone()) {
Ok(ResponseParamsSuccess::SessionPropose(propose)) => {
// If this is a session propose response, process it right away and return.
// Session proposal responses are not waited for since it might take a long time
// for the proposal to be accepted (user interaction). So they are handled in async fashion.
ctx.pending_requests
.lock()
.expect("pending request lock shouldn't fail!")
.remove(&message_id);
if let Err(err) = process_session_propose_response(ctx, topic, &propose).await {
common::log::error!("Failed to process session propose response: {err:?}");
}
return;
};
data.map_to_mm(|err| WalletConnectError::SerdeError(err.to_string()))
.map(|data| SessionMessage {
message_id,
topic: topic.clone(),
data,
})
},
Ok(data) => Ok(SessionMessage {
message_id,
topic: topic.clone(),
data,
}),
Err(err) => MmError::err(WalletConnectError::SerdeError(err.to_string())),
},
Response::Error(err) => MmError::err(WalletConnectError::UnSuccessfulResponse(format!("{err:?}"))),
};

let mut pending_requests = ctx.pending_requests.lock().await;
let mut pending_requests = ctx
.pending_requests
.lock()
.expect("pending request lock shouldn't fail!");
if let Some(tx) = pending_requests.remove(&message_id) {
tx.send(result).ok();
} else {
Expand Down
22 changes: 14 additions & 8 deletions mm2src/kdf_walletconnect/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use common::{executor::SpawnFuture, log::error};
use connection_handler::Handler;
use error::WalletConnectError;
use futures::channel::mpsc::{unbounded, UnboundedReceiver};
use futures::lock::Mutex;
use futures::StreamExt;
use inbound_message::{process_inbound_request, process_inbound_response, SessionMessageType};
use metadata::{generate_metadata, AUTH_TOKEN_DURATION, AUTH_TOKEN_SUB, PROJECT_ID, RELAY_ADDRESS};
Expand All @@ -41,7 +40,8 @@ use session::Session;
use session::{key::SymKeyPair, SessionManager};
use std::collections::{BTreeSet, HashMap};
use std::ops::Deref;
use std::{sync::Arc, time::Duration};
use std::{sync::{Arc, Mutex as SyncMutex},
time::Duration};
use storage::SessionStorageDb;
use storage::WalletConnectStorageOps;
use tokio::sync::oneshot;
Expand Down Expand Up @@ -75,11 +75,12 @@ pub trait WalletConnectOps {
pub struct WalletConnectCtxImpl {
pub(crate) client: Client,
pub(crate) pairing: PairingClient,
pub session_manager: SessionManager,
pub(crate) key_pair: SymKeyPair,
pub session_manager: SessionManager,
relay: Relay,
metadata: Metadata,
pending_requests: Mutex<HashMap<MessageId, oneshot::Sender<SessionMessageType>>>,
message_id_generator: MessageIdGenerator,
pending_requests: SyncMutex<HashMap<MessageId, oneshot::Sender<SessionMessageType>>>,
abortable_system: AbortableQueue,
}

Expand Down Expand Up @@ -108,6 +109,7 @@ impl WalletConnectCtx {
|r, h| abortable_system.weak_spawner().spawn(client_event_loop(r, h)),
);

let message_id_generator = MessageIdGenerator::new();
let context = Arc::new(WalletConnectCtxImpl {
client,
pairing,
Expand All @@ -116,6 +118,7 @@ impl WalletConnectCtx {
key_pair: SymKeyPair::new(),
session_manager: SessionManager::new(storage),
pending_requests: Default::default(),
message_id_generator,
abortable_system,
});

Expand Down Expand Up @@ -342,14 +345,17 @@ impl WalletConnectCtxImpl {
) -> MmResult<(oneshot::Receiver<SessionMessageType>, Duration), WalletConnectError> {
let irn_metadata = param.irn_metadata();
let ttl = irn_metadata.ttl;
let message_id = MessageIdGenerator::new().next();
let message_id = self.message_id_generator.next();
let request = Request::new(message_id, param.into());

self.publish_payload(topic, irn_metadata, Payload::Request(request))
.await?;

let (tx, rx) = oneshot::channel();
let mut pending_requests = self.pending_requests.lock().await;
let mut pending_requests = self
.pending_requests
.lock()
.expect("pending request lock shouldn't fail!");
pending_requests.insert(message_id, tx);

Ok((rx, Duration::from_secs(ttl)))
Expand Down Expand Up @@ -585,12 +591,12 @@ impl WalletConnectCtxImpl {
let request = RequestParams::SessionRequest(request);
let (rx, ttl) = self.publish_request(&active_topic, request).await?;

let maybe_response = rx
let response = rx
.timeout(ttl)
.await
.map_to_mm(|_| WalletConnectError::TimeoutError)?
.map_to_mm(|err| WalletConnectError::InternalError(err.to_string()))??;
match maybe_response.data {
match response.data {
ResponseParamsSuccess::Arbitrary(data) => callback(serde_json::from_value::<T>(data)?),
_ => MmError::err(WalletConnectError::PayloadError("Unexpected response type".to_string())),
}
Expand Down
5 changes: 1 addition & 4 deletions mm2src/kdf_walletconnect/src/session/rpc/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ pub(crate) async fn send_session_delete_request(
}

async fn session_delete_cleanup(ctx: &WalletConnectCtxImpl, topic: &Topic) -> MmResult<(), WalletConnectError> {
{
ctx.client.unsubscribe(topic.clone()).await?;
};
ctx.client.unsubscribe(topic.clone()).await?;

if let Some(session) = ctx.session_manager.delete_session(topic) {
debug!(
Expand All @@ -53,7 +51,6 @@ async fn session_delete_cleanup(ctx: &WalletConnectCtxImpl, topic: &Topic) -> Mm
ctx.client.unsubscribe(session.pairing_topic.clone()).await?;
// Attempt to delete/disconnect the pairing
ctx.pairing.delete(&session.pairing_topic);

// delete session from storage as well.
ctx.session_manager
.storage()
Expand Down

0 comments on commit a2c7c71

Please sign in to comment.