From 6c23530b69d67e461709c27cae72300664ce911c Mon Sep 17 00:00:00 2001 From: Artem Pikulin Date: Wed, 7 Apr 2021 14:08:28 +0700 Subject: [PATCH] Fix UTXO rpc clients dead lock on balance/unspent requests. (#887) * Try removing ElectrumClientImpl::list_unspent_subs/get_balance_subs. * Add ConcurrentRequestState and use it for scripthash_get_balance. * Prevent long holding of my_orders mutex during setprice. * Add ConcurrentRequestMap and use it for balance/unspent requests. * Use list_unspent_concurrent_map in list_unspent_impl. * Put back holding my_orders during balance check. We can have a race condition if 2 concurrent setprice calls are made with cancel_previous: true. So both orders will be created while it should create only 1 of them. * Fixes after review. * Avoid holding my_maker_orders mutex during balance checks on setprice. * Explicitly cancel previous orders on balance check errors. --- mm2src/coins/utxo.rs | 7 +- mm2src/coins/utxo/rpc_clients.rs | 235 +++++++++++++++---------------- mm2src/common/jsonrpc_client.rs | 2 +- mm2src/lp_ordermatch.rs | 103 ++++++++++---- 4 files changed, 192 insertions(+), 155 deletions(-) diff --git a/mm2src/coins/utxo.rs b/mm2src/coins/utxo.rs index ddcaf9a601..ec4aba5883 100644 --- a/mm2src/coins/utxo.rs +++ b/mm2src/coins/utxo.rs @@ -66,8 +66,8 @@ use utxo_common::{big_decimal_from_sat, display_address}; pub use chain::Transaction as UtxoTx; -use self::rpc_clients::{ElectrumClient, ElectrumClientImpl, ElectrumRpcRequest, EstimateFeeMethod, EstimateFeeMode, - UnspentInfo, UtxoRpcClientEnum}; +use self::rpc_clients::{ConcurrentRequestMap, ElectrumClient, ElectrumClientImpl, ElectrumRpcRequest, + EstimateFeeMethod, EstimateFeeMode, UnspentInfo, UtxoRpcClientEnum}; #[cfg(not(target_arch = "wasm32"))] use self::rpc_clients::{NativeClient, NativeClientImpl}; use super::{CoinTransportMetrics, CoinsContext, FeeApproxStage, FoundSwapTxSpend, HistorySyncState, MarketCoinOps, @@ -1153,8 +1153,7 @@ pub trait UtxoCoinBuilder { auth: format!("Basic {}", base64_encode(&auth_str, URL_SAFE)), event_handlers, request_id: 0u64.into(), - list_unspent_in_progress: false.into(), - list_unspent_subs: AsyncMutex::new(Vec::new()), + list_unspent_concurrent_map: ConcurrentRequestMap::new(), }); Ok(NativeClient(client)) diff --git a/mm2src/coins/utxo/rpc_clients.rs b/mm2src/coins/utxo/rpc_clients.rs index b5c069cc94..4d0c99a661 100644 --- a/mm2src/coins/utxo/rpc_clients.rs +++ b/mm2src/coins/utxo/rpc_clients.rs @@ -10,6 +10,7 @@ use common::custom_futures::select_ok_sequential; use common::executor::{spawn, Timer}; use common::jsonrpc_client::{JsonRpcClient, JsonRpcError, JsonRpcMultiClient, JsonRpcRemoteAddr, JsonRpcRequest, JsonRpcResponse, JsonRpcResponseFut, RpcRes}; +use common::log::warn; use common::mm_number::MmNumber; use common::wio::slurp_req; use common::{median, OrdRange, StringError}; @@ -45,7 +46,7 @@ use std::num::NonZeroU64; use std::ops::Deref; #[cfg(target_arch = "wasm32")] use std::os::raw::c_char; use std::pin::Pin; -use std::sync::atomic::{AtomicBool, AtomicU64, Ordering as AtomicOrdering}; +use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering}; use std::sync::Arc; use std::task::{Context, Poll}; use std::time::Duration; @@ -408,6 +409,13 @@ pub struct VerboseBlock { pub type RpcReqSub = async_oneshot::Sender>; +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +pub struct ListUnspentArgs { + min_conf: i32, + max_conf: i32, + addresses: Vec, +} + /// RPC client for UTXO based coins /// https://developer.bitcoin.org/reference/rpc/index.html - Bitcoin RPC API reference /// Other coins have additional methods or miss some of these @@ -423,8 +431,7 @@ pub struct NativeClientImpl { /// Transport event handlers pub event_handlers: Vec, pub request_id: AtomicU64, - pub list_unspent_in_progress: AtomicBool, - pub list_unspent_subs: AsyncMutex>>>, + pub list_unspent_concurrent_map: ConcurrentRequestMap>, } #[cfg(test)] @@ -436,8 +443,7 @@ impl Default for NativeClientImpl { auth: "".to_string(), event_handlers: vec![], request_id: Default::default(), - list_unspent_in_progress: Default::default(), - list_unspent_subs: Default::default(), + list_unspent_concurrent_map: ConcurrentRequestMap::new(), } } } @@ -657,32 +663,15 @@ impl NativeClient { max_conf: i32, addresses: Vec, ) -> RpcRes> { + let request_fut = rpc_func!(self, "listunspent", &min_conf, &max_conf, &addresses); let arc = self.clone(); - if self - .list_unspent_in_progress - .compare_and_swap(false, true, AtomicOrdering::Relaxed) - { - let fut = async move { - let (tx, rx) = async_oneshot::channel(); - arc.list_unspent_subs.lock().await.push(tx); - rx.await.unwrap() - }; - Box::new(fut.boxed().compat()) - } else { - let fut = async move { - let unspents_res = rpc_func!(arc, "listunspent", min_conf, max_conf, addresses) - .compat() - .await; - for sub in arc.list_unspent_subs.lock().await.drain(..) { - if sub.send(unspents_res.clone()).is_err() { - log!("list_unspent_sub is dropped"); - } - } - arc.list_unspent_in_progress.store(false, AtomicOrdering::Relaxed); - unspents_res - }; - Box::new(fut.boxed().compat()) - } + let args = ListUnspentArgs { + min_conf, + max_conf, + addresses, + }; + let fut = async move { arc.list_unspent_concurrent_map.wrap_request(args, request_fut).await }; + Box::new(fut.boxed().compat()) } } @@ -1124,26 +1113,61 @@ impl Drop for ElectrumConnection { } #[derive(Debug)] -pub struct RpcRequestState { - is_request_running: Arc, - response_subs: Vec>>, +struct ConcurrentRequestState { + is_running: bool, + subscribers: Vec>, +} + +impl ConcurrentRequestState { + fn new() -> Self { + ConcurrentRequestState { + is_running: false, + subscribers: Vec::new(), + } + } } #[derive(Debug)] -pub struct RpcRequestWrapper { - inner: HashMap>, +pub struct ConcurrentRequestMap { + inner: AsyncMutex>>, } -impl RpcRequestWrapper { - fn new() -> Self { RpcRequestWrapper { inner: HashMap::new() } } +impl Default for ConcurrentRequestMap { + fn default() -> Self { + ConcurrentRequestMap { + inner: AsyncMutex::new(HashMap::new()), + } + } +} + +impl ConcurrentRequestMap { + pub fn new() -> ConcurrentRequestMap { ConcurrentRequestMap::default() } - #[allow(dead_code)] - async fn wrap_request( - &mut self, - _key: Key, - _request: impl Future, - ) -> Result { - unimplemented!() + async fn wrap_request(&self, request_arg: K, request_fut: RpcRes) -> Result { + let mut map = self.inner.lock().await; + let state = map + .entry(request_arg.clone()) + .or_insert_with(ConcurrentRequestState::new); + if state.is_running { + let (tx, rx) = async_oneshot::channel(); + state.subscribers.push(tx); + // drop here to avoid holding the lock during await + drop(map); + rx.await.unwrap() + } else { + // drop here to avoid holding the lock during await + drop(map); + let request_res = request_fut.compat().await; + let mut map = self.inner.lock().await; + let state = map.get_mut(&request_arg).unwrap(); + for sub in state.subscribers.drain(..) { + if sub.send(request_res.clone()).is_err() { + warn!("subscriber is dropped"); + } + } + state.is_running = false; + request_res + } } } @@ -1154,12 +1178,8 @@ pub struct ElectrumClientImpl { next_id: AtomicU64, event_handlers: Vec, protocol_version: OrdRange, - get_balance_wrapper: RpcRequestWrapper, - list_unspent_wrapper: RpcRequestWrapper>, - list_unspent_in_progress: AtomicBool, - list_unspent_subs: AsyncMutex>>>, - get_balance_in_progress: AtomicBool, - get_balance_subs: AsyncMutex>>>, + get_balance_concurrent_map: ConcurrentRequestMap, + list_unspent_concurrent_map: ConcurrentRequestMap>, } #[cfg(not(target_arch = "wasm32"))] @@ -1168,17 +1188,24 @@ async fn electrum_request_multi( request: JsonRpcRequest, ) -> Result<(JsonRpcRemoteAddr, JsonRpcResponse), String> { let mut futures = vec![]; - for connection in client.connections.lock().await.iter() { + let connections = client.connections.lock().await; + for connection in connections.iter() { let connection_addr = connection.addr.clone(); match &*connection.tx.lock().await { Some(tx) => { - let fut = electrum_request(request.clone(), tx.clone(), connection.responses.clone()) - .map(|response| (JsonRpcRemoteAddr(connection_addr), response)); + let fut = electrum_request( + request.clone(), + tx.clone(), + connection.responses.clone(), + ELECTRUM_TIMEOUT / connections.len() as u64, + ) + .map(|response| (JsonRpcRemoteAddr(connection_addr), response)); futures.push(fut) }, None => (), } } + drop(connections); if futures.is_empty() { return ERR!("All electrums are currently disconnected"); } @@ -1223,7 +1250,11 @@ async fn electrum_request_to( (tx, responses) }; - let response = try_s!(electrum_request(request.clone(), tx, responses).compat().await); + let response = try_s!( + electrum_request(request.clone(), tx, responses, ELECTRUM_TIMEOUT) + .compat() + .await + ); Ok((JsonRpcRemoteAddr(to_addr.to_owned()), response)) } @@ -1423,47 +1454,26 @@ impl ElectrumClient { /// It can return duplicates sometimes: https://github.com/artemii235/SuperNET/issues/269 /// We should remove them to build valid transactions fn scripthash_list_unspent(&self, hash: &str) -> RpcRes> { + let request_fut = Box::new(rpc_func!(self, "blockchain.scripthash.listunspent", hash).and_then( + move |unspents: Vec| { + let mut map: HashMap<(H256Json, u32), bool> = HashMap::new(); + let unspents = unspents + .into_iter() + .filter(|unspent| match map.entry((unspent.tx_hash.clone(), unspent.tx_pos)) { + Entry::Occupied(_) => false, + Entry::Vacant(e) => { + e.insert(true); + true + }, + }) + .collect(); + Ok(unspents) + }, + )); let arc = self.clone(); let hash = hash.to_owned(); - if self - .list_unspent_in_progress - .compare_and_swap(false, true, AtomicOrdering::Relaxed) - { - let fut = async move { - let (tx, rx) = async_oneshot::channel(); - arc.list_unspent_subs.lock().await.push(tx); - rx.await.unwrap() - }; - Box::new(fut.boxed().compat()) - } else { - let fut = async move { - let unspents_res = rpc_func!(arc, "blockchain.scripthash.listunspent", hash) - .and_then(move |unspents: Vec| { - let mut map: HashMap<(H256Json, u32), bool> = HashMap::new(); - let unspents = unspents - .into_iter() - .filter(|unspent| match map.entry((unspent.tx_hash.clone(), unspent.tx_pos)) { - Entry::Occupied(_) => false, - Entry::Vacant(e) => { - e.insert(true); - true - }, - }) - .collect(); - Ok(unspents) - }) - .compat() - .await; - for sub in arc.list_unspent_subs.lock().await.drain(..) { - if sub.send(unspents_res.clone()).is_err() { - log!("list_unspent_sub is dropped"); - } - } - arc.list_unspent_in_progress.store(false, AtomicOrdering::Relaxed); - unspents_res - }; - Box::new(fut.boxed().compat()) - } + let fut = async move { arc.list_unspent_concurrent_map.wrap_request(hash, request_fut).await }; + Box::new(fut.boxed().compat()) } /// https://electrumx.readthedocs.io/en/latest/protocol-methods.html#blockchain-scripthash-get-history @@ -1475,29 +1485,11 @@ impl ElectrumClient { fn scripthash_get_balance(&self, hash: &str) -> RpcRes { let arc = self.clone(); let hash = hash.to_owned(); - if self - .get_balance_in_progress - .compare_and_swap(false, true, AtomicOrdering::Relaxed) - { - let fut = async move { - let (tx, rx) = async_oneshot::channel(); - arc.get_balance_subs.lock().await.push(tx); - rx.await.unwrap() - }; - Box::new(fut.boxed().compat()) - } else { - let fut = async move { - let balance_res = rpc_func!(arc, "blockchain.scripthash.get_balance", hash).compat().await; - for sub in arc.get_balance_subs.lock().await.drain(..) { - if sub.send(balance_res.clone()).is_err() { - log!("list_unspent_sub is dropped"); - } - } - arc.get_balance_in_progress.store(false, AtomicOrdering::Relaxed); - balance_res - }; - Box::new(fut.boxed().compat()) - } + let fut = async move { + let request = rpc_func!(arc, "blockchain.scripthash.get_balance", &hash); + arc.get_balance_concurrent_map.wrap_request(hash, request).await + }; + Box::new(fut.boxed().compat()) } /// https://electrumx.readthedocs.io/en/latest/protocol-methods.html#blockchain-headers-subscribe @@ -1672,12 +1664,8 @@ impl ElectrumClientImpl { next_id: 0.into(), event_handlers, protocol_version, - get_balance_wrapper: RpcRequestWrapper::new(), - list_unspent_wrapper: RpcRequestWrapper::new(), - list_unspent_in_progress: Default::default(), - list_unspent_subs: Default::default(), - get_balance_in_progress: Default::default(), - get_balance_subs: Default::default(), + get_balance_concurrent_map: ConcurrentRequestMap::new(), + list_unspent_concurrent_map: ConcurrentRequestMap::new(), } } @@ -2016,6 +2004,7 @@ fn electrum_request( request: JsonRpcRequest, tx: mpsc::Sender>, responses: Arc>>>, + timeout: u64, ) -> Box + Send + 'static> { let send_fut = async move { let mut json = try_s!(json::to_string(&request)); @@ -2033,7 +2022,7 @@ fn electrum_request( .boxed() .compat() .map_err(StringError) - .timeout(Duration::from_secs(ELECTRUM_TIMEOUT)); + .timeout(Duration::from_secs(timeout)); Box::new(send_fut.map_err(|e| ERRL!("{}", e.0))) } diff --git a/mm2src/common/jsonrpc_client.rs b/mm2src/common/jsonrpc_client.rs index adebf0ce5b..a18dd3c5a2 100644 --- a/mm2src/common/jsonrpc_client.rs +++ b/mm2src/common/jsonrpc_client.rs @@ -8,7 +8,7 @@ use std::fmt; /// Generates params vector from input args, builds the request and sends it. #[macro_export] macro_rules! rpc_func { - ($selff:ident, $method:expr $(, $arg_name:ident)*) => {{ + ($selff:ident, $method:expr $(, $arg_name:expr)*) => {{ let mut params = vec![]; $( params.push(json::value::to_value($arg_name).unwrap()); diff --git a/mm2src/lp_ordermatch.rs b/mm2src/lp_ordermatch.rs index 7e731b120b..bf54b17833 100644 --- a/mm2src/lp_ordermatch.rs +++ b/mm2src/lp_ordermatch.rs @@ -30,7 +30,7 @@ use common::log::error; use common::mm_ctx::{from_ctx, MmArc, MmWeak}; use common::mm_number::{Fraction, MmNumber}; use common::{bits256, json_dir_entries, log, new_uuid, now_ms, remove_file, write}; -use futures::{compat::Future01CompatExt, lock::Mutex as AsyncMutex, StreamExt}; +use futures::{compat::Future01CompatExt, lock::Mutex as AsyncMutex, StreamExt, TryFutureExt}; use gstuff::slurp; use hash256_std_hasher::Hash256StdHasher; use hash_db::{Hasher, EMPTY_PREFIX}; @@ -3112,29 +3112,13 @@ impl<'a> From<&'a MakerOrder> for MakerOrderForRpc<'a> { } } -pub async fn set_price(ctx: MmArc, req: Json) -> Result>, String> { - let req: SetPriceReq = try_s!(json::from_value(req)); - - let base_coin: MmCoinEnum = match try_s!(lp_coinfind(&ctx, &req.base).await) { - Some(coin) => coin, - None => return ERR!("Base coin {} is not found", req.base), - }; - - let rel_coin: MmCoinEnum = match try_s!(lp_coinfind(&ctx, &req.rel).await) { - Some(coin) => coin, - None => return ERR!("Rel coin {} is not found", req.rel), - }; - - if base_coin.wallet_only() { - return ERR!("Base coin is wallet only"); - } - if rel_coin.wallet_only() { - return ERR!("Rel coin is wallet only"); - } - - let ordermatch_ctx = try_s!(OrdermatchContext::from_ctx(&ctx)); - let mut my_orders = ordermatch_ctx.my_maker_orders.lock().await; +/// Cancels the orders in case of error on different checks +/// https://github.com/KomodoPlatform/atomicDEX-API/issues/794 +async fn cancel_orders_on_error(ctx: &MmArc, req: &SetPriceReq, error: E) -> Result { if req.cancel_previous { + let ordermatch_ctx = OrdermatchContext::from_ctx(&ctx).unwrap(); + let mut my_orders = ordermatch_ctx.my_maker_orders.lock().await; + let mut cancelled = vec![]; // remove the previous orders if there're some to allow multiple setprice call per pair // it's common use case now as `autoprice` doesn't work with new ordermatching and @@ -3157,20 +3141,57 @@ pub async fn set_price(ctx: MmArc, req: Json) -> Result>, Strin maker_order_cancelled_p2p_notify(ctx.clone(), &order).await; } } + Err(error) +} + +pub async fn set_price(ctx: MmArc, req: Json) -> Result>, String> { + let req: SetPriceReq = try_s!(json::from_value(req)); + + let base_coin: MmCoinEnum = match try_s!(lp_coinfind(&ctx, &req.base).await) { + Some(coin) => coin, + None => return ERR!("Base coin {} is not found", req.base), + }; + + let rel_coin: MmCoinEnum = match try_s!(lp_coinfind(&ctx, &req.rel).await) { + Some(coin) => coin, + None => return ERR!("Rel coin {} is not found", req.rel), + }; + + if base_coin.wallet_only() { + return ERR!("Base coin is wallet only"); + } + if rel_coin.wallet_only() { + return ERR!("Rel coin is wallet only"); + } - let my_balance = try_s!(base_coin.my_spendable_balance().compat().await); + let my_balance = try_s!( + base_coin + .my_spendable_balance() + .compat() + .or_else(|e| cancel_orders_on_error(&ctx, &req, e)) + .await + ); let volume = if req.max { // first check if `rel_coin` balance is sufficient let rel_coin_trade_fee = try_s!( rel_coin .get_receiver_trade_fee(FeeApproxStage::OrderIssue) .compat() + .or_else(|e| cancel_orders_on_error(&ctx, &req, e)) + .await + ); + try_s!( + check_other_coin_balance_for_swap(&ctx, &rel_coin, None, rel_coin_trade_fee) + .or_else(|e| cancel_orders_on_error(&ctx, &req, e)) .await ); - try_s!(check_other_coin_balance_for_swap(&ctx, &rel_coin, None, rel_coin_trade_fee).await); // calculate max maker volume // note the `calc_max_maker_vol` returns [`CheckBalanceError::NotSufficientBalance`] error if the balance of `base_coin` is not sufficient - try_s!(calc_max_maker_vol(&ctx, &base_coin, &my_balance, FeeApproxStage::OrderIssue).await) + try_s!( + calc_max_maker_vol(&ctx, &base_coin, &my_balance, FeeApproxStage::OrderIssue) + .or_else(|e| cancel_orders_on_error(&ctx, &req, e)) + .await + ) } else { try_s!( check_balance_for_maker_swap( @@ -3182,11 +3203,39 @@ pub async fn set_price(ctx: MmArc, req: Json) -> Result>, Strin None, FeeApproxStage::OrderIssue ) + .or_else(|e| cancel_orders_on_error(&ctx, &req, e)) .await ); - req.volume + req.volume.clone() }; + let ordermatch_ctx = try_s!(OrdermatchContext::from_ctx(&ctx)); + let mut my_orders = ordermatch_ctx.my_maker_orders.lock().await; + + if req.cancel_previous { + let mut cancelled = vec![]; + // remove the previous orders if there're some to allow multiple setprice call per pair + // it's common use case now as `autoprice` doesn't work with new ordermatching and + // MM2 users request the coins price from aggregators by their own scripts issuing + // repetitive setprice calls with new price + *my_orders = my_orders + .drain() + .filter_map(|(uuid, order)| { + let to_delete = order.base == req.base && order.rel == req.rel; + if to_delete { + delete_my_maker_order(&ctx, &order); + cancelled.push(order); + None + } else { + Some((uuid, order)) + } + }) + .collect(); + for order in cancelled { + maker_order_cancelled_p2p_notify(ctx.clone(), &order).await; + } + } + let conf_settings = OrderConfirmationsSettings { base_confs: req.base_confs.unwrap_or_else(|| base_coin.required_confirmations()), base_nota: req.base_nota.unwrap_or_else(|| base_coin.requires_notarization()),