Skip to content

Commit

Permalink
Electrum server version #661 (#681)
Browse files Browse the repository at this point in the history
* Add server.version call on electrum connect
* Add the way to send json request exact to one of remote electrum
* Add on_connect callback on electrum connect

* Complete server.version checking
* Add remove_server on server.version error or invalid response
* Change version from [1.2, 1.4] to 1.4
* Remove bch0.kister.net:51001 and testnet.imaginary.cash:50001 from cashaddress test, because they doesn't support custom client_name

* Add comments and refactor

* Solve PR issues:
* Change required protocol version from 1.4 to [1.2, 1.4]
* Return the proper error upon electrum call and not activate the coin if there are not Electrums with a required protocol version
  • Loading branch information
sergeyboyko0791 authored Jun 22, 2020
1 parent 72073d3 commit 1bbffa4
Show file tree
Hide file tree
Showing 7 changed files with 436 additions and 82 deletions.
19 changes: 19 additions & 0 deletions mm2src/coins/lp_coins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,8 @@ pub trait RpcTransportEventHandler {
fn on_outgoing_request(&self, data: &[u8]);

fn on_incoming_response(&self, data: &[u8]);

fn on_connected(&self, address: String) -> Result<(), String>;
}

impl fmt::Debug for dyn RpcTransportEventHandler + Send + Sync {
Expand All @@ -496,6 +498,10 @@ impl RpcTransportEventHandler for RpcTransportEventHandlerShared {
fn on_incoming_response(&self, data: &[u8]) {
self.as_ref().on_incoming_response(data)
}

fn on_connected(&self, address: String) -> Result<(), String> {
self.as_ref().on_connected(address)
}
}

impl<T: RpcTransportEventHandler> RpcTransportEventHandler for Vec<T> {
Expand All @@ -515,6 +521,13 @@ impl<T: RpcTransportEventHandler> RpcTransportEventHandler for Vec<T> {
handler.on_incoming_response(data)
}
}

fn on_connected(&self, address: String) -> Result<(), String> {
for handler in self {
try_s!(handler.on_connected(address.clone()))
}
Ok(())
}
}

pub enum RpcClientType {
Expand Down Expand Up @@ -571,6 +584,12 @@ impl RpcTransportEventHandler for CoinTransportMetrics {
mm_counter!(self.metrics, "rpc_client.response.count", 1,
"coin" => self.ticker.clone(), "client" => self.client.clone());
}

fn on_connected(&self, _address: String) -> Result<(), String> {
// Handle a new connected endpoint if necessary.
// Now just return the Ok
Ok(())
}
}

/// Adds a new currency into the list of currencies configured.
Expand Down
186 changes: 151 additions & 35 deletions mm2src/coins/utxo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,19 @@ use bigdecimal::BigDecimal;
pub use bitcrypto::{dhash160, ChecksumType, sha256};
use chain::{TransactionOutput, TransactionInput, OutPoint};
use chain::constants::{SEQUENCE_FINAL};
use common::{first_char_to_upper, small_rng};
use common::{first_char_to_upper, small_rng, MM_VERSION};
use common::executor::{spawn, Timer};
use common::jsonrpc_client::{JsonRpcError, JsonRpcErrorType};
use common::mm_ctx::MmArc;
#[cfg(feature = "native")]
use dirs::home_dir;
use futures01::{Future};
use futures01::future::Either;
use futures::channel::mpsc;
use futures::compat::Future01CompatExt;
use futures::future::{FutureExt, TryFutureExt};
use futures::lock::{Mutex as AsyncMutex};
use futures::stream::StreamExt;
use gstuff::{now_ms};
use keys::{KeyPair, Private, Public, Address, Secret, Type};
use keys::bytes::Bytes;
Expand All @@ -56,7 +58,7 @@ use std::num::NonZeroU64;
use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, Weak};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering as AtomicOrderding};
use std::thread;
use std::time::Duration;
Expand All @@ -65,8 +67,8 @@ pub use chain::Transaction as UtxoTx;

use self::rpc_clients::{electrum_script_hash, ElectrumClient, ElectrumClientImpl,
EstimateFeeMethod, EstimateFeeMode, NativeClient, UtxoRpcClientEnum, UnspentInfo};
use super::{CoinsContext, CoinTransportMetrics, FoundSwapTxSpend, HistorySyncState, MarketCoinOps, MmCoin, RpcClientType, RpcTransportEventHandlerShared,
SwapOps, TradeFee, Transaction, TransactionEnum, TransactionFut, TransactionDetails, WithdrawFee, WithdrawRequest};
use super::{CoinsContext, CoinTransportMetrics, FoundSwapTxSpend, HistorySyncState, MarketCoinOps, MmCoin, RpcClientType, RpcTransportEventHandler,
RpcTransportEventHandlerShared, SwapOps, TradeFee, Transaction, TransactionEnum, TransactionFut, TransactionDetails, WithdrawFee, WithdrawRequest};
use crate::utxo::rpc_clients::{NativeClientImpl, UtxoRpcClientOps, ElectrumRpcRequest};

