From 8c2bf3f9197a828672c6da9c1f3bdfd7e06fff87 Mon Sep 17 00:00:00 2001 From: Artem Pikulin Date: Thu, 23 Jul 2020 13:31:41 +0700 Subject: [PATCH] Refactored seednode loop to tokio and async. (#631) * Exclude mm2-seednodes-optimisation from CI triggers. * Count the number of sent messages and their total size in seednode loop. * Add max_msg_size to seednode_loop metrics. * Log send buffer size when electrum client is connected. * Log send buffer size for seednode listener and every connected client TcpStream. * Log nodelay value of TcpListener and every TcpStream. * Try to set nodelay = true. * Try to set_send_buffer_size to 200000. * Try to refactor seednode_loop to async_std. * Recv messages only and log them. * Drop connection if EOF reached. * Enable rebroadcast back. * Log message send attempts. * Try to increase channel capacity. * Use vec of mpsc senders instead of async_std mpmc. * Remove excessive logging. * Log the invalid JSON message and it's sender. * Skip lines that are not valid JSON. * Log JSON deserialize error only when buffer.len() > 1. * Try tokio TCP implementation instead of async_std. * Log the case when buffer can be probably lost. * Split incoming connections TcpStreams and process them in separate async loops. * Do not send message back to peer from which it was received. * Remove excessive logging. * Enable CI build for this branch back. * Try to spawn read and write processing loops on tokio 0.2 runtime. * Use String as P2PMessage content to avoid non-deterministic serialization issues. The deserialized and then serialized message might produce non-equal string so same message can be serialized to JSON differently causing double processing on client nodes connected to several seeds. * Explicitly disable debug_assertions for release profile. * Update tokio to 0.2.22. * Fix after merge. --- Cargo.lock | 65 +++++++++---- Cargo.toml | 3 + mm2src/common/Cargo.toml | 1 + mm2src/common/common.rs | 58 ++++++------ mm2src/common/mm_ctx.rs | 18 ++-- mm2src/docker_tests.rs | 1 + mm2src/lp_network.rs | 192 +++++++++++++++++++++------------------ mm2src/lp_ordermatch.rs | 20 ++-- mm2src/lp_swap.rs | 9 +- mm2src/mm2_bin.rs | 1 + mm2src/rpc.rs | 4 +- 11 files changed, 211 insertions(+), 161 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a0235b05e0..7935116e20 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -69,7 +69,7 @@ dependencies = [ "kv-log-macro 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "memchr 2.3.3 (registry+https://github.com/rust-lang/crates.io-index)", - "mio 0.6.19 (registry+https://github.com/rust-lang/crates.io-index)", + "mio 0.6.22 (registry+https://github.com/rust-lang/crates.io-index)", "mio-uds 0.6.7 (registry+https://github.com/rust-lang/crates.io-index)", "num_cpus 1.12.0 (registry+https://github.com/rust-lang/crates.io-index)", "once_cell 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -313,9 +313,14 @@ source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "byteorder 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "either 1.5.0 (registry+https://github.com/rust-lang/crates.io-index)", - "iovec 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "bytes" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "bzip2" version = "0.3.3" @@ -482,6 +487,7 @@ name = "common" version = "0.1.0" dependencies = [ "arrayref 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", + "async-std 1.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "atomic 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", "backtrace 0.3.32 (git+https://github.com/artemii235/backtrace-rs.git)", "base64 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1395,7 +1401,7 @@ dependencies = [ "http 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", "http-body 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "httparse 1.3.3 (registry+https://github.com/rust-lang/crates.io-index)", - "iovec 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "itoa 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "net2 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1455,11 +1461,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "iovec" -version = "0.1.2" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "libc 0.2.67 (registry+https://github.com/rust-lang/crates.io-index)", - "winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -1728,12 +1733,13 @@ dependencies = [ [[package]] name = "mio" -version = "0.6.19" +version = "0.6.22" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ + "cfg-if 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", "fuchsia-zircon 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", - "iovec 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.67 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1748,9 +1754,9 @@ name = "mio-uds" version = "0.6.7" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "iovec 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.67 (registry+https://github.com/rust-lang/crates.io-index)", - "mio 0.6.19 (registry+https://github.com/rust-lang/crates.io-index)", + "mio 0.6.22 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -1785,6 +1791,7 @@ dependencies = [ "futures-cpupool 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", "futures-preview 0.3.0-alpha.18 (registry+https://github.com/rust-lang/crates.io-index)", "futures-timer 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-util 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", "gstuff 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", "hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "http 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1812,6 +1819,7 @@ dependencies = [ "serialization_derive 0.1.0 (git+https://github.com/artemii235/parity-bitcoin.git)", "term 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", "testcontainers 0.7.0 (git+https://github.com/artemii235/testcontainers-rs.git)", + "tokio 0.2.22 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", "unwrap 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "uuid 0.7.4 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2986,7 +2994,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "bytes 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", - "mio 0.6.19 (registry+https://github.com/rust-lang/crates.io-index)", + "mio 0.6.22 (registry+https://github.com/rust-lang/crates.io-index)", "num_cpus 1.12.0 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-codec 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-current-thread 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", @@ -3002,6 +3010,21 @@ dependencies = [ "tokio-uds 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "tokio" +version = "0.2.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "bytes 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-core 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "memchr 2.3.3 (registry+https://github.com/rust-lang/crates.io-index)", + "mio 0.6.22 (registry+https://github.com/rust-lang/crates.io-index)", + "num_cpus 1.12.0 (registry+https://github.com/rust-lang/crates.io-index)", + "pin-project-lite 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "tokio-buf" version = "0.1.1" @@ -3029,9 +3052,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "bytes 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", - "iovec 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", - "mio 0.6.19 (registry+https://github.com/rust-lang/crates.io-index)", + "mio 0.6.22 (registry+https://github.com/rust-lang/crates.io-index)", "scoped-tls 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-executor 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", @@ -3087,7 +3110,7 @@ dependencies = [ "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", - "mio 0.6.19 (registry+https://github.com/rust-lang/crates.io-index)", + "mio 0.6.22 (registry+https://github.com/rust-lang/crates.io-index)", "num_cpus 1.12.0 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", "slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -3123,8 +3146,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "bytes 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", - "iovec 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", - "mio 0.6.19 (registry+https://github.com/rust-lang/crates.io-index)", + "iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", + "mio 0.6.22 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-reactor 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -3173,7 +3196,7 @@ dependencies = [ "bytes 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", - "mio 0.6.19 (registry+https://github.com/rust-lang/crates.io-index)", + "mio 0.6.22 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-codec 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-reactor 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", @@ -3186,10 +3209,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "bytes 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", - "iovec 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.67 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", - "mio 0.6.19 (registry+https://github.com/rust-lang/crates.io-index)", + "mio 0.6.22 (registry+https://github.com/rust-lang/crates.io-index)", "mio-uds 0.6.7 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-codec 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)", @@ -3586,6 +3609,7 @@ dependencies = [ "checksum byte-tools 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "980479e6fde23246dfb54d47580d66b4e99202e7579c5eaa9fe10ecb5ebd2182" "checksum byteorder 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "a019b10a2a7cdeb292db131fc8113e57ea2a908f6e7894b0c3c671893b65dbeb" "checksum bytes 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)" = "40ade3d27603c2cb345eb0912aec461a6dec7e06a4ae48589904e808335c7afa" +"checksum bytes 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)" = "130aac562c0dd69c56b3b1cc8ffd2e17be31d0b6c25b61c96b76231aa23e39e1" "checksum bzip2 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "42b7c3cbf0fa9c1b82308d57191728ca0256cb821220f4e2fd410a72ade26e3b" "checksum bzip2-sys 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "6584aa36f5ad4c9247f5323b0a42f37802b37a836f0ad87084d7a33961abe25f" "checksum c2-chacha 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7d64d04786e0f528460fc884753cf8dddcc466be308f6026f8e355c41a0e4101" @@ -3695,7 +3719,7 @@ dependencies = [ "checksum idna 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "38f09e0f0b1fb55fdee1f17470ad800da77af5186a1a76c026b679358b7e844e" "checksum im 12.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "de38d1511a0ce7677538acb1e31b5df605147c458e061b2cdb89858afb1cd182" "checksum indexmap 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7e81a7c05f79578dbc15793d8b619db9ba32b4577003ef3af1a91c416798c58d" -"checksum iovec 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "dbe6e417e7d0975db6512b90796e8ce223145ac4e33c377e4a42882a0e88bb08" +"checksum iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e" "checksum itertools 0.7.11 (registry+https://github.com/rust-lang/crates.io-index)" = "0d47946d458e94a1b7bcabbf6521ea7c037062c81f534615abcad76e84d4970d" "checksum itertools 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5b8467d9c1cebe26feb08c640139247fac215782d35371ade9a2136ed6085358" "checksum itoa 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "1306f3464951f30e30d12373d31c79fbd52d236e5e896fd92f96ec7babbbe60b" @@ -3728,7 +3752,7 @@ dependencies = [ "checksum metrics-runtime 0.13.0 (registry+https://github.com/rust-lang/crates.io-index)" = "beb3035626782c533953bcc1a349467543b7f0e9d7b0c92edc61ee2bda7033d6" "checksum metrics-util 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d11f8090a8886339f9468a04eeea0711e4cf27538b134014664308041307a1c5" "checksum miniz_oxide 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)" = "aa679ff6578b1cddee93d7e82e263b94a575e0bfced07284eb0c037c1d2416a5" -"checksum mio 0.6.19 (registry+https://github.com/rust-lang/crates.io-index)" = "83f51996a3ed004ef184e16818edc51fadffe8e7ca68be67f9dee67d84d0ff23" +"checksum mio 0.6.22 (registry+https://github.com/rust-lang/crates.io-index)" = "fce347092656428bc8eaf6201042cb551b8d67855af7374542a92a0fbfcac430" "checksum mio-uds 0.6.7 (registry+https://github.com/rust-lang/crates.io-index)" = "966257a94e196b11bb43aca423754d87429960a768de9414f3691d6957abf125" "checksum miow 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8c1f2f3b1cf331de6896aabf6e9d55dca90356cc9960cca7eaaf408a355ae919" "checksum mocktopus 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)" = "c3ddc2275f8c1c95c016bd7fa23b2debcc6e2f24f05cbbfa250e67ea32428ad5" @@ -3866,6 +3890,7 @@ dependencies = [ "checksum time 0.1.41 (registry+https://github.com/rust-lang/crates.io-index)" = "847da467bf0db05882a9e2375934a8a55cffdc9db0d128af1518200260ba1f6c" "checksum tiny-keccak 1.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e9175261fbdb60781fcd388a4d6cc7e14764a2b629a7ad94abb439aed223a44f" "checksum tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)" = "5a09c0b5bb588872ab2f09afa13ee6e9dac11e10a0ec9e8e3ba39a5a5d530af6" +"checksum tokio 0.2.22 (registry+https://github.com/rust-lang/crates.io-index)" = "5d34ca54d84bf2b5b4d7d31e901a8464f7b60ac145a284fba25ceb801f2ddccd" "checksum tokio-buf 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8fb220f46c53859a4b7ec083e41dec9778ff0b1851c0942b211edb89e0ccdc46" "checksum tokio-codec 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "5c501eceaf96f0e1793cf26beb63da3d11c738c4a943fdf3746d81d64684c39f" "checksum tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)" = "aeeffbbb94209023feaef3c196a41cbcdafa06b4a6f893f68779bb5e53796f71" diff --git a/Cargo.toml b/Cargo.toml index 845d7073f9..3aca62edd9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,6 +55,7 @@ opt-level = 1 [profile.release] # Due to the "overrides" only affects our workspace crates, as intended. debug = true +debug-assertions = false # For better or worse, might affect the stack traces in our portion of the code. #opt-level = 1 @@ -82,6 +83,7 @@ futures-cpupool = "0.1" # whereas futures-timer works with any reactor and spawns one global timer thread (fits out load better). futures-timer = "0.1" futures = { version = ">=0.3.0-alpha.16, <0.4", package = "futures-preview", features = ["compat", "async-await", "nightly"] } +futures-util = "0.3.4" gstuff = { version = "0.6", features = ["nightly"] } hex = "0.3.2" http = "0.1" @@ -114,6 +116,7 @@ serialization_derive = { git = "https://github.com/artemii235/parity-bitcoin.git term = "=0.5.1" tokio-core = { version = "0.1", optional = true } +tokio = { version = "0.2.22", features = ["io-util", "rt-threaded", "stream", "tcp"] } unwrap = "1.2" uuid = { version = "0.7", features = ["serde", "v4"] } winapi = "0.3" diff --git a/mm2src/common/Cargo.toml b/mm2src/common/Cargo.toml index 2f1c361730..5eff96f85c 100644 --- a/mm2src/common/Cargo.toml +++ b/mm2src/common/Cargo.toml @@ -18,6 +18,7 @@ doctest = false [dependencies] arrayref = "0.3" +async-std = {version = "1.5", features = ["unstable"]} atomic = "0.4" backtrace = "0.3" base64 = "0.10.0" diff --git a/mm2src/common/common.rs b/mm2src/common/common.rs index d60c616e3a..cbebc672b8 100644 --- a/mm2src/common/common.rs +++ b/mm2src/common/common.rs @@ -14,6 +14,7 @@ #![feature(async_closure)] #![feature(hash_raw_entry)] #![feature(optin_builtin_traits)] +#![feature(drain_filter)] #![feature(const_fn)] #![allow(uncommon_codepoints)] #![cfg_attr(not(feature = "native"), allow(unused_imports))] @@ -117,11 +118,12 @@ use std::future::Future as Future03; use std::intrinsics::copy; use std::io::Write; use std::mem::{forget, size_of, zeroed}; +use std::net::SocketAddr; use std::ops::{Add, Deref, Div, RangeInclusive}; use std::os::raw::{c_char, c_void}; use std::path::{Path, PathBuf}; #[cfg(not(feature = "native"))] use std::pin::Pin; -use std::ptr::{null_mut, read_volatile}; +use std::ptr::read_volatile; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use std::time::UNIX_EPOCH; @@ -1251,45 +1253,39 @@ impl From for StringError { fn from(e: std::io::Error) -> StringError { StringError(ERRL!("{}", e)) } } +#[derive(Clone, Debug)] +pub struct P2PMessage { + pub from: SocketAddr, + pub content: String, +} + +impl P2PMessage { + pub fn from_string_with_default_addr(content: String) -> P2PMessage { + P2PMessage { + from: SocketAddr::new([0; 4].into(), 0), + content, + } + } + + pub fn from_serialize_with_default_addr(msg: &T) -> P2PMessage { + P2PMessage { + from: SocketAddr::new([0; 4].into(), 0), + content: serde_json::to_string(msg).unwrap(), + } + } +} + #[derive(Debug)] pub struct QueuedCommand { pub response_sock: i32, pub stats_json_only: i32, pub queue_id: u32, - pub msg: String, + pub msg: P2PMessage, // retstrp: *mut *mut c_char, } /// Register an RPC command that came internally or from the peer-to-peer bus. -#[no_mangle] -#[cfg(feature = "native")] -#[allow(clippy::missing_safety_doc)] -pub unsafe extern "C" fn lp_queue_command_for_c( - retstrp: *mut *mut c_char, - buf: *mut c_char, - response_sock: i32, - stats_json_only: i32, - queue_id: u32, -) { - if !retstrp.is_null() { - *retstrp = null_mut() - } - - if buf.is_null() { - panic!("!buf") - } - let msg = String::from(unwrap!(CStr::from_ptr(buf).to_str())); - let _cmd = QueuedCommand { - msg, - queue_id, - response_sock, - stats_json_only, - }; - panic!("We need a context ID"); - //unwrap! ((*COMMAND_QUEUE).0.send (cmd)) -} - -pub fn lp_queue_command(ctx: &mm_ctx::MmArc, msg: String) -> Result<(), String> { +pub fn lp_queue_command(ctx: &mm_ctx::MmArc, msg: P2PMessage) -> Result<(), String> { // If we're helping a WASM then leave a copy of the broadcast for them. if let Some(ref mut cq) = *try_s!(ctx.command_queueʰ.lock()) { // Monotonic increment. diff --git a/mm2src/common/mm_ctx.rs b/mm2src/common/mm_ctx.rs index 4d582f5dd1..d4a818285d 100644 --- a/mm2src/common/mm_ctx.rs +++ b/mm2src/common/mm_ctx.rs @@ -24,12 +24,13 @@ use std::sync::{Arc, Mutex, Weak}; use crate::executor::Timer; use crate::log::{self, LogState}; use crate::mm_metrics::{prometheus, MetricsArc}; -use crate::{bits256, small_rng, QueuedCommand}; +use crate::{bits256, block_on, small_rng, P2PMessage, QueuedCommand}; /// Default interval to export and record metrics to log. const EXPORT_METRICS_INTERVAL: f64 = 5. * 60.; type StopListenerCallback = Box Result<(), String>>; +use futures::SinkExt; /// MarketMaker state, shared between the various MarketMaker threads. /// @@ -85,7 +86,7 @@ pub struct MmCtx { /// The context belonging to the `prices` mod: `PricesContext`. pub prices_ctx: Mutex>>, /// Seednode P2P message bus channel. - pub seednode_p2p_channel: (Sender>, Receiver>), + pub seednode_p2p_channel: Mutex>>, /// Standard node P2P message bus channel. pub client_p2p_channel: (Sender>, Receiver>), /// `lp_queue_command` shares messages with `lp_command_q_loop` via this channel. @@ -94,7 +95,7 @@ pub struct MmCtx { /// The end of the `command_queue` channel taken by `lp_command_q_loop`. pub command_queueʳ: Mutex>>, /// Broadcast `lp_queue_command` messages saved for WASM. - pub command_queueʰ: Mutex>>, + pub command_queueʰ: Mutex>>, /// RIPEMD160(SHA256(x)) where x is secp256k1 pubkey derived from passphrase. /// Replacement of `lp::G.LP_myrmd160`. pub rmd160: Constructible, @@ -127,7 +128,7 @@ impl MmCtx { http_fallback_ctx: Mutex::new(None), coins_ctx: Mutex::new(None), prices_ctx: Mutex::new(None), - seednode_p2p_channel: channel::unbounded(), + seednode_p2p_channel: Mutex::new(Vec::with_capacity(1000)), client_p2p_channel: channel::unbounded(), command_queue, command_queueʳ: Mutex::new(Some(command_queueʳ)), @@ -229,12 +230,15 @@ impl MmCtx { /// Sends the P2P message to a processing thread #[cfg(feature = "native")] - pub fn broadcast_p2p_msg(&self, msg: &str) { + pub fn broadcast_p2p_msg(&self, msg: P2PMessage) { let i_am_seed = self.conf["i_am_seed"].as_bool().unwrap_or(false); if i_am_seed { - unwrap!(self.seednode_p2p_channel.0.send(msg.to_owned().into_bytes())); + let mut txs = self.seednode_p2p_channel.lock().unwrap(); + *txs = txs + .drain_filter(|sender| block_on(sender.send(msg.clone())).is_ok()) + .collect(); } else { - unwrap!(self.client_p2p_channel.0.send(msg.to_owned().into_bytes())); + unwrap!(self.client_p2p_channel.0.send(msg.content.into_bytes())); } } diff --git a/mm2src/docker_tests.rs b/mm2src/docker_tests.rs index df72c7b53a..fbec86078f 100644 --- a/mm2src/docker_tests.rs +++ b/mm2src/docker_tests.rs @@ -3,6 +3,7 @@ #![test_runner(docker_tests_runner)] #![feature(drain_filter)] #![feature(non_ascii_idents)] +#![recursion_limit = "512"] #[cfg(test)] use docker_tests::docker_tests_runner; #[cfg(test)] diff --git a/mm2src/lp_network.rs b/mm2src/lp_network.rs index b3cd9aa637..aef019a725 100644 --- a/mm2src/lp_network.rs +++ b/mm2src/lp_network.rs @@ -23,20 +23,23 @@ use bytes::Bytes; use common::executor::{spawn, Timer}; #[cfg(not(feature = "native"))] use common::helperᶜ; use common::mm_ctx::MmArc; -use common::{lp_queue_command, now_float, now_ms, HyRes, QueuedCommand}; +use common::{lp_queue_command, now_ms, HyRes, P2PMessage, QueuedCommand}; use crossbeam::channel; +use futures::channel::mpsc; use futures::compat::Future01CompatExt; use futures::future::FutureExt; +use futures::StreamExt; use futures01::{future, Future}; use primitives::hash::H160; use serde_bencode::de::from_bytes as bdecode; -use serde_bencode::ser::to_bytes as bencode; use serde_json::{self as json, Value as Json}; use std::collections::hash_map::{Entry, HashMap}; use std::io::{BufRead, BufReader, Write}; -use std::net::{IpAddr, TcpListener, TcpStream}; +use std::net::{IpAddr, TcpStream}; use std::thread; use std::time::Duration; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt}; +use tokio::net::TcpListener as AsyncTcpListener; use crate::mm2::lp_native_dex::lp_command_process; use crate::mm2::lp_ordermatch::lp_post_price_recv; @@ -121,7 +124,6 @@ fn rpc_reply_to_peer(handler: HyRes, cmd: QueuedCommand) { /// The thread processing the peer-to-peer messaging bus. pub async fn lp_command_q_loop(ctx: MmArc) { use futures::future::{select, Either}; - use futures::StreamExt; let mut command_queueʳ = unwrap!(unwrap!(ctx.command_queueʳ.lock()).take().ok_or("!command_queueʳ")); @@ -153,20 +155,21 @@ pub async fn lp_command_q_loop(ctx: MmArc) { .filter(|(_, timestamp)| timestamp + 60000 > now) .collect(); - let msg_hash = ripemd160(cmd.msg.as_bytes()); + let msg_hash = ripemd160(cmd.msg.content.as_bytes()); match processed_messages.entry(msg_hash) { Entry::Vacant(e) => e.insert(now), Entry::Occupied(_) => continue, // skip the messages that we processed previously }; - let json: Json = match json::from_str(&cmd.msg) { + let json: Json = match json::from_str(&cmd.msg.content) { Ok(j) => j, - Err(e) => { - log!("Error " (e) " parsing JSON from msg " (cmd.msg)); + Err(_) => { + if cmd.msg.content.len() > 1 { + log!("Invalid JSON " (cmd.msg.content) " from " (cmd.msg.from)); + } continue; }, }; - let method = json["method"].as_str(); if let Some(m) = method { if m == "swapstatus" { @@ -180,7 +183,7 @@ pub async fn lp_command_q_loop(ctx: MmArc) { // swapstatus is excluded from rebroadcast as the message is big and other nodes might just not need it let i_am_seed = ctx.conf["i_am_seed"].as_bool().unwrap_or(false); if i_am_seed { - ctx.broadcast_p2p_msg(&cmd.msg); + ctx.broadcast_p2p_msg(cmd.msg.clone()); } let json = match dispatcher(json, ctx.clone()) { @@ -197,92 +200,108 @@ pub async fn lp_command_q_loop(ctx: MmArc) { } /// The loop processing seednode activity as message relayer/rebroadcaster -/// Non-blocking mode should be enabled on listener for this to work -pub fn seednode_loop(ctx: MmArc, listener: TcpListener) { - let mut clients = vec![]; - loop { - if ctx.is_stopping() { - break; - } - - match listener.accept() { - Ok((stream, addr)) => { - ctx.log.log( - "😀", - &[&"incoming_connection", &addr.to_string().as_str()], - "New connection...", - ); - match stream.set_nonblocking(true) { - Ok(_) => clients.push((BufReader::new(stream), addr, String::new())), - Err(e) => ctx.log.log( - "😟", - &[&"incoming_connection", &addr.to_string().as_str()], - &format!("Error {} setting nonblocking mode", e), - ), - } - }, - Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => (), - Err(e) => panic!("encountered IO error: {}", e), - } - - let mut commands = Vec::new(); - clients = clients - .drain_filter(|(client, addr, buf)| match client.read_line(buf) { - Ok(_) => { - if !buf.is_empty() { - let msgs = buf.split('\n'); - for msg in msgs { - if !msg.is_empty() { - commands.push(msg.to_string()) - } - } - buf.clear(); - } - true +pub fn seednode_loop(ctx: MmArc, listener: std::net::TcpListener) { + let fut = async move { + let mut listener = AsyncTcpListener::from_std(listener).unwrap(); + let mut incoming = listener.incoming(); + while let Some(stream) = futures_util::StreamExt::next(&mut incoming).await { + let stream = match stream { + Ok(s) => s, + Err(e) => { + log!("Error " (e) " on connection accept"); + continue; }, - Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => true, + }; + let peer_addr = match stream.peer_addr() { + Ok(a) => a, Err(e) => { - ctx.log.log( - "😟", - &[&"incoming_connection", &addr.to_string().as_str()], - &format!("Error {} reading from socket, dropping connection", e), - ); - false + log!("Could not get peer addr from stream " [stream] ", error " (e)); + continue; }, - }) - .collect(); - for msg in commands { - unwrap!(lp_queue_command(&ctx, msg)); - } - - clients = match ctx.seednode_p2p_channel.1.recv_timeout(Duration::from_millis(1)) { - Ok(mut msg) => clients - .drain_filter(|(client, addr, _)| { - msg.push(b'\n'); - match client.get_mut().write(&msg) { - Ok(_) => true, + }; + ctx.log.log( + "😀", + &[&"incoming_connection", &peer_addr.to_string().as_str()], + "New connection...", + ); + let ctx_read = ctx.clone(); + let ctx_write = ctx.clone(); + let (tx, mut rx) = mpsc::unbounded(); + ctx.seednode_p2p_channel.lock().unwrap().push(tx); + let (read, mut write) = stream.into_split(); + let read_loop = async move { + let mut read = tokio::io::BufReader::new(read); + let mut buffer = String::with_capacity(1024); + loop { + match read.read_line(&mut buffer).await { + Ok(read) => { + if read > 0 && !buffer.is_empty() { + unwrap!(lp_queue_command(&ctx_read, P2PMessage { + from: peer_addr, + content: buffer.clone(), + })); + buffer.clear(); + } else if read == 0 { + ctx_read.log.log( + "😟", + &[&"incoming_connection", &peer_addr.to_string().as_str()], + "Reached EOF, dropping connection", + ); + break; + } + }, Err(e) => { - ctx.log.log( + ctx_read.log.log( "😟", - &[&"incoming_connection", &addr.to_string().as_str()], - &format!("Error {} writing to socket, dropping connection", e), + &[&"incoming_connection", &peer_addr.to_string().as_str()], + &format!("Error {} reading from socket, dropping connection", e), ); - false + break; }, } - }) - .collect(), - Err(channel::RecvTimeoutError::Timeout) => clients, - Err(channel::RecvTimeoutError::Disconnected) => panic!("seednode_p2p_channel is disconnected"), - }; - } + } + }; + let write_loop = async move { + while let Some(mut msg) = rx.next().await { + if msg.from != peer_addr { + if !msg.content.ends_with('\n') { + msg.content.push('\n'); + } + match write.write_all(msg.content.as_bytes()).await { + Ok(_) => (), + Err(e) => { + ctx_write.log.log( + "😟", + &[&"incoming_connection", &peer_addr.to_string().as_str()], + &format!("Error {} writing to socket, dropping connection", e), + ); + break; + }, + }; + } + } + }; + tokio::spawn(async move { + // selecting over the read and write parts processing loops in order to + // drop both parts and close connection in case of errors + futures::select! { + read = Box::pin(read_loop).fuse() => (), + write = Box::pin(write_loop).fuse() => (), + }; + }); + } + }; + // creating separate tokio 0.2 runtime as TcpListener requires it and doesn't work with + // shared tokio 0.1 core + let mut runtime = tokio::runtime::Runtime::new().unwrap(); + runtime.block_on(fut); } #[cfg(feature = "native")] pub async fn start_seednode_loop(ctx: &MmArc, myipaddr: IpAddr, mypubport: u16) -> Result<(), String> { log! ("i_am_seed at " (myipaddr) ":" (mypubport)); - let listener: TcpListener = try_s!(TcpListener::bind(&fomat!((myipaddr) ":" (mypubport)))); - try_s!(listener.set_nonblocking(true)); + let to_bind = std::net::SocketAddr::new(myipaddr, mypubport); + let listener = try_s!(std::net::TcpListener::bind(to_bind)); try_s!(thread::Builder::new().name("seednode_loop".into()).spawn({ let ctx = ctx.clone(); move || seednode_loop(ctx, listener) @@ -421,6 +440,7 @@ fn start_queue_tap(ctx: MmArc) -> Result<(), String> { Ok(()) } +/* /// Poll the native helpers for messages coming from the seed nodes. #[cfg(feature = "native")] pub async fn p2p_tapʰ(req: Bytes) -> Result, String> { @@ -459,7 +479,7 @@ pub async fn broadcast_p2p_msgʰ(req: Bytes) -> Result, String> { ctx.broadcast_p2p_msg(&args.msg); Ok(Vec::new()) } - +*/ /// Tells the native helpers to start the client_p2p_loop, collecting messages from the seed nodes. #[cfg(feature = "native")] pub async fn start_client_p2p_loopʰ(req: Bytes) -> Result, String> { @@ -528,8 +548,8 @@ fn client_p2p_loop(ctx: MmArc, addrs: Vec) { if !conn.buf.is_empty() { let msgs = conn.buf.split('\n'); for msg in msgs { - if !msg.is_empty() { - commands.push(msg.to_string()) + if msg.len() > 1 { + commands.push(P2PMessage::from_string_with_default_addr(msg.to_owned())); } } conn.buf.clear(); diff --git a/mm2src/lp_ordermatch.rs b/mm2src/lp_ordermatch.rs index 8911c54d36..5d4d6af291 100644 --- a/mm2src/lp_ordermatch.rs +++ b/mm2src/lp_ordermatch.rs @@ -27,7 +27,8 @@ use coins::{lp_coinfind, lp_coinfindᵃ, MmCoinEnum}; use common::executor::{spawn, Timer}; use common::mm_ctx::{from_ctx, MmArc, MmWeak}; use common::mm_number::{from_dec_to_ratio, from_ratio_to_dec, Fraction, MmNumber}; -use common::{bits256, json_dir_entries, new_uuid, now_ms, remove_file, rpc_err_response, rpc_response, write, HyRes}; +use common::{bits256, json_dir_entries, new_uuid, now_ms, remove_file, rpc_err_response, rpc_response, write, HyRes, + P2PMessage}; use futures::compat::Future01CompatExt; use gstuff::slurp; use http::Response; @@ -1021,7 +1022,7 @@ pub fn lp_trade_command(ctx: MmArc, json: Json) -> i32 { taker_order_uuid: reserved_msg.taker_order_uuid, maker_order_uuid: reserved_msg.maker_order_uuid, }; - ctx.broadcast_p2p_msg(&unwrap!(json::to_string(&connect))); + ctx.broadcast_p2p_msg(P2PMessage::from_serialize_with_default_addr(&connect)); let taker_match = TakerMatch { reserved: reserved_msg, connect, @@ -1119,7 +1120,7 @@ pub fn lp_trade_command(ctx: MmArc, json: Json) -> i32 { }) }), }; - ctx.broadcast_p2p_msg(&unwrap!(json::to_string(&reserved))); + ctx.broadcast_p2p_msg(P2PMessage::from_serialize_with_default_addr(&reserved)); let maker_match = MakerMatch { request: taker_request, reserved, @@ -1170,7 +1171,7 @@ pub fn lp_trade_command(ctx: MmArc, json: Json) -> i32 { maker_order_uuid: connect_msg.maker_order_uuid, method: "connected".into(), }; - ctx.broadcast_p2p_msg(&unwrap!(json::to_string(&connected))); + ctx.broadcast_p2p_msg(P2PMessage::from_serialize_with_default_addr(&connected)); order_match.connect = Some(connect_msg); order_match.connected = Some(connected); my_order.started_swaps.push(order_match.request.uuid); @@ -1294,7 +1295,7 @@ pub fn lp_auto_buy( .with_conf_settings(conf_settings) .with_sender_pubkey(H256Json::from(our_public_id.bytes)); let request = try_s!(request_builder.build()); - ctx.broadcast_p2p_msg(&unwrap!(json::to_string(&request))); + ctx.broadcast_p2p_msg(P2PMessage::from_serialize_with_default_addr(&request)); let result = json!({ "result": request }).to_string(); let order = TakerOrder { created_at: now_ms(), @@ -1445,20 +1446,19 @@ pub fn lp_post_price_recv(ctx: &MmArc, req: Json) -> HyRes { } fn lp_send_price_ping(req: &PricePingRequest, ctx: &MmArc) -> Result<(), String> { - let req_string = try_s!(json::to_string(req)); + let msg = P2PMessage::from_serialize_with_default_addr(&req); + let req_value = try_s!(json::to_value(req)); + let ctxʹ = ctx.clone(); // TODO this is required to process the set price message on our own node, it's the easiest way now // there might be a better way of doing this so we should consider refactoring - let req_value = try_s!(json::to_value(req)); - let ctxʹ = ctx.clone(); spawn(async move { let rc = lp_post_price_recv(&ctxʹ, req_value).compat().await; if let Err(err) = rc { log!("!lp_post_price_recv: "(err)) } }); - - ctx.broadcast_p2p_msg(&req_string); + ctx.broadcast_p2p_msg(msg); Ok(()) } diff --git a/mm2src/lp_swap.rs b/mm2src/lp_swap.rs index 2d33695def..109945e63c 100644 --- a/mm2src/lp_swap.rs +++ b/mm2src/lp_swap.rs @@ -64,7 +64,7 @@ use common::{block_on, executor::spawn, mm_ctx::{from_ctx, MmArc}, mm_number::MmNumber, - read_dir, rpc_response, slurp, write, HyRes}; + read_dir, rpc_response, slurp, write, HyRes, P2PMessage}; use http::Response; use primitives::hash::{H160, H256, H264}; use rpc::v1::types::{Bytes as BytesJson, H256 as H256Json}; @@ -640,12 +640,11 @@ fn broadcast_my_swap_status(uuid: &str, ctx: &MmArc) -> Result<(), String> { SavedSwap::Maker(ref mut swap) => swap.hide_secret(), }; try_s!(save_stats_swap(ctx, &status)); - let status_string = json!({ + let status = json!({ "method": "swapstatus", "data": status, - }) - .to_string(); - ctx.broadcast_p2p_msg(&status_string); + }); + ctx.broadcast_p2p_msg(P2PMessage::from_serialize_with_default_addr(&status)); Ok(()) } diff --git a/mm2src/mm2_bin.rs b/mm2src/mm2_bin.rs index 1138e3b01b..fe92a57f6a 100644 --- a/mm2src/mm2_bin.rs +++ b/mm2src/mm2_bin.rs @@ -1,5 +1,6 @@ #![feature(non_ascii_idents)] #![feature(drain_filter)] +#![recursion_limit = "512"] #[macro_use] extern crate common; #[macro_use] extern crate fomat_macros; diff --git a/mm2src/rpc.rs b/mm2src/rpc.rs index b0cc0c1546..7324ebd2df 100644 --- a/mm2src/rpc.rs +++ b/mm2src/rpc.rs @@ -139,8 +139,8 @@ async fn helpers( } let res = match method { - "broadcast_p2p_msg" => try_s!(lp_network::broadcast_p2p_msgʰ(reqᵇ).await), - "p2p_tap" => try_s!(lp_network::p2p_tapʰ(reqᵇ).await), + // "broadcast_p2p_msg" => try_s! (lp_network::broadcast_p2p_msgʰ (reqᵇ) .await), + // "p2p_tap" => try_s! (lp_network::p2p_tapʰ (reqᵇ) .await), "common_wait_for_log_re" => try_s!(common_wait_for_log_re(reqᵇ).await), "ctx2helpers" => try_s!(ctx2helpers(ctx, reqᵇ).await), "peers_initialize" => try_s!(peers::peers_initialize(reqᵇ).await),