Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into feature/pvf-clone
Browse files Browse the repository at this point in the history
  • Loading branch information
turuslan committed Dec 12, 2024
2 parents 5858be5 + 6acdc45 commit 308a8d7
Show file tree
Hide file tree
Showing 15 changed files with 102 additions and 59 deletions.
29 changes: 15 additions & 14 deletions core/network/impl/protocols/beefy_justification_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,21 @@

namespace kagome::network {

BeefyJustificationProtocol::BeefyJustificationProtocol(libp2p::Host &host,
const blockchain::GenesisBlockHash &genesis,
common::MainThreadPool &main_thread_pool,
std::shared_ptr<PeerManager> peer_manager,
std::shared_ptr<Beefy> beefy)
: RequestResponseProtocolImpl{
kName,
host,
make_protocols(kBeefyJustificationProtocol, genesis),
log::createLogger(kName),
},
main_pool_handler_{main_thread_pool.handlerStarted()},
peer_manager_{std::move(peer_manager)},
beefy_{std::move(beefy)} {}
BeefyJustificationProtocol::BeefyJustificationProtocol(
libp2p::Host &host,
const blockchain::GenesisBlockHash &genesis,
common::MainThreadPool &main_thread_pool,
std::shared_ptr<PeerManager> peer_manager,
std::shared_ptr<Beefy> beefy)
: RequestResponseProtocolImpl{kName,
host,
make_protocols(kBeefyJustificationProtocol,
genesis),
log::createLogger(kName),
main_thread_pool},
main_pool_handler_{main_thread_pool.handlerStarted()},
peer_manager_{std::move(peer_manager)},
beefy_{std::move(beefy)} {}

std::optional<outcome::result<BeefyJustificationProtocol::ResponseType>>
BeefyJustificationProtocol::onRxRequest(RequestType block,
Expand Down
6 changes: 4 additions & 2 deletions core/network/impl/protocols/fetch_attested_candidate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ namespace kagome::network {
const blockchain::GenesisBlockHash &genesis_hash,
std::shared_ptr<
parachain::statement_distribution::StatementDistribution>
statement_distribution)
statement_distribution,
common::MainThreadPool &main_thread_pool)
: RequestResponseProtocolImpl<
vstaging::AttestedCandidateRequest,
vstaging::AttestedCandidateResponse,
Expand All @@ -48,7 +49,8 @@ namespace kagome::network {
kProtocolPrefixPolkadot),
log::createLogger(
kFetchAttestedCandidateProtocolName,
"req_attested_candidate_protocol")},
"req_attested_candidate_protocol"),
main_thread_pool},
statement_distribution_(std::move(statement_distribution)) {
BOOST_ASSERT(statement_distribution_);
}
Expand Down
23 changes: 12 additions & 11 deletions core/network/impl/protocols/light.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,18 @@ namespace kagome::network {
std::shared_ptr<blockchain::BlockHeaderRepository> repository,
std::shared_ptr<storage::trie::TrieStorage> storage,
std::shared_ptr<runtime::ModuleRepository> module_repo,
std::shared_ptr<runtime::Executor> executor)
: RequestResponseProtocolImpl{
kName,
host,
make_protocols(kLightProtocol, genesis, chain_spec),
log::createLogger(kName),
},
repository_{std::move(repository)},
storage_{std::move(storage)},
module_repo_{std::move(module_repo)},
executor_{std::move(executor)} {}
std::shared_ptr<runtime::Executor> executor,
common::MainThreadPool &main_thread_pool)
: RequestResponseProtocolImpl{kName,
host,
make_protocols(
kLightProtocol, genesis, chain_spec),
log::createLogger(kName),
main_thread_pool},
repository_{std::move(repository)},
storage_{std::move(storage)},
module_repo_{std::move(module_repo)},
executor_{std::move(executor)} {}

std::optional<outcome::result<LightProtocol::ResponseType>>
LightProtocol::onRxRequest(RequestType req, std::shared_ptr<Stream>) {
Expand Down
3 changes: 2 additions & 1 deletion core/network/impl/protocols/light.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ namespace kagome::network {
std::shared_ptr<blockchain::BlockHeaderRepository> repository,
std::shared_ptr<storage::trie::TrieStorage> storage,
std::shared_ptr<runtime::ModuleRepository> module_repo,
std::shared_ptr<runtime::Executor> executor);
std::shared_ptr<runtime::Executor> executor,
common::MainThreadPool &main_thread_pool);

