Skip to content

Commit

Permalink
Fix UTXO rpc clients dead lock on balance/unspent requests. (#887)
Browse files Browse the repository at this point in the history
* Try removing ElectrumClientImpl::list_unspent_subs/get_balance_subs.

* Add ConcurrentRequestState and use it for scripthash_get_balance.

* Prevent long holding of my_orders mutex during setprice.

* Add ConcurrentRequestMap and use it for balance/unspent requests.

* Use list_unspent_concurrent_map in list_unspent_impl.

* Put back holding my_orders during balance check.
We can have a race condition if 2 concurrent setprice calls are made with cancel_previous: true.
So both orders will be created while it should create only 1 of them.

* Fixes after review.

* Avoid holding my_maker_orders mutex during balance checks on setprice.

* Explicitly cancel previous orders on balance check errors.
  • Loading branch information
artemii235 authored Apr 7, 2021
1 parent 5933e5a commit 6c23530
Show file tree
Hide file tree
Showing 4 changed files with 192 additions and 155 deletions.
7 changes: 3 additions & 4 deletions mm2src/coins/utxo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ use utxo_common::{big_decimal_from_sat, display_address};

pub use chain::Transaction as UtxoTx;

use self::rpc_clients::{ElectrumClient, ElectrumClientImpl, ElectrumRpcRequest, EstimateFeeMethod, EstimateFeeMode,
UnspentInfo, UtxoRpcClientEnum};
use self::rpc_clients::{ConcurrentRequestMap, ElectrumClient, ElectrumClientImpl, ElectrumRpcRequest,
EstimateFeeMethod, EstimateFeeMode, UnspentInfo, UtxoRpcClientEnum};
#[cfg(not(target_arch = "wasm32"))]
use self::rpc_clients::{NativeClient, NativeClientImpl};
use super::{CoinTransportMetrics, CoinsContext, FeeApproxStage, FoundSwapTxSpend, HistorySyncState, MarketCoinOps,
Expand Down Expand Up @@ -1153,8 +1153,7 @@ pub trait UtxoCoinBuilder {
auth: format!("Basic {}", base64_encode(&auth_str, URL_SAFE)),
event_handlers,
request_id: 0u64.into(),
list_unspent_in_progress: false.into(),
list_unspent_subs: AsyncMutex::new(Vec::new()),
list_unspent_concurrent_map: ConcurrentRequestMap::new(),
});

Ok(NativeClient(client))
Expand Down
235 changes: 112 additions & 123 deletions mm2src/coins/utxo/rpc_clients.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use common::custom_futures::select_ok_sequential;
use common::executor::{spawn, Timer};
use common::jsonrpc_client::{JsonRpcClient, JsonRpcError, JsonRpcMultiClient, JsonRpcRemoteAddr, JsonRpcRequest,
JsonRpcResponse, JsonRpcResponseFut, RpcRes};
use common::log::warn;
use common::mm_number::MmNumber;
use common::wio::slurp_req;
use common::{median, OrdRange, StringError};
Expand Down Expand Up @@ -45,7 +46,7 @@ use std::num::NonZeroU64;
use std::ops::Deref;
#[cfg(target_arch = "wasm32")] use std::os::raw::c_char;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering as AtomicOrdering};
use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
Expand Down Expand Up @@ -408,6 +409,13 @@ pub struct VerboseBlock {

pub type RpcReqSub<T> = async_oneshot::Sender<Result<T, JsonRpcError>>;

#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub struct ListUnspentArgs {
min_conf: i32,
max_conf: i32,
addresses: Vec<String>,
}

/// RPC client for UTXO based coins
/// https://developer.bitcoin.org/reference/rpc/index.html - Bitcoin RPC API reference
/// Other coins have additional methods or miss some of these
Expand All @@ -423,8 +431,7 @@ pub struct NativeClientImpl {
/// Transport event handlers
pub event_handlers: Vec<RpcTransportEventHandlerShared>,
pub request_id: AtomicU64,
pub list_unspent_in_progress: AtomicBool,
pub list_unspent_subs: AsyncMutex<Vec<RpcReqSub<Vec<NativeUnspent>>>>,
pub list_unspent_concurrent_map: ConcurrentRequestMap<ListUnspentArgs, Vec<NativeUnspent>>,
}

#[cfg(test)]
Expand All @@ -436,8 +443,7 @@ impl Default for NativeClientImpl {
auth: "".to_string(),
event_handlers: vec![],
request_id: Default::default(),
list_unspent_in_progress: Default::default(),
list_unspent_subs: Default::default(),
list_unspent_concurrent_map: ConcurrentRequestMap::new(),
}
}
}
Expand Down Expand Up @@ -657,32 +663,15 @@ impl NativeClient {
max_conf: i32,
addresses: Vec<String>,
) -> RpcRes<Vec<NativeUnspent>> {
let request_fut = rpc_func!(self, "listunspent", &min_conf, &max_conf, &addresses);
let arc = self.clone();
if self
.list_unspent_in_progress
.compare_and_swap(false, true, AtomicOrdering::Relaxed)
{
let fut = async move {
let (tx, rx) = async_oneshot::channel();
arc.list_unspent_subs.lock().await.push(tx);
rx.await.unwrap()
};
Box::new(fut.boxed().compat())
} else {
let fut = async move {
let unspents_res = rpc_func!(arc, "listunspent", min_conf, max_conf, addresses)
.compat()
.await;
for sub in arc.list_unspent_subs.lock().await.drain(..) {
if sub.send(unspents_res.clone()).is_err() {
log!("list_unspent_sub is dropped");
}
}
arc.list_unspent_in_progress.store(false, AtomicOrdering::Relaxed);
unspents_res
};
Box::new(fut.boxed().compat())
}
let args = ListUnspentArgs {
min_conf,
max_conf,
addresses,
};
let fut = async move { arc.list_unspent_concurrent_map.wrap_request(args, request_fut).await };
Box::new(fut.boxed().compat())
}
}