#[cfg(test)]
Expand Down Expand Up @@ -1948,15 +1950,30 @@ fn read_native_mode_conf(_filename: &dyn AsRef<Path>) -> Result<(Option<u16>, St
unimplemented!()
}

fn rpc_event_handlers_for_client_transport(
ctx: &MmArc,
ticker: String,
client: RpcClientType,
) -> Vec<RpcTransportEventHandlerShared> {
let metrics = ctx.metrics.weak();
vec![
CoinTransportMetrics::new(metrics, ticker, client).into_shared(),
]
/// Electrum protocol version verifier.
/// The structure is used to handle the `on_connected` event and notify `electrum_version_loop`.
struct ElectrumProtoVerifier {
on_connect_tx: mpsc::UnboundedSender<String>,
}

impl ElectrumProtoVerifier {
fn into_shared(self) -> RpcTransportEventHandlerShared {
Arc::new(self)
}
}

impl RpcTransportEventHandler for ElectrumProtoVerifier {
fn debug_info(&self) -> String {
"ElectrumProtoVerifier".into()
}

fn on_outgoing_request(&self, _data: &[u8]) {}

fn on_incoming_response(&self, _data: &[u8]) {}

fn on_connected(&self, address: String) -> Result<(), String> {
Ok(try_s!(self.on_connect_tx.unbounded_send(address)))
}
}

pub async fn utxo_coin_from_conf_and_request(
Expand Down Expand Up @@ -2008,7 +2025,7 @@ pub async fn utxo_coin_from_conf_and_request(
Some(p) => p,
None => try_s!(conf["rpcport"].as_u64().ok_or(ERRL!("Rpc port is not set neither in `coins` file nor in native daemon config"))) as u16,
};
let event_handlers = rpc_event_handlers_for_client_transport(ctx, ticker.to_string(), RpcClientType::Native);
let event_handlers = vec![CoinTransportMetrics::new(ctx.metrics.weak(), ticker.to_owned(), RpcClientType::Native).into_shared()];
let client = Arc::new(NativeClientImpl {
coin_ticker: ticker.to_string(),
uri: fomat!("http://127.0.0.1:"(rpc_port)),
Expand All @@ -2022,19 +2039,24 @@ pub async fn utxo_coin_from_conf_and_request(
}
},
Some("electrum") => {
let (on_connect_tx, on_connect_rx) = mpsc::unbounded();
let event_handlers = vec![
CoinTransportMetrics::new(ctx.metrics.weak(), ticker.to_owned(), RpcClientType::Electrum).into_shared(),
ElectrumProtoVerifier { on_connect_tx }.into_shared(),
];

let mut servers: Vec<ElectrumRpcRequest> = try_s!(json::from_value(req["servers"].clone()));
let mut rng = small_rng();
servers.as_mut_slice().shuffle(&mut rng);
let event_handlers = rpc_event_handlers_for_client_transport(ctx, ticker.to_string(), RpcClientType::Electrum);
let mut client = ElectrumClientImpl::new(ticker.to_string(), event_handlers);
let client = ElectrumClientImpl::new(ticker.to_string(), event_handlers);
for server in servers.iter() {
match client.add_server(server) {
match client.add_server(server).await {
Ok(_) => (),
Err(e) => log!("Error " (e) " connecting to " [server] ". Address won't be used")
};
}

let mut attempts = 0;
let mut attempts = 0i32;
while !client.is_connected().await {
if attempts >= 10 {
return ERR!("Failed to connect to at least 1 of {:?} in 5 seconds.", servers);
Expand All @@ -2045,24 +2067,15 @@ pub async fn utxo_coin_from_conf_and_request(
}

let client = Arc::new(client);
// ping the electrum servers every 30 seconds to prevent them from disconnecting us.
// according to docs server can do it if there are no messages in ~10 minutes.
// https://electrumx.readthedocs.io/en/latest/protocol-methods.html?highlight=keep#server-ping
// weak reference will allow to stop the thread if client is dropped

let weak_client = Arc::downgrade(&client);
spawn(async move {
loop {
if let Some(client) = weak_client.upgrade() {
if let Err(e) = ElectrumClient(client).server_ping().compat().await {
log!("Electrum servers " [servers] " ping error " [e]);
}
} else {
log!("Electrum servers " [servers] " ping loop stopped");
break;
}
Timer::sleep(30.).await
}
});
spawn_electrum_ping_loop(weak_client, servers);

let weak_client = Arc::downgrade(&client);
let client_name = format!("{} GUI/MM2 {}", ctx.gui().unwrap_or("UNKNOWN"), MM_VERSION);
spawn_electrum_version_loop(weak_client, on_connect_rx, client_name);

try_s!(wait_for_protocol_version_checked(&client).await);
UtxoRpcClientEnum::Electrum(ElectrumClient(client))
},
_ => return ERR!("utxo_coin_from_conf_and_request should be called only by enable or electrum requests"),
Expand Down Expand Up @@ -2174,6 +2187,109 @@ pub async fn utxo_coin_from_conf_and_request(
Ok(UtxoCoin(Arc::new(coin)))
}

/// Ping the electrum servers every 30 seconds to prevent them from disconnecting us.
/// According to docs server can do it if there are no messages in ~10 minutes.
/// https://electrumx.readthedocs.io/en/latest/protocol-methods.html?highlight=keep#server-ping
/// Weak reference will allow to stop the thread if client is dropped.
fn spawn_electrum_ping_loop(weak_client: Weak<ElectrumClientImpl>, servers: Vec<ElectrumRpcRequest>) {
spawn(async move {
loop {
if let Some(client) = weak_client.upgrade() {
if let Err(e) = ElectrumClient(client).server_ping().compat().await {
log!("Electrum servers " [servers] " ping error " [e]);
}
} else {
log!("Electrum servers " [servers] " ping loop stopped");
break;
}
Timer::sleep(30.).await
}
});
}

/// Follow the `on_connect_rx` stream and verify the protocol version of each connected electrum server.
/// https://electrumx.readthedocs.io/en/latest/protocol-methods.html?highlight=keep#server-version
/// Weak reference will allow to stop the thread if client is dropped.
fn spawn_electrum_version_loop(
weak_client: Weak<ElectrumClientImpl>,
mut on_connect_rx: mpsc::UnboundedReceiver<String>,
client_name: String,
) {
// client.remove_server() is called too often
async fn remove_server(client: ElectrumClient, electrum_addr: &str) {
if let Err(e) = client.remove_server(electrum_addr).await {
log!("Error on remove server "[e]);
}
}

spawn(async move {
while let Some(electrum_addr) = on_connect_rx.next().await {
let client = match weak_client.upgrade() {
Some(c) => ElectrumClient(c),
_ => break,
};

let available_protocols = client.protocol_version();
let version = match client.server_version(&electrum_addr, &client_name, available_protocols).compat().await {
Ok(version) => version,
Err(e) => {
log!("Electrum " (electrum_addr) " server.version error \"" [e] "\". Remove the connection");
remove_server(client, &electrum_addr).await;
continue
}
};

// check if the version is allowed
let actual_version = match version.protocol_version.parse::<f32>() {
Ok(v) => v,
Err(e) => {
log!("Error on parse protocol_version "[e]);
remove_server(client, &electrum_addr).await;
continue
},
};

if !available_protocols.contains(&actual_version) {
log!("Received unsupported protocol version " [actual_version] " from " [electrum_addr] ". Remove the connection");
remove_server(client, &electrum_addr).await;
continue
}

if let Err(e) = client.set_protocol_version(&electrum_addr, actual_version).await {
log!("Error on set protocol_version "[e]);
};

log!("Use protocol version " [actual_version] " for Electrum " [electrum_addr]);
}

log!("Electrum server.version loop stopped");
});
}

/// Wait until the protocol version of at least one client's Electrum is checked.
async fn wait_for_protocol_version_checked(client: &ElectrumClientImpl) -> Result<(), String> {
let mut attempts = 0;
loop {
if attempts >= 10 {
return ERR!("Failed protocol version verifying of at least 1 of Electrums in 5 seconds.");
}

if client.count_connections().await == 0 {
// All of the connections were removed because of server.version checking
return ERR!("There are no Electrums with the required protocol version {:?}", client.protocol_version());
}

if client.is_protocol_version_checked().await {
break;
}

Timer::sleep(0.5).await;
attempts += 1;
}

Ok(())
}

/// Function calculating KMD interest
/// https://komodoplatform.atlassian.net/wiki/spaces/KPSD/pages/71729215/What+is+the+5+Komodo+Stake+Reward
/// https://github.com/KomodoPlatform/komodo/blob/master/src/komodo_interest.h
Expand Down
Loading

0 comments on commit 1bbffa4

Please sign in to comment.