std::optional<outcome::result<ResponseType>> onRxRequest(
RequestType req, std::shared_ptr<Stream>) override;
Expand Down
13 changes: 8 additions & 5 deletions core/network/impl/protocols/protocol_fetch_available_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ namespace kagome::network {
libp2p::Host &host,
const application::ChainSpec &chain_spec,
const blockchain::GenesisBlockHash &genesis_hash,
std::shared_ptr<parachain::AvailabilityStore> av_store)
std::shared_ptr<parachain::AvailabilityStore> av_store,
common::MainThreadPool &main_thread_pool)
: RequestResponseProtocolImpl<
FetchAvailableDataRequest,
FetchAvailableDataResponse,
Expand All @@ -44,8 +45,8 @@ namespace kagome::network {
genesis_hash,
kProtocolPrefixPolkadot),
log::createLogger(
kName,
"req_available_data_protocol")},
kName, "req_available_data_protocol"),
main_thread_pool},
av_store_{std::move(av_store)} {}

private:
Expand Down Expand Up @@ -77,7 +78,8 @@ namespace kagome::network {
libp2p::Host &host,
const application::ChainSpec &chain_spec,
const blockchain::GenesisBlockHash &genesis_hash,
std::shared_ptr<parachain::BackingStore> backing_store)
std::shared_ptr<parachain::BackingStore> backing_store,
common::MainThreadPool &main_thread_pool)
: RequestResponseProtocolImpl<
FetchStatementRequest,
FetchStatementResponse,
Expand All @@ -87,7 +89,8 @@ namespace kagome::network {
genesis_hash,
kProtocolPrefixPolkadot),
log::createLogger(
kName, "req_statement_protocol")},
kName, "req_statement_protocol"),
main_thread_pool},
backing_store_{std::move(backing_store)} {}

private:
Expand Down
6 changes: 4 additions & 2 deletions core/network/impl/protocols/protocol_fetch_chunk.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ namespace kagome::network {
const application::ChainSpec & /*chain_spec*/,
const blockchain::GenesisBlockHash &genesis_hash,
std::shared_ptr<parachain::ParachainProcessorImpl> pp,
std::shared_ptr<PeerManager> pm)
std::shared_ptr<PeerManager> pm,
common::MainThreadPool &main_thread_pool)
: RequestResponseProtocolImpl<
FetchChunkRequest,
FetchChunkResponse,
Expand All @@ -53,7 +54,8 @@ namespace kagome::network {
genesis_hash,
kProtocolPrefixPolkadot),
log::createLogger(kFetchChunkProtocolName,
"req_chunk_protocol")},
"req_chunk_protocol"),
main_thread_pool},
pp_{std::move(pp)},
pm_{std::move(pm)} {
BOOST_ASSERT(pp_);
Expand Down
6 changes: 4 additions & 2 deletions core/network/impl/protocols/protocol_fetch_chunk_obsolete.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ namespace kagome::network {
libp2p::Host &host,
const application::ChainSpec & /*chain_spec*/,
const blockchain::GenesisBlockHash &genesis_hash,
std::shared_ptr<parachain::ParachainProcessorImpl> pp)
std::shared_ptr<parachain::ParachainProcessorImpl> pp,
common::MainThreadPool &main_thread_pool)
: RequestResponseProtocolImpl<
FetchChunkRequest,
FetchChunkResponseObsolete,
Expand All @@ -54,7 +55,8 @@ namespace kagome::network {
genesis_hash,
kProtocolPrefixPolkadot),
log::createLogger(kFetchChunkProtocolName,
"req_chunk_protocol")},
"req_chunk_protocol"),
main_thread_pool},
pp_{std::move(pp)} {
BOOST_ASSERT(pp_);
}
Expand Down
19 changes: 14 additions & 5 deletions core/network/impl/protocols/protocol_req_collation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@ namespace kagome::network {
const libp2p::peer::ProtocolName &protoname,
const application::ChainSpec &chain_spec,
const blockchain::GenesisBlockHash &genesis_hash,
std::shared_ptr<ReqCollationObserver> observer)
std::shared_ptr<ReqCollationObserver> observer,
common::MainThreadPool &main_thread_pool)
: Base{kReqCollationProtocolName,
host,
make_protocols(protoname, genesis_hash, kProtocolPrefixPolkadot),
log::createLogger(kReqCollationProtocolName,
"req_collation_protocol")},
"req_collation_protocol"),
main_thread_pool},
observer_{std::move(observer)} {}

