diff --git a/core/application/app_configuration.hpp b/core/application/app_configuration.hpp index 69c4140716..d77402de77 100644 --- a/core/application/app_configuration.hpp +++ b/core/application/app_configuration.hpp @@ -330,6 +330,8 @@ namespace kagome::application { const = 0; virtual std::optional precompileWasm() const = 0; + + virtual uint32_t maxParallelDownloads() const = 0; }; } // namespace kagome::application diff --git a/core/application/impl/app_configuration_impl.cpp b/core/application/impl/app_configuration_impl.cpp index c4130d05c7..c05c6e737a 100644 --- a/core/application/impl/app_configuration_impl.cpp +++ b/core/application/impl/app_configuration_impl.cpp @@ -116,6 +116,7 @@ namespace { #endif const uint32_t def_db_cache_size = 1024; const uint32_t def_parachain_runtime_instance_cache_size = 100; + const uint32_t def_max_parallel_downloads = 5; /** * Generate once at run random node name if form of UUID @@ -177,12 +178,11 @@ namespace { static constexpr std::array - interpreters { + interpreters{ #if KAGOME_WASM_COMPILER_WASM_EDGE == 1 - "WasmEdge", + "WasmEdge", #endif - "Binaryen" - }; + "Binaryen"}; static const std::string interpreters_str = fmt::format("[{}]", fmt::join(interpreters, ", ")); @@ -845,6 +845,9 @@ namespace kagome::application { ("rpc-methods", po::value(), R"("auto" (default), "unsafe", "safe")") ("no-mdns", po::bool_switch(), "(unused, zombienet stub)") ("prometheus-external", po::bool_switch(), "alias for \"--prometheus-host 0.0.0.0\"") + ("max-parallel-downloads", po::value()->default_value(def_max_parallel_downloads), + "Maximum number of peers from which to ask for the same blocks in parallel." + "This allows downloading announced blocks from multiple peers. Decrease to save traffic and risk increased latency.") ; po::options_description development_desc("Additional options"); @@ -1606,6 +1609,9 @@ namespace kagome::application { runtime_exec_method_ = RuntimeExecutionMethod::Compile; } + max_parallel_downloads_ = + find_argument(vm, "max-parallel-downloads") + .value_or(def_max_parallel_downloads); // if something wrong with config print help message if (not validate_config()) { std::cout << desc << '\n'; diff --git a/core/application/impl/app_configuration_impl.hpp b/core/application/impl/app_configuration_impl.hpp index 5022fb94eb..34e50ee6f7 100644 --- a/core/application/impl/app_configuration_impl.hpp +++ b/core/application/impl/app_configuration_impl.hpp @@ -242,6 +242,10 @@ namespace kagome::application { return precompile_wasm_; } + uint32_t maxParallelDownloads() const override { + return max_parallel_downloads_; + } + private: void parse_general_segment(const rapidjson::Value &val); void parse_blockchain_segment(const rapidjson::Value &val); @@ -386,6 +390,7 @@ namespace kagome::application { std::max(std::thread::hardware_concurrency(), 1)}; bool disable_secure_mode_{false}; std::optional precompile_wasm_; + uint32_t max_parallel_downloads_; }; } // namespace kagome::application diff --git a/core/network/impl/synchronizer_impl.cpp b/core/network/impl/synchronizer_impl.cpp index b7fe69878b..f5aaef067a 100644 --- a/core/network/impl/synchronizer_impl.cpp +++ b/core/network/impl/synchronizer_impl.cpp @@ -138,6 +138,8 @@ namespace kagome::network { BOOST_ASSERT(block_storage_); sync_method_ = app_config.syncMethod(); + max_parallel_downloads_ = app_config.maxParallelDownloads(); + random_gen_ = std::mt19937(std::random_device{}()); // Register metrics metrics_registry_->registerGaugeFamily( @@ -328,11 +330,30 @@ namespace kagome::network { return false; } + std::vector selected_peers = {peer_id}; + std::vector active_peers; + peer_manager_->enumeratePeerState( + [&active_peers, &block_info, &peer_id](const PeerId &p_id, + PeerState &peer_state) { + if (peer_state.best_block >= block_info and p_id != peer_id) { + active_peers.push_back(p_id); + } + return true; + }); + std::ranges::shuffle(active_peers, random_gen_); + for (const auto &p_id : active_peers) { + if (selected_peers.size() >= max_parallel_downloads_) { + break; + } + selected_peers.push_back(p_id); + } // Block is already enqueued if (auto it = known_blocks_.find(block_info.hash); it != known_blocks_.end()) { auto &block_in_queue = it->second; - block_in_queue.peers.emplace(peer_id); + for (const auto &p_id : selected_peers) { + block_in_queue.peers.emplace(p_id); + } return false; } @@ -354,24 +375,30 @@ namespace kagome::network { or block_tree_->has(header.parent_hash); if (parent_is_known) { - loadBlocks(peer_id, block_info, [wp{weak_from_this()}](auto res) { - if (auto self = wp.lock()) { - SL_TRACE(self->log_, "Block(s) enqueued to apply by announce"); - } - }); + for (const auto &p_id : selected_peers) { + loadBlocks(p_id, block_info, [wp{weak_from_this()}](auto res) { + if (auto self = wp.lock()) { + SL_TRACE(self->log_, "Block(s) enqueued to apply by announce"); + } + }); + } return true; } // Otherwise, is using base way to enqueue - return syncByBlockInfo( - block_info, - peer_id, - [wp{weak_from_this()}](auto res) { - if (auto self = wp.lock()) { - SL_TRACE(self->log_, "Block(s) enqueued to load by announce"); - } - }, - false); + auto res = true; + for (const auto &p_id : selected_peers) { + res &= syncByBlockInfo( + block_info, + p_id, + [wp{weak_from_this()}](auto res) { + if (auto self = wp.lock()) { + SL_TRACE(self->log_, "Block(s) enqueued to load by announce"); + } + }, + false); + } + return res; } void SynchronizerImpl::findCommonBlock( diff --git a/core/network/impl/synchronizer_impl.hpp b/core/network/impl/synchronizer_impl.hpp index 6f932976c4..fa485a0e65 100644 --- a/core/network/impl/synchronizer_impl.hpp +++ b/core/network/impl/synchronizer_impl.hpp @@ -11,6 +11,7 @@ #include #include #include +#include #include #include @@ -261,6 +262,8 @@ namespace kagome::network { primitives::events::ChainSubscriptionEnginePtr chain_sub_engine_; std::shared_ptr main_pool_handler_; std::shared_ptr block_storage_; + uint32_t max_parallel_downloads_; + std::mt19937 random_gen_; application::SyncMethod sync_method_; diff --git a/test/mock/core/application/app_configuration_mock.hpp b/test/mock/core/application/app_configuration_mock.hpp index fd69d234db..def7c8245c 100644 --- a/test/mock/core/application/app_configuration_mock.hpp +++ b/test/mock/core/application/app_configuration_mock.hpp @@ -199,6 +199,8 @@ namespace kagome::application { precompileWasm, (), (const, override)); + + MOCK_METHOD(uint32_t, maxParallelDownloads, (), (const, override)); }; } // namespace kagome::application