diff --git a/CHANGELOG.md b/CHANGELOG.md index e2c930c96..0c48cb79e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +- CLI: Buffer received transfer proofs for later processing if we're currently running a different swap + ## [0.13.1] - 2024-06-10 - Add retry logic to monero-wallet-rpc wallet refresh diff --git a/swap/migrations/20210903050345_create_swaps_table.sql b/swap/migrations/20210903050345_create_swaps_table.sql index 741a45e65..84a97d0f0 100644 --- a/swap/migrations/20210903050345_create_swaps_table.sql +++ b/swap/migrations/20210903050345_create_swaps_table.sql @@ -22,4 +22,10 @@ CREATE TABLE if NOT EXISTS peer_addresses ( peer_id TEXT NOT NULL, address TEXT NOT NULL +); + +CREATE TABLE if NOT EXISTS buffered_transfer_proofs +( + swap_id TEXT PRIMARY KEY NOT NULL, + proof TEXT NOT NULL ); \ No newline at end of file diff --git a/swap/sqlx-data.json b/swap/sqlx-data.json index f24a50e6f..6ab0f14a9 100644 --- a/swap/sqlx-data.json +++ b/swap/sqlx-data.json @@ -195,5 +195,33 @@ } }, "query": "\n SELECT state\n FROM swap_states\n WHERE swap_id = ?\n " + }, + "e36c287aa98ae80ad4b6bb6f7e4b59cced041406a9db71da827b09f0d3bacfd6": { + "describe": { + "columns": [], + "nullable": [], + "parameters": { + "Right": 2 + } + }, + "query": "\n INSERT INTO buffered_transfer_proofs (\n swap_id,\n proof\n ) VALUES (?, ?);\n " + }, + "e9d422daf774d099fcbde6c4cda35821da948bd86cc57798b4d8375baf0b51ae": { + "describe": { + "columns": [ + { + "name": "proof", + "ordinal": 0, + "type_info": "Text" + } + ], + "nullable": [ + false + ], + "parameters": { + "Right": 1 + } + }, + "query": "\n SELECT proof\n FROM buffered_transfer_proofs\n WHERE swap_id = ?\n " } } \ No newline at end of file diff --git a/swap/src/api/request.rs b/swap/src/api/request.rs index 338432c35..800e1c06d 100644 --- a/swap/src/api/request.rs +++ b/swap/src/api/request.rs @@ -376,7 +376,7 @@ impl Request { }, result = async { let (event_loop, mut event_loop_handle) = - EventLoop::new(swap_id, swarm, seller_peer_id)?; + EventLoop::new(swap_id, swarm, seller_peer_id, context.db.clone())?; let event_loop = tokio::spawn(event_loop.run().in_current_span()); let bid_quote = event_loop_handle.request_quote().await?; @@ -522,7 +522,7 @@ impl Request { } let (event_loop, event_loop_handle) = - EventLoop::new(swap_id, swarm, seller_peer_id)?; + EventLoop::new(swap_id, swarm, seller_peer_id, context.db.clone())?; let monero_receive_address = context.db.get_monero_address(swap_id).await?; let swap = Swap::from_db( Arc::clone(&context.db), diff --git a/swap/src/cli/event_loop.rs b/swap/src/cli/event_loop.rs index befe1dc5a..fd7b43084 100644 --- a/swap/src/cli/event_loop.rs +++ b/swap/src/cli/event_loop.rs @@ -5,6 +5,7 @@ use crate::network::encrypted_signature; use crate::network::quote::BidQuote; use crate::network::swap_setup::bob::NewSwap; use crate::protocol::bob::State2; +use crate::protocol::Database; use anyhow::{Context, Result}; use futures::future::{BoxFuture, OptionFuture}; use futures::{FutureExt, StreamExt}; @@ -13,6 +14,7 @@ use libp2p::swarm::dial_opts::DialOpts; use libp2p::swarm::SwarmEvent; use libp2p::{PeerId, Swarm}; use std::collections::HashMap; +use std::sync::Arc; use std::time::Duration; use uuid::Uuid; @@ -21,6 +23,7 @@ pub struct EventLoop { swap_id: Uuid, swarm: libp2p::Swarm, alice_peer_id: PeerId, + db: Arc, // these streams represents outgoing requests that we have to make quote_requests: bmrng::RequestReceiverStream<(), BidQuote>, @@ -51,6 +54,7 @@ impl EventLoop { swap_id: Uuid, swarm: Swarm, alice_peer_id: PeerId, + db: Arc, ) -> Result<(Self, EventLoopHandle)> { let execution_setup = bmrng::channel_with_timeout(1, Duration::from_secs(60)); let transfer_proof = bmrng::channel_with_timeout(1, Duration::from_secs(60)); @@ -69,6 +73,7 @@ impl EventLoop { inflight_swap_setup: None, inflight_encrypted_signature_requests: HashMap::default(), pending_transfer_proof: OptionFuture::from(None), + db, }; let handle = EventLoopHandle { @@ -108,38 +113,63 @@ impl EventLoop { SwarmEvent::Behaviour(OutEvent::TransferProofReceived { msg, channel, peer }) => { let swap_id = msg.swap_id; - if peer != self.alice_peer_id { - tracing::warn!( - %swap_id, - "Ignoring malicious transfer proof from {}, expected to receive it from {}", - peer, - self.alice_peer_id); - continue; - } + if swap_id == self.swap_id { + if peer != self.alice_peer_id { + tracing::warn!( + %swap_id, + "Ignoring malicious transfer proof from {}, expected to receive it from {}", + peer, + self.alice_peer_id); + continue; + } - if swap_id != self.swap_id { + let mut responder = match self.transfer_proof.send(msg.tx_lock_proof).await { + Ok(responder) => responder, + Err(e) => { + tracing::warn!("Failed to pass on transfer proof: {:#}", e); + continue; + } + }; - // TODO: Save unexpected transfer proofs in the database and check for messages in the database when handling swaps - tracing::warn!("Received unexpected transfer proof for swap {} while running swap {}. This transfer proof will be ignored", swap_id, self.swap_id); + self.pending_transfer_proof = OptionFuture::from(Some(async move { + let _ = responder.recv().await; - // When receiving a transfer proof that is unexpected we still have to acknowledge that it was received - let _ = self.swarm.behaviour_mut().transfer_proof.send_response(channel, ()); - continue; - } - - let mut responder = match self.transfer_proof.send(msg.tx_lock_proof).await { - Ok(responder) => responder, - Err(e) => { - tracing::warn!("Failed to pass on transfer proof: {:#}", e); - continue; + channel + }.boxed())); + }else { + // Check if the transfer proof is sent from the correct peer and if we have a record of the swap + match self.db.get_peer_id(swap_id).await { + // We have a record of the swap + Ok(buffer_swap_alice_peer_id) => { + if buffer_swap_alice_peer_id == self.alice_peer_id { + // Save transfer proof in the database such that we can process it later when we resume the swap + match self.db.insert_buffered_transfer_proof(swap_id, msg.tx_lock_proof).await { + Ok(_) => { + tracing::info!("Received transfer proof for swap {} while running swap {}. Buffering this transfer proof in the database for later retrieval", swap_id, self.swap_id); + let _ = self.swarm.behaviour_mut().transfer_proof.send_response(channel, ()); + } + Err(e) => { + tracing::error!("Failed to buffer transfer proof for swap {}: {:#}", swap_id, e); + } + }; + }else { + tracing::warn!( + %swap_id, + "Ignoring malicious transfer proof from {}, expected to receive it from {}", + self.swap_id, + buffer_swap_alice_peer_id); + } + }, + // We do not have a record of the swap or an error occurred while retrieving the peer id of Alice + Err(e) => { + if let Some(sqlx::Error::RowNotFound) = e.downcast_ref::() { + tracing::warn!("Ignoring transfer proof for swap {} while running swap {}. We do not have a record of this swap", swap_id, self.swap_id); + } else { + tracing::error!("Ignoring transfer proof for swap {} while running swap {}. Failed to retrieve the peer id of Alice for the corresponding swap: {:#}", swap_id, self.swap_id, e); + } + } } - }; - - self.pending_transfer_proof = OptionFuture::from(Some(async move { - let _ = responder.recv().await; - - channel - }.boxed())); + } } SwarmEvent::Behaviour(OutEvent::EncryptedSignatureAcknowledged { id }) => { if let Some(responder) = self.inflight_encrypted_signature_requests.remove(&id) { diff --git a/swap/src/database/sqlite.rs b/swap/src/database/sqlite.rs index 751f76aa9..6cddd50ed 100644 --- a/swap/src/database/sqlite.rs +++ b/swap/src/database/sqlite.rs @@ -1,5 +1,5 @@ use crate::database::Swap; -use crate::monero::Address; +use crate::monero::{Address, TransferProof}; use crate::protocol::{Database, State}; use anyhow::{anyhow, Context, Result}; use async_trait::async_trait; @@ -303,6 +303,56 @@ impl Database for SqliteDatabase { result } + async fn insert_buffered_transfer_proof( + &self, + swap_id: Uuid, + proof: TransferProof, + ) -> Result<()> { + let mut conn = self.pool.acquire().await?; + let swap_id = swap_id.to_string(); + let proof = serde_json::to_string(&proof)?; + + sqlx::query!( + r#" + INSERT INTO buffered_transfer_proofs ( + swap_id, + proof + ) VALUES (?, ?); + "#, + swap_id, + proof + ) + .execute(&mut conn) + .await?; + + Ok(()) + } + + async fn get_buffered_transfer_proof(&self, swap_id: Uuid) -> Result> { + let mut conn = self.pool.acquire().await?; + let swap_id = swap_id.to_string(); + + let row = sqlx::query!( + r#" + SELECT proof + FROM buffered_transfer_proofs + WHERE swap_id = ? + "#, + swap_id + ) + .fetch_all(&mut conn) + .await?; + + if row.is_empty() { + return Ok(None); + } + + let proof_str = &row[0].proof; + let proof = serde_json::from_str(proof_str)?; + + Ok(Some(proof)) + } + async fn raw_all(&self) -> Result>> { let mut conn = self.pool.acquire().await?; let rows = sqlx::query!( diff --git a/swap/src/network/test.rs b/swap/src/network/test.rs index 5a3243859..f27c38249 100644 --- a/swap/src/network/test.rs +++ b/swap/src/network/test.rs @@ -1,6 +1,6 @@ use async_trait::async_trait; use futures::stream::FusedStream; -use futures::{future, Future, Stream, StreamExt}; +use futures::{future, Future, StreamExt}; use libp2p::core::muxing::StreamMuxerBox; use libp2p::core::transport::upgrade::Version; use libp2p::core::transport::MemoryTransport; @@ -75,8 +75,8 @@ async fn get_local_tcp_address() -> Multiaddr { } pub async fn await_events_or_timeout( - swarm_1: &mut (impl Stream> + FusedStream + Unpin), - swarm_2: &mut (impl Stream> + FusedStream + Unpin), + swarm_1: &mut (impl FusedStream> + FusedStream + Unpin), + swarm_2: &mut (impl FusedStream> + FusedStream + Unpin), ) -> (SwarmEvent, SwarmEvent) where SwarmEvent: Debug, diff --git a/swap/src/protocol.rs b/swap/src/protocol.rs index 0e15f89af..676a03f45 100644 --- a/swap/src/protocol.rs +++ b/swap/src/protocol.rs @@ -146,4 +146,13 @@ pub trait Database { async fn get_states(&self, swap_id: Uuid) -> Result>; async fn all(&self) -> Result>; async fn raw_all(&self) -> Result>>; + async fn insert_buffered_transfer_proof( + &self, + swap_id: Uuid, + proof: monero::TransferProof, + ) -> Result<()>; + async fn get_buffered_transfer_proof( + &self, + swap_id: Uuid, + ) -> Result>; } diff --git a/swap/src/protocol/bob/swap.rs b/swap/src/protocol/bob/swap.rs index 3900a17bf..7a702d778 100644 --- a/swap/src/protocol/bob/swap.rs +++ b/swap/src/protocol/bob/swap.rs @@ -1,10 +1,11 @@ use crate::bitcoin::{ExpiredTimelocks, TxCancel, TxRefund}; use crate::cli::EventLoopHandle; use crate::network::swap_setup::bob::NewSwap; -use crate::protocol::bob; use crate::protocol::bob::state::*; +use crate::protocol::{bob, Database}; use crate::{bitcoin, monero}; use anyhow::{bail, Context, Result}; +use std::sync::Arc; use tokio::select; use uuid::Uuid; @@ -34,6 +35,7 @@ pub async fn run_until( swap.id, current_state.clone(), &mut swap.event_loop_handle, + swap.db.clone(), swap.bitcoin_wallet.as_ref(), swap.monero_wallet.as_ref(), swap.monero_receive_address, @@ -52,6 +54,7 @@ async fn next_state( swap_id: Uuid, state: BobState, event_loop_handle: &mut EventLoopHandle, + db: Arc, bitcoin_wallet: &bitcoin::Wallet, monero_wallet: &monero::Wallet, monero_receive_address: monero::Address, @@ -118,12 +121,28 @@ async fn next_state( let tx_lock_status = bitcoin_wallet.subscribe_to(state3.tx_lock.clone()).await; if let ExpiredTimelocks::None { .. } = state3.expired_timelock(bitcoin_wallet).await? { + tracing::info!("Waiting for Alice to lock Monero"); + + let buffered_transfer_proof = db + .get_buffered_transfer_proof(swap_id) + .await + .context("Failed to get buffered transfer proof")?; + + if let Some(transfer_proof) = buffered_transfer_proof { + tracing::debug!(txid = %transfer_proof.tx_hash(), "Found buffered transfer proof"); + tracing::info!(txid = %transfer_proof.tx_hash(), "Alice locked Monero"); + + return Ok(BobState::XmrLockProofReceived { + state: state3, + lock_transfer_proof: transfer_proof, + monero_wallet_restore_blockheight, + }); + } + let transfer_proof_watcher = event_loop_handle.recv_transfer_proof(); let cancel_timelock_expires = tx_lock_status.wait_until_confirmed_with(state3.cancel_timelock); - tracing::info!("Waiting for Alice to lock Monero"); - select! { transfer_proof = transfer_proof_watcher => { let transfer_proof = transfer_proof?; diff --git a/swap/tests/concurrent_bobs_before_xmr_lock_proof_sent.rs b/swap/tests/concurrent_bobs_before_xmr_lock_proof_sent.rs index d76b3b5f8..bf8adb060 100644 --- a/swap/tests/concurrent_bobs_before_xmr_lock_proof_sent.rs +++ b/swap/tests/concurrent_bobs_before_xmr_lock_proof_sent.rs @@ -32,8 +32,7 @@ async fn concurrent_bobs_before_xmr_lock_proof_sent() { let alice_swap_2 = tokio::spawn(alice::run(alice_swap_2, FixedRate::default())); // The 2nd swap ALWAYS finish successfully in this - // scenario, but will receive an "unwanted" transfer proof that is ignored in - // the event loop. + // scenario, but will receive an "unwanted" transfer proof that is buffered until the 1st swap is resumed let bob_state_2 = bob_swap_2.await??; assert!(matches!(bob_state_2, BobState::XmrRedeemed { .. })); @@ -46,15 +45,13 @@ async fn concurrent_bobs_before_xmr_lock_proof_sent() { .await; assert!(matches!(bob_state_1, BobState::BtcLocked { .. })); - // The 1st (paused) swap is expected to refund, because the transfer - // proof is delivered to the wrong swap, and we currently don't store it in the - // database for the other swap. + // The 1st (paused) swap is expected to finish successfully because the transfer proof is buffered when it is receives while another swap is running. let bob_state_1 = bob::run(bob_swap_1).await?; - assert!(matches!(bob_state_1, BobState::BtcRefunded { .. })); + assert!(matches!(bob_state_1, BobState::XmrRedeemed { .. })); let alice_state_1 = alice_swap_1.await??; - assert!(matches!(alice_state_1, AliceState::XmrRefunded { .. })); + assert!(matches!(alice_state_1, AliceState::BtcRedeemed { .. })); Ok(()) }) diff --git a/swap/tests/harness/mod.rs b/swap/tests/harness/mod.rs index 24083a713..028b09352 100644 --- a/swap/tests/harness/mod.rs +++ b/swap/tests/harness/mod.rs @@ -427,8 +427,6 @@ impl BobParams { } pub async fn new_swap_from_db(&self, swap_id: Uuid) -> Result<(bob::Swap, cli::EventLoop)> { - let (event_loop, handle) = self.new_eventloop(swap_id).await?; - if let Some(parent_dir) = self.db_path.parent() { ensure_directory_exists(parent_dir)?; } @@ -437,8 +435,10 @@ impl BobParams { } let db = Arc::new(SqliteDatabase::open(&self.db_path).await?); + let (event_loop, handle) = self.new_eventloop(swap_id, db.clone()).await?; + let swap = bob::Swap::from_db( - db, + db.clone(), swap_id, self.bitcoin_wallet.clone(), self.monero_wallet.clone(), @@ -457,8 +457,6 @@ impl BobParams { ) -> Result<(bob::Swap, cli::EventLoop)> { let swap_id = Uuid::new_v4(); - let (event_loop, handle) = self.new_eventloop(swap_id).await?; - if let Some(parent_dir) = self.db_path.parent() { ensure_directory_exists(parent_dir)?; } @@ -467,6 +465,8 @@ impl BobParams { } let db = Arc::new(SqliteDatabase::open(&self.db_path).await?); + let (event_loop, handle) = self.new_eventloop(swap_id, db.clone()).await?; + db.insert_peer_id(swap_id, self.alice_peer_id).await?; let swap = bob::Swap::new( @@ -487,6 +487,7 @@ impl BobParams { pub async fn new_eventloop( &self, swap_id: Uuid, + db: Arc, ) -> Result<(cli::EventLoop, cli::EventLoopHandle)> { let tor_socks5_port = get_port() .expect("We don't care about Tor in the tests so we get a free port to disable it."); @@ -503,7 +504,7 @@ impl BobParams { .behaviour_mut() .add_address(self.alice_peer_id, self.alice_address.clone()); - cli::EventLoop::new(swap_id, swarm, self.alice_peer_id) + cli::EventLoop::new(swap_id, swarm, self.alice_peer_id, db.clone()) } }