protected:
Expand All @@ -60,19 +62,26 @@ namespace kagome::network {
libp2p::Host &host,
const application::ChainSpec &chain_spec,
const blockchain::GenesisBlockHash &genesis_hash,
std::shared_ptr<ReqCollationObserver> observer)
std::shared_ptr<ReqCollationObserver> observer,
common::MainThreadPool &main_thread_pool)
: v1_impl_{std::make_shared<
ReqCollationProtocolImpl<CollationFetchingRequest,
CollationFetchingResponse>>(
host, kReqCollationProtocol, chain_spec, genesis_hash, observer)},
host,
kReqCollationProtocol,
chain_spec,
genesis_hash,
observer,
main_thread_pool)},
vstaging_impl_{std::make_shared<
ReqCollationProtocolImpl<vstaging::CollationFetchingRequest,
vstaging::CollationFetchingResponse>>(
host,
kReqCollationVStagingProtocol,
chain_spec,
genesis_hash,
observer)} {
observer,
main_thread_pool)} {
BOOST_ASSERT(v1_impl_);
BOOST_ASSERT(vstaging_impl_);
}
Expand Down
4 changes: 3 additions & 1 deletion core/network/impl/protocols/protocol_req_collation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

#include "application/app_configuration.hpp"
#include "application/chain_spec.hpp"
#include "common/main_thread_pool.hpp"
#include "log/logger.hpp"
#include "network/peer_manager.hpp"
#include "network/protocols/req_collation_protocol.hpp"
Expand All @@ -41,7 +42,8 @@ namespace kagome::network {
ReqCollationProtocol(libp2p::Host &host,
const application::ChainSpec &chain_spec,
const blockchain::GenesisBlockHash &genesis_hash,
std::shared_ptr<ReqCollationObserver> observer);
std::shared_ptr<ReqCollationObserver> observer,
common::MainThreadPool &main_thread_pool);

const Protocol &protocolName() const override;

Expand Down
16 changes: 11 additions & 5 deletions core/network/impl/protocols/protocol_req_pov.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ namespace kagome::network {
ReqPovProtocolImpl(libp2p::Host &host,
const application::ChainSpec &chain_spec,
const blockchain::GenesisBlockHash &genesis_hash,
std::shared_ptr<ReqPovObserver> observer)
std::shared_ptr<ReqPovObserver> observer,
common::MainThreadPool &main_thread_pool)
: RequestResponseProtocolImpl<
RequestPov,
ResponsePov,
Expand All @@ -33,7 +34,8 @@ namespace kagome::network {
genesis_hash,
kProtocolPrefixPolkadot),
log::createLogger(kReqPovProtocolName,
"req_pov_protocol")},
"req_pov_protocol"),
main_thread_pool},
observer_{std::move(observer)} {}

protected:
Expand Down Expand Up @@ -71,9 +73,13 @@ namespace kagome::network {
libp2p::Host &host,
const application::ChainSpec &chain_spec,
const blockchain::GenesisBlockHash &genesis_hash,
std::shared_ptr<ReqPovObserver> observer)
: impl_{std::make_shared<ReqPovProtocolImpl>(
host, chain_spec, genesis_hash, std::move(observer))} {}
std::shared_ptr<ReqPovObserver> observer,
common::MainThreadPool &main_thread_pool)
: impl_{std::make_shared<ReqPovProtocolImpl>(host,
chain_spec,
genesis_hash,
std::move(observer),
main_thread_pool)} {}

