From f5649805d8b2dd8b5d7339e6c0eac974fbec3ed3 Mon Sep 17 00:00:00 2001 From: Boris Erakhtin Date: Fri, 20 Dec 2024 16:52:24 +0500 Subject: [PATCH] max_parallel_downloads are passed by cli --- core/application/app_configuration.hpp | 4 ++++ .../impl/app_configuration_impl.cpp | 19 +++++++++++++------ .../impl/app_configuration_impl.hpp | 4 ++++ .../protocols/block_announce_protocol.cpp | 10 ++++------ .../protocols/block_announce_protocol.hpp | 1 + .../application/app_configuration_mock.hpp | 5 +++++ 6 files changed, 31 insertions(+), 12 deletions(-) diff --git a/core/application/app_configuration.hpp b/core/application/app_configuration.hpp index 6a5807553c..e76c40d058 100644 --- a/core/application/app_configuration.hpp +++ b/core/application/app_configuration.hpp @@ -325,6 +325,10 @@ namespace kagome::application { const = 0; virtual std::optional precompileWasm() const = 0; + /** + * @return maximum number of parallel downloads of announced block per peer + */ + virtual uint32_t maxParallelDownloads() const = 0; }; } // namespace kagome::application diff --git a/core/application/impl/app_configuration_impl.cpp b/core/application/impl/app_configuration_impl.cpp index 7e34615377..b3d6166206 100644 --- a/core/application/impl/app_configuration_impl.cpp +++ b/core/application/impl/app_configuration_impl.cpp @@ -115,6 +115,7 @@ namespace { #endif const uint32_t def_db_cache_size = 1024; const uint32_t def_parachain_runtime_instance_cache_size = 100; + constexpr auto max_concurrent_block_announce_validations_per_peer = 5; /** * Generate once at run random node name if form of UUID @@ -176,12 +177,11 @@ namespace { static constexpr std::array - interpreters { + interpreters{ #if KAGOME_WASM_COMPILER_WASM_EDGE == 1 - "WasmEdge", + "WasmEdge", #endif - "Binaryen" - }; + "Binaryen"}; static const std::string interpreters_str = fmt::format("[{}]", fmt::join(interpreters, ", ")); @@ -841,6 +841,9 @@ namespace kagome::application { ("rpc-methods", po::value(), R"("auto" (default), "unsafe", "safe")") ("no-mdns", po::bool_switch(), "(unused, zombienet stub)") ("prometheus-external", po::bool_switch(), "alias for \"--prometheus-host 0.0.0.0\"") + ("max-parallel-downloads", po::value()->default_value(max_concurrent_block_announce_validations_per_peer), + "Maximum number of peers from which to ask for the same blocks in parallel." + "This allows downloading announced blocks from multiple peers. Decrease to save traffic and risk increased latency.") ; po::options_description development_desc("Additional options"); @@ -912,8 +915,8 @@ namespace kagome::application { } if (vm.count("help") > 0) { - std::cout - << "Available subcommands: storage-explorer db-editor benchmark key\n"; + std::cout << "Available subcommands: storage-explorer db-editor " + "benchmark key\n"; std::cout << desc << '\n'; return false; } @@ -1600,6 +1603,10 @@ namespace kagome::application { runtime_exec_method_ = RuntimeExecutionMethod::Compile; } + max_parallel_downloads_ = + find_argument(vm, "max-parallel-downloads") + .value_or(max_concurrent_block_announce_validations_per_peer); + // if something wrong with config print help message if (not validate_config()) { std::cout << desc << '\n'; diff --git a/core/application/impl/app_configuration_impl.hpp b/core/application/impl/app_configuration_impl.hpp index 2421b9f874..a2d52419b3 100644 --- a/core/application/impl/app_configuration_impl.hpp +++ b/core/application/impl/app_configuration_impl.hpp @@ -238,6 +238,9 @@ namespace kagome::application { std::optional precompileWasm() const override { return precompile_wasm_; } + uint32_t maxParallelDownloads() const override { + return max_parallel_downloads_; + } private: void parse_general_segment(const rapidjson::Value &val); @@ -368,6 +371,7 @@ namespace kagome::application { bool prune_discarded_states_ = false; bool enable_thorough_pruning_ = false; std::optional blocks_pruning_; + uint32_t max_parallel_downloads_; bool enable_db_migration_ = false; std::optional dev_mnemonic_phrase_; std::string node_wss_pem_; diff --git a/core/network/impl/protocols/block_announce_protocol.cpp b/core/network/impl/protocols/block_announce_protocol.cpp index 645e207e6e..6c5f6cd600 100644 --- a/core/network/impl/protocols/block_announce_protocol.cpp +++ b/core/network/impl/protocols/block_announce_protocol.cpp @@ -26,8 +26,6 @@ namespace kagome::network { // https://github.com/paritytech/polkadot-sdk/blob/edf79aa972bcf2e043e18065a9bb860ecdbd1a6e/substrate/client/network/sync/src/engine.rs#L86 constexpr size_t kSeenCapacity = 1024; - constexpr auto MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS_PER_PEER = 4; - static const struct { void inc(bool inc) const { for (auto &metric : metrics) { @@ -77,7 +75,8 @@ namespace kagome::network { hasher_(std::move(hasher)), telemetry_peer_count_(std::move(telemetry_peer_count)), peer_manager_{std::move(peer_manager)}, - seen_{kSeenCapacity} { + seen_{kSeenCapacity}, + max_parallel_downloads_{app_config.maxParallelDownloads()} { BOOST_ASSERT(block_tree_ != nullptr); BOOST_ASSERT(observer_ != nullptr); BOOST_ASSERT(peer_manager_ != nullptr); @@ -128,8 +127,7 @@ namespace kagome::network { { std::shared_lock lock(active_peers_mutex_); selected_peers.push_back(peer_id); - if (active_peers_.size() - > MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS_PER_PEER) { + if (active_peers_.size() > max_parallel_downloads_) { std::vector temp_peers; std::copy_if(active_peers_.begin(), active_peers_.end(), @@ -138,7 +136,7 @@ namespace kagome::network { std::sample(temp_peers.begin(), temp_peers.end(), std::back_inserter(selected_peers), - MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS_PER_PEER - 1, + max_parallel_downloads_ ? max_parallel_downloads_ - 1 : 0, std::mt19937{std::random_device{}()}); } else { std::copy_if(active_peers_.begin(), diff --git a/core/network/impl/protocols/block_announce_protocol.hpp b/core/network/impl/protocols/block_announce_protocol.hpp index 9dc6b0de51..e4b1d095e2 100644 --- a/core/network/impl/protocols/block_announce_protocol.hpp +++ b/core/network/impl/protocols/block_announce_protocol.hpp @@ -88,6 +88,7 @@ namespace kagome::network { MapLruSet seen_; std::shared_mutex active_peers_mutex_; std::unordered_set active_peers_; + uint32_t max_parallel_downloads_; }; } // namespace kagome::network diff --git a/test/mock/core/application/app_configuration_mock.hpp b/test/mock/core/application/app_configuration_mock.hpp index 3fa0d2761e..7ff9bce8ac 100644 --- a/test/mock/core/application/app_configuration_mock.hpp +++ b/test/mock/core/application/app_configuration_mock.hpp @@ -197,6 +197,11 @@ namespace kagome::application { precompileWasm, (), (const, override)); + + MOCK_METHOD(std::optional, + maxParallelDownloads, + (), + (const, override)); }; } // namespace kagome::application