Skip to content

Commit

Permalink
Feature/import manifest tests (#2238)
Browse files Browse the repository at this point in the history
* import mafifest tests

Signed-off-by: iceseer <[email protected]>

* deactivate leaf after allowed ancestry len elapsed

Signed-off-by: iceseer <[email protected]>

* per session/relay parent

Signed-off-by: iceseer <[email protected]>

* statement distribution refactoring

Signed-off-by: iceseer <[email protected]>

* network bridge

Signed-off-by: iceseer <[email protected]>

* prune storage

Signed-off-by: iceseer <[email protected]>

* [trace] additional logging to determine validators list size calculation

Signed-off-by: iceseer <[email protected]>

---------

Signed-off-by: iceseer <[email protected]>
  • Loading branch information
iceseer authored Nov 12, 2024
1 parent 18889a9 commit 1d7f2ee
Show file tree
Hide file tree
Showing 36 changed files with 4,615 additions and 3,122 deletions.
10 changes: 8 additions & 2 deletions core/common/ref_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <map>
#include <memory>
#include <optional>
#include "outcome/outcome.hpp"

namespace kagome {

Expand Down Expand Up @@ -91,14 +92,15 @@ namespace kagome {
* @param f is a functor to create RefObj
*/
template <typename F>
SRefObj get_or_insert(const Key &k, F &&f) {
outcome::result<SRefObj> get_or_insert(const Key &k, F &&f) {
[[likely]] if (auto it = items_.find(k); it != items_.end()) {
auto obj = it->second.lock();
BOOST_ASSERT(obj);
return obj;
}

auto obj = std::make_shared<RefObj>(std::forward<F>(f)());
OUTCOME_TRY(o, std::forward<F>(f)());
auto obj = std::make_shared<RefObj>(std::move(o));
auto [it, inserted] = items_.emplace(k, obj);
BOOST_ASSERT(inserted);

Expand All @@ -107,6 +109,10 @@ namespace kagome {
return obj;
}

size_t size() const {
return items_.size();
}

private:
RefCache() = default;

Expand Down
9 changes: 8 additions & 1 deletion core/injector/application_injector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@
#include "parachain/pvf/workers.hpp"
#include "parachain/validator/impl/parachain_observer_impl.hpp"
#include "parachain/validator/parachain_processor.hpp"
#include "parachain/validator/statement_distribution/statement_distribution.hpp"
#include "runtime/binaryen/binaryen_memory_provider.hpp"
#include "runtime/binaryen/instance_environment_factory.hpp"
#include "runtime/binaryen/module/module_factory_impl.hpp"
Expand Down Expand Up @@ -788,7 +789,7 @@ namespace {
di::bind<parachain::BitfieldStore>.template to<parachain::BitfieldStoreImpl>(),
di::bind<parachain::BackingStore>.template to<parachain::BackingStoreImpl>(),
di::bind<parachain::BackedCandidatesSource>.template to<parachain::ParachainProcessorImpl>(),
di::bind<network::CanDisconnect>.template to<parachain::ParachainProcessorImpl>(),
di::bind<network::CanDisconnect>.template to<parachain::statement_distribution::StatementDistribution>(),
di::bind<parachain::Pvf>.template to<parachain::PvfImpl>(),
di::bind<network::CollationObserver>.template to<parachain::ParachainObserverImpl>(),
di::bind<network::ValidationObserver>.template to<parachain::ParachainObserverImpl>(),
Expand Down Expand Up @@ -1011,6 +1012,12 @@ namespace kagome::injector {
.template create<sptr<parachain::ParachainProcessorImpl>>();
}

std::shared_ptr<parachain::statement_distribution::StatementDistribution>
KagomeNodeInjector::injectStatementDistribution() {
return pimpl_->injector_.template create<
sptr<parachain::statement_distribution::StatementDistribution>>();
}

std::shared_ptr<parachain::ApprovalDistribution>
KagomeNodeInjector::injectApprovalDistribution() {
return pimpl_->injector_
Expand Down
6 changes: 6 additions & 0 deletions core/injector/application_injector.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ namespace kagome {
class ParachainObserver;
struct ParachainProcessorImpl;
struct ApprovalDistribution;

namespace statement_distribution {
struct StatementDistribution;
}
} // namespace parachain

namespace runtime {
Expand Down Expand Up @@ -131,6 +135,8 @@ namespace kagome::injector {
std::shared_ptr<parachain::ParachainObserver> injectParachainObserver();
std::shared_ptr<parachain::ParachainProcessorImpl>
injectParachainProcessor();
std::shared_ptr<parachain::statement_distribution::StatementDistribution>
injectStatementDistribution();
std::shared_ptr<parachain::ApprovalDistribution>
injectApprovalDistribution();
std::shared_ptr<network::DisputeRequestObserver>
Expand Down
2 changes: 1 addition & 1 deletion core/network/can_disconnect.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ namespace kagome::network {
public:
virtual ~CanDisconnect() = default;

virtual bool canDisconnect(const libp2p::PeerId &) const = 0;
virtual bool can_disconnect(const libp2p::PeerId &) const = 0;
};
} // namespace kagome::network
18 changes: 11 additions & 7 deletions core/network/impl/peer_manager_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ namespace kagome::network {
std::shared_ptr<crypto::Hasher> hasher,
std::shared_ptr<ReputationRepository> reputation_repository,
LazySPtr<CanDisconnect> can_disconnect,
std::shared_ptr<PeerView> peer_view)
std::shared_ptr<PeerView> peer_view,
primitives::events::PeerSubscriptionEnginePtr peer_event_engine)
: log_{log::createLogger("PeerManager", "network")},
host_(host),
main_pool_handler_{poolHandlerReadyMake(
Expand All @@ -111,7 +112,8 @@ namespace kagome::network {
hasher_{std::move(hasher)},
reputation_repository_{std::move(reputation_repository)},
can_disconnect_{can_disconnect},
peer_view_{std::move(peer_view)} {
peer_view_{std::move(peer_view)},
peer_event_engine_(std::move(peer_event_engine)) {
BOOST_ASSERT(identify_ != nullptr);
BOOST_ASSERT(kademlia_ != nullptr);
BOOST_ASSERT(scheduler_ != nullptr);
Expand All @@ -122,6 +124,7 @@ namespace kagome::network {
BOOST_ASSERT(peer_view_);
BOOST_ASSERT(reputation_repository_ != nullptr);
BOOST_ASSERT(peer_view_ != nullptr);
BOOST_ASSERT(peer_event_engine_);

// Register metrics
registry_->registerGaugeFamily(syncPeerMetricName,
Expand Down Expand Up @@ -188,6 +191,9 @@ namespace kagome::network {
self->peer_view_->removePeer(peer_id);
self->sync_peer_num_->set(self->active_peers_.size());
self->peers_count_metric_->set(self->active_peers_.size());

self->peer_event_engine_->notify(
primitives::events::PeerEventType::kDisconnected, peer_id);
SL_DEBUG(self->log_,
"Remained {} active peers",
self->active_peers_.size());
Expand Down Expand Up @@ -331,7 +337,7 @@ namespace kagome::network {

for (const auto &[peer_id, desc] : active_peers_) {
// Skip peer having immunity
if (not can_disconnect_.get()->canDisconnect(peer_id)) {
if (not can_disconnect_.get()->can_disconnect(peer_id)) {
continue;
}

Expand Down Expand Up @@ -858,11 +864,9 @@ namespace kagome::network {
}
}

self->peer_event_engine_->notify(
primitives::events::PeerEventType::kConnected, peer_info.id);
self->tryOpenGrandpaProtocol(peer_info, peer_state.value().get());
self->tryOpenValidationProtocol(
peer_info,
peer_state.value().get(),
network::CollationVersion::VStaging);
auto beefy_protocol = std::static_pointer_cast<BeefyProtocolImpl>(
self->router_->getBeefyProtocol());
openOutgoing(
Expand Down
4 changes: 3 additions & 1 deletion core/network/impl/peer_manager_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ namespace kagome::network {
std::shared_ptr<crypto::Hasher> hasher,
std::shared_ptr<ReputationRepository> reputation_repository,
LazySPtr<CanDisconnect> can_disconnect,
std::shared_ptr<PeerView> peer_view);
std::shared_ptr<PeerView> peer_view,
primitives::events::PeerSubscriptionEnginePtr peer_event_engine);

/** @see poolHandlerReadyMake */
bool tryStart();
Expand Down Expand Up @@ -222,6 +223,7 @@ namespace kagome::network {
std::unordered_map<PeerId, PeerState> peer_states_;
libp2p::basic::Scheduler::Handle align_timer_;
std::set<PeerId> recently_active_peers_;
primitives::events::PeerSubscriptionEnginePtr peer_event_engine_;

// metrics
metrics::RegistryPtr registry_ = metrics::createRegistry();
Expand Down
24 changes: 10 additions & 14 deletions core/network/impl/protocols/fetch_attested_candidate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "network/impl/protocols/request_response_protocol.hpp"
#include "network/impl/stream_engine.hpp"
#include "parachain/validator/parachain_processor.hpp"
#include "parachain/validator/statement_distribution/statement_distribution.hpp"
#include "utils/non_copyable.hpp"

namespace kagome::network {
Expand All @@ -34,7 +35,9 @@ namespace kagome::network {
libp2p::Host &host,
const application::ChainSpec &chain_spec,
const blockchain::GenesisBlockHash &genesis_hash,
std::shared_ptr<parachain::ParachainProcessorImpl> pp)
std::shared_ptr<
parachain::statement_distribution::StatementDistribution>
statement_distribution)
: RequestResponseProtocolImpl<
vstaging::AttestedCandidateRequest,
vstaging::AttestedCandidateResponse,
Expand All @@ -47,8 +50,8 @@ namespace kagome::network {
log::createLogger(
kFetchAttestedCandidateProtocolName,
"req_attested_candidate_protocol")},
pp_{std::move(pp)} {
BOOST_ASSERT(pp_);
statement_distribution_(std::move(statement_distribution)) {
BOOST_ASSERT(statement_distribution_);
}

private:
Expand All @@ -57,16 +60,8 @@ namespace kagome::network {
base().logger()->info(
"Fetching attested candidate request.(candidate={})",
request.candidate_hash);
auto res = pp_->OnFetchAttestedCandidateRequest(
std::move(request), stream->remotePeerId().value());
if (res.has_error()) {
base().logger()->error(
"Fetching attested candidate response failed.(error={})",
res.error());
} else {
SL_TRACE(base().logger(), "Fetching attested candidate response.");
}
return res;
statement_distribution_->OnFetchAttestedCandidateRequest(request, stream);
return std::nullopt;
}

void onTxRequest(const RequestType &request) override {
Expand All @@ -76,7 +71,8 @@ namespace kagome::network {

inline static const auto kFetchAttestedCandidateProtocolName =
"FetchAttestedCandidateProtocol"s;
std::shared_ptr<parachain::ParachainProcessorImpl> pp_;
std::shared_ptr<parachain::statement_distribution::StatementDistribution>
statement_distribution_;
};

} // namespace kagome::network
5 changes: 5 additions & 0 deletions core/network/impl/protocols/request_response_protocol.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ namespace kagome::network {
});
}

void writeResponseAsync(std::shared_ptr<Stream> stream,
ResponseType response) {
writeResponse(std::move(stream), std::move(response));
}

protected:
virtual std::optional<outcome::result<ResponseType>> onRxRequest(
RequestType request, std::shared_ptr<Stream> stream) = 0;
Expand Down
57 changes: 0 additions & 57 deletions core/network/peer_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,66 +60,9 @@ namespace kagome::network {

/// @brief parachain peer state
std::optional<CollatingPeerState> collator_state = std::nullopt;
View view;
std::unordered_set<common::Hash256> implicit_view;
std::optional<CollationVersion> collation_version;
std::optional<ReqChunkVersion> req_chunk_version;

/// Update the view, returning a vector of implicit relay-parents which
/// weren't previously part of the view.
std::vector<common::Hash256> update_view(
const View &new_view, const parachain::ImplicitView &local_implicit) {
std::unordered_set<common::Hash256> next_implicit;
for (const auto &x : new_view.heads_) {
auto t =
local_implicit.known_allowed_relay_parents_under(x, std::nullopt);
if (t) {
next_implicit.insert(t->begin(), t->end());
}
}

std::vector<common::Hash256> fresh_implicit;
for (const auto &x : next_implicit) {
if (implicit_view.find(x) == implicit_view.end()) {
fresh_implicit.emplace_back(x);
}
}

view = new_view;
implicit_view = next_implicit;
return fresh_implicit;
}

/// Whether we know that a peer knows a relay-parent. The peer knows the
/// relay-parent if it is either implicit or explicit in their view.
/// However, if it is implicit via an active-leaf we don't recognize, we
/// will not accurately be able to recognize them as 'knowing' the
/// relay-parent.
bool knows_relay_parent(const common::Hash256 &relay_parent) {
return implicit_view.contains(relay_parent)
|| view.contains(relay_parent);
}

/// Attempt to reconcile the view with new information about the implicit
/// relay parents under an active leaf.
std::vector<common::Hash256> reconcile_active_leaf(
const common::Hash256 &leaf_hash,
std::span<const common::Hash256> implicit) {
if (!view.contains(leaf_hash)) {
return {};
}

std::vector<common::Hash256> v;
v.reserve(implicit.size());
for (const auto &i : implicit) {
auto [_, inserted] = implicit_view.insert(i);
if (inserted) {
v.emplace_back(i);
}
}
return v;
}

/// Whether the peer has advertised the given collation.
bool hasAdvertised(
const RelayHash &relay_parent,
Expand Down
52 changes: 0 additions & 52 deletions core/network/types/collator_messages.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,41 +88,6 @@ namespace kagome::network {
using RequestPov = CandidateHash;
using ResponsePov = boost::variant<ParachainBlock, Empty>;

/**
* Contains information about the candidate and a proof of the results of its
* execution.
*/
struct CandidateReceipt {
CandidateDescriptor descriptor; /// Candidate descriptor
Hash commitments_hash; /// Hash of candidate commitments

const Hash &hash(const crypto::Hasher &hasher) const {
if (not hash_.has_value()) {
hash_.emplace(hasher.blake2b_256(
::scale::encode(std::tie(descriptor, commitments_hash)).value()));
}
return hash_.value();
}

inline bool operator==(const CandidateReceipt &other) const {
return descriptor == other.descriptor
and commitments_hash == other.commitments_hash;
}

friend inline scale::ScaleDecoderStream &operator>>(
scale::ScaleDecoderStream &s, CandidateReceipt &cr) {
return s >> cr.descriptor >> cr.commitments_hash;
}

friend inline scale::ScaleEncoderStream &operator<<(
scale::ScaleEncoderStream &s, const CandidateReceipt &cr) {
return s << cr.descriptor << cr.commitments_hash;
}

private:
mutable std::optional<Hash> hash_{};
};

struct CollationResponse {
SCALE_TIE(2);

Expand Down Expand Up @@ -212,23 +177,6 @@ namespace kagome::network {
using FetchAvailableDataResponse =
boost::variant<runtime::AvailableData, Empty>;

struct CommittedCandidateReceipt {
SCALE_TIE(2);

CandidateDescriptor descriptor; /// Candidate descriptor
CandidateCommitments commitments; /// commitments retrieved from validation
/// result and produced by the execution
/// and validation parachain candidate

CandidateReceipt to_plain(const crypto::Hasher &hasher) const {
CandidateReceipt receipt;
receipt.descriptor = descriptor,
receipt.commitments_hash =
hasher.blake2b_256(scale::encode(commitments).value());
return receipt;
}
};

struct FetchStatementRequest {
SCALE_TIE(2);
RelayHash relay_parent;
Expand Down
1 change: 1 addition & 0 deletions core/parachain/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ add_library(validator_parachain
backing/store_impl.cpp
backing/cluster.cpp
validator/backing_implicit_view.cpp
validator/statement_distribution/statement_distribution.cpp
)

target_link_libraries(validator_parachain
Expand Down
7 changes: 7 additions & 0 deletions core/parachain/approval/approval_thread_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,11 @@ namespace kagome::parachain {
ApprovalThreadPool(std::shared_ptr<Watchdog> watchdog)
: ThreadPool(std::move(watchdog), "approval", 1, std::nullopt) {}
};

class StatementDistributionThreadPool final : public ThreadPool {
public:
StatementDistributionThreadPool(std::shared_ptr<Watchdog> watchdog)
: ThreadPool(
std::move(watchdog), "statement-distribution", 1, std::nullopt) {}
};
} // namespace kagome::parachain
Loading

0 comments on commit 1d7f2ee

Please sign in to comment.