Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/prospective parachains multi candidates #1996

Merged
merged 16 commits into from
Apr 7, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions core/log/configurator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,15 @@ namespace kagome::log {
- name: warp_sync_protocol
- name: parachain_protocols
children:
- name: collation_protocol
- name: validation_protocol
- name: collation_protocol_vstaging
- name: validation_protocol_vstaging
- name: req_collation_protocol
- name: req_chunk_protocol
- name: req_available_data_protocol
- name: req_statement_protocol
- name: req_pov_protocol
- name: dispute_protocol
- name: req_attested_candidate_protocol
- name: changes_trie
- name: storage
children:
Expand Down
78 changes: 39 additions & 39 deletions core/network/impl/peer_manager_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,15 @@ namespace kagome::network {
return it->second;
}

std::optional<std::reference_wrapper<const PeerState>>
PeerManagerImpl::getPeerState(const PeerId &peer_id) const {
auto it = peer_states_.find(peer_id);
if (it == peer_states_.end()) {
return std::nullopt;
}
return it->second;
}

void PeerManagerImpl::processDiscoveredPeer(const PeerId &peer_id) {
// Ignore himself
if (isSelfPeer(peer_id)) {
Expand Down Expand Up @@ -726,41 +735,29 @@ namespace kagome::network {

log_->trace("Try to open outgoing validation protocol.(peer={})",
peer_info.id);
openOutgoing(
stream_engine_,
validation_protocol,
peer_info,
[validation_protocol, peer_info, wptr{weak_from_this()}](
outcome::result<std::shared_ptr<Stream>> stream_result) {
auto self = wptr.lock();
if (not self) {
return;
}

auto &peer_id = peer_info.id;
if (!stream_result.has_value()) {
SL_TRACE(self->log_,
"Unable to create stream {} with {}: {}",
validation_protocol->protocolName(),
peer_id,
stream_result.error().message());
auto ps = self->getPeerState(peer_info.id);
if (ps) {
self->tryOpenValidationProtocol(
peer_info, ps->get(), network::CollationVersion::V1);
} else {
SL_TRACE(
self->log_,
"No peer state to open V1 validation protocol {} with {}",
validation_protocol->protocolName(),
peer_id);
}
return;
}

self->stream_engine_->addOutgoing(stream_result.value(),
validation_protocol);
});
openOutgoing(stream_engine_,
validation_protocol,
peer_info,
[validation_protocol, peer_info, wptr{weak_from_this()}](
outcome::result<std::shared_ptr<Stream>> stream_result) {
auto self = wptr.lock();
if (not self) {
return;
}

auto &peer_id = peer_info.id;
if (!stream_result.has_value()) {
SL_TRACE(self->log_,
"Unable to create stream {} with {}: {}",
validation_protocol->protocolName(),
peer_id,
stream_result.error().message());
return;
}

self->stream_engine_->addOutgoing(stream_result.value(),
validation_protocol);
});
}
}

Expand Down Expand Up @@ -839,11 +836,14 @@ namespace kagome::network {
}

void PeerManagerImpl::reserveStatusStreams(const PeerId &peer_id) const {
auto proto_val_vstaging = router_->getValidationProtocolVStaging();
BOOST_ASSERT_MSG(proto_val_vstaging,
"Router did not provide validation protocol vstaging");
if (auto ps = getPeerState(peer_id);
ps && ps->get().roles.flags.authority) {
auto proto_val_vstaging = router_->getValidationProtocolVStaging();
BOOST_ASSERT_MSG(proto_val_vstaging,
"Router did not provide validation protocol vstaging");

stream_engine_->reserveStreams(peer_id, proto_val_vstaging);
stream_engine_->reserveStreams(peer_id, proto_val_vstaging);
}
}

void PeerManagerImpl::reserveStreams(const PeerId &peer_id) const {
Expand Down
2 changes: 2 additions & 0 deletions core/network/impl/peer_manager_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ namespace kagome::network {
/** @see PeerManager::getPeerState */
std::optional<std::reference_wrapper<PeerState>> getPeerState(
const PeerId &peer_id) override;
std::optional<std::reference_wrapper<const PeerState>> getPeerState(
const PeerId &peer_id) const override;

private:
/// Right way to check self peer as it takes into account dev mode
Expand Down
4 changes: 2 additions & 2 deletions core/network/impl/router_libp2p.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ namespace kagome::network {
// lazyStart(collation_protocol_);
// lazyStart(validation_protocol_);

lazyStart(collation_protocol_);
lazyStart(validation_protocol_);
lazyStart(collation_protocol_vstaging_);
lazyStart(validation_protocol_vstaging_);
lazyStart(req_collation_protocol_);
lazyStart(req_pov_protocol_);
lazyStart(fetch_chunk_protocol_);
Expand Down
2 changes: 2 additions & 0 deletions core/network/peer_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ namespace kagome::network {
*/
virtual std::optional<std::reference_wrapper<PeerState>> getPeerState(
const PeerId &peer_id) = 0;
virtual std::optional<std::reference_wrapper<const PeerState>> getPeerState(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps, non-const method is not used and might be deleted

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Нужен для изменения версии и стауса коллатора.

const PeerId &peer_id) const = 0;

/**
* @returns number of active peers
Expand Down
78 changes: 59 additions & 19 deletions core/parachain/validator/fragment_tree.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -534,46 +534,86 @@ namespace kagome::parachain::fragment {
}

template <typename Func>
std::optional<CandidateHash> selectChild(
const std::vector<CandidateHash> &required_path, Func &&pred) const {
std::vector<CandidateHash> selectChildren(
const std::vector<CandidateHash> &required_path,
uint32_t count,
Func &&pred) const {
NodePointer base_node{NodePointerRoot{}};
for (const CandidateHash &required_step : required_path) {
if (auto node = nodeCandidateChild(base_node, required_step)) {
base_node = *node;
} else {
return std::nullopt;
return {};
}
}

return visit_in_place(
std::vector<CandidateHash> accum;
return selectChildrenInner(
std::move(base_node), count, count, std::forward<Func>(pred), accum);
}

template <typename Func>
std::vector<CandidateHash> selectChildrenInner(
NodePointer base_node,
uint32_t expected_count,
uint32_t remaining_count,
const Func &pred,
std::vector<CandidateHash> &accumulator) const {
if (remaining_count == 0) {
return accumulator;
}

auto children = visit_in_place(
base_node,
[&](const NodePointerRoot &) -> std::optional<CandidateHash> {
for (const FragmentNode &n : nodes) {
[&](const NodePointerRoot &)
-> std::vector<std::pair<NodePointer, CandidateHash>> {
std::vector<std::pair<NodePointer, CandidateHash>> tmp;
for (size_t ptr = 0; ptr < nodes.size(); ++ptr) {
const FragmentNode &n = nodes[ptr];
if (!is_type<NodePointerRoot>(n.parent)) {
return std::nullopt;
continue;
}
if (scope.getPendingAvailability(n.candidate_hash)) {
return std::nullopt;
continue;
}
if (!pred(n.candidate_hash)) {
return std::nullopt;
continue;
}
return n.candidate_hash;
tmp.emplace_back(NodePointerStorage{ptr}, n.candidate_hash);
}
return std::nullopt;
return tmp;
},
[&](const NodePointerStorage &ptr) -> std::optional<CandidateHash> {
for (const auto &[_, h] : nodes[ptr].children) {
if (scope.getPendingAvailability(h)) {
return std::nullopt;
[&](const NodePointerStorage &base_node_ptr)
-> std::vector<std::pair<NodePointer, CandidateHash>> {
std::vector<std::pair<NodePointer, CandidateHash>> tmp;
const auto &bn = nodes[base_node_ptr];
for (const auto &[ptr, hash] : bn.children) {
if (scope.getPendingAvailability(hash)) {
continue;
}
if (!pred(h)) {
return std::nullopt;
if (!pred(hash)) {
continue;
}
return h;
tmp.emplace_back(ptr, hash);
}
return std::nullopt;
return tmp;
});

auto best_result = accumulator;
for (const auto &[child_ptr, child_hash] : children) {
accumulator.emplace_back(child_hash);
auto result = selectChildrenInner(
child_ptr, expected_count, remaining_count - 1, pred, accumulator);
accumulator.pop_back();

if (result.size() == size_t(expected_count)) {
return result;
} else if (best_result.size() < result.size()) {
best_result = result;
}
}

return best_result;
}

static FragmentTree populate(const std::shared_ptr<crypto::Hasher> &hasher,
Expand Down
50 changes: 35 additions & 15 deletions core/parachain/validator/impl/parachain_processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -387,11 +387,19 @@ namespace kagome::parachain {
return;
}

[[maybe_unused]] const auto _ =
prospective_parachains_->onActiveLeavesUpdate(network::ExViewRef{
.new_head = {event.new_head},
.lost = event.lost,
});
if (const auto r =
prospective_parachains_->onActiveLeavesUpdate(network::ExViewRef{
.new_head = {event.new_head},
.lost = event.lost,
});
r.has_error()) {
SL_WARN(
logger_,
"Prospective parachains leaf update failed. (relay_parent={}, error={})",
relay_parent,
r.error().message());
}

backing_store_->onActivateLeaf(relay_parent);
createBackingTask(relay_parent);
SL_TRACE(logger_,
Expand Down Expand Up @@ -521,7 +529,7 @@ namespace kagome::parachain {
void ParachainProcessorImpl::broadcastViewExcept(
const libp2p::peer::PeerId &peer_id, const network::View &view) const {
auto msg = std::make_shared<
network::WireMessage<network::ValidatorProtocolMessage>>(
network::WireMessage<network::vstaging::ValidatorProtocolMessage>>(
network::ViewUpdate{.view = view});
pm_->getStreamEngine()->broadcast(
router_->getValidationProtocolVStaging(),
Expand Down Expand Up @@ -562,7 +570,8 @@ namespace kagome::parachain {
BOOST_ASSERT(se);

auto message = std::make_shared<
network::WireMessage<network::ValidatorProtocolMessage>>(msg);
network::WireMessage<network::vstaging::ValidatorProtocolMessage>>(
msg);
SL_TRACE(
logger_,
"Broadcasting view update to group.(relay_parent={}, group_size={})",
Expand All @@ -581,7 +590,7 @@ namespace kagome::parachain {

void ParachainProcessorImpl::broadcastView(const network::View &view) const {
auto msg = std::make_shared<
network::WireMessage<network::ValidatorProtocolMessage>>(
network::WireMessage<network::vstaging::ValidatorProtocolMessage>>(
network::ViewUpdate{.view = view});
pm_->getStreamEngine()->broadcast(router_->getCollationProtocolVStaging(),
msg);
Expand Down Expand Up @@ -2255,18 +2264,28 @@ namespace kagome::parachain {
core,
[&](const network::ScheduledCore &scheduled_core)
-> std::optional<std::pair<CandidateHash, Hash>> {
return prospective_parachains_->answerGetBackableCandidate(
relay_parent, scheduled_core.para_id, {});
if (auto i = prospective_parachains_->answerGetBackableCandidates(
relay_parent, scheduled_core.para_id, 1, {});
!i.empty()) {
return i[0];
}
return std::nullopt;
},
[&](const runtime::OccupiedCore &occupied_core)
-> std::optional<std::pair<CandidateHash, Hash>> {
/// TODO(iceseer): do https://github.com/qdrvm/kagome/issues/1888
/// `bitfields_indicate_availability` check
if (occupied_core.next_up_on_available) {
return prospective_parachains_->answerGetBackableCandidate(
relay_parent,
occupied_core.next_up_on_available->para_id,
{occupied_core.candidate_hash});
if (auto i =
prospective_parachains_->answerGetBackableCandidates(
relay_parent,
occupied_core.next_up_on_available->para_id,
1,
{occupied_core.candidate_hash});
!i.empty()) {
return i[0];
}
return std::nullopt;
}
return std::nullopt;
},
Expand Down Expand Up @@ -2810,7 +2829,7 @@ namespace kagome::parachain {
peer_id,
protocol,
std::make_shared<
network::WireMessage<network::ValidatorProtocolMessage>>(
network::WireMessage<network::vstaging::ValidatorProtocolMessage>>(
network::ViewUpdate{.view = my_view->get().view}));
}

Expand Down Expand Up @@ -3553,6 +3572,7 @@ namespace kagome::parachain {
*our_current_state_.implicit_view,
our_current_state_.active_leaves,
peer_data.collator_state->para_id)) {
SL_TRACE(logger_, "Out of view. (relay_parent={})", on_relay_parent);
return Error::OUT_OF_VIEW;
}

Expand Down
Loading
Loading