const Protocol &ReqPovProtocol::protocolName() const {
BOOST_ASSERT(impl_ && !!"ReqPovProtocolImpl must be initialized!");
Expand Down
4 changes: 3 additions & 1 deletion core/network/impl/protocols/protocol_req_pov.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

#include "application/app_configuration.hpp"
#include "application/chain_spec.hpp"
#include "common/main_thread_pool.hpp"
#include "log/logger.hpp"
#include "network/peer_manager.hpp"
#include "network/protocols/req_pov_protocol.hpp"
Expand All @@ -36,7 +37,8 @@ namespace kagome::network {
ReqPovProtocol(libp2p::Host &host,
const application::ChainSpec &chain_spec,
const blockchain::GenesisBlockHash &genesis_hash,
std::shared_ptr<ReqPovObserver> observer);
std::shared_ptr<ReqPovObserver> observer,
common::MainThreadPool &main_thread_pool);

const Protocol &protocolName() const override;

Expand Down
13 changes: 11 additions & 2 deletions core/network/impl/protocols/request_response_protocol.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include "network/impl/protocols/protocol_base_impl.hpp"

#include "common/main_thread_pool.hpp"
#include "protocol_error.hpp"
#include "utils/box.hpp"

Expand Down Expand Up @@ -39,9 +40,11 @@ namespace kagome::network {
libp2p::Host &host,
Protocols protocols,
log::Logger logger,
common::MainThreadPool &main_thread_pool,
std::chrono::milliseconds timeout = std::chrono::seconds(1))
: base_(std::move(name), host, std::move(protocols), std::move(logger)),
timeout_(std::move(timeout)) {}
timeout_(std::move(timeout)),
main_pool_handler_{main_thread_pool.handlerStarted()} {}

bool start() override {
return base_.start(this->weak_from_this());
Expand Down Expand Up @@ -125,7 +128,10 @@ namespace kagome::network {
base_.host().getPeerRepository().getAddressRepository().getAddresses(
peer_id);
if (not addresses_res.has_value()) {
cb(addresses_res.as_failure());
main_pool_handler_->execute(
[cb(std::move(cb)), addresses_res(std::move(addresses_res))] {
cb(addresses_res.as_failure());
});
return;
}

Expand Down Expand Up @@ -374,6 +380,9 @@ namespace kagome::network {

ProtocolBaseImpl base_;
std::chrono::milliseconds timeout_;

private:
std::shared_ptr<PoolHandler> main_pool_handler_;
};

} // namespace kagome::network
6 changes: 4 additions & 2 deletions core/network/impl/protocols/send_dispute_protocol.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ namespace kagome::network {
SendDisputeProtocolImpl(libp2p::Host &host,
const blockchain::GenesisBlockHash &genesis_hash,
std::shared_ptr<network::DisputeRequestObserver>
dispute_request_observer)
dispute_request_observer,
common::MainThreadPool &main_thread_pool)
: RequestResponseProtocolImpl<
DisputeRequest,
DisputeResponse,
Expand All @@ -56,7 +57,8 @@ namespace kagome::network {
kProtocolPrefixPolkadot),
log::createLogger(
kSendDisputeProtocolName,
"dispute_protocol")},
"dispute_protocol"),
main_thread_pool},
dispute_request_observer_{std::move(dispute_request_observer)} {
BOOST_ASSERT(dispute_request_observer_);
}
Expand Down
12 changes: 7 additions & 5 deletions core/network/warp/protocol.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ namespace kagome::network {
WarpProtocolImpl(libp2p::Host &host,
const application::ChainSpec &chain_spec,
const blockchain::GenesisBlockHash &genesis,
std::shared_ptr<WarpSyncCache> cache)
std::shared_ptr<WarpSyncCache> cache,
common::MainThreadPool &main_thread_pool)
: RequestResponseProtocolImpl(
kName,
host,
make_protocols(kWarpProtocol, genesis, chain_spec),
log::createLogger(kName, "warp_sync_protocol")),
kName,
host,
make_protocols(kWarpProtocol, genesis, chain_spec),
log::createLogger(kName, "warp_sync_protocol"),
main_thread_pool),
cache_{std::move(cache)} {}

std::optional<outcome::result<ResponseType>> onRxRequest(
Expand Down
1 change: 0 additions & 1 deletion core/parachain/availability/recovery/recovery_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,6 @@ namespace kagome::parachain {
candidate_hash,
validator_index,
peer->id);
lock.unlock();
send_fetch_available_data_request(
peer->id,
candidate_hash,
Expand Down

0 comments on commit 308a8d7

Please sign in to comment.