Expand Down Expand Up @@ -1124,26 +1113,61 @@ impl Drop for ElectrumConnection {
}

#[derive(Debug)]
pub struct RpcRequestState<T> {
is_request_running: Arc<AtomicBool>,
response_subs: Vec<oneshot::Sender<Result<T, JsonRpcError>>>,
struct ConcurrentRequestState<V> {
is_running: bool,
subscribers: Vec<RpcReqSub<V>>,
}

impl<V> ConcurrentRequestState<V> {
fn new() -> Self {
ConcurrentRequestState {
is_running: false,
subscribers: Vec::new(),
}
}
}

#[derive(Debug)]
pub struct RpcRequestWrapper<Key, Response> {
inner: HashMap<Key, RpcRequestState<Response>>,
pub struct ConcurrentRequestMap<K, V> {
inner: AsyncMutex<HashMap<K, ConcurrentRequestState<V>>>,
}

impl<Key, Response> RpcRequestWrapper<Key, Response> {
fn new() -> Self { RpcRequestWrapper { inner: HashMap::new() } }
impl<K, V> Default for ConcurrentRequestMap<K, V> {
fn default() -> Self {
ConcurrentRequestMap {
inner: AsyncMutex::new(HashMap::new()),
}
}
}

impl<K: Clone + Eq + std::hash::Hash, V: Clone> ConcurrentRequestMap<K, V> {
pub fn new() -> ConcurrentRequestMap<K, V> { ConcurrentRequestMap::default() }

#[allow(dead_code)]
async fn wrap_request(
&mut self,
_key: Key,
_request: impl Future<Item = Response, Error = JsonRpcError>,
) -> Result<Response, JsonRpcError> {
unimplemented!()
async fn wrap_request(&self, request_arg: K, request_fut: RpcRes<V>) -> Result<V, JsonRpcError> {
let mut map = self.inner.lock().await;
let state = map
.entry(request_arg.clone())
.or_insert_with(ConcurrentRequestState::new);
if state.is_running {
let (tx, rx) = async_oneshot::channel();
state.subscribers.push(tx);
// drop here to avoid holding the lock during await
drop(map);
rx.await.unwrap()
} else {
// drop here to avoid holding the lock during await
drop(map);
let request_res = request_fut.compat().await;
let mut map = self.inner.lock().await;
let state = map.get_mut(&request_arg).unwrap();
for sub in state.subscribers.drain(..) {
if sub.send(request_res.clone()).is_err() {
warn!("subscriber is dropped");
}
}
state.is_running = false;
request_res
}
}
}

Expand All @@ -1154,12 +1178,8 @@ pub struct ElectrumClientImpl {
next_id: AtomicU64,
event_handlers: Vec<RpcTransportEventHandlerShared>,
protocol_version: OrdRange<f32>,
get_balance_wrapper: RpcRequestWrapper<Address, ElectrumBalance>,
list_unspent_wrapper: RpcRequestWrapper<Address, Vec<ElectrumUnspent>>,
list_unspent_in_progress: AtomicBool,
list_unspent_subs: AsyncMutex<Vec<RpcReqSub<Vec<ElectrumUnspent>>>>,
get_balance_in_progress: AtomicBool,
get_balance_subs: AsyncMutex<Vec<async_oneshot::Sender<Result<ElectrumBalance, JsonRpcError>>>>,
get_balance_concurrent_map: ConcurrentRequestMap<String, ElectrumBalance>,
list_unspent_concurrent_map: ConcurrentRequestMap<String, Vec<ElectrumUnspent>>,
}

#[cfg(not(target_arch = "wasm32"))]
Expand All @@ -1168,17 +1188,24 @@ async fn electrum_request_multi(
request: JsonRpcRequest,
) -> Result<(JsonRpcRemoteAddr, JsonRpcResponse), String> {
let mut futures = vec![];
for connection in client.connections.lock().await.iter() {
let connections = client.connections.lock().await;
for connection in connections.iter() {
let connection_addr = connection.addr.clone();
match &*connection.tx.lock().await {
Some(tx) => {
let fut = electrum_request(request.clone(), tx.clone(), connection.responses.clone())
.map(|response| (JsonRpcRemoteAddr(connection_addr), response));
let fut = electrum_request(
request.clone(),
tx.clone(),
connection.responses.clone(),
ELECTRUM_TIMEOUT / connections.len() as u64,
)
.map(|response| (JsonRpcRemoteAddr(connection_addr), response));
futures.push(fut)
},
None => (),
}
}
drop(connections);
if futures.is_empty() {
return ERR!("All electrums are currently disconnected");
}
Expand Down Expand Up @@ -1223,7 +1250,11 @@ async fn electrum_request_to(
(tx, responses)
};

let response = try_s!(electrum_request(request.clone(), tx, responses).compat().await);
let response = try_s!(
electrum_request(request.clone(), tx, responses, ELECTRUM_TIMEOUT)
.compat()
.await
);
Ok((JsonRpcRemoteAddr(to_addr.to_owned()), response))
}

Expand Down Expand Up @@ -1423,47 +1454,26 @@ impl ElectrumClient {
/// It can return duplicates sometimes: https://github.com/artemii235/SuperNET/issues/269
/// We should remove them to build valid transactions
fn scripthash_list_unspent(&self, hash: &str) -> RpcRes<Vec<ElectrumUnspent>> {
let request_fut = Box::new(rpc_func!(self, "blockchain.scripthash.listunspent", hash).and_then(
move |unspents: Vec<ElectrumUnspent>| {
let mut map: HashMap<(H256Json, u32), bool> = HashMap::new();
let unspents = unspents
.into_iter()
.filter(|unspent| match map.entry((unspent.tx_hash.clone(), unspent.tx_pos)) {
Entry::Occupied(_) => false,
Entry::Vacant(e) => {
e.insert(true);
true
},
})
.collect();
Ok(unspents)
},
));
let arc = self.clone();
let hash = hash.to_owned();
if self
.list_unspent_in_progress
.compare_and_swap(false, true, AtomicOrdering::Relaxed)
{
let fut = async move {
let (tx, rx) = async_oneshot::channel();
arc.list_unspent_subs.lock().await.push(tx);
rx.await.unwrap()
};
Box::new(fut.boxed().compat())
} else {
let fut = async move {
let unspents_res = rpc_func!(arc, "blockchain.scripthash.listunspent", hash)
.and_then(move |unspents: Vec<ElectrumUnspent>| {
let mut map: HashMap<(H256Json, u32), bool> = HashMap::new();
let unspents = unspents
.into_iter()
.filter(|unspent| match map.entry((unspent.tx_hash.clone(), unspent.tx_pos)) {
Entry::Occupied(_) => false,
Entry::Vacant(e) => {
e.insert(true);
true
},
})
.collect();
Ok(unspents)
})
.compat()
.await;
for sub in arc.list_unspent_subs.lock().await.drain(..) {
if sub.send(unspents_res.clone()).is_err() {
log!("list_unspent_sub is dropped");
}
}
arc.list_unspent_in_progress.store(false, AtomicOrdering::Relaxed);
unspents_res
};
Box::new(fut.boxed().compat())
}
let fut = async move { arc.list_unspent_concurrent_map.wrap_request(hash, request_fut).await };
Box::new(fut.boxed().compat())
}

/// https://electrumx.readthedocs.io/en/latest/protocol-methods.html#blockchain-scripthash-get-history
Expand All @@ -1475,29 +1485,11 @@ impl ElectrumClient {
fn scripthash_get_balance(&self, hash: &str) -> RpcRes<ElectrumBalance> {
let arc = self.clone();
let hash = hash.to_owned();
if self
.get_balance_in_progress
.compare_and_swap(false, true, AtomicOrdering::Relaxed)
{
let fut = async move {
let (tx, rx) = async_oneshot::channel();
arc.get_balance_subs.lock().await.push(tx);
rx.await.unwrap()
};
Box::new(fut.boxed().compat())
} else {
let fut = async move {
let balance_res = rpc_func!(arc, "blockchain.scripthash.get_balance", hash).compat().await;
for sub in arc.get_balance_subs.lock().await.drain(..) {
if sub.send(balance_res.clone()).is_err() {
log!("list_unspent_sub is dropped");
}
}
arc.get_balance_in_progress.store(false, AtomicOrdering::Relaxed);
balance_res
};
Box::new(fut.boxed().compat())
}
let fut = async move {
let request = rpc_func!(arc, "blockchain.scripthash.get_balance", &hash);
arc.get_balance_concurrent_map.wrap_request(hash, request).await
};
Box::new(fut.boxed().compat())
}

/// https://electrumx.readthedocs.io/en/latest/protocol-methods.html#blockchain-headers-subscribe
Expand Down Expand Up @@ -1672,12 +1664,8 @@ impl ElectrumClientImpl {
next_id: 0.into(),
event_handlers,
protocol_version,
get_balance_wrapper: RpcRequestWrapper::new(),
list_unspent_wrapper: RpcRequestWrapper::new(),
list_unspent_in_progress: Default::default(),
list_unspent_subs: Default::default(),
get_balance_in_progress: Default::default(),
get_balance_subs: Default::default(),
get_balance_concurrent_map: ConcurrentRequestMap::new(),
list_unspent_concurrent_map: ConcurrentRequestMap::new(),
}
}

Expand Down Expand Up @@ -2016,6 +2004,7 @@ fn electrum_request(
request: JsonRpcRequest,
tx: mpsc::Sender<Vec<u8>>,
responses: Arc<AsyncMutex<HashMap<String, async_oneshot::Sender<JsonRpcResponse>>>>,
timeout: u64,
) -> Box<dyn Future<Item = JsonRpcResponse, Error = String> + Send + 'static> {
let send_fut = async move {
let mut json = try_s!(json::to_string(&request));
Expand All @@ -2033,7 +2022,7 @@ fn electrum_request(
.boxed()
.compat()
.map_err(StringError)
.timeout(Duration::from_secs(ELECTRUM_TIMEOUT));
.timeout(Duration::from_secs(timeout));

Box::new(send_fut.map_err(|e| ERRL!("{}", e.0)))
}
2 changes: 1 addition & 1 deletion mm2src/common/jsonrpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::fmt;
/// Generates params vector from input args, builds the request and sends it.
#[macro_export]
macro_rules! rpc_func {
($selff:ident, $method:expr $(, $arg_name:ident)*) => {{
($selff:ident, $method:expr $(, $arg_name:expr)*) => {{
let mut params = vec![];
$(
params.push(json::value::to_value($arg_name).unwrap());
Expand Down
Loading

0 comments on commit 6c23530

Please sign in to comment.