Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2304 enhancement announced blocks parallel download #2320

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/application/app_configuration.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,8 @@ namespace kagome::application {
const = 0;

virtual std::optional<PrecompileWasmConfig> precompileWasm() const = 0;

virtual uint32_t maxParallelDownloads() const = 0;
};

} // namespace kagome::application
14 changes: 10 additions & 4 deletions core/application/impl/app_configuration_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ namespace {
#endif
const uint32_t def_db_cache_size = 1024;
const uint32_t def_parachain_runtime_instance_cache_size = 100;
const uint32_t def_max_parallel_downloads = 5;

/**
* Generate once at run random node name if form of UUID
Expand Down Expand Up @@ -177,12 +178,11 @@ namespace {

static constexpr std::array<std::string_view,
1 + KAGOME_WASM_COMPILER_WASM_EDGE>
interpreters {
interpreters{
#if KAGOME_WASM_COMPILER_WASM_EDGE == 1
"WasmEdge",
"WasmEdge",
#endif
"Binaryen"
};
"Binaryen"};

static const std::string interpreters_str =
fmt::format("[{}]", fmt::join(interpreters, ", "));
Expand Down Expand Up @@ -845,6 +845,9 @@ namespace kagome::application {
("rpc-methods", po::value<std::string>(), R"("auto" (default), "unsafe", "safe")")
("no-mdns", po::bool_switch(), "(unused, zombienet stub)")
("prometheus-external", po::bool_switch(), "alias for \"--prometheus-host 0.0.0.0\"")
("max-parallel-downloads", po::value<uint32_t>()->default_value(def_max_parallel_downloads),
"Maximum number of peers from which to ask for the same blocks in parallel."
"This allows downloading announced blocks from multiple peers. Decrease to save traffic and risk increased latency.")
;

po::options_description development_desc("Additional options");
Expand Down Expand Up @@ -1606,6 +1609,9 @@ namespace kagome::application {
runtime_exec_method_ = RuntimeExecutionMethod::Compile;
}

max_parallel_downloads_ =
find_argument<uint32_t>(vm, "max-parallel-downloads")
.value_or(def_max_parallel_downloads);
// if something wrong with config print help message
if (not validate_config()) {
std::cout << desc << '\n';
Expand Down
5 changes: 5 additions & 0 deletions core/application/impl/app_configuration_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,10 @@ namespace kagome::application {
return precompile_wasm_;
}

uint32_t maxParallelDownloads() const override {
return max_parallel_downloads_;
}

private:
void parse_general_segment(const rapidjson::Value &val);
void parse_blockchain_segment(const rapidjson::Value &val);
Expand Down Expand Up @@ -386,6 +390,7 @@ namespace kagome::application {
std::max<size_t>(std::thread::hardware_concurrency(), 1)};
bool disable_secure_mode_{false};
std::optional<PrecompileWasmConfig> precompile_wasm_;
uint32_t max_parallel_downloads_;
};

} // namespace kagome::application
Expand Down
57 changes: 42 additions & 15 deletions core/network/impl/synchronizer_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ namespace kagome::network {
BOOST_ASSERT(block_storage_);

sync_method_ = app_config.syncMethod();
max_parallel_downloads_ = app_config.maxParallelDownloads();
random_gen_ = std::mt19937(std::random_device{}());

// Register metrics
metrics_registry_->registerGaugeFamily(
Expand Down Expand Up @@ -328,11 +330,30 @@ namespace kagome::network {
return false;
}

std::vector<libp2p::peer::PeerId> selected_peers = {peer_id};
std::vector<libp2p::peer::PeerId> active_peers;
peer_manager_->enumeratePeerState(
[&active_peers, &block_info, &peer_id](const PeerId &p_id,
PeerState &peer_state) {
if (peer_state.best_block >= block_info and p_id != peer_id) {
active_peers.push_back(p_id);
}
return true;
});
std::ranges::shuffle(active_peers, random_gen_);
for (const auto &p_id : active_peers) {
if (selected_peers.size() >= max_parallel_downloads_) {
break;
}
selected_peers.push_back(p_id);
}
// Block is already enqueued
if (auto it = known_blocks_.find(block_info.hash);
it != known_blocks_.end()) {
auto &block_in_queue = it->second;
block_in_queue.peers.emplace(peer_id);
for (const auto &p_id : selected_peers) {
block_in_queue.peers.emplace(p_id);
}
return false;
}

Expand All @@ -354,24 +375,30 @@ namespace kagome::network {
or block_tree_->has(header.parent_hash);

if (parent_is_known) {
loadBlocks(peer_id, block_info, [wp{weak_from_this()}](auto res) {
if (auto self = wp.lock()) {
SL_TRACE(self->log_, "Block(s) enqueued to apply by announce");
}
});
for (const auto &p_id : selected_peers) {
loadBlocks(p_id, block_info, [wp{weak_from_this()}](auto res) {
if (auto self = wp.lock()) {
SL_TRACE(self->log_, "Block(s) enqueued to apply by announce");
}
});
}
return true;
}

// Otherwise, is using base way to enqueue
return syncByBlockInfo(
block_info,
peer_id,
[wp{weak_from_this()}](auto res) {
if (auto self = wp.lock()) {
SL_TRACE(self->log_, "Block(s) enqueued to load by announce");
}
},
false);
auto res = true;
for (const auto &p_id : selected_peers) {
res &= syncByBlockInfo(
block_info,
p_id,
[wp{weak_from_this()}](auto res) {
if (auto self = wp.lock()) {
SL_TRACE(self->log_, "Block(s) enqueued to load by announce");
}
},
false);
}
return res;
}

void SynchronizerImpl::findCommonBlock(
Expand Down
3 changes: 3 additions & 0 deletions core/network/impl/synchronizer_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <atomic>
#include <mutex>
#include <queue>
#include <random>
#include <unordered_set>

#include <libp2p/basic/scheduler.hpp>
Expand Down Expand Up @@ -261,6 +262,8 @@ namespace kagome::network {
primitives::events::ChainSubscriptionEnginePtr chain_sub_engine_;
std::shared_ptr<PoolHandlerReady> main_pool_handler_;
std::shared_ptr<blockchain::BlockStorage> block_storage_;
uint32_t max_parallel_downloads_;
std::mt19937 random_gen_;

application::SyncMethod sync_method_;

Expand Down
2 changes: 2 additions & 0 deletions test/mock/core/application/app_configuration_mock.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ namespace kagome::application {
precompileWasm,
(),
(const, override));

MOCK_METHOD(uint32_t, maxParallelDownloads, (), (const, override));
};

} // namespace kagome::application
Loading