diff --git a/Cargo.lock b/Cargo.lock index b55add5f6..ae4741b8f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -45,6 +45,7 @@ dependencies = [ "cfg-if", "cipher 0.3.0", "cpufeatures", + "ctr 0.8.0", "opaque-debug", ] @@ -73,6 +74,17 @@ dependencies = [ "subtle", ] +[[package]] +name = "ahash" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcb51a0695d8f838b1ee009b3fbf66bda078cd64590202a864a8f3e8c4315c47" +dependencies = [ + "getrandom 0.2.8", + "once_cell", + "version_check", +] + [[package]] name = "ahash" version = "0.8.3" @@ -1402,6 +1414,16 @@ dependencies = [ "syn 1.0.107", ] +[[package]] +name = "delay_map" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4355c25cbf99edcb6b4a0e906f6bdc6956eda149e84455bea49696429b2f8e8" +dependencies = [ + "futures", + "tokio-util", +] + [[package]] name = "der" version = "0.7.4" @@ -1513,6 +1535,39 @@ dependencies = [ "winapi", ] +[[package]] +name = "discv5" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "98c05fa26996c6141f78ac4fafbe297a7fa69690565ba4e0d1f2e60bde5ce501" +dependencies = [ + "aes 0.7.5", + "aes-gcm", + "arrayvec", + "delay_map", + "enr 0.9.1", + "fnv", + "futures", + "hashlink", + "hex", + "hkdf", + "lazy_static", + "libp2p-core", + "libp2p-identity", + "lru 0.7.8", + "more-asserts", + "parking_lot 0.11.2", + "rand", + "rlp", + "smallvec", + "socket2 0.4.9", + "tokio", + "tracing", + "tracing-subscriber", + "uint", + "zeroize", +] + [[package]] name = "displaydoc" version = "0.2.4" @@ -1657,6 +1712,25 @@ dependencies = [ "zeroize", ] +[[package]] +name = "enr" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe81b5c06ecfdbc71dd845216f225f53b62a10cb8a16c946836a3467f701d05b" +dependencies = [ + "base64 0.21.0", + "bytes", + "ed25519-dalek", + "hex", + "k256", + "log", + "rand", + "rlp", + "serde", + "sha3", + "zeroize", +] + [[package]] name = "enum-as-inner" version = "0.5.1" @@ -1681,6 +1755,19 @@ dependencies = [ "syn 2.0.32", ] +[[package]] +name = "env_logger" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85cdab6a89accf66733ad5a1693a4dcced6aeff64602b634530dd73c1f3ee9f0" +dependencies = [ + "humantime", + "is-terminal", + "log", + "regex", + "termcolor", +] + [[package]] name = "equivalent" version = "1.0.1" @@ -2007,7 +2094,7 @@ dependencies = [ "auto_impl", "base64 0.21.0", "bytes", - "enr", + "enr 0.8.1", "ethers-core", "futures-core", "futures-timer", @@ -2478,11 +2565,23 @@ dependencies = [ "tracing", ] +[[package]] +name = "hashbrown" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e" +dependencies = [ + "ahash 0.7.6", +] + [[package]] name = "hashbrown" version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +dependencies = [ + "ahash 0.7.6", +] [[package]] name = "hashbrown" @@ -2490,7 +2589,7 @@ version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "33ff8ae62cd3a9102e5637afc8452c55acf3844001bd5374e0b0bd7b6616c038" dependencies = [ - "ahash", + "ahash 0.8.3", ] [[package]] @@ -2508,6 +2607,15 @@ dependencies = [ "fxhash", ] +[[package]] +name = "hashlink" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7249a3129cbc1ffccd74857f81464a323a152173cdb134e0fd81bc803b29facf" +dependencies = [ + "hashbrown 0.11.2", +] + [[package]] name = "heck" version = "0.4.1" @@ -2647,6 +2755,12 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + [[package]] name = "hyper" version = "0.14.27" @@ -3026,7 +3140,7 @@ dependencies = [ "futures-util", "hyper", "jsonrpsee-types", - "parking_lot", + "parking_lot 0.12.1", "rand", "rustc-hash", "serde", @@ -3240,6 +3354,7 @@ dependencies = [ "libp2p-mdns", "libp2p-metrics", "libp2p-noise", + "libp2p-ping", "libp2p-quic", "libp2p-swarm", "libp2p-tcp", @@ -3274,9 +3389,9 @@ dependencies = [ [[package]] name = "libp2p-core" -version = "0.40.1" +version = "0.40.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd44289ab25e4c9230d9246c475a22241e301b23e8f4061d3bdef304a1a99713" +checksum = "ef7dd7b09e71aac9271c60031d0e558966cdb3253ba0308ab369bb2de80630d0" dependencies = [ "either", "fnv", @@ -3289,7 +3404,7 @@ dependencies = [ "multihash", "multistream-select", "once_cell", - "parking_lot", + "parking_lot 0.12.1", "pin-project", "quick-protobuf", "rand", @@ -3310,7 +3425,7 @@ dependencies = [ "libp2p-core", "libp2p-identity", "log", - "parking_lot", + "parking_lot 0.12.1", "smallvec", "trust-dns-resolver", ] @@ -3329,7 +3444,7 @@ dependencies = [ "libp2p-identity", "libp2p-swarm", "log", - "lru", + "lru 0.10.1", "quick-protobuf", "quick-protobuf-codec", "smallvec", @@ -3339,14 +3454,13 @@ dependencies = [ [[package]] name = "libp2p-identity" -version = "0.2.5" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57bf6e730ec5e7022958da53ffb03b326e681b7316939012ae9b3c7449a812d4" +checksum = "686e73aff5e23efbb99bc85340ea6fd8686986aa7b283a881ba182cfca535ca9" dependencies = [ "asn1_der", "bs58 0.5.0", "ed25519-dalek", - "hkdf", "libsecp256k1", "log", "multihash", @@ -3388,11 +3502,31 @@ dependencies = [ "libp2p-core", "libp2p-identify", "libp2p-identity", + "libp2p-ping", "libp2p-swarm", "once_cell", "prometheus-client", ] +[[package]] +name = "libp2p-mplex" +version = "0.40.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93959ed08b6caf9810e067655e25f1362098797fef7c44d3103e63dcb6f0fabe" +dependencies = [ + "asynchronous-codec", + "bytes", + "futures", + "libp2p-core", + "libp2p-identity", + "log", + "nohash-hasher", + "parking_lot 0.12.1", + "rand", + "smallvec", + "unsigned-varint", +] + [[package]] name = "libp2p-noise" version = "0.43.1" @@ -3418,6 +3552,24 @@ dependencies = [ "zeroize", ] +[[package]] +name = "libp2p-ping" +version = "0.43.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3cd5ee3270229443a2b34b27ed0cb7470ef6b4a6e45e54e89a8771fa683bab48" +dependencies = [ + "either", + "futures", + "futures-timer", + "instant", + "libp2p-core", + "libp2p-identity", + "libp2p-swarm", + "log", + "rand", + "void", +] + [[package]] name = "libp2p-quic" version = "0.9.2" @@ -3432,7 +3584,7 @@ dependencies = [ "libp2p-identity", "libp2p-tls", "log", - "parking_lot", + "parking_lot 0.12.1", "quinn", "rand", "rustls 0.21.7", @@ -3443,9 +3595,9 @@ dependencies = [ [[package]] name = "libp2p-swarm" -version = "0.43.5" +version = "0.43.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab94183f8fc2325817835b57946deb44340c99362cd4606c0a5717299b2ba369" +checksum = "28016944851bd73526d3c146aabf0fa9bbe27c558f080f9e5447da3a1772c01a" dependencies = [ "either", "fnv", @@ -3631,6 +3783,15 @@ dependencies = [ "value-bag", ] +[[package]] +name = "lru" +version = "0.7.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999beba7b6e8345721bd280141ed958096a2e4abdf74f67ff4ce49b4b54e47a" +dependencies = [ + "hashbrown 0.12.3", +] + [[package]] name = "lru" version = "0.10.1" @@ -3726,7 +3887,7 @@ version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aa8ebbd1a9e57bbab77b9facae7f5136aea44c356943bf9a198f647da64285d6" dependencies = [ - "ahash", + "ahash 0.8.3", "metrics-macros", "portable-atomic", ] @@ -3862,6 +4023,12 @@ dependencies = [ "syn 1.0.107", ] +[[package]] +name = "more-asserts" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7843ec2de400bcbc6a6328c958dc38e5359da6e93e72e37bc5246bf1ae776389" + [[package]] name = "multiaddr" version = "0.18.0" @@ -4236,6 +4403,17 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "14f2252c834a40ed9bb5422029649578e63aa341ac401f74e719dd1afda8394e" +[[package]] +name = "parking_lot" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" +dependencies = [ + "instant", + "lock_api", + "parking_lot_core 0.8.6", +] + [[package]] name = "parking_lot" version = "0.12.1" @@ -4243,7 +4421,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" dependencies = [ "lock_api", - "parking_lot_core", + "parking_lot_core 0.9.7", +] + +[[package]] +name = "parking_lot_core" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60a2cfe6f0ad2bfc16aefa463b497d5c7a5ecd44a23efa72aa342d90177356dc" +dependencies = [ + "cfg-if", + "instant", + "libc", + "redox_syscall 0.2.16", + "smallvec", + "winapi", ] [[package]] @@ -4662,7 +4854,7 @@ checksum = "3c99afa9a01501019ac3a14d71d9f94050346f55ca471ce90c799a15c58f61e2" dependencies = [ "dtoa", "itoa", - "parking_lot", + "parking_lot 0.12.1", "prometheus-client-derive-encode", ] @@ -5265,17 +5457,24 @@ dependencies = [ name = "rundler-network" version = "0.1.0-beta" dependencies = [ + "discv5", "ethereum_ssz", "ethereum_ssz_derive", "ethers", + "futures", + "hex", "libp2p", + "libp2p-mplex", "rand", "rundler-types", "snap", "ssz_types", "thiserror", + "tokio", + "tokio-io-timeout", "tokio-util", "tracing", + "tracing-test", "unsigned-varint", ] @@ -5292,7 +5491,7 @@ dependencies = [ "itertools 0.11.0", "metrics", "mockall", - "parking_lot", + "parking_lot 0.12.1", "prost", "rundler-provider", "rundler-sim", @@ -5408,10 +5607,17 @@ version = "0.1.0-beta" dependencies = [ "anyhow", "clap 4.4.4", + "discv5", "dotenv", + "enr 0.9.1", + "env_logger", "ethers", "ethers-signers", + "futures", + "hex", + "libp2p", "rundler-dev", + "rundler-network", "rundler-rpc", "rusoto_core", "rusoto_kms", @@ -6199,7 +6405,7 @@ checksum = "f91138e76242f575eb1d3b38b4f1362f10d3a43f47d182a5b359af488a02293b" dependencies = [ "new_debug_unreachable", "once_cell", - "parking_lot", + "parking_lot 0.12.1", "phf_shared 0.10.0", "precomputed-hash", ] @@ -6508,16 +6714,16 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.33.0" +version = "1.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f38200e3ef7995e5ef13baec2f432a6da0aa9ac495b2c0e8f3b7eec2c92d653" +checksum = "17ed6077ed6cd6c74735e21f37eb16dc3935f96878b1fe961074089cc80893f9" dependencies = [ "backtrace", "bytes", "libc", "mio", "num_cpus", - "parking_lot", + "parking_lot 0.12.1", "pin-project-lite", "signal-hook-registry", "socket2 0.5.4", @@ -6605,6 +6811,7 @@ dependencies = [ "futures-io", "futures-sink", "pin-project-lite", + "slab", "tokio", "tracing", ] @@ -6855,6 +7062,29 @@ dependencies = [ "tracing-serde", ] +[[package]] +name = "tracing-test" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a2c0ff408fe918a94c428a3f2ad04e4afd5c95bbc08fcf868eff750c15728a4" +dependencies = [ + "lazy_static", + "tracing-core", + "tracing-subscriber", + "tracing-test-macro", +] + +[[package]] +name = "tracing-test-macro" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "258bc1c4f8e2e73a977812ab339d503e6feeb92700f6d07a6de4d321522d5c08" +dependencies = [ + "lazy_static", + "quote", + "syn 1.0.107", +] + [[package]] name = "tree_hash" version = "0.5.2" @@ -6903,7 +7133,7 @@ dependencies = [ "ipconfig", "lazy_static", "lru-cache", - "parking_lot", + "parking_lot 0.12.1", "resolv-conf", "smallvec", "thiserror", @@ -7591,7 +7821,7 @@ dependencies = [ "futures", "log", "nohash-hasher", - "parking_lot", + "parking_lot 0.12.1", "pin-project", "rand", "static_assertions", diff --git a/bin/tools/Cargo.toml b/bin/tools/Cargo.toml index a812724b0..ecab7abba 100644 --- a/bin/tools/Cargo.toml +++ b/bin/tools/Cargo.toml @@ -12,13 +12,24 @@ Rundler tools [dependencies] rundler-dev = { path = "../../crates/dev" } rundler-rpc = { path = "../../crates/rpc" } +rundler-network = { path = "../../crates/network" } anyhow = "1.0.70" clap = { version = "4.2.4", features = ["derive", "env"] } dotenv = "0.15.0" ethers = "2.0.8" ethers-signers = {version = "2.0.8", features = ["aws"] } +hex = "0.4.3" rusoto_core = { version = "0.48.0", default-features = false, features = ["rustls"] } rusoto_kms = { version = "0.48.0", default-features = false, features = ["rustls"] } serde_json = "1.0.96" tokio.workspace = true +futures.workspace = true +enr = { version = "*", features = ["serde", "k256"] } +discv5 = { version = "0.3.1", features = ["libp2p"] } +env_logger = "0.10.0" + +[dependencies.libp2p] +version = "0.52.3" +default-features = false +features = ["tokio", "identify", "noise", "macros", "ping", "tcp", "identify", "yamux", "secp256k1"] diff --git a/bin/tools/src/bin/network.rs b/bin/tools/src/bin/network.rs new file mode 100644 index 000000000..9dd94dfa6 --- /dev/null +++ b/bin/tools/src/bin/network.rs @@ -0,0 +1,77 @@ +// This file is part of Rundler. +// +// Rundler is free software: you can redistribute it and/or modify it under the +// terms of the GNU Lesser General Public License as published by the Free Software +// Foundation, either version 3 of the License, or (at your option) any later version. +// +// Rundler is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; +// without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +// See the GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License along with Rundler. +// If not, see https://www.gnu.org/licenses/. + +use std::{net::SocketAddr, time::Duration}; + +use clap::Parser; +use ethers::types::H256; +use rundler_network::{Config as NetworkConfig, ConnectionConfig, Network}; +use tokio::sync::mpsc; + +const PRIVATE_KEY: &str = "b0ddfec7d365b4599ff8367e960f8c4890364f99e2151beac352338cc0cfe1bc"; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + env_logger::init(); + + let cli = Cli::parse(); + + let private_key = if cli.bootnode.is_none() { + println!("Starting as a bootnode"); + PRIVATE_KEY.into() + } else { + println!("Starting as a regular node"); + let enr_key = discv5::enr::CombinedKey::generate_secp256k1(); + hex::encode(enr_key.encode()) + }; + + let bootnodes = cli + .bootnode + .map(|b| vec![b.parse().expect("invalid bootnode")]) + .unwrap_or_default(); + let listen_address: SocketAddr = format!("127.0.0.1:{}", cli.port).parse()?; + + let config = NetworkConfig { + private_key, + listen_address, + bootnodes, + network_config: ConnectionConfig { + max_chunk_size: 1048576, + request_timeout: Duration::from_secs(10), + ttfb_timeout: Duration::from_secs(5), + }, + supported_mempools: vec![H256::random()], + metadata_seq_number: 0, + }; + + let (_, action_recv) = mpsc::unbounded_channel(); + let (event_send, _) = mpsc::unbounded_channel(); + + println!("Config: {:?}", config); + let network = Network::new(config, event_send, action_recv).await?; + println!("ENR: {}", network.enr()); + + network.run().await?; + Ok(()) +} + +#[derive(Debug, Parser)] +#[clap(name = "rundler network tester")] +struct Cli { + /// The port used to listen on all interfaces + #[clap(short)] + port: u16, + + #[clap(short)] + bootnode: Option, +} diff --git a/crates/network/Cargo.toml b/crates/network/Cargo.toml index 82d2c37fa..45a9885e3 100644 --- a/crates/network/Cargo.toml +++ b/crates/network/Cargo.toml @@ -14,6 +14,12 @@ ethereum_ssz_derive = "0.5.3" ethers.workspace = true snap = "1.1.0" ssz_types = "0.5.4" +discv5 = { version = "0.3.1", features = ["libp2p"] } +futures.workspace = true +hex = "0.4.3" +libp2p-mplex = "0.40.0" +tokio.workspace = true +tokio-io-timeout = "1.2.0" tokio-util = { workspace = true, features = ["codec", "compat"] } thiserror.workspace = true tracing.workspace = true @@ -26,3 +32,4 @@ features = ["tokio", "noise", "macros", "tcp", "identify", "yamux", "secp256k1"] [dev-dependencies] rand.workspace = true +tracing-test = { version = "0.2.4", features = ["no-env-filter"] } diff --git a/crates/network/src/behaviour.rs b/crates/network/src/behaviour.rs new file mode 100644 index 000000000..570f1fa2a --- /dev/null +++ b/crates/network/src/behaviour.rs @@ -0,0 +1,29 @@ +// This file is part of Rundler. +// +// Rundler is free software: you can redistribute it and/or modify it under the +// terms of the GNU Lesser General Public License as published by the Free Software +// Foundation, either version 3 of the License, or (at your option) any later version. +// +// Rundler is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; +// without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +// See the GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License along with Rundler. +// If not, see https://www.gnu.org/licenses/. + +use libp2p::{ + identify, + swarm::{keep_alive, NetworkBehaviour}, +}; + +use crate::rpc; + +#[derive(NetworkBehaviour)] +pub(crate) struct Behaviour { + // TODO(danc): temp, remove when not needed + pub(crate) keep_alive: keep_alive::Behaviour, + // Request/response protocol + pub(crate) rpc: rpc::Behaviour, + // Identity protocol + pub(crate) identify: identify::Behaviour, +} diff --git a/crates/network/src/enr.rs b/crates/network/src/enr.rs new file mode 100644 index 000000000..f6d5ec13f --- /dev/null +++ b/crates/network/src/enr.rs @@ -0,0 +1,98 @@ +// This file is part of Rundler. +// +// Rundler is free software: you can redistribute it and/or modify it under the +// terms of the GNU Lesser General Public License as published by the Free Software +// Foundation, either version 3 of the License, or (at your option) any later version. +// +// Rundler is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; +// without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +// See the GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License along with Rundler. +// If not, see https://www.gnu.org/licenses/. + +// Adapted from https://github.com/sigp/lighthouse/blob/stable/beacon_node/lighthouse_network/src/discovery/enr_ext.rs + +use discv5::{ + enr::{k256, CombinedKey, CombinedPublicKey, EnrBuilder}, + Enr, +}; +use libp2p::{ + identity::{ed25519, secp256k1, PublicKey}, + multiaddr::Protocol, + Multiaddr, PeerId, +}; + +use crate::Config; + +pub(crate) fn build_enr(config: &Config) -> (Enr, CombinedKey) { + let enr_key: CombinedKey = + k256::ecdsa::SigningKey::from_slice(&hex::decode(&config.private_key).unwrap()) + .unwrap() + .into(); + + let enr = EnrBuilder::new("v4") + .ip(config.listen_address.ip()) + .tcp4(config.listen_address.port()) + .udp4(config.listen_address.port() + 1) + .build(&enr_key) + .unwrap(); + + (enr, enr_key) +} + +pub(crate) trait EnrExt { + fn multiaddr(&self) -> Vec; + + fn peer_id(&self) -> PeerId; +} + +impl EnrExt for Enr { + fn multiaddr(&self) -> Vec { + let mut multiaddrs: Vec = Vec::new(); + if let Some(ip) = self.ip4() { + if let Some(udp) = self.udp4() { + let mut multiaddr: Multiaddr = ip.into(); + multiaddr.push(Protocol::Udp(udp)); + multiaddrs.push(multiaddr); + } + if let Some(tcp) = self.tcp4() { + let mut multiaddr: Multiaddr = ip.into(); + multiaddr.push(Protocol::Tcp(tcp)); + multiaddrs.push(multiaddr); + } + } + multiaddrs + } + + fn peer_id(&self) -> PeerId { + self.public_key().as_peer_id() + } +} + +pub(crate) trait CombinedKeyPublicExt { + /// Converts the publickey into a peer id, without consuming the key. + fn as_peer_id(&self) -> PeerId; +} + +impl CombinedKeyPublicExt for CombinedPublicKey { + /// Converts the publickey into a peer id, without consuming the key. + fn as_peer_id(&self) -> PeerId { + match self { + Self::Secp256k1(pk) => { + let pk_bytes = pk.to_sec1_bytes(); + let libp2p_pk: PublicKey = secp256k1::PublicKey::try_from_bytes(&pk_bytes) + .expect("valid public key") + .into(); + PeerId::from_public_key(&libp2p_pk) + } + Self::Ed25519(pk) => { + let pk_bytes = pk.to_bytes(); + let libp2p_pk: PublicKey = ed25519::PublicKey::try_from_bytes(&pk_bytes) + .expect("valid public key") + .into(); + PeerId::from_public_key(&libp2p_pk) + } + } + } +} diff --git a/crates/network/src/error.rs b/crates/network/src/error.rs new file mode 100644 index 000000000..00b3a88a4 --- /dev/null +++ b/crates/network/src/error.rs @@ -0,0 +1,19 @@ +// This file is part of Rundler. +// +// Rundler is free software: you can redistribute it and/or modify it under the +// terms of the GNU Lesser General Public License as published by the Free Software +// Foundation, either version 3 of the License, or (at your option) any later version. +// +// Rundler is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; +// without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +// See the GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License along with Rundler. +// If not, see https://www.gnu.org/licenses/. + +/// Network errors +#[derive(thiserror::Error, Debug)] +pub enum Error {} + +/// Network result +pub type Result = std::result::Result; diff --git a/crates/network/src/lib.rs b/crates/network/src/lib.rs index f105462d6..fbd59bbb3 100644 --- a/crates/network/src/lib.rs +++ b/crates/network/src/lib.rs @@ -24,6 +24,23 @@ //! Lots of inspiration for the components of this implementation were taken from the Lighthouse implementation //! of the Ethereum consensus layer p2p protocol. See [here](https://github.com/sigp/lighthouse/) for more details. -// TODO(danc): remove this before release -#[allow(dead_code)] mod rpc; +pub use rpc::{ + message::{ + ErrorKind as ResponseErrorKind, PooledUserOpHashesRequest, PooledUserOpHashesResponse, + PooledUserOpsByHashRequest, PooledUserOpsByHashResponse, MAX_OPS_PER_REQUEST, + }, + ConnectionConfig, +}; + +mod behaviour; + +mod network; +pub use network::{ + Action, AppRequest, AppRequestId, AppResponse, AppResponseResult, Config, Event, Network, +}; + +mod enr; + +mod error; +pub use error::{Error, Result}; diff --git a/crates/network/src/network.rs b/crates/network/src/network.rs new file mode 100644 index 000000000..8968f5433 --- /dev/null +++ b/crates/network/src/network.rs @@ -0,0 +1,487 @@ +// This file is part of Rundler. +// +// Rundler is free software: you can redistribute it and/or modify it under the +// terms of the GNU Lesser General Public License as published by the Free Software +// Foundation, either version 3 of the License, or (at your option) any later version. +// +// Rundler is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; +// without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +// See the GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License along with Rundler. +// If not, see https://www.gnu.org/licenses/. + +use std::net::SocketAddr; + +use discv5::Enr; +use ethers::types::H256; +use futures::StreamExt; +use libp2p::{ + core, + core::{transport::upgrade, ConnectedPoint}, + identify, + identity::{secp256k1, Keypair}, + noise, + swarm::{keep_alive, SwarmBuilder, SwarmEvent}, + tcp, Multiaddr, PeerId, Swarm, Transport, +}; +use tokio::sync::mpsc; +use tracing::{debug, error, info}; + +use crate::{ + behaviour::{Behaviour, BehaviourEvent}, + enr::{self, EnrExt}, + error::Result, + rpc::{ + self, + message::{ + ErrorKind, Metadata, Ping, Pong, PooledUserOpHashesRequest, PooledUserOpHashesResponse, + PooledUserOpsByHashRequest, PooledUserOpsByHashResponse, Request, Response, + ResponseError, Status, MAX_OPS_PER_REQUEST, + }, + ConnectionConfig, + }, +}; + +/// Configuration for the network +#[derive(Debug)] +pub struct Config { + /// private key in hex for ENR + pub private_key: String, + /// address to listen on + pub listen_address: SocketAddr, + /// bootnodes to connect to + pub bootnodes: Vec, + /// configuration for network connections + pub network_config: ConnectionConfig, + /// supported mempools + pub supported_mempools: Vec, + /// metadata sequence number + pub metadata_seq_number: u64, +} + +impl Default for Config { + fn default() -> Self { + Self { + private_key: String::default(), + listen_address: SocketAddr::new( + std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), + 0, + ), + bootnodes: vec![], + network_config: ConnectionConfig::default(), + supported_mempools: vec![], + metadata_seq_number: 0, + } + } +} + +/// Requests available to application +#[derive(Debug)] +pub enum AppRequest { + /// Pooled user op hashes request + PooledUserOpHashes(PooledUserOpHashesRequest), + /// Pooled user ops by hash request + PooledUserOpsByHash(PooledUserOpsByHashRequest), +} + +/// Responses available to application +#[derive(Debug)] +pub enum AppResponse { + /// Pooled user op hashes response + PooledUserOpHashes(PooledUserOpHashesResponse), + /// Pooled user ops by hash response + PooledUserOpsByHash(PooledUserOpsByHashResponse), +} + +/// Result of application request +pub type AppResponseResult = std::result::Result; + +/// Request id for application +/// +/// Use to associate requests from the application with responses +/// from the network. +#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] +pub struct AppRequestId(pub u64); + +/// Request id for the network +/// +/// Use to associate requests from the network with responses +/// from the application. +#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] +pub struct PeerRequestId(pub u64); + +/// Events emitted by the network +#[derive(Debug)] +pub enum Event { + /// Peer connected, can send requests + PeerConnected(PeerId), + /// Peer disconnected, can no longer send requests + PeerDisconnected(PeerId), + /// Network shutdown complete + ShutdownComplete, + /// Request from peer + RequestReceived(PeerId, PeerRequestId, AppRequest), + /// Response from peer + ResponseReceived(PeerId, AppRequestId, AppResponseResult), + // TODO: gossip messages will go here +} + +/// Actions sent to the network +#[derive(Debug)] +pub enum Action { + /// Shutdown the network, say goodbye to peers + /// + /// This will eventually cause the network to shutdown after goodbye + /// messages are sent to all peers. The network will send a + /// `ShutdownComplete` event when shutdown is complete. + Shutdown, + /// Request to send to peer + Request(PeerId, AppRequestId, AppRequest), + /// Response to send to peer + Response(PeerRequestId, AppResponseResult), + // TODO: gossip messages will go here +} + +// Requests sent by the network +#[derive(Debug)] +pub(crate) enum NetworkRequestId { + // Internal requests from the network + Internal, + // External requests from the application + App(AppRequestId), +} + +/// ERC-4337 P2P network +pub struct Network { + swarm: Swarm, + enr: Enr, + config: Config, + + event_sender: mpsc::UnboundedSender, + action_recv: mpsc::UnboundedReceiver, +} + +impl Network { + /// Create a new network + pub async fn new( + config: Config, + event_sender: mpsc::UnboundedSender, + action_recv: mpsc::UnboundedReceiver, + ) -> Result { + let local_key: Keypair = secp256k1::Keypair::from( + secp256k1::SecretKey::try_from_bytes(&mut hex::decode(&config.private_key).unwrap()) + .unwrap(), + ) + .into(); + let local_peer_id = PeerId::from(local_key.public()); + + let (enr, _enr_key) = enr::build_enr(&config); + + let mplex_config = libp2p_mplex::MplexConfig::new(); + let yamux_config = libp2p::yamux::Config::default(); + + let transport = tcp::tokio::Transport::default() + .upgrade(upgrade::Version::V1Lazy) + .authenticate(noise::Config::new(&local_key).unwrap()) + .multiplex(core::upgrade::SelectUpgrade::new( + yamux_config, + mplex_config, + )) + .boxed(); + + let rpc = rpc::Behaviour::new(config.network_config.clone()); + + let behaviour = Behaviour { + keep_alive: keep_alive::Behaviour, + rpc, + identify: identify::Behaviour::new(identify::Config::new( + "erc4337/0.1.0".into(), + local_key.public(), + )), + }; + + let multi_addr: Multiaddr = match config.listen_address { + SocketAddr::V4(addr) => format!("/ip4/{}/tcp/{}", addr.ip(), addr.port()), + SocketAddr::V6(addr) => format!("/ip6/{}/tcp/{}", addr.ip(), addr.port()), + } + .try_into() + .unwrap(); + + let mut swarm = + SwarmBuilder::with_tokio_executor(transport, behaviour, local_peer_id).build(); + + swarm.listen_on(multi_addr).unwrap(); + + for bootnode in &config.bootnodes { + for multi_addr in bootnode.multiaddr() { + if multi_addr.to_string().contains("udp") { + continue; + } + debug!("Dialing bootnode multiaddr {:?}", multi_addr); + swarm.dial(multi_addr).unwrap(); + } + } + + Ok(Self { + swarm, + enr, + config, + event_sender, + action_recv, + }) + } + + /// Run the network. Consumes self. + pub async fn run(mut self) -> Result<()> { + loop { + tokio::select! { + Some(action) = self.action_recv.recv() => { + match action { + Action::Request(peer_id, request_id, req) => { + self.swarm.behaviour_mut().rpc.send_request( + &peer_id, + NetworkRequestId::App(request_id), + req.into(), + ); + }, + Action::Response(request_id, req) => { + self.swarm.behaviour_mut().rpc.send_response( + request_id, + req.map(Into::into), + ); + }, + Action::Shutdown => { + info!("Shutting down network"); + + // TODO: say goodbye to peers, wait for requests to be sent + + self.send_event(Event::ShutdownComplete); + return Ok(()); + } + } + }, + Some(swarm_event) = self.swarm.next() => { + match swarm_event { + SwarmEvent::NewListenAddr { address, .. } => info!("Listening on {address:?}"), + SwarmEvent::Behaviour(BehaviourEvent::Identify(e)) => debug!("Identify: {e:?}"), + SwarmEvent::Behaviour(BehaviourEvent::Rpc(e)) => self.on_rpc_event(e), + SwarmEvent::ConnectionEstablished { + peer_id, endpoint, .. + } => { + self.on_connection_established(peer_id, endpoint); + } + SwarmEvent::ConnectionClosed { + peer_id, endpoint, .. + } => { + debug!("Connection closed with peer {:?} at endpoint {:?}", peer_id, endpoint); + self.send_event(Event::PeerDisconnected(peer_id)); + } + e => debug!("Unhandled event: {:?}", e), + } + }, + } + } + } + + /// Get the ENR of the node after constructing + pub fn enr(&self) -> &Enr { + &self.enr + } + + fn on_connection_established(&mut self, peer: PeerId, endpoint: ConnectedPoint) { + info!( + "Connection established with peer {:?} at endpoint {:?}", + peer, endpoint + ); + match endpoint { + ConnectedPoint::Dialer { .. } => { + self.request_status(peer); + } + ConnectedPoint::Listener { .. } => {} + } + self.send_event(Event::PeerConnected(peer)); + } + + fn on_rpc_event(&mut self, event: rpc::Event) { + match event { + rpc::Event::Request(peer_id, request_id, request) => match request { + Request::Status(status) => { + debug!("Received status: {:?} from peer {:?}", status, peer_id); + self.swarm.behaviour_mut().rpc.send_response( + request_id, + Ok(Response::Status(Status { + supported_mempools: self.config.supported_mempools.clone(), + })), + ); + // Notify peer manager to update status + } + Request::Goodbye(_) => { + // Notify peer manager to remove peer + todo!(); + } + Request::Ping(ping) => { + debug!("Received ping: {:?} from peer {:?}", ping, peer_id); + self.swarm.behaviour_mut().rpc.send_response( + request_id, + Ok(Response::Ping(Pong { + metadata_seq_number: self.config.metadata_seq_number, + })), + ); + // Notify peer manager to update ping time + } + Request::Metadata => { + self.swarm.behaviour_mut().rpc.send_response( + request_id, + Ok(Response::Metadata(Metadata { + seq_number: self.config.metadata_seq_number, + })), + ); + } + Request::PooledUserOpHashes(r) => { + self.send_event(Event::RequestReceived( + peer_id, + request_id, + AppRequest::PooledUserOpHashes(r), + )); + } + Request::PooledUserOpsByHash(r) => { + if r.hashes.len() > MAX_OPS_PER_REQUEST { + self.swarm.behaviour_mut().rpc.send_response( + request_id, + Err(ResponseError { + kind: ErrorKind::InvalidRequest, + message: format!( + "Too many hashes in request. Max: {}, Got: {}", + MAX_OPS_PER_REQUEST, + r.hashes.len() + ), + }), + ); + // Notify peer manager to update metadata + } else { + // Send request to network manager + self.send_event(Event::RequestReceived( + peer_id, + request_id, + AppRequest::PooledUserOpsByHash(r), + )); + } + } + }, + rpc::Event::Response(peer_id, request_id, response) => match response { + Ok(Response::Status(status)) => { + debug!("Received status: {:?} from peer {:?}", status, peer_id); + // Notify peer manager to update status + } + Ok(Response::Ping(pong)) => { + debug!("Received pong: {:?} from peer {:?}", pong, peer_id); + // Notify peer manager to update ping time + } + Ok(Response::Metadata(_)) => { + todo!(); + // Notify peer manager to update metadata + } + Ok(Response::PooledUserOpHashes(r)) => { + let NetworkRequestId::App(id) = request_id else { + error!( + "Received unexpected request id for PooledUserOpHashes: {:?}", + request_id + ); + return; + }; + + // Send response to network manager + self.send_event(Event::ResponseReceived( + peer_id, + id, + Ok(AppResponse::PooledUserOpHashes(r)), + )); + } + Ok(Response::PooledUserOpsByHash(r)) => { + let NetworkRequestId::App(id) = request_id else { + error!( + "Received unexpected request id for PooledUserOpsByHash: {:?}", + request_id + ); + return; + }; + + // Send response to network manager + self.send_event(Event::ResponseReceived( + peer_id, + id, + Ok(AppResponse::PooledUserOpsByHash(r)), + )); + } + Err(e) => { + error!("Received response error: {:?}", e); + // If external request, return error to network manager + if let NetworkRequestId::App(id) = request_id { + self.send_event(Event::ResponseReceived(peer_id, id, Err(e))); + } + } + }, + } + } + + fn send_event(&mut self, event: Event) { + if let Err(e) = self.event_sender.send(event) { + error!("Failed to send event to application: {:?}", e); + } + } + + // A status request is sent in response to a new dialed connection + // and at a regular interval to all peers. + fn request_status(&mut self, peer: PeerId) { + debug!("Requesting status from peer {:?}", peer); + self.swarm.behaviour_mut().rpc.send_request( + &peer, + NetworkRequestId::Internal, + Request::Status(Status { + supported_mempools: self.config.supported_mempools.clone(), + }), + ); + } + + // A metadata request is sent when a ping/pong is received with a different + // metadata sequence number than the local cached version. + fn _request_metadata(&mut self, peer: PeerId) { + debug!("Sending metadata request to peer {:?}", peer); + self.swarm.behaviour_mut().rpc.send_request( + &peer, + NetworkRequestId::Internal, + Request::Metadata, + ); + } + + // A ping request is sent at a regular interval to all peers. + fn _ping(&mut self, peer: PeerId) { + debug!("Sending ping to peer {:?}", peer); + self.swarm.behaviour_mut().rpc.send_request( + &peer, + NetworkRequestId::Internal, + Request::Ping(Ping { + metadata_seq_number: self.config.metadata_seq_number, + }), + ); + } +} + +impl From for Request { + fn from(req: AppRequest) -> Self { + match req { + AppRequest::PooledUserOpHashes(r) => Request::PooledUserOpHashes(r), + AppRequest::PooledUserOpsByHash(r) => Request::PooledUserOpsByHash(r), + } + } +} + +impl From for Response { + fn from(res: AppResponse) -> Self { + match res { + AppResponse::PooledUserOpHashes(r) => Response::PooledUserOpHashes(r), + AppResponse::PooledUserOpsByHash(r) => Response::PooledUserOpsByHash(r), + } + } +} diff --git a/crates/network/src/rpc/behaviour.rs b/crates/network/src/rpc/behaviour.rs new file mode 100644 index 000000000..9dd7b3ee9 --- /dev/null +++ b/crates/network/src/rpc/behaviour.rs @@ -0,0 +1,164 @@ +// This file is part of Rundler. +// +// Rundler is free software: you can redistribute it and/or modify it under the +// terms of the GNU Lesser General Public License as published by the Free Software +// Foundation, either version 3 of the License, or (at your option) any later version. +// +// Rundler is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; +// without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +// See the GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License along with Rundler. +// If not, see https://www.gnu.org/licenses/. + +use std::{ + collections::{HashMap, VecDeque}, + sync::{atomic::AtomicU64, Arc}, + task::{Context, Poll}, +}; + +use libp2p::{ + core::Endpoint, + swarm::{ + ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, NotifyHandler, PollParameters, + THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, + }, + Multiaddr, PeerId, +}; +use tokio::sync::oneshot; +use tracing::error; + +use super::{ + handler::{ConnectionConfig, Event as HandlerEvent, Handler, NotifyEvent}, + message::{Request, Response, ResponseError, ResponseResult}, +}; +use crate::network::{NetworkRequestId, PeerRequestId}; + +#[derive(Debug)] +pub(crate) enum Event { + Request(PeerId, PeerRequestId, Request), + Response(PeerId, NetworkRequestId, Result), +} + +#[derive(Debug)] +pub(crate) struct Behaviour { + network_config: ConnectionConfig, + global_request_id: Arc, + pending_events: VecDeque>, + // TODO time these out + pending_inbound_responses: HashMap>, +} + +impl Behaviour { + /// Create a new RPC behaviour + pub(crate) fn new(network_config: ConnectionConfig) -> Self { + Self { + network_config, + global_request_id: Arc::new(AtomicU64::new(0)), + pending_events: VecDeque::new(), + pending_inbound_responses: HashMap::new(), + } + } + + /// Send a request to a peer + pub(crate) fn send_request(&mut self, peer: &PeerId, id: NetworkRequestId, request: Request) { + self.pending_events.push_back(ToSwarm::NotifyHandler { + peer_id: *peer, + handler: NotifyHandler::Any, + event: NotifyEvent::Request(id, request), + }); + } + + /// Send a response to a peer + pub(crate) fn send_response(&mut self, request_id: PeerRequestId, response: ResponseResult) { + if let Some(sender) = self.pending_inbound_responses.remove(&request_id) { + sender.send(response).unwrap_or_else(|e| { + error!("Failed to send response to peer: {:?}", e); + }); + }; + } +} + +impl NetworkBehaviour for Behaviour { + type ConnectionHandler = Handler; + type ToSwarm = Event; + + fn handle_established_inbound_connection( + &mut self, + _connection_id: ConnectionId, + _peer: PeerId, + _local_addr: &Multiaddr, + _remote_addr: &Multiaddr, + ) -> Result, ConnectionDenied> { + Ok(Handler::new( + self.network_config.clone(), + Arc::clone(&self.global_request_id), + )) + } + + fn handle_established_outbound_connection( + &mut self, + _connection_id: ConnectionId, + _peer: PeerId, + _addr: &Multiaddr, + _role_override: Endpoint, + ) -> Result, ConnectionDenied> { + Ok(Handler::new( + self.network_config.clone(), + Arc::clone(&self.global_request_id), + )) + } + + fn on_swarm_event(&mut self, event: FromSwarm<'_, Self::ConnectionHandler>) { + match event { + FromSwarm::ConnectionEstablished(_) => {} + FromSwarm::ConnectionClosed(_) => {} + FromSwarm::AddressChange(_) => {} + FromSwarm::DialFailure(_) => {} + FromSwarm::ListenFailure(_) => {} + FromSwarm::NewListener(_) => {} + FromSwarm::NewListenAddr(_) => {} + FromSwarm::ExpiredListenAddr(_) => {} + FromSwarm::ListenerError(_) => {} + FromSwarm::ListenerClosed(_) => {} + FromSwarm::NewExternalAddrCandidate(_) => {} + FromSwarm::ExternalAddrExpired(_) => {} + FromSwarm::ExternalAddrConfirmed(_) => {} + } + } + + fn on_connection_handler_event( + &mut self, + peer_id: PeerId, + _connection_id: ConnectionId, + event: THandlerOutEvent, + ) { + match event { + HandlerEvent::InboundRequest(request_id, request, sender) => { + self.pending_inbound_responses.insert(request_id, sender); + self.pending_events + .push_back(ToSwarm::GenerateEvent(Event::Request( + peer_id, request_id, request, + ))); + } + HandlerEvent::OutboundResponse(request_id, response) => { + self.pending_events + .push_back(ToSwarm::GenerateEvent(Event::Response( + peer_id, request_id, response, + ))); + } + } + } + + fn poll( + &mut self, + _cx: &mut Context<'_>, + _params: &mut impl PollParameters, + ) -> Poll>> { + if let Some(event) = self.pending_events.pop_front() { + return Poll::Ready(event); + } + + Poll::Pending + } +} diff --git a/crates/network/src/rpc/handler/handler.rs b/crates/network/src/rpc/handler/handler.rs new file mode 100644 index 000000000..353032483 --- /dev/null +++ b/crates/network/src/rpc/handler/handler.rs @@ -0,0 +1,254 @@ +// This file is part of Rundler. +// +// Rundler is free software: you can redistribute it and/or modify it under the +// terms of the GNU Lesser General Public License as published by the Free Software +// Foundation, either version 3 of the License, or (at your option) any later version. +// +// Rundler is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; +// without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +// See the GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License along with Rundler. +// If not, see https://www.gnu.org/licenses/. + +use std::{ + collections::VecDeque, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, + task::{Context, Poll}, + time::Duration, +}; + +use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt, StreamExt, TryFutureExt}; +use libp2p::swarm::{ + handler::{ConnectionEvent, FullyNegotiatedInbound, FullyNegotiatedOutbound}, + ConnectionHandler, ConnectionHandlerEvent, KeepAlive, SubstreamProtocol, +}; +use tokio::sync::oneshot; +use tracing::{debug, error}; + +use super::{inbound::InboundProtocol, outbound::OutboundProtocol}; +use crate::{ + network::{NetworkRequestId, PeerRequestId}, + rpc::message::{Request, ResponseResult}, +}; + +const DEFAULT_MAX_CHUNK_SIZE: usize = 1024 * 1024; +const DEFAULT_TTFB_TIMEOUT: Duration = Duration::from_secs(5); +const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(10); + +/// Configuration for the RPC connection +#[derive(Clone, Debug)] +pub struct ConnectionConfig { + /// Maximum chunk size to read in an RPC connection + pub max_chunk_size: usize, + /// Time to first byte timeout + pub ttfb_timeout: Duration, + /// Request timeout, resets between each chunk + pub request_timeout: Duration, +} + +impl Default for ConnectionConfig { + fn default() -> Self { + Self { + max_chunk_size: DEFAULT_MAX_CHUNK_SIZE, + ttfb_timeout: DEFAULT_TTFB_TIMEOUT, + request_timeout: DEFAULT_REQUEST_TIMEOUT, + } + } +} + +type InboundRecvResult = + Result<((PeerRequestId, Request), oneshot::Sender), oneshot::error::RecvError>; + +pub(crate) struct Handler { + network_config: ConnectionConfig, + global_request_id: Arc, + pending_inbound: FuturesUnordered>, + outbound_queue: VecDeque>, + pending_events: VecDeque, +} + +/// RPC handler error +#[derive(thiserror::Error, Debug)] +pub(crate) enum Error {} + +#[derive(Debug)] +pub(crate) enum Event { + InboundRequest(PeerRequestId, Request, oneshot::Sender), + OutboundResponse(NetworkRequestId, ResponseResult), +} + +#[derive(Debug)] +pub(crate) enum NotifyEvent { + Request(NetworkRequestId, Request), +} + +impl ConnectionHandler for Handler { + type FromBehaviour = NotifyEvent; + type ToBehaviour = Event; + type Error = Error; + type InboundProtocol = InboundProtocol; + type OutboundProtocol = OutboundProtocol; + type InboundOpenInfo = PeerRequestId; + type OutboundOpenInfo = NetworkRequestId; + + fn listen_protocol(&self) -> SubstreamProtocol { + // TODO this method is called in various places in the swarm to get the underlying supported protocols + // the constructed protocol is quickly thrown away. + // + // We should implement this such that it has no side-effects. + // This was copied from request-response and is hot garbage. + + let (request_sender, request_receiver) = oneshot::channel(); + let (response_sender, response_receiver) = oneshot::channel(); + + let peer_request_id = self.next_peer_request_id(); + + self.pending_inbound.push( + request_receiver + .map_ok(move |rq| (rq, response_sender)) + .boxed(), + ); + + SubstreamProtocol::new( + InboundProtocol { + request_sender, + response_receiver, + network_config: self.network_config.clone(), + request_id: peer_request_id, + }, + peer_request_id, + ) + } + + fn connection_keep_alive(&self) -> KeepAlive { + // TODO handle shutdowns + KeepAlive::Yes + } + + fn poll( + &mut self, + cx: &mut Context<'_>, + ) -> Poll< + ConnectionHandlerEvent< + Self::OutboundProtocol, + Self::OutboundOpenInfo, + Self::ToBehaviour, + Self::Error, + >, + > { + // Pending events + if let Some(event) = self.pending_events.pop_front() { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)); + } + + // handle inbound + while let Poll::Ready(Some(result)) = self.pending_inbound.poll_next_unpin(cx) { + if let Ok(((request_id, request), response_sender)) = result { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::InboundRequest(request_id, request, response_sender), + )); + } + // errors are handled in on_listen_upgrade_error + } + + // handle outbound + if let Some(substream) = self.outbound_queue.pop_front() { + return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { + protocol: substream, + }); + } + + Poll::Pending + } + + fn on_behaviour_event(&mut self, event: NotifyEvent) { + match event { + NotifyEvent::Request(request_id, request) => { + let substream = SubstreamProtocol::new( + OutboundProtocol { + request, + network_config: self.network_config.clone(), + }, + request_id, + ); + self.outbound_queue.push_back(substream); + } + } + } + + fn on_connection_event( + &mut self, + event: ConnectionEvent< + '_, + Self::InboundProtocol, + Self::OutboundProtocol, + Self::InboundOpenInfo, + Self::OutboundOpenInfo, + >, + ) { + match event { + ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound { + info: request_id, + .. + }) => self.on_fully_negotiated_inbound(request_id), + ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { + protocol: result, + info: request_id, + }) => self.on_fully_negotiated_outbound(request_id, result), + ConnectionEvent::DialUpgradeError(e) => { + // TODO handle this with peer manager + error!("Dial upgrade error"); + panic!("Dial upgrade error unhandled {:?}", e.error) + } + ConnectionEvent::ListenUpgradeError(e) => { + // TODO handle this with peer manager + // could be timeouts on our end, so deal with error types + error!("Listen upgrade error {:?}", e.error); + panic!("Listen upgrade error unhandled") + } + ConnectionEvent::AddressChange(_) => {} + ConnectionEvent::LocalProtocolsChange(_) => {} + ConnectionEvent::RemoteProtocolsChange(_) => {} + } + } +} + +impl Handler { + pub(crate) fn new(network_config: ConnectionConfig, global_request_id: Arc) -> Self { + Self { + network_config, + global_request_id, + pending_inbound: FuturesUnordered::new(), + outbound_queue: VecDeque::new(), + pending_events: VecDeque::new(), + } + } + + fn next_peer_request_id(&self) -> PeerRequestId { + PeerRequestId(self.global_request_id.fetch_add(1, Ordering::Relaxed)) + } + + fn on_fully_negotiated_inbound(&mut self, request_id: PeerRequestId) { + debug!( + "Inbound protocol fully negotiated. Request ID: {:?}", + request_id + ); + } + + fn on_fully_negotiated_outbound( + &mut self, + request_id: NetworkRequestId, + result: ResponseResult, + ) { + debug!( + "Outbound protocol fully negotiated. Request ID: {:?}", + request_id + ); + self.pending_events + .push_back(Event::OutboundResponse(request_id, result)); + } +} diff --git a/crates/network/src/rpc/handler/inbound.rs b/crates/network/src/rpc/handler/inbound.rs new file mode 100644 index 000000000..c2792bafd --- /dev/null +++ b/crates/network/src/rpc/handler/inbound.rs @@ -0,0 +1,107 @@ +// This file is part of Rundler. +// +// Rundler is free software: you can redistribute it and/or modify it under the +// terms of the GNU Lesser General Public License as published by the Free Software +// Foundation, either version 3 of the License, or (at your option) any later version. +// +// Rundler is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; +// without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +// See the GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License along with Rundler. +// If not, see https://www.gnu.org/licenses/. + +use futures::{future::BoxFuture, FutureExt, SinkExt, StreamExt}; +use libp2p::{core::UpgradeInfo, InboundUpgrade, Stream}; +use tokio::sync::oneshot; +use tokio_io_timeout::TimeoutStream; +use tokio_util::{codec::Framed, compat::FuturesAsyncReadCompatExt}; +use tracing::info; + +use super::{codec, serde, ConnectionConfig}; +use crate::{ + network::PeerRequestId, + rpc::{ + message::{Request, ResponseResult}, + protocol::{self, Encoding, Protocol, ProtocolError}, + }, +}; + +#[derive(Debug)] +pub(crate) struct InboundProtocol { + pub(crate) request_sender: oneshot::Sender<(PeerRequestId, Request)>, + pub(crate) response_receiver: oneshot::Receiver, + pub(crate) network_config: ConnectionConfig, + pub(crate) request_id: PeerRequestId, +} + +impl UpgradeInfo for InboundProtocol { + type Info = Protocol; + type InfoIter = Vec; + + fn protocol_info(&self) -> Self::InfoIter { + protocol::supported_protocols() + } +} + +impl InboundUpgrade for InboundProtocol { + type Output = (); + type Error = ProtocolError; + type Future = BoxFuture<'static, Result>; + + fn upgrade_inbound(self, socket: Stream, info: Self::Info) -> Self::Future { + let codec = match info.encoding { + Encoding::SSZSnappy => { + codec::InboundCodec::new(info.clone(), self.network_config.max_chunk_size) + } + }; + + let socket = socket.compat(); + let mut timed_socket = TimeoutStream::new(socket); + timed_socket.set_read_timeout(Some(self.network_config.ttfb_timeout)); + let stream = Framed::new(Box::pin(timed_socket), codec); + + async move { + let (request, mut stream) = match info.schema { + // metadata requests don't have a body + protocol::ProtocolSchema::MetadataV1 => (Request::Metadata, stream), + _ => { + match tokio::time::timeout( + self.network_config.request_timeout, + stream.into_future(), + ) + .await + { + Ok((Some(Ok(request)), stream)) => (request, stream), + Ok((Some(Err(e)), _)) => { + info!("Error decoding inbound request: {:?}", e); + return Err(ProtocolError::from(e)); + } + Ok((None, _)) => return Err(ProtocolError::IncompleteStream), + Err(_) => return Err(ProtocolError::StreamTimeout), + } + } + }; + + self.request_sender + .send((self.request_id, request)) + .map_err(|_| { + ProtocolError::Internal("Failed to send request up to handler".into()) + })?; + + match self.response_receiver.await { + Ok(response) => { + let chunks = serde::chunks_from_response(response)?; + for chunk in chunks { + stream.send(chunk).await?; + } + stream.close().await?; + } + Err(_) => return Err(ProtocolError::Internal("Failed to receive response".into())), + } + + Ok(()) + } + .boxed() + } +} diff --git a/crates/network/src/rpc/handler/mod.rs b/crates/network/src/rpc/handler/mod.rs index a8dc973e0..5b0eec28f 100644 --- a/crates/network/src/rpc/handler/mod.rs +++ b/crates/network/src/rpc/handler/mod.rs @@ -13,5 +13,12 @@ mod codec; pub(crate) use codec::CodecError; + +mod inbound; +mod outbound; mod serde; mod snappy; + +#[allow(clippy::module_inception)] +mod handler; +pub use handler::*; diff --git a/crates/network/src/rpc/handler/outbound.rs b/crates/network/src/rpc/handler/outbound.rs new file mode 100644 index 000000000..1d10f2508 --- /dev/null +++ b/crates/network/src/rpc/handler/outbound.rs @@ -0,0 +1,101 @@ +// This file is part of Rundler. +// +// Rundler is free software: you can redistribute it and/or modify it under the +// terms of the GNU Lesser General Public License as published by the Free Software +// Foundation, either version 3 of the License, or (at your option) any later version. +// +// Rundler is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; +// without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +// See the GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License along with Rundler. +// If not, see https://www.gnu.org/licenses/. + +use futures::{future::BoxFuture, FutureExt, SinkExt, StreamExt}; +use libp2p::{core::UpgradeInfo, OutboundUpgrade, Stream}; +use tokio_io_timeout::TimeoutStream; +use tokio_util::{codec::Framed, compat::FuturesAsyncReadCompatExt}; + +use super::{codec, serde, ConnectionConfig}; +use crate::rpc::{ + message::{Request, ResponseResult}, + protocol::{self, Encoding, Protocol, ProtocolError, ProtocolSchema}, +}; + +#[derive(Debug)] +pub(crate) struct OutboundProtocol { + pub(crate) request: Request, + pub(crate) network_config: ConnectionConfig, +} + +impl UpgradeInfo for OutboundProtocol { + type Info = Protocol; + type InfoIter = Vec; + + fn protocol_info(&self) -> Self::InfoIter { + protocol::request_protocols(&self.request) + } +} + +impl OutboundUpgrade for OutboundProtocol { + type Output = ResponseResult; + type Error = ProtocolError; + type Future = BoxFuture<'static, Result>; + + fn upgrade_outbound(self, socket: Stream, info: Self::Info) -> Self::Future { + let codec = match info.encoding { + Encoding::SSZSnappy => { + codec::OutboundCodec::new(info.clone(), self.network_config.max_chunk_size) + } + }; + + let socket = socket.compat(); + let mut timed_socket = TimeoutStream::new(socket); + // TODO this should be set after the request is sent. + timed_socket.set_read_timeout(Some(self.network_config.ttfb_timeout)); + + let mut socket = Framed::new(Box::pin(timed_socket), codec); + + async move { + let num_expected_response_chunks = self.request.num_expected_response_chunks(); + + match info.schema { + // nothing to send for metadata requests, just close + ProtocolSchema::MetadataV1 => {} + _ => socket.send(self.request).await?, + } + + // close the sink portion of the socket + socket.close().await?; + + let mut chunks = vec![]; + let mut socket = Some(socket); + for _ in 0..num_expected_response_chunks { + let to_consume = socket.take().unwrap(); + + let (chunk, tail) = match tokio::time::timeout( + self.network_config.request_timeout, + to_consume.into_future(), + ) + .await + { + Ok((Some(Ok(item)), tail)) => (item, tail), + Ok((Some(Err(e)), _)) => return Err(ProtocolError::from(e)), + Ok((None, _)) => return Err(ProtocolError::IncompleteStream), + Err(_) => return Err(ProtocolError::StreamTimeout), + }; + + // short circuit on error + if chunk.is_err() { + return Ok(serde::response_from_chunks(info.schema, vec![chunk])); + } + + chunks.push(chunk); + socket = Some(tail); + } + + Ok(serde::response_from_chunks(info.schema, chunks)) + } + .boxed() + } +} diff --git a/crates/network/src/rpc/handler/serde.rs b/crates/network/src/rpc/handler/serde.rs index 6dc5004eb..7fc76d3cb 100644 --- a/crates/network/src/rpc/handler/serde.rs +++ b/crates/network/src/rpc/handler/serde.rs @@ -91,6 +91,10 @@ impl SszChunkResult { Self(Err(error)) } + pub(crate) fn is_err(&self) -> bool { + self.0.is_err() + } + /// The code associated with a response chunk pub(crate) fn code(&self) -> u8 { match self.0 { diff --git a/crates/network/src/rpc/message.rs b/crates/network/src/rpc/message.rs index 791c02d45..82a85a127 100644 --- a/crates/network/src/rpc/message.rs +++ b/crates/network/src/rpc/message.rs @@ -15,8 +15,10 @@ use ethers::types::H256; use rundler_types::UserOperation; use ssz_derive::{Decode, Encode}; +/// Maximum length of an error message pub(crate) const MAX_ERROR_MESSAGE_LEN: usize = 256; -pub(crate) const MAX_OPS_PER_REQUEST: usize = 4096; +/// Maximum number of user ops per request +pub const MAX_OPS_PER_REQUEST: usize = 4096; /// Request types #[derive(Clone, Debug, PartialEq)] @@ -61,22 +63,23 @@ pub(crate) type ResponseResult = Result; /// Error for a response #[derive(Clone, Debug, PartialEq)] -pub(crate) struct ResponseError { +pub struct ResponseError { /// Kind of error - pub(crate) kind: ErrorKind, + pub kind: ErrorKind, /// Error message, may be empty if could not decode as UTF8 string - pub(crate) message: String, + pub message: String, } /// Error kinds #[derive(Clone, Debug, PartialEq)] -pub(crate) enum ErrorKind { +pub enum ErrorKind { /// Request is invalid InvalidRequest, /// Server error ServerError, /// Resource unavailable ResourceUnavailable, + /// Unknown error Unknown, } @@ -183,32 +186,32 @@ impl From for u64 { /// Pooled user op hashes request #[derive(Clone, Debug, PartialEq, Encode, Decode)] -pub(crate) struct PooledUserOpHashesRequest { +pub struct PooledUserOpHashesRequest { /// Hash id of the mempool - pub(crate) mempool: H256, + pub mempool: H256, /// Offset into the mempool - pub(crate) offset: u64, + pub offset: u64, } /// Pooled user op hashes response #[derive(Clone, Debug, PartialEq, Encode, Decode)] -pub(crate) struct PooledUserOpHashesResponse { +pub struct PooledUserOpHashesResponse { /// More user ops are available - pub(crate) more_flag: bool, + pub more_flag: bool, /// Hashes of the user ops - pub(crate) hashes: Vec, + pub hashes: Vec, } /// Pooled user ops by hash request #[derive(Clone, Debug, PartialEq, Encode, Decode)] -pub(crate) struct PooledUserOpsByHashRequest { +pub struct PooledUserOpsByHashRequest { /// Hashes of the user ops - pub(crate) hashes: Vec, + pub hashes: Vec, } /// Pooled user ops by hash response #[derive(Clone, Debug, PartialEq)] -pub(crate) struct PooledUserOpsByHashResponse { +pub struct PooledUserOpsByHashResponse { /// User ops - pub(crate) user_ops: Vec, + pub user_ops: Vec, } diff --git a/crates/network/src/rpc/mod.rs b/crates/network/src/rpc/mod.rs index 7db612ff8..8af789824 100644 --- a/crates/network/src/rpc/mod.rs +++ b/crates/network/src/rpc/mod.rs @@ -11,6 +11,12 @@ // You should have received a copy of the GNU General Public License along with Rundler. // If not, see https://www.gnu.org/licenses/. +mod behaviour; +pub(crate) use behaviour::{Behaviour, Event}; + mod handler; -mod message; +pub use handler::ConnectionConfig; + +pub(crate) mod message; + mod protocol; diff --git a/crates/network/tests/test_integration.rs b/crates/network/tests/test_integration.rs new file mode 100644 index 000000000..718966368 --- /dev/null +++ b/crates/network/tests/test_integration.rs @@ -0,0 +1,287 @@ +// This file is part of Rundler. +// +// Rundler is free software: you can redistribute it and/or modify it under the +// terms of the GNU Lesser General Public License as published by the Free Software +// Foundation, either version 3 of the License, or (at your option) any later version. +// +// Rundler is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; +// without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +// See the GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License along with Rundler. +// If not, see https://www.gnu.org/licenses/. + +use std::net::{Ipv4Addr, SocketAddr, TcpListener}; + +use discv5::Enr; +use ethers::types::H256; +use libp2p::PeerId; +use rundler_network::{ + Action, AppRequest, AppRequestId, AppResponse, Config, Event, Network, + PooledUserOpHashesRequest, PooledUserOpHashesResponse, PooledUserOpsByHashRequest, + PooledUserOpsByHashResponse, ResponseErrorKind, Result, MAX_OPS_PER_REQUEST, +}; +use rundler_types::UserOperation; +use tokio::{sync::mpsc, task::JoinHandle}; +use tracing_test::traced_test; + +struct TestNetworkContext { + handle: JoinHandle>, + action_sender: mpsc::UnboundedSender, + event_receiver: mpsc::UnboundedReceiver, + enr: Enr, +} + +fn unused_port() -> u16 { + let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0); + let listener = TcpListener::bind(addr).unwrap(); + listener.local_addr().unwrap().port() +} + +async fn setup_network(bootnodes: Vec, supported_mempools: Vec) -> TestNetworkContext { + let enr_key = discv5::enr::CombinedKey::generate_secp256k1(); + let private_key = hex::encode(enr_key.encode()); + + let config = Config { + bootnodes, + private_key, + supported_mempools, + listen_address: SocketAddr::new(Ipv4Addr::LOCALHOST.into(), unused_port()), + ..Default::default() + }; + + let (action_sender, action_receiver) = mpsc::unbounded_channel(); + let (event_sender, event_receiver) = mpsc::unbounded_channel(); + + let network = Network::new(config, event_sender, action_receiver) + .await + .unwrap(); + let enr = network.enr().clone(); + + let handle = tokio::spawn(async move { network.run().await }); + + TestNetworkContext { + handle, + action_sender, + event_receiver, + enr, + } +} + +async fn setup_node_pair() -> (TestNetworkContext, TestNetworkContext) { + let bootnode = setup_network(vec![], vec![]).await; + let node = setup_network(vec![bootnode.enr.clone()], vec![]).await; + (bootnode, node) +} + +async fn shutdown_node_pair(mut node0: TestNetworkContext, node1: TestNetworkContext) { + shutdown(node1).await; + + match node0.event_receiver.recv().await { + Some(Event::PeerDisconnected(_)) => {} + _ => panic!("Expected peer disconnected event"), + } + + shutdown(node0).await; +} + +async fn wait_for_pair_connect( + node0: &mut TestNetworkContext, + node1: &mut TestNetworkContext, +) -> (PeerId, PeerId) { + let peer0 = match node0.event_receiver.recv().await { + Some(Event::PeerConnected(peer_id)) => peer_id, + _ => panic!("Expected peer connected event"), + }; + + let peer1 = match node1.event_receiver.recv().await { + Some(Event::PeerConnected(peer_id)) => peer_id, + _ => panic!("Expected peer connected event"), + }; + + (peer0, peer1) +} + +async fn shutdown(mut context: TestNetworkContext) { + let _ = context.action_sender.send(Action::Shutdown); + match context.event_receiver.recv().await { + Some(Event::ShutdownComplete) => {} + _ => panic!("Expected shutdown event"), + } + let _ = context.handle.await.unwrap(); +} + +#[tokio::test] +async fn test_shutdown() { + let context = setup_network(vec![], vec![]).await; + shutdown(context).await; +} + +#[tokio::test] +#[traced_test] +async fn test_peer_connect() { + let (mut bootnode, mut node) = setup_node_pair().await; + + match node.event_receiver.recv().await { + Some(Event::PeerConnected(_)) => {} + _ => panic!("Expected peer connected event"), + } + + match bootnode.event_receiver.recv().await { + Some(Event::PeerConnected(_)) => {} + _ => panic!("Expected peer connected event"), + } + + shutdown_node_pair(bootnode, node).await; +} + +#[tokio::test] +#[traced_test] +async fn test_req_resp_op_hashes() { + let (mut bootnode, mut node) = setup_node_pair().await; + let (node_peer_id, bootnode_peer_id) = wait_for_pair_connect(&mut bootnode, &mut node).await; + + let mempool = H256::random(); + + bootnode + .action_sender + .send(Action::Request( + node_peer_id, + AppRequestId(0), + AppRequest::PooledUserOpHashes(PooledUserOpHashesRequest { mempool, offset: 0 }), + )) + .unwrap(); + + let request_id = match node.event_receiver.recv().await { + Some(Event::RequestReceived(peer_id, request_id, request)) => match request { + AppRequest::PooledUserOpHashes(r) => { + assert_eq!(peer_id, bootnode_peer_id); + assert_eq!(r.mempool, mempool); + assert_eq!(r.offset, 0); + request_id + } + _ => panic!("Expected pooled user op hashes request"), + }, + _ => panic!("Expected request received event"), + }; + + let hashes = vec![H256::random(), H256::random()]; + + node.action_sender + .send(Action::Response( + request_id, + Ok(AppResponse::PooledUserOpHashes( + PooledUserOpHashesResponse { + more_flag: true, + hashes: hashes.clone(), + }, + )), + )) + .unwrap(); + + match bootnode.event_receiver.recv().await { + Some(Event::ResponseReceived(peer_id, request_id, request)) => match request { + Ok(AppResponse::PooledUserOpHashes(r)) => { + assert_eq!(peer_id, node_peer_id); + assert_eq!(request_id, AppRequestId(0)); + assert!(r.more_flag); + assert_eq!(r.hashes, hashes); + } + _ => panic!("Expected pooled user op hashes response"), + }, + _ => panic!("Expected response received event"), + } + + shutdown_node_pair(bootnode, node).await; +} + +#[tokio::test] +#[traced_test] +async fn test_req_resp_ops_by_hashes() { + let (mut bootnode, mut node) = setup_node_pair().await; + let (node_peer_id, bootnode_peer_id) = wait_for_pair_connect(&mut bootnode, &mut node).await; + + let hashes = vec![H256::random(), H256::random()]; + + bootnode + .action_sender + .send(Action::Request( + node_peer_id, + AppRequestId(0), + AppRequest::PooledUserOpsByHash(PooledUserOpsByHashRequest { + hashes: hashes.clone(), + }), + )) + .unwrap(); + + let request_id = match node.event_receiver.recv().await { + Some(Event::RequestReceived(peer_id, request_id, request)) => match request { + AppRequest::PooledUserOpsByHash(r) => { + assert_eq!(peer_id, bootnode_peer_id); + assert_eq!(r.hashes, hashes); + request_id + } + _ => panic!("Expected pooled user op hashes request"), + }, + _ => panic!("Expected request received event"), + }; + + node.action_sender + .send(Action::Response( + request_id, + Ok(AppResponse::PooledUserOpsByHash( + PooledUserOpsByHashResponse { + user_ops: vec![UserOperation::default(), UserOperation::default()], + }, + )), + )) + .unwrap(); + + match bootnode.event_receiver.recv().await { + Some(Event::ResponseReceived(peer_id, request_id, request)) => match request { + Ok(AppResponse::PooledUserOpsByHash(r)) => { + assert_eq!(peer_id, node_peer_id); + assert_eq!(request_id, AppRequestId(0)); + assert_eq!(r.user_ops.len(), 2); + } + _ => panic!("Expected pooled user op hashes response"), + }, + _ => panic!("Expected response received event"), + } + + shutdown_node_pair(bootnode, node).await; +} + +#[tokio::test] +#[traced_test] +async fn test_req_resp_ops_by_hashes_too_many() { + let (mut bootnode, mut node) = setup_node_pair().await; + let (node_peer_id, _) = wait_for_pair_connect(&mut bootnode, &mut node).await; + + let hashes = vec![H256::random(); MAX_OPS_PER_REQUEST + 1]; + + bootnode + .action_sender + .send(Action::Request( + node_peer_id, + AppRequestId(0), + AppRequest::PooledUserOpsByHash(PooledUserOpsByHashRequest { + hashes: hashes.clone(), + }), + )) + .unwrap(); + + match bootnode.event_receiver.recv().await { + Some(Event::ResponseReceived(peer_id, request_id, response)) => match response { + Err(e) => { + assert_eq!(peer_id, node_peer_id); + assert_eq!(request_id, AppRequestId(0)); + assert_eq!(e.kind, ResponseErrorKind::InvalidRequest); + } + _ => panic!("Expected pooled user op hashes response error"), + }, + _ => panic!("Expected response received event"), + }; + + shutdown_node_pair(bootnode, node).await; +} diff --git a/deny.toml b/deny.toml index c6cb720f9..7f7aeaa0b 100644 --- a/deny.toml +++ b/deny.toml @@ -45,6 +45,7 @@ allow = [ "Apache-2.0", "BSD-2-Clause", "BSD-3-Clause", + "CC0-1.0", "ISC", "Unlicense", "MPL-2.0", @@ -54,7 +55,6 @@ allow = [ # Allow 1 or more licenses on a per-crate basis, so that particular licenses # aren't accepted for every possible crate as with the normal allow list exceptions = [ - { allow = ["CC0-1.0"], name = "tiny-keccak" }, { allow = ["Unicode-DFS-2016"], name = "unicode-ident" }, { allow = ["OpenSSL"], name = "ring" }, ]