Skip to content

Commit

Permalink
Refactored seednode loop to tokio and async. (#631)
Browse files Browse the repository at this point in the history
* Exclude mm2-seednodes-optimisation from CI triggers.

* Count the number of sent messages and their total size in seednode loop.

* Add max_msg_size to seednode_loop metrics.

* Log send buffer size when electrum client is connected.

* Log send buffer size for seednode listener and every connected client TcpStream.

* Log nodelay value of TcpListener and every TcpStream.

* Try to set nodelay = true.

* Try to set_send_buffer_size to 200000.

* Try to refactor seednode_loop to async_std.

* Recv messages only and log them.

* Drop connection if EOF reached.

* Enable rebroadcast back.

* Log message send attempts.

* Try to increase channel capacity.

* Use vec of mpsc senders instead of async_std mpmc.

* Remove excessive logging.

* Log the invalid JSON message and it's sender.

* Skip lines that are not valid JSON.

* Log JSON deserialize error only when buffer.len() > 1.

* Try tokio TCP implementation instead of async_std.

* Log the case when buffer can be probably lost.

* Split incoming connections TcpStreams and process them in separate async loops.

* Do not send message back to peer from which it was received.

* Remove excessive logging.

* Enable CI build for this branch back.

* Try to spawn read and write processing loops on tokio 0.2 runtime.

* Use String as P2PMessage content to avoid non-deterministic serialization issues.
The deserialized and then serialized message might produce non-equal string so
same message can be serialized to JSON differently causing double processing on
client nodes connected to several seeds.

* Explicitly disable debug_assertions for release profile.

* Update tokio to 0.2.22.

* Fix after merge.
  • Loading branch information
artemii235 authored Jul 23, 2020
1 parent e51a128 commit 8c2bf3f
Show file tree
Hide file tree
Showing 11 changed files with 211 additions and 161 deletions.
65 changes: 45 additions & 20 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ opt-level = 1
[profile.release]
# Due to the "overrides" only affects our workspace crates, as intended.
debug = true
debug-assertions = false
# For better or worse, might affect the stack traces in our portion of the code.
#opt-level = 1

Expand Down Expand Up @@ -82,6 +83,7 @@ futures-cpupool = "0.1"
# whereas futures-timer works with any reactor and spawns one global timer thread (fits out load better).
futures-timer = "0.1"
futures = { version = ">=0.3.0-alpha.16, <0.4", package = "futures-preview", features = ["compat", "async-await", "nightly"] }
futures-util = "0.3.4"
gstuff = { version = "0.6", features = ["nightly"] }
hex = "0.3.2"
http = "0.1"
Expand Down Expand Up @@ -114,6 +116,7 @@ serialization_derive = { git = "https://github.com/artemii235/parity-bitcoin.git
term = "=0.5.1"

tokio-core = { version = "0.1", optional = true }
tokio = { version = "0.2.22", features = ["io-util", "rt-threaded", "stream", "tcp"] }
unwrap = "1.2"
uuid = { version = "0.7", features = ["serde", "v4"] }
winapi = "0.3"
Expand Down
1 change: 1 addition & 0 deletions mm2src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ doctest = false

[dependencies]
arrayref = "0.3"
async-std = {version = "1.5", features = ["unstable"]}
atomic = "0.4"
backtrace = "0.3"
base64 = "0.10.0"
Expand Down
58 changes: 27 additions & 31 deletions mm2src/common/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#![feature(async_closure)]
#![feature(hash_raw_entry)]
#![feature(optin_builtin_traits)]
#![feature(drain_filter)]
#![feature(const_fn)]
#![allow(uncommon_codepoints)]
#![cfg_attr(not(feature = "native"), allow(unused_imports))]
Expand Down Expand Up @@ -117,11 +118,12 @@ use std::future::Future as Future03;
use std::intrinsics::copy;
use std::io::Write;
use std::mem::{forget, size_of, zeroed};
use std::net::SocketAddr;
use std::ops::{Add, Deref, Div, RangeInclusive};
use std::os::raw::{c_char, c_void};
use std::path::{Path, PathBuf};
#[cfg(not(feature = "native"))] use std::pin::Pin;
use std::ptr::{null_mut, read_volatile};
use std::ptr::read_volatile;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::UNIX_EPOCH;
Expand Down Expand Up @@ -1251,45 +1253,39 @@ impl From<std::io::Error> for StringError {
fn from(e: std::io::Error) -> StringError { StringError(ERRL!("{}", e)) }
}

#[derive(Clone, Debug)]
pub struct P2PMessage {
pub from: SocketAddr,
pub content: String,
}

impl P2PMessage {
pub fn from_string_with_default_addr(content: String) -> P2PMessage {
P2PMessage {
from: SocketAddr::new([0; 4].into(), 0),
content,
}
}

pub fn from_serialize_with_default_addr<T: serde::Serialize>(msg: &T) -> P2PMessage {
P2PMessage {
from: SocketAddr::new([0; 4].into(), 0),
content: serde_json::to_string(msg).unwrap(),
}
}
}

#[derive(Debug)]
pub struct QueuedCommand {
pub response_sock: i32,
pub stats_json_only: i32,
pub queue_id: u32,
pub msg: String,
pub msg: P2PMessage,
// retstrp: *mut *mut c_char,
}

/// Register an RPC command that came internally or from the peer-to-peer bus.
#[no_mangle]
#[cfg(feature = "native")]
#[allow(clippy::missing_safety_doc)]
pub unsafe extern "C" fn lp_queue_command_for_c(
retstrp: *mut *mut c_char,
buf: *mut c_char,
response_sock: i32,
stats_json_only: i32,
queue_id: u32,
) {
if !retstrp.is_null() {
*retstrp = null_mut()
}

if buf.is_null() {
panic!("!buf")
}
let msg = String::from(unwrap!(CStr::from_ptr(buf).to_str()));
let _cmd = QueuedCommand {
msg,
queue_id,
response_sock,
stats_json_only,
};
panic!("We need a context ID");
//unwrap! ((*COMMAND_QUEUE).0.send (cmd))
}

pub fn lp_queue_command(ctx: &mm_ctx::MmArc, msg: String) -> Result<(), String> {
pub fn lp_queue_command(ctx: &mm_ctx::MmArc, msg: P2PMessage) -> Result<(), String> {
// If we're helping a WASM then leave a copy of the broadcast for them.
if let Some(ref mut cq) = *try_s!(ctx.command_queueʰ.lock()) {
// Monotonic increment.
Expand Down
Loading

0 comments on commit 8c2bf3f

Please sign in to comment.