Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/prospective parachains multi candidates #1996

Merged
merged 16 commits into from
Apr 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions core/log/configurator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,15 @@ namespace kagome::log {
- name: warp_sync_protocol
- name: parachain_protocols
children:
- name: collation_protocol
- name: validation_protocol
- name: collation_protocol_vstaging
- name: validation_protocol_vstaging
- name: req_collation_protocol
- name: req_chunk_protocol
- name: req_available_data_protocol
- name: req_statement_protocol
- name: req_pov_protocol
- name: dispute_protocol
- name: req_attested_candidate_protocol
- name: changes_trie
- name: storage
children:
Expand Down
78 changes: 39 additions & 39 deletions core/network/impl/peer_manager_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,15 @@ namespace kagome::network {
return it->second;
}

std::optional<std::reference_wrapper<const PeerState>>
PeerManagerImpl::getPeerState(const PeerId &peer_id) const {
auto it = peer_states_.find(peer_id);
if (it == peer_states_.end()) {
return std::nullopt;
}
return it->second;
}

void PeerManagerImpl::processDiscoveredPeer(const PeerId &peer_id) {
// Ignore himself
if (isSelfPeer(peer_id)) {
Expand Down Expand Up @@ -726,41 +735,29 @@ namespace kagome::network {

log_->trace("Try to open outgoing validation protocol.(peer={})",
peer_info.id);
openOutgoing(
stream_engine_,
validation_protocol,
peer_info,
[validation_protocol, peer_info, wptr{weak_from_this()}](
outcome::result<std::shared_ptr<Stream>> stream_result) {
auto self = wptr.lock();
if (not self) {
return;
}

auto &peer_id = peer_info.id;
if (!stream_result.has_value()) {
SL_TRACE(self->log_,
"Unable to create stream {} with {}: {}",
validation_protocol->protocolName(),
peer_id,
stream_result.error().message());
auto ps = self->getPeerState(peer_info.id);
if (ps) {
self->tryOpenValidationProtocol(
peer_info, ps->get(), network::CollationVersion::V1);
} else {
SL_TRACE(
self->log_,
"No peer state to open V1 validation protocol {} with {}",
validation_protocol->protocolName(),
peer_id);
}
return;
}

self->stream_engine_->addOutgoing(stream_result.value(),
validation_protocol);
});
openOutgoing(stream_engine_,
validation_protocol,
peer_info,
[validation_protocol, peer_info, wptr{weak_from_this()}](
outcome::result<std::shared_ptr<Stream>> stream_result) {
auto self = wptr.lock();
if (not self) {
return;
}

auto &peer_id = peer_info.id;
if (!stream_result.has_value()) {
SL_TRACE(self->log_,
"Unable to create stream {} with {}: {}",
validation_protocol->protocolName(),
peer_id,
stream_result.error().message());
return;
}

self->stream_engine_->addOutgoing(stream_result.value(),
validation_protocol);
});
}
}

Expand Down Expand Up @@ -839,11 +836,14 @@ namespace kagome::network {
}

void PeerManagerImpl::reserveStatusStreams(const PeerId &peer_id) const {
auto proto_val_vstaging = router_->getValidationProtocolVStaging();
BOOST_ASSERT_MSG(proto_val_vstaging,
"Router did not provide validation protocol vstaging");
if (auto ps = getPeerState(peer_id);
ps && ps->get().roles.flags.authority) {
auto proto_val_vstaging = router_->getValidationProtocolVStaging();
BOOST_ASSERT_MSG(proto_val_vstaging,
"Router did not provide validation protocol vstaging");

stream_engine_->reserveStreams(peer_id, proto_val_vstaging);
stream_engine_->reserveStreams(peer_id, proto_val_vstaging);
}
}

void PeerManagerImpl::reserveStreams(const PeerId &peer_id) const {
Expand Down
2 changes: 2 additions & 0 deletions core/network/impl/peer_manager_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ namespace kagome::network {
/** @see PeerManager::getPeerState */
std::optional<std::reference_wrapper<PeerState>> getPeerState(
const PeerId &peer_id) override;
std::optional<std::reference_wrapper<const PeerState>> getPeerState(
const PeerId &peer_id) const override;

private:
/// Right way to check self peer as it takes into account dev mode
Expand Down
4 changes: 2 additions & 2 deletions core/network/impl/router_libp2p.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ namespace kagome::network {
// lazyStart(collation_protocol_);
// lazyStart(validation_protocol_);

lazyStart(collation_protocol_);
lazyStart(validation_protocol_);
lazyStart(collation_protocol_vstaging_);
lazyStart(validation_protocol_vstaging_);
lazyStart(req_collation_protocol_);
lazyStart(req_pov_protocol_);
lazyStart(fetch_chunk_protocol_);
Expand Down
2 changes: 2 additions & 0 deletions core/network/peer_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ namespace kagome::network {
*/
virtual std::optional<std::reference_wrapper<PeerState>> getPeerState(
const PeerId &peer_id) = 0;
virtual std::optional<std::reference_wrapper<const PeerState>> getPeerState(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps, non-const method is not used and might be deleted

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Нужен для изменения версии и стауса коллатора.

const PeerId &peer_id) const = 0;

/**
* @returns number of active peers
Expand Down
116 changes: 97 additions & 19 deletions core/parachain/validator/fragment_tree.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -533,47 +533,125 @@ namespace kagome::parachain::fragment {
return res;
}

/**
* @brief Select `count` candidates after the given `required_path` which
* pass the predicate and have not already been backed on chain.
* Does an exhaustive search into the tree starting after `required_path`.
* If there are multiple possibilities of size `count`, this will select the
* first one. If there is no chain of size `count` that matches the
* criteria, this will return the largest chain it could find with the
* criteria. If there are no candidates meeting those criteria,
* returns an empty `Vec`. Cycles are accepted, see module docs for the
* `Cycles` section. The intention of the `required_path` is
* to allow queries on the basis of one or more candidates which were
* previously pending availability becoming available and opening up more
* room on the core.
*/

template <typename Func>
std::optional<CandidateHash> selectChild(
const std::vector<CandidateHash> &required_path, Func &&pred) const {
std::vector<CandidateHash> selectChildren(
const std::vector<CandidateHash> &required_path,
uint32_t count,
Func &&pred) const {
NodePointer base_node{NodePointerRoot{}};
for (const CandidateHash &required_step : required_path) {
if (auto node = nodeCandidateChild(base_node, required_step)) {
base_node = *node;
} else {
return std::nullopt;
return {};
}
}

return visit_in_place(
std::vector<CandidateHash> accum;
return selectChildrenInner(
std::move(base_node), count, count, std::forward<Func>(pred), accum);
}

/**
* @brief Try finding a candidate chain starting from `base_node` of length
* `expected_count`. If not possible, return the longest one we
* could find. Does a depth-first search, since we're optimistic that
* there won't be more than one such chains (parachains shouldn't
* usually have forks). So in the usual case, this will conclude in
* `O(expected_count)`. Cycles are accepted, but this doesn't allow for
* infinite execution time, because the maximum depth we'll reach is
* `expected_count`. Worst case performance is `O(num_forks
* ^ expected_count)`. Although an exponential function, this is
* actually a constant that can only be altered via sudo/governance,
* because: 1. `num_forks` at a given level is at most `max_candidate_depth
* * max_validators_per_core` (because each validator in the
* assigned group can second `max_candidate_depth` candidates). The
* prospective-parachains subsystem assumes that the number of para forks is
* limited by collator-protocol and backing subsystems. In practice, this is
* a constant which can only be altered by sudo or governance. 2.
* `expected_count` is equal to the number of cores a para is scheduled on
* (in an elastic scaling scenario). For non-elastic-scaling, this is
* just 1. In practice, this should be a small number (1-3), capped
* by the total number of available cores (a constant alterable only
* via governance/sudo).
*/
template <typename Func>
std::vector<CandidateHash> selectChildrenInner(
NodePointer base_node,
uint32_t expected_count,
uint32_t remaining_count,
const Func &pred,
std::vector<CandidateHash> &accumulator) const {
if (remaining_count == 0) {
return accumulator;
}

auto children = visit_in_place(
base_node,
[&](const NodePointerRoot &) -> std::optional<CandidateHash> {
for (const FragmentNode &n : nodes) {
[&](const NodePointerRoot &)
-> std::vector<std::pair<NodePointer, CandidateHash>> {
std::vector<std::pair<NodePointer, CandidateHash>> tmp;
for (size_t ptr = 0; ptr < nodes.size(); ++ptr) {
const FragmentNode &n = nodes[ptr];
if (!is_type<NodePointerRoot>(n.parent)) {
return std::nullopt;
continue;
}
if (scope.getPendingAvailability(n.candidate_hash)) {
return std::nullopt;
continue;
}
if (!pred(n.candidate_hash)) {
return std::nullopt;
continue;
}
return n.candidate_hash;
tmp.emplace_back(NodePointerStorage{ptr}, n.candidate_hash);
}
return std::nullopt;
return tmp;
},
[&](const NodePointerStorage &ptr) -> std::optional<CandidateHash> {
for (const auto &[_, h] : nodes[ptr].children) {
if (scope.getPendingAvailability(h)) {
return std::nullopt;
[&](const NodePointerStorage &base_node_ptr)
-> std::vector<std::pair<NodePointer, CandidateHash>> {
std::vector<std::pair<NodePointer, CandidateHash>> tmp;
const auto &bn = nodes[base_node_ptr];
for (const auto &[ptr, hash] : bn.children) {
if (scope.getPendingAvailability(hash)) {
continue;
}
if (!pred(h)) {
return std::nullopt;
if (!pred(hash)) {
continue;
}
return h;
tmp.emplace_back(ptr, hash);
}
return std::nullopt;
return tmp;
});

auto best_result = accumulator;
for (const auto &[child_ptr, child_hash] : children) {
accumulator.emplace_back(child_hash);
auto result = selectChildrenInner(
child_ptr, expected_count, remaining_count - 1, pred, accumulator);
accumulator.pop_back();

if (result.size() == size_t(expected_count)) {
return result;
} else if (best_result.size() < result.size()) {
best_result = result;
}
}

return best_result;
}

static FragmentTree populate(const std::shared_ptr<crypto::Hasher> &hasher,
Expand Down
50 changes: 35 additions & 15 deletions core/parachain/validator/impl/parachain_processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -387,11 +387,19 @@ namespace kagome::parachain {
return;
}

[[maybe_unused]] const auto _ =
prospective_parachains_->onActiveLeavesUpdate(network::ExViewRef{
.new_head = {event.new_head},
.lost = event.lost,
});
if (const auto r =
prospective_parachains_->onActiveLeavesUpdate(network::ExViewRef{
.new_head = {event.new_head},
.lost = event.lost,
});
r.has_error()) {
SL_WARN(
logger_,
"Prospective parachains leaf update failed. (relay_parent={}, error={})",
relay_parent,
r.error().message());
}

backing_store_->onActivateLeaf(relay_parent);
createBackingTask(relay_parent);
SL_TRACE(logger_,
Expand Down Expand Up @@ -521,7 +529,7 @@ namespace kagome::parachain {
void ParachainProcessorImpl::broadcastViewExcept(
const libp2p::peer::PeerId &peer_id, const network::View &view) const {
auto msg = std::make_shared<
network::WireMessage<network::ValidatorProtocolMessage>>(
network::WireMessage<network::vstaging::ValidatorProtocolMessage>>(
network::ViewUpdate{.view = view});
pm_->getStreamEngine()->broadcast(
router_->getValidationProtocolVStaging(),
Expand Down Expand Up @@ -562,7 +570,8 @@ namespace kagome::parachain {
BOOST_ASSERT(se);

auto message = std::make_shared<
network::WireMessage<network::ValidatorProtocolMessage>>(msg);
network::WireMessage<network::vstaging::ValidatorProtocolMessage>>(
msg);
SL_TRACE(
logger_,
"Broadcasting view update to group.(relay_parent={}, group_size={})",
Expand All @@ -581,7 +590,7 @@ namespace kagome::parachain {

void ParachainProcessorImpl::broadcastView(const network::View &view) const {
auto msg = std::make_shared<
network::WireMessage<network::ValidatorProtocolMessage>>(
network::WireMessage<network::vstaging::ValidatorProtocolMessage>>(
network::ViewUpdate{.view = view});
pm_->getStreamEngine()->broadcast(router_->getCollationProtocolVStaging(),
msg);
Expand Down Expand Up @@ -2255,18 +2264,28 @@ namespace kagome::parachain {
core,
[&](const network::ScheduledCore &scheduled_core)
-> std::optional<std::pair<CandidateHash, Hash>> {
return prospective_parachains_->answerGetBackableCandidate(
relay_parent, scheduled_core.para_id, {});
if (auto i = prospective_parachains_->answerGetBackableCandidates(
relay_parent, scheduled_core.para_id, 1, {});
!i.empty()) {
return i[0];
}
return std::nullopt;
},
[&](const runtime::OccupiedCore &occupied_core)
-> std::optional<std::pair<CandidateHash, Hash>> {
/// TODO(iceseer): do https://github.com/qdrvm/kagome/issues/1888
/// `bitfields_indicate_availability` check
if (occupied_core.next_up_on_available) {
return prospective_parachains_->answerGetBackableCandidate(
relay_parent,
occupied_core.next_up_on_available->para_id,
{occupied_core.candidate_hash});
if (auto i =
prospective_parachains_->answerGetBackableCandidates(
relay_parent,
occupied_core.next_up_on_available->para_id,
1,
{occupied_core.candidate_hash});
!i.empty()) {
return i[0];
}
return std::nullopt;
}
return std::nullopt;
},
Expand Down Expand Up @@ -2810,7 +2829,7 @@ namespace kagome::parachain {
peer_id,
protocol,
std::make_shared<
network::WireMessage<network::ValidatorProtocolMessage>>(
network::WireMessage<network::vstaging::ValidatorProtocolMessage>>(
network::ViewUpdate{.view = my_view->get().view}));
}

Expand Down Expand Up @@ -3553,6 +3572,7 @@ namespace kagome::parachain {
*our_current_state_.implicit_view,
our_current_state_.active_leaves,
peer_data.collator_state->para_id)) {
SL_TRACE(logger_, "Out of view. (relay_parent={})", on_relay_parent);
return Error::OUT_OF_VIEW;
}

Expand Down
Loading
Loading