From 95201a8b7e96a1402a23e2b91936f7dee4469b8e Mon Sep 17 00:00:00 2001 From: Alexander Lednev <57529355+iceseer@users.noreply.github.com> Date: Fri, 2 Aug 2024 13:55:04 +0300 Subject: [PATCH] Validation v3 (#2130) * validation v3 Signed-off-by: iceseer * disputes fixup Signed-off-by: iceseer -------- Signed-off-by: iceseer Co-authored-by: kamilsa --- cmake/Hunter/hunter-gate-url.cmake | 6 +- core/api/transport/impl/ws/ws_session.cpp | 1 + core/blockchain/impl/block_tree_impl.cpp | 26 +- .../impl/dispute_coordinator_impl.cpp | 19 +- core/network/common.hpp | 2 +- core/network/impl/peer_view.cpp | 38 +- core/network/peer_view.hpp | 5 +- core/network/types/collator_messages.hpp | 17 +- .../types/collator_messages_vstaging.hpp | 36 +- core/parachain/approval/approval.hpp | 81 +- .../approval/approval_distribution.cpp | 1406 +++++++++++------ .../approval/approval_distribution.hpp | 235 ++- .../approval/approval_distribution_error.cpp | 14 + .../approval/approval_distribution_error.hpp | 7 + core/parachain/approval/knowledge.hpp | 28 +- core/parachain/approval/state.hpp | 10 + core/parachain/backing/grid.hpp | 76 + core/utils/map.hpp | 29 + test/core/parachain/CMakeLists.txt | 1 + test/core/parachain/assignments.cpp | 12 +- 20 files changed, 1474 insertions(+), 575 deletions(-) diff --git a/cmake/Hunter/hunter-gate-url.cmake b/cmake/Hunter/hunter-gate-url.cmake index 8bc0cb16cf..be63f1e82b 100644 --- a/cmake/Hunter/hunter-gate-url.cmake +++ b/cmake/Hunter/hunter-gate-url.cmake @@ -1,5 +1,5 @@ HunterGate( - URL https://github.com/qdrvm/hunter/archive/refs/tags/v0.25.3-qdrvm18.zip - SHA1 22d842b448f84a39392d7835f4046da34f8dcd70 + URL https://github.com/qdrvm/hunter/archive/refs/tags/v0.25.3-qdrvm19.zip + SHA1 eed8b8333c14f25176d4af4fb26256981fd1b527 LOCAL -) +) \ No newline at end of file diff --git a/core/api/transport/impl/ws/ws_session.cpp b/core/api/transport/impl/ws/ws_session.cpp index 12145fb759..8afd5b0728 100644 --- a/core/api/transport/impl/ws/ws_session.cpp +++ b/core/api/transport/impl/ws/ws_session.cpp @@ -10,6 +10,7 @@ #include #include +#include #include #include diff --git a/core/blockchain/impl/block_tree_impl.cpp b/core/blockchain/impl/block_tree_impl.cpp index b4cadc891d..6e92bec81d 100644 --- a/core/blockchain/impl/block_tree_impl.cpp +++ b/core/blockchain/impl/block_tree_impl.cpp @@ -387,6 +387,8 @@ namespace kagome::blockchain { block_tree_leaves.erase(block); std::vector leaves; + leaves.reserve(block_tree_leaves.size()); + std::transform(block_tree_leaves.begin(), block_tree_leaves.end(), std::back_inserter(leaves), @@ -423,18 +425,18 @@ namespace kagome::blockchain { std::shared_ptr state_pruner, common::MainThreadPool &main_thread_pool) : block_tree_data_{BlockTreeData{ - .header_repo_ = std::move(header_repo), - .storage_ = std::move(storage), - .state_pruner_ = std::move(state_pruner), - .tree_ = std::make_unique(finalized), - .extrinsic_observer_ = std::move(extrinsic_observer), - .hasher_ = std::move(hasher), - .extrinsic_event_key_repo_ = std::move(extrinsic_event_key_repo), - .justification_storage_policy_ = - std::move(justification_storage_policy), - .genesis_block_hash_ = {}, - .blocks_pruning_ = {app_config.blocksPruning(), finalized.number}, - }}, + .header_repo_ = std::move(header_repo), + .storage_ = std::move(storage), + .state_pruner_ = std::move(state_pruner), + .tree_ = std::make_unique(finalized), + .extrinsic_observer_ = std::move(extrinsic_observer), + .hasher_ = std::move(hasher), + .extrinsic_event_key_repo_ = std::move(extrinsic_event_key_repo), + .justification_storage_policy_ = + std::move(justification_storage_policy), + .genesis_block_hash_ = {}, + .blocks_pruning_ = {app_config.blocksPruning(), finalized.number}, + }}, main_pool_handler_{main_thread_pool.handlerStarted()} { block_tree_data_.sharedAccess([&](const BlockTreeData &p) { BOOST_ASSERT(p.header_repo_ != nullptr); diff --git a/core/dispute_coordinator/impl/dispute_coordinator_impl.cpp b/core/dispute_coordinator/impl/dispute_coordinator_impl.cpp index 9dfd7314d7..b9819f07c7 100644 --- a/core/dispute_coordinator/impl/dispute_coordinator_impl.cpp +++ b/core/dispute_coordinator/impl/dispute_coordinator_impl.cpp @@ -1205,12 +1205,12 @@ namespace kagome::dispute { auto is_old_concluded_for = intermediate_result.old_state.dispute_status.has_value() ? is_type( - intermediate_result.old_state.dispute_status.value()) + intermediate_result.old_state.dispute_status.value()) : false; auto is_new_concluded_for = intermediate_result.new_state.dispute_status.has_value() ? is_type( - intermediate_result.new_state.dispute_status.value()) + intermediate_result.new_state.dispute_status.value()) : false; auto is_freshly_concluded_for = not is_old_concluded_for and is_new_concluded_for; @@ -1218,12 +1218,12 @@ namespace kagome::dispute { auto is_old_concluded_against = intermediate_result.old_state.dispute_status.has_value() ? is_type( - intermediate_result.old_state.dispute_status.value()) + intermediate_result.old_state.dispute_status.value()) : false; auto is_new_concluded_against = intermediate_result.new_state.dispute_status.has_value() ? is_type( - intermediate_result.new_state.dispute_status.value()) + intermediate_result.new_state.dispute_status.value()) : false; auto is_freshly_concluded_against = not is_old_concluded_against and is_new_concluded_against; @@ -1234,12 +1234,12 @@ namespace kagome::dispute { auto is_old_confirmed_concluded = intermediate_result.old_state.dispute_status.has_value() ? not is_type( - intermediate_result.old_state.dispute_status.value()) + intermediate_result.old_state.dispute_status.value()) : false; auto is_new_confirmed_concluded = intermediate_result.new_state.dispute_status.has_value() ? not is_type( - intermediate_result.new_state.dispute_status.value()) + intermediate_result.new_state.dispute_status.value()) : false; auto is_freshly_confirmed = not is_old_confirmed_concluded and is_new_confirmed_concluded; @@ -1262,13 +1262,13 @@ namespace kagome::dispute { // {polkadot}/node/core/dispute-coordinator/src/initialized.rs:809 auto promise_res = std::promise< - std::unordered_map>(); + parachain::ApprovalDistribution::SignaturesForCandidate>(); auto res_future = promise_res.get_future(); approval_distribution_->getApprovalSignaturesForCandidate( candidate_hash, [promise_res = std::ref(promise_res)]( - std::unordered_map res) { + parachain::ApprovalDistribution::SignaturesForCandidate res) { promise_res.get().set_value(std::move(res)); }); @@ -1289,7 +1289,8 @@ namespace kagome::dispute { auto _votes = std::move(import_result.new_state.votes); - for (auto &[index, signature] : approval_votes) { + for (auto &[index, signatures_data] : approval_votes) { + auto &[hash, candidates, signature] = signatures_data; // clang-format off BOOST_ASSERT_MSG( [&] { diff --git a/core/network/common.hpp b/core/network/common.hpp index a2c9df8621..fa1e4d8d4c 100644 --- a/core/network/common.hpp +++ b/core/network/common.hpp @@ -32,7 +32,7 @@ namespace kagome::network { const libp2p::peer::ProtocolName kCollationProtocolVStaging{ "/{}/collation/2"}; const libp2p::peer::ProtocolName kValidationProtocolVStaging{ - "/{}/validation/2"}; + "/{}/validation/3"}; const libp2p::peer::ProtocolName kReqCollationProtocol{"/{}/req_collation/1"}; const libp2p::peer::ProtocolName kReqCollationVStagingProtocol{ "/{}/req_collation/2"}; diff --git a/core/network/impl/peer_view.cpp b/core/network/impl/peer_view.cpp index 7db4b3f8bd..6e67aa21b4 100644 --- a/core/network/impl/peer_view.cpp +++ b/core/network/impl/peer_view.cpp @@ -77,22 +77,42 @@ namespace kagome::network { } } + size_t PeerView::peersCount() const { + return remote_view_.sharedAccess([](const auto &rv) { return rv.size(); }); + } + void PeerView::removePeer(const PeerId &peer_id) { - if (auto it = remote_view_.find(peer_id); it != remote_view_.end()) { - network::View old_view{std::move(it->second)}; - remote_view_.erase(peer_id); + auto ref = remote_view_.exclusiveAccess( + [&](auto &rv) -> std::optional { + if (auto it = rv.find(peer_id); it != rv.end()) { + network::View old_view{std::move(it->second)}; + rv.erase(peer_id); + return old_view; + } + return std::nullopt; + }); + + if (ref) { remote_view_update_observable_->notify( - EventType::kPeerRemoved, peer_id, std::move(old_view)); + EventType::kPeerRemoved, peer_id, std::move(*ref)); } } void PeerView::updateRemoteView(const PeerId &peer_id, network::View &&view) { - auto it = remote_view_.find(peer_id); - if (it == remote_view_.end() || it->second != view) { - auto &ref = remote_view_[peer_id]; - ref = std::move(view); + auto ref = remote_view_.exclusiveAccess( + [&](auto &rv) -> std::optional { + auto it = rv.find(peer_id); + if (it == rv.end() || it->second != view) { + auto &ref = rv[peer_id]; + ref = std::move(view); + return ref; + } + return std::nullopt; + }); + + if (ref) { remote_view_update_observable_->notify( - EventType::kViewUpdated, peer_id, ref); + EventType::kViewUpdated, peer_id, *ref); } } diff --git a/core/network/peer_view.hpp b/core/network/peer_view.hpp index b263d38d97..2cbc91aae4 100644 --- a/core/network/peer_view.hpp +++ b/core/network/peer_view.hpp @@ -20,6 +20,7 @@ #include "subscription/subscriber.hpp" #include "subscription/subscription_engine.hpp" #include "utils/non_copyable.hpp" +#include "utils/safe_object.hpp" namespace kagome::blockchain { class BlockTree; @@ -75,6 +76,8 @@ namespace kagome::network { bool prepare(); void stop(); + size_t peersCount() const; + MyViewSubscriptionEnginePtr getMyViewObservable(); PeerViewSubscriptionEnginePtr getRemoteViewObservable(); @@ -91,7 +94,7 @@ namespace kagome::network { PeerViewSubscriptionEnginePtr remote_view_update_observable_; std::optional my_view_; - std::unordered_map remote_view_; + SafeObject> remote_view_; LazySPtr block_tree_; }; diff --git a/core/network/types/collator_messages.hpp b/core/network/types/collator_messages.hpp index 7435fb69cf..ade3f9fc96 100644 --- a/core/network/types/collator_messages.hpp +++ b/core/network/types/collator_messages.hpp @@ -358,21 +358,8 @@ namespace kagome::network { */ using BitfieldDistributionMessage = boost::variant; - /// A signed approval vote which references the candidate indirectly via the - /// block. - /// - /// In practice, we have a look-up from block hash and candidate index to - /// candidate hash, so this can be transformed into a `SignedApprovalVote`. - struct ApprovalVote { - SCALE_TIE(2); - - primitives::BlockHash - block_hash; /// A block hash where the candidate appears. - CandidateIndex - candidate_index; /// The index of the candidate in the list of - /// candidates fully included as-of the block. - }; - using IndirectSignedApprovalVote = parachain::IndexedAndSigned; + using IndirectSignedApprovalVote = + parachain::approval::IndirectSignedApprovalVote; struct Assignment { SCALE_TIE(2); diff --git a/core/network/types/collator_messages_vstaging.hpp b/core/network/types/collator_messages_vstaging.hpp index 9ffccc03ac..f7b24d6adc 100644 --- a/core/network/types/collator_messages_vstaging.hpp +++ b/core/network/types/collator_messages_vstaging.hpp @@ -35,9 +35,43 @@ namespace kagome::network::vstaging { using CollatorProtocolMessageCollationSeconded = network::Seconded; using BitfieldDistributionMessage = network::BitfieldDistributionMessage; using BitfieldDistribution = network::BitfieldDistribution; - using ApprovalDistributionMessage = network::ApprovalDistributionMessage; using ViewUpdate = network::ViewUpdate; + using IndirectSignedApprovalVoteV2 = + parachain::approval::IndirectSignedApprovalVoteV2; + + struct Assignment { + SCALE_TIE(2); + + kagome::parachain::approval::IndirectAssignmentCertV2 + indirect_assignment_cert; + scale::BitVec candidate_bitfield; + }; + + struct Assignments { + SCALE_TIE(1); + + std::vector assignments; /// Assignments for candidates in + /// recent, unfinalized blocks. + }; + + struct Approvals { + SCALE_TIE(1); + + std::vector + approvals; /// Approvals for candidates in some recent, unfinalized + /// block. + }; + + /// Network messages used by the approval distribution subsystem. + using ApprovalDistributionMessage = boost::variant< + /// Assignments for candidates in recent, unfinalized blocks. + /// + /// Actually checking the assignment may yield a different result. + Assignments, + /// Approvals for candidates in some recent, unfinalized block. + Approvals>; + struct CollatorProtocolMessageAdvertiseCollation { SCALE_TIE(3); /// Hash of the relay parent advertised collation is based on. diff --git a/core/parachain/approval/approval.hpp b/core/parachain/approval/approval.hpp index bc7acddd26..3fc0df0392 100644 --- a/core/parachain/approval/approval.hpp +++ b/core/parachain/approval/approval.hpp @@ -80,6 +80,25 @@ namespace kagome::parachain::approval { /// The VRF showing the criterion is met. crypto::VRFOutput vrf; + + static AssignmentCertV2 from(const AssignmentCert &src) { + return AssignmentCertV2{ + .kind = visit_in_place( + src.kind, + [](const auto &v) -> AssignmentCertKindV2 { return v; }), + .vrf = src.vrf, + }; + } + }; + + /// An assignment criterion which refers to the candidate under which the + /// assignment is relevant by block hash. + struct IndirectAssignmentCert { + SCALE_TIE(3); + + Hash block_hash; /// A block hash where the candidate appears. + ValidatorIndex validator; /// The validator index. + AssignmentCert cert; /// The cert itself. }; /// An assignment criterion which refers to the candidate under which the @@ -95,17 +114,30 @@ namespace kagome::parachain::approval { /// The cert itself. AssignmentCertV2 cert; + + static IndirectAssignmentCertV2 from(const IndirectAssignmentCert &src) { + return IndirectAssignmentCertV2{ + .block_hash = src.block_hash, + .validator = src.validator, + .cert = AssignmentCertV2::from(src.cert), + }; + } }; - /// An assignment criterion which refers to the candidate under which the - /// assignment is relevant by block hash. - struct IndirectAssignmentCert { - SCALE_TIE(3); + /// A signed approval vote which references the candidate indirectly via the + /// block. + /// + /// In practice, we have a look-up from block hash and candidate index to + /// candidate hash, so this can be transformed into a `SignedApprovalVote`. + struct IndirectApprovalVote { + SCALE_TIE(2); - Hash block_hash; /// A block hash where the candidate appears. - ValidatorIndex validator; /// The validator index. - AssignmentCert cert; /// The cert itself. + Hash block_hash; /// A block hash where the candidate appears. + CandidateIndex + candidate_index; /// The index of the candidate in the list of + /// candidates fully included as-of the block. }; + using IndirectSignedApprovalVote = IndexedAndSigned; /// A signed approval vote which references the candidate indirectly via the /// block. @@ -121,23 +153,30 @@ namespace kagome::parachain::approval { /// The index of the candidate in the list of candidates fully included /// as-of the block. scale::BitVec candidate_indices; + + static IndirectApprovalVoteV2 from(const IndirectApprovalVote &value) { + scale::BitVec v; + v.bits.resize(value.candidate_index + 1); + v.bits[value.candidate_index] = true; + return IndirectApprovalVoteV2{ + .block_hash = value.block_hash, + .candidate_indices = std::move(v), + }; + } }; using IndirectSignedApprovalVoteV2 = IndexedAndSigned; - /// A signed approval vote which references the candidate indirectly via the - /// block. - /// - /// In practice, we have a look-up from block hash and candidate index to - /// candidate hash, so this can be transformed into a `SignedApprovalVote`. - struct IndirectApprovalVote { - SCALE_TIE(2); - - Hash block_hash; /// A block hash where the candidate appears. - CandidateIndex - candidate_index; /// The index of the candidate in the list of - /// candidates fully included as-of the block. - }; - using IndirectSignedApprovalVote = IndexedAndSigned; + inline IndirectSignedApprovalVoteV2 from( + const IndirectSignedApprovalVote &value) { + return IndirectSignedApprovalVoteV2{ + .payload = + { + .payload = IndirectApprovalVoteV2::from(value.payload.payload), + .ix = value.payload.ix, + }, + .signature = value.signature, + }; + } struct RemoteApproval { ValidatorIndex validator_ix; diff --git a/core/parachain/approval/approval_distribution.cpp b/core/parachain/approval/approval_distribution.cpp index 9ed7b7f0b2..63adc2225b 100644 --- a/core/parachain/approval/approval_distribution.cpp +++ b/core/parachain/approval/approval_distribution.cpp @@ -36,6 +36,7 @@ static constexpr size_t kMaxAssignmentBatchSize = 200ull; static constexpr size_t kMaxApprovalBatchSize = 300ull; +static constexpr size_t kMaxBitfieldSize = 500ull; static constexpr uint64_t kTickDurationMs = 500ull; static constexpr kagome::network::Tick kApprovalDelay = 2ull; @@ -83,7 +84,77 @@ namespace { return false; } - void computeVrfModuloAssignments( + void computeVrfModuloAssignments_v2( + std::span + assignments_key, + const kagome::runtime::SessionInfo &config, + const RelayVRFStory &relay_vrf_story, + const std::vector &leaving_cores, + kagome::parachain::ValidatorIndex validator_index, + std::unordered_map + &assignments) { + using namespace kagome::parachain; + using namespace kagome; + + VRFCOutput cert_output; + VRFCProof cert_proof; + uint32_t *cores; + uint64_t cores_out_sz; + + if (sr25519_relay_vrf_modulo_assignments_cert_v2( + assignments_key.data(), + config.relay_vrf_modulo_samples, + config.n_cores, + &relay_vrf_story, + leaving_cores.data(), + leaving_cores.size(), + &cert_output, + &cert_proof, + &cores, + &cores_out_sz)) { + ::scale::BitVec assignment_bitfield; + for (size_t ix = 0; ix < cores_out_sz; ++ix) { + const auto ci = cores[ix]; + if (ci >= assignment_bitfield.bits.size()) { + assignment_bitfield.bits.resize(ci + 1); + } + assignment_bitfield.bits[ci] = true; + } + + crypto::VRFPreOutput o; + std::copy_n(std::make_move_iterator(cert_output.data), + crypto::constants::sr25519::vrf::OUTPUT_SIZE, + o.begin()); + + crypto::VRFProof p; + std::copy_n(std::make_move_iterator(cert_proof.data), + crypto::constants::sr25519::vrf::PROOF_SIZE, + p.begin()); + + ApprovalDistribution::OurAssignment assignment{ + .cert = + approval::AssignmentCertV2{ + .kind = + approval::RelayVRFModuloCompact{ + .core_bitfield = assignment_bitfield, + }, + .vrf = crypto::VRFOutput{.output = o, .proof = p}, + }, + .tranche = 0ul, + .validator_index = validator_index, + .triggered = false}; + + for (size_t ix = 0; ix < cores_out_sz; ++ix) { + const auto core_index = cores[ix]; + assignments.emplace(core_index, assignment); + } + + sr25519_clear_assigned_cores_v2(cores, cores_out_sz); + } + } + + void computeVrfModuloAssignments_v1( std::span keypair_buf, const kagome::runtime::SessionInfo &config, @@ -128,9 +199,9 @@ namespace { core, ApprovalDistribution::OurAssignment{ .cert = - approval::AssignmentCert{ + approval::AssignmentCertV2::from(approval::AssignmentCert{ .kind = approval::RelayVRFModulo{.sample = rvm_sample}, - .vrf = crypto::VRFOutput{.output = o, .proof = p}}, + .vrf = crypto::VRFOutput{.output = o, .proof = p}}), .tranche = 0ul, .validator_index = validator_ix, .triggered = false}); @@ -181,7 +252,7 @@ namespace { core, ApprovalDistribution::OurAssignment{ .cert = - approval::AssignmentCert{ + approval::AssignmentCertV2{ .kind = approval::RelayVRFDelay{.core_index = core}, .vrf = crypto::VRFOutput{.output = std::move(o), .proof = std::move(p)}}, @@ -211,10 +282,10 @@ namespace { auto it = [&](uint32_t tranche) -> std::optional { auto s = *state; - auto const clock_drift = s.depth * no_show_duration; - auto const drifted_tick_now = + const auto clock_drift = s.depth * no_show_duration; + const auto drifted_tick_now = kagome::math::sat_sub_unsigned(tick_now, clock_drift); - auto const drifted_tranche_now = + const auto drifted_tranche_now = kagome::math::sat_sub_unsigned(drifted_tick_now, block_tick); if (tranche > drifted_tranche_now) { @@ -230,9 +301,9 @@ namespace { tranches.begin(), tranches.end(), tranche, - [](auto const &te, auto const t) { return te.tranche < t; }); + [](const auto &te, const auto t) { return te.tranche < t; }); i != tranches.end() && i->tranche == tranche) { - for (auto const &[v_index, t] : i->assignments) { + for (const auto &[v_index, t] : i->assignments) { if (v_index < n_validators) { ++n_assignments; } @@ -240,12 +311,12 @@ namespace { std::max(t, last_assignment_tick ? *last_assignment_tick : kagome::parachain::Tick{0ull}); - auto const no_show_at = kagome::math::sat_sub_unsigned( + const auto no_show_at = kagome::math::sat_sub_unsigned( std::max(t, block_tick), clock_drift) + no_show_duration; if (v_index < approvals.bits.size()) { - auto const has_approved = approvals.bits.at(v_index); - auto const is_no_show = + const auto has_approved = approvals.bits.at(v_index); + const auto is_no_show = !has_approved && no_show_at <= drifted_tick_now; if (!is_no_show && !has_approved) { next_no_show = kagome::parachain::approval::min_or_some( @@ -260,7 +331,7 @@ namespace { s = s.advance( n_assignments, no_shows, next_no_show, last_assignment_tick); - auto const output = + const auto output = s.output(tranche, needed_approvals, n_validators, no_show_duration); state = kagome::visit_in_place( @@ -374,12 +445,12 @@ namespace { } outcome::result checkAssignmentCert( - kagome::network::CoreIndex claimed_core_index, + const scale::BitVec &claimed_core_indices, kagome::network::ValidatorIndex validator_index, const kagome::runtime::SessionInfo &config, const RelayVRFStory &relay_vrf_story, - const kagome::parachain::approval::AssignmentCert &assignment, - kagome::network::GroupIndex backing_group) { + const kagome::parachain::approval::AssignmentCertV2 &assignment, + const std::vector &backing_groups) { using namespace kagome; using parachain::ApprovalDistributionError; @@ -390,34 +461,76 @@ namespace { const auto &validator_public = config.assignment_keys[validator_index]; // OUTCOME_TRY(pk, network::ValidatorId::fromSpan(validator_public)); - if (claimed_core_index >= config.n_cores) { + if (kagome::parachain::approval::count_ones(claimed_core_indices) == 0 + || kagome::parachain::approval::count_ones(claimed_core_indices) + != backing_groups.size()) { return ApprovalDistributionError::CORE_INDEX_OUT_OF_BOUNDS; } - const auto is_in_backing = isInBackingGroup( - config.validator_groups, validator_index, backing_group); - if (is_in_backing) { - return ApprovalDistributionError::IS_IN_BACKING_GROUP; + // Check that the validator was not part of the backing group + // and not already assigned. + for (size_t claimed_core = 0, b_i = 0; + claimed_core < claimed_core_indices.bits.size(); + ++claimed_core) { + if (!claimed_core_indices.bits[claimed_core]) { + continue; + } + + const auto backing_group = backing_groups[b_i++]; + if (claimed_core >= config.n_cores) { + return ApprovalDistributionError::CORE_INDEX_OUT_OF_BOUNDS; + } + + const auto is_in_backing = isInBackingGroup( + config.validator_groups, validator_index, backing_group); + if (is_in_backing) { + return ApprovalDistributionError::IS_IN_BACKING_GROUP; + } } const auto &vrf_output = assignment.vrf.output; const auto &vrf_proof = assignment.vrf.proof; + const auto first_claimed_core_index = [&]() { + for (uint32_t i = 0; i < claimed_core_indices.bits.size(); ++i) { + if (claimed_core_indices.bits[i]) { + return i; + } + } + throw std::runtime_error( + "Unexpected bitslice content. No `true` found, but expect."); + }(); return visit_in_place( assignment.kind, + [&](const parachain::approval::RelayVRFModuloCompact &obj) + -> outcome::result { + const auto core_bitfield = obj.core_bitfield; + if (claimed_core_indices != core_bitfield) { + return ApprovalDistributionError::VRF_MODULO_CORE_INDEX_MISMATCH; + } + + /// TODO(iceseer): `vrf_verify_extra` check + /// TODO(iceseer): `relay_vrf_modulo_core` + return network::DelayTranche(0ull); + }, [&](const parachain::approval::RelayVRFModulo &obj) -> outcome::result { - auto const sample = obj.sample; + const auto sample = obj.sample; if (sample >= config.relay_vrf_modulo_samples) { return ApprovalDistributionError::SAMPLE_OUT_OF_BOUNDS; } - /// TODO(iceseer): vrf_verify_extra check + /// TODO(iceseer): `vrf_verify_extra` check + /// TODO(iceseer): `relay_vrf_modulo_core` return network::DelayTranche(0ull); }, [&](const parachain::approval::RelayVRFDelay &obj) -> outcome::result { - auto const core_index = obj.core_index; - if (core_index != claimed_core_index) { + const auto core_index = obj.core_index; + if (parachain::approval::count_ones(claimed_core_indices) != 1) { + return ApprovalDistributionError::INVALID_ARGUMENTS; + } + + if (core_index != first_claimed_core_index) { return ApprovalDistributionError::VRF_DELAY_CORE_INDEX_MISMATCH; } @@ -463,7 +576,7 @@ namespace kagome::parachain { common::MainThreadPool &main_thread_pool, LazySPtr dispute_coordinator) : approval_thread_handler_{poolHandlerReadyMake( - this, app_state_manager, approval_thread_pool, logger_)}, + this, app_state_manager, approval_thread_pool, logger_)}, worker_pool_handler_{worker_thread_pool.handler(*app_state_manager)}, parachain_host_(std::move(parachain_host)), slots_util_(std::move(slots_util)), @@ -540,6 +653,15 @@ namespace kagome::parachain { return true; } + bool ApprovalDistribution::BlockEntry::operator==(const BlockEntry &l) const { + return block_hash == l.block_hash && parent_hash == l.parent_hash + && block_number == l.block_number && session == l.session + && slot == l.slot && candidates == l.candidates + && approved_bitfield == l.approved_bitfield + && distributed_assignments == l.distributed_assignments + && children == l.children; + } + void ApprovalDistribution::store_remote_view( const libp2p::peer::PeerId &peer_id, const network::View &view) { REINVOKE(*approval_thread_handler_, store_remote_view, peer_id, view); @@ -561,7 +683,7 @@ namespace kagome::parachain { } } - unify_with_peer(storedDistribBlockEntries(), peer_id, view); + unify_with_peer(storedDistribBlockEntries(), peer_id, view, false); peer_views_[peer_id] = std::move(view); } @@ -620,9 +742,17 @@ namespace kagome::parachain { const std::shared_ptr &keystore, const runtime::SessionInfo &config, const RelayVRFStory &relay_vrf_story, - const CandidateIncludedList &leaving_cores) { + const CandidateIncludedList &leaving_cores, + bool enable_v2_assignments, + log::Logger &logger) { if (config.n_cores == 0 || config.assignment_keys.empty() || config.validator_groups.empty()) { + SL_TRACE(logger, + "Not producing assignments because config is degenerate. " + "(n_cores={}, assignments_keys={}, validators_groups={})", + config.n_cores, + config.assignment_keys.size(), + config.validator_groups.size()); return {}; } @@ -632,14 +762,21 @@ namespace kagome::parachain { return {}; } - const auto &[validator_ix, assignments_key] = *found_key; - std::vector lc; - for (const auto &[hashed_candidate_receipt, core_ix, group_ix] : - leaving_cores) { - if (isInBackingGroup(config.validator_groups, validator_ix, group_ix)) { - continue; + const auto &[index, assignments_key] = *found_key; + std::vector lc; + for (const auto &[c_hash, core, g] : leaving_cores) { + if (!isInBackingGroup(config.validator_groups, index, g)) { + lc.emplace_back(core); } - lc.push_back(core_ix); + } + + SL_TRACE(logger, + "Assigning to candidates from different backing groups. " + "(assignable_cores={})", + lc.size()); + + if (lc.empty()) { + return {}; } common::Blob keypair_buf{}; @@ -651,14 +788,237 @@ namespace kagome::parachain { std::unordered_map assignments; - computeVrfModuloAssignments( - keypair_buf, config, relay_vrf_story, lc, validator_ix, assignments); + if (enable_v2_assignments) { + computeVrfModuloAssignments_v2( + keypair_buf, config, relay_vrf_story, lc, index, assignments); + } else { + computeVrfModuloAssignments_v1( + keypair_buf, config, relay_vrf_story, lc, index, assignments); + } computeVrfDelayAssignments( - keypair_buf, config, relay_vrf_story, lc, validator_ix, assignments); + keypair_buf, config, relay_vrf_story, lc, index, assignments); return assignments; } + ApprovalDistribution::DistribApprovalEntry & + ApprovalDistribution::DistribBlockEntry::insert_approval_entry( + ApprovalDistribution::DistribApprovalEntry &&entry) { + std::ignore = approval::iter_ones( + entry.assignment_claimed_candidates, + [&](const auto claimed_candidate_index) -> outcome::result { + if (claimed_candidate_index >= candidates.size()) { + throw std::runtime_error( + fmt::format("Missing candidate entry on " + "`import_and_circulate_assignment`. (hash={}, " + "claimed_candidate_index={})", + entry.assignment.block_hash, + claimed_candidate_index)); + } + + auto &candidate_entry = candidates[claimed_candidate_index]; + std::ignore = candidate_entry.assignments.emplace( + entry.validator_index, entry.assignment_claimed_candidates); + return outcome::success(); + }); + + auto [it, _] = approval_entries.emplace( + std::make_pair(entry.validator_index, + entry.assignment_claimed_candidates), + entry); + return it->second; + } + + bool ApprovalDistribution::DistribApprovalEntry::includes_approval_candidates( + const approval::IndirectSignedApprovalVoteV2 &approval) const { + return approval::iter_ones( + getPayload(approval).candidate_indices, + [&](const auto candidate_index) -> outcome::result { + if (candidate_index < assignment_claimed_candidates.bits.size() + && assignment_claimed_candidates.bits[candidate_index]) { + return ApprovalDistributionError::BIT_FOUND; + } + return outcome::success(); + }) + .has_error(); + } + + outcome::result + ApprovalDistribution::DistribApprovalEntry::note_approval( + const approval::IndirectSignedApprovalVoteV2 &approval_val) { + const auto &approval = getPayload(approval_val); + if (validator_index != approval_val.payload.ix) { + return ApprovalDistributionError::VALIDATOR_INDEX_OUT_OF_BOUNDS; + } + + if (!includes_approval_candidates(approval_val)) { + return ApprovalDistributionError::CANDIDATE_INDEX_OUT_OF_BOUNDS; + } + + if (approvals.contains(approval.candidate_indices)) { + return ApprovalDistributionError::DUPLICATE_APPROVAL; + } + + approvals.insert_or_assign(approval.candidate_indices, approval_val); + return outcome::success(); + } + + std::vector + ApprovalDistribution::DistribBlockEntry::approval_votes( + CandidateIndex candidate_index) const { + std::unordered_map + result; + if (auto candidate_entry = utils::get(candidates, candidate_index)) { + for (const auto &[validator, assignment_bitfield] : + (*candidate_entry)->assignments) { + if (auto approval_entry = + utils::get(approval_entries, + std::make_pair(validator, assignment_bitfield))) { + for (const auto &[approved_candidates, vote] : + (*approval_entry)->second.approvals) { + if (candidate_index < approved_candidates.bits.size() + && approved_candidates.bits[candidate_index]) { + result[std::make_pair((*approval_entry)->second.validator_index, + approved_candidates)] = vote; + } + } + } + } + } + + std::vector out; + out.reserve(result.size()); + + std::transform(result.begin(), + result.end(), + std::back_inserter(out), + [](const auto it) { return it.second; }); + return out; + } + + outcome::result>> + ApprovalDistribution::DistribBlockEntry::note_approval( + const approval::IndirectSignedApprovalVoteV2 &approval_value) { + const approval::IndirectApprovalVoteV2 &approval = + getPayload(approval_value); + + std::optional required_routing; + std::unordered_set peers_randomly_routed_to; + + if (candidates.size() < approval.candidate_indices.bits.size()) { + return ApprovalDistributionError::CANDIDATE_INDEX_OUT_OF_BOUNDS; + } + + std::unordered_set covered_assignments_bitfields; + std::ignore = approval::iter_ones( + approval.candidate_indices, + [&](const auto candidate_index) -> outcome::result { + if (candidate_index < candidates.size()) { + const auto &candidate_entry = candidates[candidate_index]; + if (auto it = utils::get(candidate_entry.assignments, + approval_value.payload.ix)) { + covered_assignments_bitfields.insert((*it)->second); + } + } + return outcome::success(); + }); + + for (const auto &assignment_bitfield : covered_assignments_bitfields) { + if (auto it = utils::get( + approval_entries, + std::make_pair(approval_value.payload.ix, assignment_bitfield))) { + auto &approval_entry = (*it)->second; + OUTCOME_TRY(approval_entry.note_approval(approval_value)); + + peers_randomly_routed_to.insert( + approval_entry.routing_info.peers_randomly_routed.begin(), + approval_entry.routing_info.peers_randomly_routed.end()); + if (required_routing) { + if (*required_routing + != approval_entry.routing_info.required_routing) { + return ApprovalDistributionError:: + ASSIGNMENTS_FOLLOWED_DIFFERENT_PATH; + } + } else { + required_routing = approval_entry.routing_info.required_routing; + } + } + } + + if (required_routing) { + return std::make_pair(*required_routing, + std::move(peers_randomly_routed_to)); + } + return ApprovalDistributionError::UNKNOWN_ASSIGNMENT; + } + + std::optional + ApprovalDistribution::get_assignment_core_indices( + const approval::AssignmentCertKindV2 &assignment, + const CandidateHash &candidate_hash, + const BlockEntry &block_entry) { + return visit_in_place( + assignment, + [&](const kagome::parachain::approval::RelayVRFModuloCompact &value) + -> std::optional { return value.core_bitfield; }, + [&](const kagome::parachain::approval::RelayVRFModulo &value) + -> std::optional { + for (const auto &[core_index, h] : block_entry.candidates) { + if (candidate_hash == h) { + scale::BitVec v; + v.bits.resize(core_index + 1); + v.bits[core_index] = true; + return v; + } + } + return std::nullopt; + }, + [&](const kagome::parachain::approval::RelayVRFDelay &value) + -> std::optional { + scale::BitVec v; + v.bits.resize(value.core_index + 1); + v.bits[value.core_index] = true; + return v; + }); + } + + std::optional ApprovalDistribution::cores_to_candidate_indices( + const scale::BitVec &core_indices, const BlockEntry &block_entry) { + std::vector candidate_indices; + std::ignore = approval::iter_ones( + core_indices, + [&](const auto claimed_core_index) -> outcome::result { + for (uint32_t candidate_index = 0; + candidate_index < block_entry.candidates.size(); + ++candidate_index) { + const auto &[core_index, _] = + block_entry.candidates[candidate_index]; + if (core_index == claimed_core_index) { + candidate_indices.emplace_back(candidate_index); + return outcome::success(); + } + } + return outcome::success(); + }); + + scale::BitVec v; + for (const auto candidate_index : candidate_indices) { + if (candidate_index >= v.bits.size()) { + v.bits.resize(candidate_index + 1); + } + v.bits[candidate_index] = true; + } + + if (v.bits.empty()) { + return std::nullopt; + } + + return v; + } + void ApprovalDistribution::imported_block_info( const primitives::BlockHash &block_hash, const primitives::BlockHeader &block_header) { @@ -694,20 +1054,22 @@ namespace kagome::parachain { block_hash, std::move(context)); - for_ACU(block_hash, [this, context{std::move(context)}](auto &acu) { - auto &&[included, session, babe_config] = std::move(context); - auto &&[session_index, session_info] = std::move(session); - auto &&[epoch_number, babe_block_header, authorities, randomness] = - std::move(babe_config); - - acu.second.included_candidates = std::move(included); - acu.second.babe_epoch = epoch_number; - acu.second.babe_block_header = std::move(babe_block_header); - acu.second.authorities = std::move(authorities); - acu.second.randomness = std::move(randomness); - - this->try_process_approving_context(acu, session_index, session_info); - }); + for_ACU( + block_hash, [this, block_hash, context{std::move(context)}](auto &acu) { + auto &&[included, session, babe_config] = std::move(context); + auto &&[session_index, session_info] = std::move(session); + auto &&[epoch_number, babe_block_header, authorities, randomness] = + std::move(babe_config); + + acu.second.included_candidates = std::move(included); + acu.second.babe_epoch = epoch_number; + acu.second.babe_block_header = std::move(babe_block_header); + acu.second.authorities = std::move(authorities); + acu.second.randomness = std::move(randomness); + + this->try_process_approving_context( + acu, block_hash, session_index, session_info); + }); } template @@ -722,6 +1084,7 @@ namespace kagome::parachain { void ApprovalDistribution::try_process_approving_context( ApprovalDistribution::ApprovingContextUnit &acu, + const primitives::BlockHash &block_hash, SessionIndex session_index, const runtime::SessionInfo &session_info) { ApprovingContext &ac = acu.second; @@ -739,6 +1102,18 @@ namespace kagome::parachain { return; } + bool enable_v2_assignments = false; + if (auto r = parachain_host_->node_features(block_hash, session_index); + r.has_value()) { + if (r.value() + && r.value()->bits.size() > runtime::ParachainHost::NodeFeatureIndex:: + EnableAssignmentsV2) { + enable_v2_assignments = + r.value()->bits + [runtime::ParachainHost::NodeFeatureIndex::EnableAssignmentsV2]; + } + } + approval::UnsafeVRFOutput unsafe_vrf{ .vrf_output = ac.babe_block_header->vrf_output, .slot = ac.babe_block_header->slot_number, @@ -752,8 +1127,12 @@ namespace kagome::parachain { return; } - auto assignments = compute_assignments( - keystore_, session_info, relay_vrf, *ac.included_candidates); + auto assignments = compute_assignments(keystore_, + session_info, + relay_vrf, + *ac.included_candidates, + enable_v2_assignments, + logger_); /// TODO(iceseer): force approve impl @@ -947,6 +1326,7 @@ namespace kagome::parachain { .relay_vrf_story = std::move(block_info.relay_vrf_story), .candidates = std::move(candidates), .approved_bitfield = std::move(approved_bitfield), + .distributed_assignments = {}, .children = {}}); return entries; @@ -1078,6 +1458,7 @@ namespace kagome::parachain { .known_by = {}, .number = meta.number, .parent_hash = meta.parent_hash, + .approval_entries = {}, }); blocks_by_number_[meta.number].insert(meta.hash); } @@ -1097,7 +1478,8 @@ namespace kagome::parachain { network::View{ .heads_ = {h}, .finalized_number_ = finalized_block_number, - }); + }, + false); } } } @@ -1113,13 +1495,14 @@ namespace kagome::parachain { for (auto i = it->second.begin(); i != it->second.end(); ++i) { visit_in_place( i->second, - [&](const network::Assignment &assignment) { + [&](const network::vstaging::Assignment &assignment) { self->import_and_circulate_assignment( i->first, assignment.indirect_assignment_cert, - assignment.candidate_ix); + assignment.candidate_bitfield); }, - [&](const network::IndirectSignedApprovalVote &approval) { + [&](const network::vstaging::IndirectSignedApprovalVoteV2 + &approval) { self->import_and_circulate_approval(i->first, approval); }); } @@ -1347,8 +1730,8 @@ namespace kagome::parachain { ApprovalDistribution::AssignmentCheckResult ApprovalDistribution::check_and_import_assignment( - const approval::IndirectAssignmentCert &assignment, - CandidateIndex candidate_index) { + const approval::IndirectAssignmentCertV2 &assignment, + const scale::BitVec &candidate_indices) { BOOST_ASSERT(approval_thread_handler_->isInCurrentThread()); const auto tick_now = ::tickNow(); @@ -1362,12 +1745,12 @@ namespace kagome::parachain { session_info_res.has_value()) { opt_session_info = std::move(session_info_res.value()); } else { - logger_->warn( - "Assignment. Session info runtime request failed. (parent_hash={}, " - "session_index={}, error={})", - block_entry.parent_hash, - block_entry.session, - session_info_res.error()); + SL_WARN(logger_, + "Assignment. Session info runtime request failed. " + "(parent_hash={}, session_index={}, error={})", + block_entry.parent_hash, + block_entry.session, + session_info_res.error()); return AssignmentCheckResult::Bad; } @@ -1380,32 +1763,70 @@ namespace kagome::parachain { } runtime::SessionInfo &session_info = *opt_session_info; - if (candidate_index >= block_entry.candidates.size()) { - logger_->warn( - "Candidate index more than candidates array.(candidate index={})", - candidate_index); + const auto n_cores = size_t(session_info.n_cores); + + // Early check the candidate bitfield and core bitfields lengths < + // `n_cores`. Core bitfield length is checked later in + // `check_assignment_cert`. + if (candidate_indices.bits.size() > n_cores) { + SL_TRACE(logger_, + "Oversized bitfield. (validator={}, n_cores={}, " + "candidate_bitfield_len={})", + assignment.validator, + n_cores, + candidate_indices.bits.size()); return AssignmentCheckResult::Bad; } - auto &[claimed_core_index, assigned_candidate_hash] = - block_entry.candidates[candidate_index]; + std::vector backing_groups; + std::vector claimed_core_indices; + std::vector assigned_candidate_hashes; - GET_OPT_VALUE_OR_EXIT( - candidate_entry, - AssignmentCheckResult::Bad, - storedCandidateEntries().get(assigned_candidate_hash)); - GET_OPT_VALUE_OR_EXIT( - approval_entry, - AssignmentCheckResult::Bad, - candidate_entry.approval_entry(assignment.block_hash)); + for (size_t candidate_index = 0; + candidate_index < candidate_indices.bits.size(); + ++candidate_index) { + if (!candidate_indices.bits[candidate_index]) { + continue; + } + + auto &[claimed_core_index, assigned_candidate_hash] = + block_entry.candidates[candidate_index]; + + GET_OPT_VALUE_OR_EXIT( + candidate_entry, + AssignmentCheckResult::Bad, + storedCandidateEntries().get(assigned_candidate_hash)); + + GET_OPT_VALUE_OR_EXIT( + approval_entry, + AssignmentCheckResult::Bad, + candidate_entry.approval_entry(assignment.block_hash)); + + backing_groups.emplace_back(approval_entry.backing_group); + claimed_core_indices.emplace_back(claimed_core_index); + assigned_candidate_hashes.emplace_back(assigned_candidate_hash); + } + + // Error on null assignments. + if (claimed_core_indices.empty()) { + return AssignmentCheckResult::Bad; + } + + scale::BitVec v; + for (const auto ci : claimed_core_indices) { + if (ci >= v.bits.size()) { + v.bits.resize(ci + 1); + } + v.bits[ci] = true; + } DelayTranche tranche; - if (auto res = checkAssignmentCert(claimed_core_index, + if (auto res = checkAssignmentCert(v, assignment.validator, session_info, block_entry.relay_vrf_story, assignment.cert, - approval_entry.backing_group); + backing_groups); res.has_value()) { const auto current_tranche = ::trancheNow(config_.slot_duration_millis, block_entry.slot); @@ -1416,50 +1837,89 @@ namespace kagome::parachain { } tranche = res.value(); } else { - logger_->warn( - "Check assignment certificate failed.(error={}, candidate index={})", - res.error(), - candidate_index); + logger_->warn("Check assignment certificate failed.(error={})", + res.error()); return AssignmentCheckResult::Bad; } - const auto is_duplicate = approval_entry.is_assigned(assignment.validator); - approval_entry.import_assignment(tranche, assignment.validator, tick_now); + bool is_duplicate = true; + for (size_t candidate_index = 0, h_i = 0; + candidate_index < candidate_indices.bits.size(); + ++candidate_index) { + if (!candidate_indices.bits[candidate_index]) { + continue; + } + const auto assigned_candidate_hash = assigned_candidate_hashes[h_i++]; + GET_OPT_VALUE_OR_EXIT( + candidate_entry, + AssignmentCheckResult::Bad, + storedCandidateEntries().get(assigned_candidate_hash)); + + GET_OPT_VALUE_OR_EXIT( + approval_entry, + AssignmentCheckResult::Bad, + candidate_entry.approval_entry(assignment.block_hash)); + + is_duplicate = + is_duplicate && approval_entry.is_assigned(assignment.validator); + approval_entry.import_assignment(tranche, assignment.validator, tick_now); + + if (auto result = approval_status(block_entry, candidate_entry); result) { + schedule_wakeup_action(result->first.get(), + block_entry.block_hash, + block_entry.block_number, + assigned_candidate_hash, + result->second.block_tick, + tick_now, + result->second.required_tranches); + } + } - AssignmentCheckResult res; if (is_duplicate) { - res = AssignmentCheckResult::AcceptedDuplicate; + return AssignmentCheckResult::AcceptedDuplicate; + } else if (approval::count_ones(candidate_indices) > 1) { + SL_TRACE(logger_, + "Imported assignment for multiple cores. (validator={})", + assignment.validator); + return AssignmentCheckResult::Accepted; } else { - logger_->trace( - "Imported assignment. (validator={}, candidate hash={}, para id={})", - assignment.validator, - assigned_candidate_hash, - candidate_entry.candidate.get().descriptor.para_id); - res = AssignmentCheckResult::Accepted; - } - - if (auto result = approval_status(block_entry, candidate_entry); result) { - schedule_wakeup_action(result->first.get(), - block_entry.block_hash, - block_entry.block_number, - assigned_candidate_hash, - result->second.block_tick, - tick_now, - result->second.required_tranches); + SL_TRACE(logger_, + "Imported assignment for a single core. (validator={})", + assignment.validator); + return AssignmentCheckResult::Accepted; } - - /// TODO(iceseer): new candidate must be in db already - return res; } ApprovalDistribution::ApprovalCheckResult ApprovalDistribution::check_and_import_approval( - const network::IndirectSignedApprovalVote &approval) { + const approval::IndirectSignedApprovalVoteV2 &approval) { GET_OPT_VALUE_OR_EXIT( block_entry, ApprovalCheckResult::Bad, storedBlockEntries().get(approval.payload.payload.block_hash)); - std::optional opt_session_info{}; + + std::vector> approved_candidates_info; + auto r = approval::iter_ones( + approval.payload.payload.candidate_indices, + [&](const auto candidate_index) -> outcome::result { + if (candidate_index >= block_entry.candidates.size()) { + SL_WARN(logger_, + "Candidate index more than candidates array.(candidate " + "index={})", + candidate_index); + return ApprovalDistributionError::CANDIDATE_INDEX_OUT_OF_BOUNDS; + } + const auto &candidate = block_entry.candidates[candidate_index]; + approved_candidates_info.emplace_back(candidate_index, + candidate.second); + return outcome::success(); + }); + + if (r.has_error()) { + return ApprovalCheckResult::Bad; + } + + std::optional opt_session_info; if (auto session_info_res = parachain_host_->session_info( approval.payload.payload.block_hash, block_entry.session); session_info_res.has_value()) { @@ -1483,61 +1943,47 @@ namespace kagome::parachain { } runtime::SessionInfo &session_info = *opt_session_info; - if (approval.payload.payload.candidate_index - >= block_entry.candidates.size()) { - logger_->warn( - "Candidate index more than candidates array.(candidate index={})", - approval.payload.payload.candidate_index); - return ApprovalCheckResult::Bad; - } + const auto &pubkey = session_info.validators[approval.payload.ix]; - const auto &approved_candidate_hash = - block_entry.candidates[approval.payload.payload.candidate_index].second; - if (approval.payload.ix >= session_info.validators.size()) { - logger_->warn( - "Validator index more than validators array.(validator index={})", - approval.payload.ix); - return ApprovalCheckResult::Bad; - } + for (const auto &[approval_candidate_index, approved_candidate_hash] : + approved_candidates_info) { + GET_OPT_VALUE_OR_EXIT( + candidate_entry, + ApprovalCheckResult::Bad, + storedCandidateEntries().get(approved_candidate_hash)); - const auto &pubkey = session_info.validators[approval.payload.ix]; - GET_OPT_VALUE_OR_EXIT( - candidate_entry, - ApprovalCheckResult::Bad, - storedCandidateEntries().get(approved_candidate_hash)); - - if (auto ae = candidate_entry.approval_entry( - approval.payload.payload.block_hash)) { - if (!ae->get().is_assigned(approval.payload.ix)) { - logger_->warn( - "No assignment from validator.(block hash={}, candidate hash={}, " - "validator={})", - approval.payload.payload.block_hash, - approved_candidate_hash, - approval.payload.ix); + if (auto ae = candidate_entry.approval_entry( + approval.payload.payload.block_hash)) { + if (!ae->get().is_assigned(approval.payload.ix)) { + logger_->warn( + "No assignment from validator.(block hash={}, candidate hash={}, " + "validator={})", + approval.payload.payload.block_hash, + approved_candidate_hash, + approval.payload.ix); + return ApprovalCheckResult::Bad; + } + } else { + logger_->error("No approval entry.(block hash={}, candidate hash={})", + approval.payload.payload.block_hash, + approved_candidate_hash); return ApprovalCheckResult::Bad; } - } else { - logger_->error("No approval entry.(block hash={}, candidate hash={})", - approval.payload.payload.block_hash, - approved_candidate_hash); - return ApprovalCheckResult::Bad; - } - - SL_DEBUG(logger_, - "Importing approval vote.(validator index={}, validator id={}, " - "candidate hash={}, para id={})", - approval.payload.ix, - pubkey, - approved_candidate_hash, - candidate_entry.candidate.get().descriptor.para_id); - advance_approval_state(block_entry, - approved_candidate_hash, - candidate_entry, - approval::RemoteApproval{ - .validator_ix = approval.payload.ix, - }); + SL_DEBUG(logger_, + "Importing approval vote.(validator index={}, validator id={}, " + "candidate hash={}, para id={})", + approval.payload.ix, + pubkey, + approved_candidate_hash, + candidate_entry.candidate.get().descriptor.para_id); + advance_approval_state(block_entry, + approved_candidate_hash, + candidate_entry, + approval::RemoteApproval{ + .validator_ix = approval.payload.ix, + }); + } return ApprovalCheckResult::Accepted; } @@ -1545,11 +1991,13 @@ namespace kagome::parachain { void ApprovalDistribution::import_and_circulate_assignment( const MessageSource &source, - const approval::IndirectAssignmentCert &assignment, - CandidateIndex claimed_candidate_index) { + const approval::IndirectAssignmentCertV2 &assignment, + const scale::BitVec &claimed_candidate_indices) { BOOST_ASSERT(approval_thread_handler_->isInCurrentThread()); + const auto &block_hash = assignment.block_hash; const auto validator_index = assignment.validator; + auto opt_entry = storedDistribBlockEntries().get(block_hash); if (!opt_entry) { logger_->warn( @@ -1560,6 +2008,7 @@ namespace kagome::parachain { validator_index); return; } + auto &entry = opt_entry->get(); SL_DEBUG( logger_, @@ -1568,31 +2017,9 @@ namespace kagome::parachain { block_hash, validator_index); - auto &entry = opt_entry->get(); - if (claimed_candidate_index >= entry.candidates.size()) { - logger_->warn( - "Unexpected candidate entry. (candidate index={}, block hash={})", - claimed_candidate_index, - block_hash); - return; - } - - auto &candidate_entry = entry.candidates[claimed_candidate_index]; - if (auto it = candidate_entry.messages.find(validator_index); - it != candidate_entry.messages.end()) { - if (is_type(it->second.approval_state)) { - logger_->trace( - "Already have approved state. (candidate index={}, " - "block hash={}, validator index={})", - claimed_candidate_index, - block_hash, - validator_index); - return; - } - } - - auto message_subject{ - std::make_tuple(block_hash, claimed_candidate_index, validator_index)}; + // Compute metadata on the assignment. + auto message_subject{std::make_tuple( + block_hash, claimed_candidate_indices, validator_index)}; auto message_kind{approval::MessageKind::Assignment}; if (source) { @@ -1604,10 +2031,9 @@ namespace kagome::parachain { if (!peer_knowledge.received.insert(message_subject, message_kind)) { SL_TRACE(logger_, "Duplicate assignment. (peer id={}, block_hash={}, " - "candidate index={}, validator index={})", + "validator index={})", peer_id, std::get<0>(message_subject), - std::get<1>(message_subject), std::get<2>(message_subject)); } return; @@ -1615,10 +2041,9 @@ namespace kagome::parachain { } else { SL_WARN(logger_, "Assignment from a peer is out of view. (peer id={}, " - "block_hash={}, candidate index={}, validator index={})", + "block_hash={}, validator index={})", peer_id, std::get<0>(message_subject), - std::get<1>(message_subject), std::get<2>(message_subject)); } @@ -1633,7 +2058,7 @@ namespace kagome::parachain { } switch ( - check_and_import_assignment(assignment, claimed_candidate_index)) { + check_and_import_assignment(assignment, claimed_candidate_indices)) { case AssignmentCheckResult::Accepted: { SL_TRACE(logger_, "Assignment accepted. (peer id={}, block hash={})", @@ -1679,41 +2104,47 @@ namespace kagome::parachain { if (!entry.knowledge.insert(message_subject, message_kind)) { SL_WARN(logger_, "Importing locally an already known assignment. " - "(block_hash={}, candidate index={}, validator index={})", + "(block_hash={}, validator index={})", std::get<0>(message_subject), - std::get<1>(message_subject), std::get<2>(message_subject)); return; } SL_TRACE(logger_, - "Importing locally a new assignment. (block_hash={}, candidate " - "index={}, validator index={})", + "Importing locally a new assignment. (block_hash={}, validator " + "index={})", std::get<0>(message_subject), - std::get<1>(message_subject), std::get<2>(message_subject)); } - const auto local = !source; - [[maybe_unused]] auto &message_state = - candidate_entry.messages - .emplace(validator_index, - MessageState{ - .approval_state = assignment.cert, - .local = local, - }) - .first->second; + const auto local = !source.has_value(); + auto &approval_entry = entry.insert_approval_entry(DistribApprovalEntry{ + .assignment = assignment, + .assignment_claimed_candidates = claimed_candidate_indices, + .approvals = {}, + .validator_index = assignment.validator, + .routing_info = + ApprovalRouting{ + .required_routing = + grid::RequiredRouting{ + .value = grid::RequiredRouting:: + GridXY}, /// TODO(iceseer): calculate based on grid + .local = local, + .random_routing = {}, + .peers_randomly_routed = {}, + }, + }); + const auto n_peers_total = peer_view_->peersCount(); auto peer_filter = [&](const auto &peer, const auto &peer_kn) { - if (source && peer == source->get()) { - return false; + if (!source || peer != source->get()) { + const auto route_random = + approval_entry.routing_info.random_routing.sample(n_peers_total); + if (route_random) { + approval_entry.routing_info.mark_randomly_sent(peer); + return true; + } } - - const bool already_sent = - peer_kn.sent.contains(message_subject, - approval::MessageKind::Assignment) - || peer_kn.sent.contains(message_subject, - approval::MessageKind::Approval); - return !already_sent; + return false; }; std::unordered_set peers{}; @@ -1726,17 +2157,18 @@ namespace kagome::parachain { if (!peers.empty()) { runDistributeAssignment( - assignment, claimed_candidate_index, std::move(peers)); + assignment, claimed_candidate_indices, std::move(peers)); } } void ApprovalDistribution::import_and_circulate_approval( const MessageSource &source, - const network::IndirectSignedApprovalVote &vote) { + const approval::IndirectSignedApprovalVoteV2 &vote) { BOOST_ASSERT(approval_thread_handler_->isInCurrentThread()); const auto &block_hash = vote.payload.payload.block_hash; const auto validator_index = vote.payload.ix; - const auto candidate_index = vote.payload.payload.candidate_index; + const auto &candidate_indices = vote.payload.payload.candidate_indices; + auto opt_entry = storedDistribBlockEntries().get(block_hash); if (!opt_entry) { logger_->info( @@ -1755,33 +2187,8 @@ namespace kagome::parachain { validator_index); auto &entry = opt_entry->get(); - if (candidate_index >= entry.candidates.size()) { - logger_->warn( - "Unexpected candidate entry in import approval. (candidate index={}, " - "block hash={}, validator index={})", - candidate_index, - block_hash, - validator_index); - return; - } - - auto &candidate_entry = entry.candidates[candidate_index]; - if (auto it = candidate_entry.messages.find(validator_index); - it != candidate_entry.messages.end()) { - if (kagome::is_type( - it->second.approval_state)) { - logger_->trace( - "Duplicate message. (candidate index={}, " - "block hash={}, validator index={})", - candidate_index, - block_hash, - validator_index); - return; - } - } - auto message_subject{ - std::make_tuple(block_hash, candidate_index, validator_index)}; + std::make_tuple(block_hash, candidate_indices, validator_index)}; auto message_kind{approval::MessageKind::Approval}; if (source) { @@ -1790,10 +2197,9 @@ namespace kagome::parachain { approval::MessageKind::Assignment)) { SL_TRACE(logger_, "Unknown approval assignment. (peer id={}, block hash={}, " - "candidate={}, validator={})", + "validator={})", peer_id, std::get<0>(message_subject), - std::get<1>(message_subject), std::get<2>(message_subject)); return; } @@ -1805,10 +2211,9 @@ namespace kagome::parachain { if (!peer_knowledge.received.insert(message_subject, message_kind)) { SL_TRACE(logger_, "Duplicate approval. (peer id={}, block_hash={}, " - "candidate index={}, validator index={})", + "validator index={})", peer_id, std::get<0>(message_subject), - std::get<1>(message_subject), std::get<2>(message_subject)); } return; @@ -1816,21 +2221,18 @@ namespace kagome::parachain { } else { SL_TRACE(logger_, "Approval from a peer is out of view. (peer id={}, " - "block_hash={}, candidate index={}, validator index={})", + "block_hash={}, validator index={})", peer_id, std::get<0>(message_subject), - std::get<1>(message_subject), std::get<2>(message_subject)); } /// if the approval is known to be valid, reward the peer if (entry.knowledge.contains(message_subject, message_kind)) { SL_TRACE(logger_, - "Known approval. (peer id={}, block hash={}, " - "candidate={}, validator={})", + "Known approval. (peer id={}, block hash={}, validator={})", peer_id, std::get<0>(message_subject), - std::get<1>(message_subject), std::get<2>(message_subject)); if (auto it = entry.known_by.find(peer_id); @@ -1862,43 +2264,40 @@ namespace kagome::parachain { // again SL_WARN(logger_, "Importing locally an already known approval. " - "(block_hash={}, candidate index={}, validator index={})", + "(block_hash={}, validator index={})", std::get<0>(message_subject), - std::get<1>(message_subject), std::get<2>(message_subject)); return; } SL_TRACE(logger_, - "Importing locally a new approval. (block_hash={}, candidate " - "index={}, validator index={})", + "Importing locally a new approval. (block_hash={}, validator " + "index={})", std::get<0>(message_subject), - std::get<1>(message_subject), std::get<2>(message_subject)); } - if (auto it = candidate_entry.messages.find(validator_index); - it != candidate_entry.messages.end()) { - auto cert{ - boost::get(&it->second.approval_state)}; - BOOST_ASSERT(cert); - it->second.approval_state = - DistribApprovalStateApproved{*cert, vote.signature}; - } else { - logger_->warn( - "Importing an approval we don't have an assignment for. (candidate " - "index={}, block hash={}, validator index={})", - candidate_index, - block_hash, - validator_index); + std::pair> + nar; + if (auto res = entry.note_approval(vote); res.has_error()) { + SL_WARN(logger_, + "Possible bug: Vote import failed. (hash={}, validator_index={}, " + "error={})", + block_hash, + validator_index, + res.error()); return; + } else { + nar = std::move(res.value()); } auto peer_filter = [&](const auto &peer, const auto &peer_kn) { + const auto &[_, pr] = nar; if (source && peer == source->get()) { return false; } - return peer_kn.sent.contains(message_subject, - approval::MessageKind::Assignment); + + /// TODO(iceseer): topology + return pr.contains(peer); }; std::unordered_set peers{}; @@ -1914,6 +2313,64 @@ namespace kagome::parachain { } } + network::vstaging::Approvals ApprovalDistribution::sanitize_v1_approvals( + const network::Approvals &approvals) { + network::vstaging::Approvals sanitized_approvals; + for (const auto &approval : approvals.approvals) { + if (size_t(approval.payload.payload.candidate_index) > kMaxBitfieldSize) { + SL_DEBUG(logger_, + "Bad approval v1, invalid candidate index. (block_hash={}, " + "candidate_index={})", + approval.payload.payload.block_hash, + approval.payload.payload.candidate_index); + } else { + sanitized_approvals.approvals.emplace_back( + ::kagome::parachain::approval::from(approval)); + } + } + return sanitized_approvals; + } + + network::vstaging::Assignments ApprovalDistribution::sanitize_v1_assignments( + const network::Assignments &assignments) { + network::vstaging::Assignments sanitized_assignments; + for (const auto &assignment : assignments.assignments) { + const auto &cert = assignment.indirect_assignment_cert; + const auto &candidate_index = assignment.candidate_ix; + + const auto cert_bitfield_bits = visit_in_place( + cert.cert.kind, + [](const approval::RelayVRFDelay &v) { + return size_t(v.core_index) + 1; + }, + [&](const approval::RelayVRFModulo &v) { + return size_t(candidate_index) + 1; + }); + + const auto candidate_bitfield_bits = size_t(candidate_index) + 1; + if (cert_bitfield_bits > kMaxBitfieldSize + || candidate_bitfield_bits > kMaxBitfieldSize) { + SL_DEBUG(logger_, + "Bad assignment v1, invalid candidate index. (block_hash={}, " + "candidate_index={}, validator_index={})", + cert.block_hash, + candidate_index, + cert.validator); + } else { + scale::BitVec v; + v.bits.resize(candidate_index + 1); + v.bits[candidate_index] = true; + sanitized_assignments.assignments.emplace_back( + network::vstaging::Assignment{ + .indirect_assignment_cert = + approval::IndirectAssignmentCertV2::from(cert), + .candidate_bitfield = std::move(v), + }); + } + } + return sanitized_assignments; + } + void ApprovalDistribution::getApprovalSignaturesForCandidate( const CandidateHash &candidate_hash, SignaturesForCandidateCallback &&callback) { @@ -1940,49 +2397,45 @@ namespace kagome::parachain { SignaturesForCandidate all_sigs; for (const auto &[hash, _] : entry.block_assignments) { - if (auto block_entry = storedBlockEntries().get(hash)) { - for (size_t candidate_index = 0ull; - candidate_index < block_entry->get().candidates.size(); - ++candidate_index) { - const auto &[_core_index, c_hash] = - block_entry->get().candidates[candidate_index]; - if (c_hash == candidate_hash) { - const auto index = candidate_index; - if (auto distrib_block_entry = - storedDistribBlockEntries().get(hash)) { - if (index < distrib_block_entry->get().candidates.size()) { - const auto &candidate_entry = - distrib_block_entry->get().candidates[index]; - for (const auto &[validator_index, message_state] : - candidate_entry.messages) { - if (auto approval_state = - if_type( - message_state.approval_state)) { - const auto &[__, sig] = approval_state->get(); - all_sigs[validator_index] = sig; - } - } - } else { - SL_DEBUG(logger_, - "`getApprovalSignaturesForCandidate`: could not find " - "candidate entry for given hash and index!. (hash={}, " - "index={})", - hash, - index); - } - } else { - SL_DEBUG(logger_, - "`getApprovalSignaturesForCandidate`: could not find " - "block entry for given hash!. (hash={})", - hash); - } - } - } - } else { + auto block_entry = storedBlockEntries().get(hash); + if (!block_entry) { SL_DEBUG(logger_, "Block entry for assignment missing. (candidate={}, hash={})", candidate_hash, hash); + continue; + } + + for (size_t candidate_index = 0ull; + candidate_index < block_entry->get().candidates.size(); + ++candidate_index) { + const auto &[_core_index, c_hash] = + block_entry->get().candidates[candidate_index]; + if (c_hash == candidate_hash) { + const auto index = candidate_index; + auto distrib_block_entry = storedDistribBlockEntries().get(hash); + if (!distrib_block_entry) { + SL_DEBUG(logger_, + "`getApprovalSignaturesForCandidate`: could not find " + "block entry for given hash!. (hash={})", + hash); + continue; + } + + for (auto approval : + distrib_block_entry->get().approval_votes(index)) { + std::vector ixs; + std::ignore = approval::iter_ones( + getPayload(approval).candidate_indices, + [&](const auto val) -> outcome::result { + ixs.emplace_back(val); + return outcome::success(); + }); + + all_sigs[approval.payload.ix] = + std::make_tuple(hash, std::move(ixs), approval.signature); + } + } } } callback(std::move(all_sigs)); @@ -1998,11 +2451,23 @@ namespace kagome::parachain { return; } - std::optional< - std::reference_wrapper> - m = visit_in_place(message, [](const auto &val) { - return if_type(val); - }); + auto m = [&]() -> std::optional> { + auto m = + if_type(message); + if (!m) { + SL_TRACE(logger_, "Received V1 message.(peer_id={})", peer_id); + return std::nullopt; + } + + auto a = if_type( + m->get()); + if (!a) { + return std::nullopt; + } + + return a; + }(); if (!m) { return; @@ -2010,22 +2475,21 @@ namespace kagome::parachain { visit_in_place( m->get(), - [&](const network::Assignments &assignments) { + [&](const network::vstaging::Assignments &assignments) { SL_TRACE(logger_, "Received assignments.(peer_id={}, count={})", peer_id, assignments.assignments.size()); - for (auto const &assignment : assignments.assignments) { + for (const auto &assignment : assignments.assignments) { if (auto it = pending_known_.find( assignment.indirect_assignment_cert.block_hash); it != pending_known_.end()) { - SL_TRACE(logger_, - "Pending assignment.(block hash={}, claimed index={}, " - "validator={}, peer={})", - assignment.indirect_assignment_cert.block_hash, - assignment.candidate_ix, - assignment.indirect_assignment_cert.validator, - peer_id); + SL_TRACE( + logger_, + "Pending assignment.(block hash={}, validator={}, peer={})", + assignment.indirect_assignment_cert.block_hash, + assignment.indirect_assignment_cert.validator, + peer_id); it->second.emplace_back( std::make_pair(peer_id, PendingMessage{assignment})); continue; @@ -2033,25 +2497,24 @@ namespace kagome::parachain { import_and_circulate_assignment(peer_id, assignment.indirect_assignment_cert, - assignment.candidate_ix); + assignment.candidate_bitfield); } }, - [&](const network::Approvals &approvals) { + [&](const network::vstaging::Approvals &approvals) { SL_TRACE(logger_, "Received approvals.(peer_id={}, count={})", peer_id, approvals.approvals.size()); - for (auto const &approval_vote : approvals.approvals) { + for (const auto &approval_vote : approvals.approvals) { if (auto it = pending_known_.find( approval_vote.payload.payload.block_hash); it != pending_known_.end()) { - SL_TRACE(logger_, - "Pending approval.(block hash={}, candidate index={}, " - "validator={}, peer={})", - approval_vote.payload.payload.block_hash, - approval_vote.payload.payload.candidate_index, - approval_vote.payload.ix, - peer_id); + SL_TRACE( + logger_, + "Pending approval.(block hash={}, validator={}, peer={})", + approval_vote.payload.payload.block_hash, + approval_vote.payload.ix, + peer_id); it->second.emplace_back( std::make_pair(peer_id, PendingMessage{approval_vote})); continue; @@ -2064,20 +2527,18 @@ namespace kagome::parachain { } void ApprovalDistribution::runDistributeAssignment( - const approval::IndirectAssignmentCert &indirect_cert, - CandidateIndex candidate_index, + const approval::IndirectAssignmentCertV2 &indirect_cert, + const scale::BitVec &candidate_indices, std::unordered_set &&peers) { REINVOKE(*main_pool_handler_, runDistributeAssignment, indirect_cert, - candidate_index, + candidate_indices, std::move(peers)); SL_DEBUG(logger_, - "Distributing assignment on candidate (block hash={}, candidate " - "index={})", - indirect_cert.block_hash, - candidate_index); + "Distributing assignment on candidate (block hash={})", + indirect_cert.block_hash); auto se = pm_->getStreamEngine(); BOOST_ASSERT(se); @@ -2085,17 +2546,18 @@ namespace kagome::parachain { se->broadcast( router_->getValidationProtocolVStaging(), std::make_shared< - network::WireMessage>( - network::ApprovalDistributionMessage{network::Assignments{ - .assignments = {network::Assignment{ - .indirect_assignment_cert = indirect_cert, - .candidate_ix = candidate_index, - }}}}), + network::WireMessage>( + network::vstaging::ApprovalDistributionMessage{ + network::vstaging::Assignments{ + .assignments = {network::vstaging::Assignment{ + .indirect_assignment_cert = indirect_cert, + .candidate_bitfield = candidate_indices, + }}}}), [&](const libp2p::peer::PeerId &p) { return peers.count(p) != 0ull; }); } void ApprovalDistribution::send_assignments_batched( - std::deque &&assignments, + std::deque &&assignments, const libp2p::peer::PeerId &peer_id) { REINVOKE(*main_pool_handler_, send_assignments_batched, @@ -2126,10 +2588,12 @@ namespace kagome::parachain { : assignments.end(); auto msg = std::make_shared< - network::WireMessage>( - network::ApprovalDistributionMessage{network::Assignments{ - .assignments = std::vector(begin, end), - }}); + network::WireMessage>( + network::vstaging::ApprovalDistributionMessage{ + network::vstaging::Assignments{ + .assignments = + std::vector(begin, end), + }}); se->send(peer_id, router_->getValidationProtocolVStaging(), msg); assignments.erase(begin, end); @@ -2137,7 +2601,7 @@ namespace kagome::parachain { } void ApprovalDistribution::send_approvals_batched( - std::deque &&approvals, + std::deque &&approvals, const libp2p::peer::PeerId &peer_id) { REINVOKE(*main_pool_handler_, send_approvals_batched, @@ -2173,11 +2637,13 @@ namespace kagome::parachain { : approvals.end(); auto msg = std::make_shared< - network::WireMessage>( - network::ApprovalDistributionMessage{network::Approvals{ - .approvals = - std::vector(begin, end), - }}); + network::WireMessage>( + network::vstaging::ApprovalDistributionMessage{ + network::vstaging::Approvals{ + .approvals = + std::vector(begin, + end), + }}); se->send(peer_id, router_->getValidationProtocolVStaging(), msg); approvals.erase(begin, end); @@ -2185,16 +2651,15 @@ namespace kagome::parachain { } void ApprovalDistribution::runDistributeApproval( - const network::IndirectSignedApprovalVote &vote, + const approval::IndirectSignedApprovalVoteV2 &vote, std::unordered_set &&peers) { REINVOKE( *main_pool_handler_, runDistributeApproval, vote, std::move(peers)); - logger_->info( - "Sending an approval to peers. (block={}, index={}, num peers={})", - vote.payload.payload.block_hash, - vote.payload.payload.candidate_index, - peers.size()); + SL_INFO(logger_, + "Sending an approval to peers. (block={}, num peers={})", + vote.payload.payload.block_hash, + peers.size()); auto se = pm_->getStreamEngine(); BOOST_ASSERT(se); @@ -2202,10 +2667,11 @@ namespace kagome::parachain { se->broadcast( router_->getValidationProtocolVStaging(), std::make_shared< - network::WireMessage>( - network::ApprovalDistributionMessage{network::Approvals{ - .approvals = {vote}, - }}), + network::WireMessage>( + network::vstaging::ApprovalDistributionMessage{ + network::vstaging::Approvals{ + .approvals = {vote}, + }}), [&](const libp2p::peer::PeerId &p) { return peers.count(p) != 0ull; }); } @@ -2302,15 +2768,19 @@ namespace kagome::parachain { .validator_sig = *sig, }); + scale::BitVec v; + v.bits.resize(*candidate_index + 1); + v.bits[*candidate_index] = true; + import_and_circulate_approval( std::nullopt, - network::IndirectSignedApprovalVote{ + approval::IndirectSignedApprovalVoteV2{ .payload = { .payload = - network::ApprovalVote{ + approval::IndirectApprovalVoteV2{ .block_hash = block_hash, - .candidate_index = *candidate_index, + .candidate_indices = std::move(v), }, .ix = validator_index, }, @@ -2344,19 +2814,22 @@ namespace kagome::parachain { } void ApprovalDistribution::runLaunchApproval( - const approval::IndirectAssignmentCert &indirect_cert, + const approval::IndirectAssignmentCertV2 &indirect_cert, DelayTranche assignment_tranche, const RelayHash &relay_block_hash, - CandidateIndex candidate_index, + const scale::BitVec &claimed_candidate_indices, SessionIndex session, const HashedCandidateReceipt &hashed_candidate, - GroupIndex backing_group) { + GroupIndex backing_group, + bool distribute_assignment) { /// TODO(iceseer): don't launch approval work if the node is syncing. const auto &block_hash = indirect_cert.block_hash; const auto validator_index = indirect_cert.validator; - import_and_circulate_assignment( - std::nullopt, indirect_cert, candidate_index); + if (distribute_assignment) { + import_and_circulate_assignment( + std::nullopt, indirect_cert, claimed_candidate_indices); + } std::optional approval_state = approvals_cache_.exclusiveAccess( @@ -2403,19 +2876,20 @@ namespace kagome::parachain { return std::nullopt; }, [&tick_now](const approval::ExactRequiredTranche &e) { - auto filter = [](Tick const &t, Tick const &ref) { + auto filter = [](const Tick &t, const Tick &ref) { return ((t > ref) ? std::optional{t} : std::optional{}); }; return approval::min_or_some( e.next_no_show, - (e.last_assignment_tick ? filter( - *e.last_assignment_tick + kApprovalDelay, tick_now) - : std::optional{})); + (e.last_assignment_tick + ? filter(*e.last_assignment_tick + kApprovalDelay, + tick_now) + : std::optional{})); }, [&](const approval::PendingRequiredTranche &e) { std::optional next_announced{}; - for (auto const &t : approval_entry.tranches) { + for (const auto &t : approval_entry.tranches) { if (t.tranche > e.considered) { next_announced = t.tranche; break; @@ -2640,6 +3114,7 @@ namespace kagome::parachain { auto &block_entry = opt_block_entry->get(); auto &candidate_entry = opt_candidate_entry->get(); + std::optional opt_session_info{}; if (auto session_info_res = parachain_host_->session_info( block_entry.parent_hash, block_entry.session); @@ -2707,27 +3182,53 @@ namespace kagome::parachain { if (maybe_cert) { const auto &[cert, val_index, tranche] = *maybe_cert; - approval::IndirectAssignmentCert indirect_cert{ + approval::IndirectAssignmentCertV2 indirect_cert{ .block_hash = block_hash, .validator = val_index, .cert = cert.get(), }; - if (auto i = block_entry.candidateIxByHash(candidate_hash)) { - SL_TRACE(logger_, - "Launching approval work. (candidate_hash={}, para_id={}, " - "block_hash={})", - candidate_hash, - candidate_receipt.descriptor.para_id, - block_hash); - - runLaunchApproval(indirect_cert, - tranche, - block_hash, - CandidateIndex(*i), - block_entry.session, - candidate_entry.candidate, - backing_group); + SL_TRACE(logger_, + "Launching approval work. (candidate_hash={}, para_id={}, " + "block_hash={})", + candidate_hash, + candidate_receipt.descriptor.para_id, + block_hash); + + if (auto claimed_core_indices = get_assignment_core_indices( + indirect_cert.cert.kind, candidate_hash, block_entry)) { + if (auto claimed_candidate_indices = cores_to_candidate_indices( + *claimed_core_indices, block_entry)) { + bool distribute_assignment; + if (approval::count_ones(*claimed_candidate_indices) > 1) { + distribute_assignment = !block_entry.mark_assignment_distributed( + *claimed_candidate_indices); + } else { + distribute_assignment = true; + } + + BOOST_ASSERT(storedBlockEntries().get(block_hash)->get() + == block_entry); + runLaunchApproval(indirect_cert, + tranche, + block_hash, + *claimed_candidate_indices, + block_entry.session, + candidate_entry.candidate, + backing_group, + distribute_assignment); + + } else { + SL_WARN(logger_, + "Failed to create assignment bitfield. (block_hash={})", + block_hash); + } + } else { + SL_WARN(logger_, + "Cannot get assignment claimed core indices. " + "(candidate_hash={}, block_hash={})", + candidate_hash, + block_hash); } } @@ -2740,9 +3241,10 @@ namespace kagome::parachain { void ApprovalDistribution::unify_with_peer( StoreUnit> &entries, const libp2p::peer::PeerId &peer_id, - const network::View &view) { - std::deque assignments_to_send; - std::deque approvals_to_send; + const network::View &view, + bool retry_known_blocks) { + std::deque assignments_to_send; + std::deque approvals_to_send; const auto view_finalized_number = view.finalized_number_; for (const auto &head : view.heads_) { @@ -2754,72 +3256,48 @@ namespace kagome::parachain { } auto &entry = opt_entry->get(); - if (entry.known_by.count(peer_id) != 0ull) { + if (entry.known_by.count(peer_id) != 0ull && !retry_known_blocks) { break; } auto &peer_knowledge = entry.known_by[peer_id]; - for (uint32_t candidate_index = 0; - candidate_index < entry.candidates.size(); - ++candidate_index) { - auto &c = entry.candidates[candidate_index]; - for (auto &[validator, message_state] : c.messages) { - auto message_subject{ - std::make_tuple(block, candidate_index, validator)}; - const auto &[ref_cert, opt_ref_approval_sig] = visit_in_place( - message_state.approval_state, - [](const DistribApprovalStateAssigned &cert) - -> std::pair< - std::reference_wrapper, - std::optional< - std::reference_wrapper>> { - return std::make_pair(std::cref(cert), std::nullopt); - }, - [](const DistribApprovalStateApproved &val) - -> std::pair< - std::reference_wrapper, - std::optional< - std::reference_wrapper>> { - const auto &[cert, sig] = val; - return std::make_pair(std::cref(cert), std::cref(sig)); - }); + for (auto &[_, approval_entry] : entry.approval_entries) { + [[maybe_unused]] const auto &required_routing = + approval_entry.routing_info.required_routing; + [[maybe_unused]] auto &routing_info = approval_entry.routing_info; + + auto peer_filter = [&](const libp2p::peer::PeerId &) { + /// TODO(iceseer): check topology + return true; + }; - if (!peer_knowledge.contains(message_subject, - approval::MessageKind::Assignment)) { - peer_knowledge.sent.insert(message_subject, - approval::MessageKind::Assignment); - - assignments_to_send.emplace_back(network::Assignment{ - .indirect_assignment_cert = - approval::IndirectAssignmentCert{ - .block_hash = block, - .validator = validator, - .cert = ref_cert.get(), - }, - .candidate_ix = candidate_index, - }); - } + if (!peer_filter(peer_id)) { + continue; + } - if (opt_ref_approval_sig) { - if (!peer_knowledge.contains(message_subject, - approval::MessageKind::Approval)) { - peer_knowledge.sent.insert(message_subject, - approval::MessageKind::Approval); - - approvals_to_send.emplace_back( - network::IndirectSignedApprovalVote{ - .payload = - { - .payload = - network::ApprovalVote{ - .block_hash = block, - .candidate_index = candidate_index, - }, - .ix = validator, - }, - .signature = opt_ref_approval_sig->get(), - }); - } + const auto assignment_message = approval_entry.get_assignment(); + const auto approval_messages = approval_entry.get_approvals(); + const auto [assignment_knowledge, message_kind] = + approval_entry.create_assignment_knowledge(block); + + if (!peer_knowledge.contains(assignment_knowledge, message_kind)) { + peer_knowledge.sent.insert(assignment_knowledge, message_kind); + assignments_to_send.emplace_back(network::vstaging::Assignment{ + .indirect_assignment_cert = assignment_message.first, + .candidate_bitfield = assignment_message.second, + }); + } + + for (const auto &approval_message : approval_messages) { + auto approval_knowledge = + approval::PeerKnowledge::generate_approval_key( + approval_message); + + if (!peer_knowledge.contains(approval_knowledge.first, + approval_knowledge.second)) { + approvals_to_send.emplace_back(approval_message); + peer_knowledge.sent.insert(approval_knowledge.first, + approval_knowledge.second); } } } diff --git a/core/parachain/approval/approval_distribution.hpp b/core/parachain/approval/approval_distribution.hpp index 6e4346c815..0b128a02de 100644 --- a/core/parachain/approval/approval_distribution.hpp +++ b/core/parachain/approval/approval_distribution.hpp @@ -32,6 +32,7 @@ #include "parachain/approval/knowledge.hpp" #include "parachain/approval/store.hpp" #include "parachain/availability/recovery/recovery.hpp" +#include "parachain/backing/grid.hpp" #include "parachain/validator/parachain_processor.hpp" #include "runtime/runtime_api/parachain_host.hpp" #include "runtime/runtime_api/parachain_host_types.hpp" @@ -76,7 +77,7 @@ namespace kagome::parachain { public IApprovedAncestor { struct OurAssignment { SCALE_TIE(4); - approval::AssignmentCert cert; + approval::AssignmentCertV2 cert; uint32_t tranche; ValidatorIndex validator_index; bool triggered; /// Whether the assignment has been triggered already. @@ -96,10 +97,22 @@ namespace kagome::parachain { std::vector> assignments; }; + using DistribApprovalEntryKey = std::pair; + + struct ApprovalEntryHash { + size_t operator()(const DistribApprovalEntryKey &obj) const { + size_t value{0ull}; + boost::hash_combine(value, obj.first); + boost::hash_range( + value, obj.second.bits.begin(), obj.second.bits.end()); + return value; + } + }; + struct ApprovalEntry { SCALE_TIE(6); using MaybeCert = std::optional< - std::tuple, + std::tuple, ValidatorIndex, DelayTranche>>; @@ -161,7 +174,7 @@ namespace kagome::parachain { import_assignment( our_assignment->tranche, our_assignment->validator_index, tick_now); return std::make_tuple( - std::reference_wrapper( + std::reference_wrapper( our_assignment->cert), our_assignment->validator_index, our_assignment->tranche); @@ -176,10 +189,10 @@ namespace kagome::parachain { tranches.begin(), tranches.end(), tranche, - [](auto const &l, auto const &r) { return l.tranche < r; }); + [](const auto &l, const auto &r) { return l.tranche < r; }); if (it != tranches.end()) { - auto const pos = (size_t)std::distance(tranches.begin(), it); + const auto pos = (size_t)std::distance(tranches.begin(), it); if (it->tranche > tranche) { tranches.insert(it, TrancheEntry{ @@ -220,8 +233,9 @@ namespace kagome::parachain { CandidateEntry(const network::CandidateReceipt &receipt, SessionIndex session_index, size_t approvals_size) - : CandidateEntry( - HashedCandidateReceipt{receipt}, session_index, approvals_size) {} + : CandidateEntry(HashedCandidateReceipt{receipt}, + session_index, + approvals_size) {} std::optional> approval_entry( const network::RelayHash &relay_hash) { @@ -253,7 +267,7 @@ namespace kagome::parachain { if (block_assignments.size() != c.block_assignments.size()) { return false; } - for (auto const &[h, ae] : block_assignments) { + for (const auto &[h, ae] : block_assignments) { auto it = c.block_assignments.find(h); if (it == c.block_assignments.end() || it->second != ae) { return false; @@ -301,14 +315,17 @@ namespace kagome::parachain { const std::shared_ptr &keystore, const runtime::SessionInfo &config, const RelayVRFStory &relay_vrf_story, - const CandidateIncludedList &leaving_cores); + const CandidateIncludedList &leaving_cores, + bool enable_v2_assignments, + log::Logger &logger); void onValidationProtocolMsg( const libp2p::peer::PeerId &peer_id, const network::VersionedValidatorProtocolMessage &message); - using SignaturesForCandidate = - std::unordered_map; + using SignaturesForCandidate = std::unordered_map< + ValidatorIndex, + std::tuple, ValidatorSignature>>; using SignaturesForCandidateCallback = std::function; @@ -320,7 +337,6 @@ namespace kagome::parachain { const primitives::BlockInfo &min, const primitives::BlockInfo &max) const override; - private: struct ImportedBlockInfo { CandidateIncludedList included_candidates; SessionIndex session_index; @@ -348,9 +364,9 @@ namespace kagome::parachain { } }; - using DistribApprovalStateAssigned = approval::AssignmentCert; + using DistribApprovalStateAssigned = approval::AssignmentCertV2; using DistribApprovalStateApproved = - std::pair; + std::pair; using DistribApprovalState = boost::variant; @@ -368,7 +384,81 @@ namespace kagome::parachain { /// for the same candidate, if it is included by multiple blocks - this is /// likely the case when there are forks. struct DistribCandidateEntry { - std::unordered_map messages{}; + /// The value represents part of the lookup key in `approval_entries` to + /// fetch the assignment + /// and existing votes. + std::unordered_map assignments; + }; + + /// Contains topology routing information for assignments and approvals. + struct ApprovalRouting { + grid::RequiredRouting required_routing; + bool local; + grid::RandomRouting random_routing; + std::vector peers_randomly_routed; + + void mark_randomly_sent(const libp2p::peer::PeerId &peer) { + random_routing.inc_sent(); + peers_randomly_routed.emplace_back(peer); + } + }; + + // This struct is responsible for tracking the full state of an assignment + // and grid routing information. + struct DistribApprovalEntry { + // The assignment certificate. + approval::IndirectAssignmentCertV2 assignment; + + // The candidates claimed by the certificate. A mapping between bit index + // and candidate index. + scale::BitVec assignment_claimed_candidates; + + // The approval signatures for each `CandidateIndex` claimed by the + // assignment certificate. + std::unordered_map + approvals; + + // The validator index of the assignment signer. + ValidatorIndex validator_index; + + // Information required for gossiping to other peers using the grid + // topology. + ApprovalRouting routing_info; + + std::pair + get_assignment() const { + return {assignment, assignment_claimed_candidates}; + } + + /// Records a new approval. Returns error if the claimed candidate is not + /// found or we already have received the approval. + outcome::result note_approval( + const approval::IndirectSignedApprovalVoteV2 &approval); + + /// Tells if this entry assignment covers at least one candidate in the + /// approval + bool includes_approval_candidates( + const approval::IndirectSignedApprovalVoteV2 &approval_val) const; + + // Get all approvals for all candidates claimed by the assignment. + std::vector get_approvals() + const { + std::vector out; + out.reserve(approvals.size()); + std::transform(approvals.begin(), + approvals.end(), + std::back_inserter(out), + [](const auto it) { return it.second; }); + return out; + } + + // Create a `MessageSubject` to reference the assignment. + std::pair + create_assignment_knowledge(const Hash &block_hash) const { + return {std::make_tuple( + block_hash, assignment_claimed_candidates, validator_index), + approval::MessageKind::Assignment}; + } }; /// Information about blocks in our current view as well as whether peers @@ -376,16 +466,43 @@ namespace kagome::parachain { struct DistribBlockEntry { /// A votes entry for each candidate indexed by [`CandidateIndex`]. std::vector candidates{}; + /// Our knowledge of messages. approval::Knowledge knowledge{}; + /// Peers who we know are aware of this block and thus, the candidates /// within it. This maps to their knowledge of messages. std::unordered_map known_by{}; + /// The number of the block. primitives::BlockNumber number; + /// The parent hash of the block. RelayHash parent_hash; + + /// Approval entries for whole block. These also contain all approvals in + /// the case of multiple candidates being claimed by assignments. + std::unordered_map + approval_entries; + + public: + DistribApprovalEntry &insert_approval_entry(DistribApprovalEntry &&entry); + + // Saves the given approval in all ApprovalEntries that contain an + // assignment for any of the candidates in the approval. + // + // Returns the required routing needed for this approval and the lit of + // random peers the covering assignments were sent. + outcome::result>> + note_approval(const approval::IndirectSignedApprovalVoteV2 &approval); + + /// Returns the list of approval votes covering this candidate + std::vector approval_votes( + CandidateIndex candidate_index) const; }; /// Metadata regarding approval of a particular block, by way of approval of @@ -397,16 +514,26 @@ namespace kagome::parachain { SessionIndex session; consensus::SlotNumber slot; RelayVRFStory relay_vrf_story; + // The candidates included as-of this block and the index of the core they // are leaving. Sorted ascending by core index. std::vector> candidates; + // A bitfield where the i'th bit corresponds to the i'th candidate in // `candidates`. The i'th bit is `true` iff the candidate has been // approved in the context of this block. The block can be considered // approved if the bitfield has all bits set to `true`. scale::BitVec approved_bitfield; + + // A list of assignments for which we already distributed the assignment. + // We use this to ensure we don't distribute multiple core assignments + // twice as we track individual wakeups for each core. + scale::BitVec distributed_assignments; + std::vector children; + bool operator==(const BlockEntry &l) const; + std::optional candidateIxByHash( const CandidateHash &candidate_hash) { for (size_t ix = 0ul; ix < candidates.size(); ++ix) { @@ -439,6 +566,26 @@ namespace kagome::parachain { } return false; } + + /// Mark distributed assignment for many candidate indices. + /// Returns `true` if an assignment was already distributed for the + /// `candidates`. + bool mark_assignment_distributed(const scale::BitVec &bitfield) { + const auto total_one_bits = + approval::count_ones(distributed_assignments); + const auto new_len = + std::max(distributed_assignments.bits.size(), bitfield.bits.size()); + + distributed_assignments.bits.resize(new_len); + for (size_t ix = 0; ix < bitfield.bits.size(); ++ix) { + distributed_assignments.bits[ix] = + distributed_assignments.bits[ix] || bitfield.bits[ix]; + } + + const auto distributed = + (total_one_bits == approval::count_ones(distributed_assignments)); + return distributed; + } }; /// Information about a block and imported candidates. @@ -452,8 +599,8 @@ namespace kagome::parachain { }; using AssignmentOrApproval = - boost::variant; + boost::variant; using PendingMessage = AssignmentOrApproval; using MessageSource = @@ -504,17 +651,35 @@ namespace kagome::parachain { const primitives::BlockHeader &block_header); AssignmentCheckResult check_and_import_assignment( - const approval::IndirectAssignmentCert &assignment, - CandidateIndex claimed_candidate_index); + const approval::IndirectAssignmentCertV2 &assignment, + const scale::BitVec &candidate_indices); ApprovalCheckResult check_and_import_approval( - const network::IndirectSignedApprovalVote &vote); + const approval::IndirectSignedApprovalVoteV2 &vote); void import_and_circulate_assignment( const MessageSource &source, - const approval::IndirectAssignmentCert &assignment, - CandidateIndex claimed_candidate_index); + const approval::IndirectAssignmentCertV2 &assignment, + const scale::BitVec &claimed_candidate_indices); void import_and_circulate_approval( const MessageSource &source, - const network::IndirectSignedApprovalVote &vote); + const approval::IndirectSignedApprovalVoteV2 &vote); + + // Returns the claimed core bitfield from the assignment cert, the candidate + // hash and a + // `BlockEntry`. Can fail only for VRF Delay assignments for which we cannot + // find the candidate hash in the block entry which indicates a bug or + // corrupted storage. + std::optional get_assignment_core_indices( + const approval::AssignmentCertKindV2 &assignment, + const CandidateHash &candidate_hash, + const BlockEntry &block_entry); + + std::optional cores_to_candidate_indices( + const scale::BitVec &core_indices, const BlockEntry &block_entry); + + network::vstaging::Assignments sanitize_v1_assignments( + const network::Assignments &assignments); + network::vstaging::Approvals sanitize_v1_approvals( + const network::Approvals &approval); template void handle_new_head(const primitives::BlockHash &head, @@ -543,6 +708,7 @@ namespace kagome::parachain { void try_process_approving_context( ApprovingContextUnit &acu, + const primitives::BlockHash &block_hash, SessionIndex session_index, const runtime::SessionInfo &session_info); @@ -552,7 +718,8 @@ namespace kagome::parachain { void unify_with_peer(StoreUnit> &entries, const libp2p::peer::PeerId &peer_id, - const network::View &view); + const network::View &view, + bool retry_known_blocks); outcome::result processImportedBlock( primitives::BlockNumber block_number, @@ -586,13 +753,14 @@ namespace kagome::parachain { const RelayHash &block_hash); void runLaunchApproval( - const approval::IndirectAssignmentCert &indirect_cert, + const approval::IndirectAssignmentCertV2 &indirect_cert, DelayTranche assignment_tranche, const RelayHash &relay_block_hash, - CandidateIndex candidate_index, + const scale::BitVec &claimed_candidate_indices, SessionIndex session, const HashedCandidateReceipt &hashed_candidate, - GroupIndex backing_group); + GroupIndex backing_group, + bool distribute_assignment); void runNewBlocks(approval::BlockApprovalMeta &&approval_meta, primitives::BlockNumber finalized_block_number); @@ -625,19 +793,20 @@ namespace kagome::parachain { BlockImportedCandidates &&candidate); void runDistributeAssignment( - const approval::IndirectAssignmentCert &indirect_cert, - CandidateIndex candidate_index, + const approval::IndirectAssignmentCertV2 &indirect_cert, + const scale::BitVec &candidate_indices, std::unordered_set &&peers); - void send_assignments_batched(std::deque &&assignments, - const libp2p::peer::PeerId &peer_id); + void send_assignments_batched( + std::deque &&assignments, + const libp2p::peer::PeerId &peer_id); void send_approvals_batched( - std::deque &&approvals, + std::deque &&approvals, const libp2p::peer::PeerId &peer_id); void runDistributeApproval( - const network::IndirectSignedApprovalVote &vote, + const approval::IndirectSignedApprovalVoteV2 &vote, std::unordered_set &&peers); void runScheduleWakeup(const primitives::BlockHash &block_hash, diff --git a/core/parachain/approval/approval_distribution_error.cpp b/core/parachain/approval/approval_distribution_error.cpp index e620893e67..656ab9bc7c 100644 --- a/core/parachain/approval/approval_distribution_error.cpp +++ b/core/parachain/approval/approval_distribution_error.cpp @@ -25,14 +25,28 @@ OUTCOME_CPP_DEFINE_CATEGORY(kagome::parachain, ApprovalDistributionError, e) { return "Validator index out of bounds"; case E::CORE_INDEX_OUT_OF_BOUNDS: return "Core index out of bounds"; + case E::CANDIDATE_INDEX_OUT_OF_BOUNDS: + return "Candidate index out of bounds"; case E::IS_IN_BACKING_GROUP: return "Is in backing group"; case E::SAMPLE_OUT_OF_BOUNDS: return "Sample is out of bounds"; case E::VRF_DELAY_CORE_INDEX_MISMATCH: return "VRF delay core index mismatch"; + case E::VRF_MODULO_CORE_INDEX_MISMATCH: + return "VRF modulo core index mismatch"; + case E::INVALID_ARGUMENTS: + return "Invalid arguments"; case E::VRF_VERIFY_AND_GET_TRANCHE: return "VRF verify and get tranche failed"; + case E::BIT_FOUND: + return "bit found"; + case E::DUPLICATE_APPROVAL: + return "Duplicate approval"; + case E::ASSIGNMENTS_FOLLOWED_DIFFERENT_PATH: + return "Assignments followed different path"; + case E::UNKNOWN_ASSIGNMENT: + return "Unknown assignment"; } return "Unknown approval-distribution error"; } diff --git a/core/parachain/approval/approval_distribution_error.hpp b/core/parachain/approval/approval_distribution_error.hpp index 326690b825..5e5eab24d7 100644 --- a/core/parachain/approval/approval_distribution_error.hpp +++ b/core/parachain/approval/approval_distribution_error.hpp @@ -22,6 +22,13 @@ namespace kagome::parachain { SAMPLE_OUT_OF_BOUNDS = 10, VRF_DELAY_CORE_INDEX_MISMATCH = 11, VRF_VERIFY_AND_GET_TRANCHE = 12, + VRF_MODULO_CORE_INDEX_MISMATCH = 13, + INVALID_ARGUMENTS = 14, + CANDIDATE_INDEX_OUT_OF_BOUNDS = 15, + BIT_FOUND = 16, + DUPLICATE_APPROVAL = 17, + ASSIGNMENTS_FOLLOWED_DIFFERENT_PATH = 18, + UNKNOWN_ASSIGNMENT = 19, }; } diff --git a/core/parachain/approval/knowledge.hpp b/core/parachain/approval/knowledge.hpp index 2adacaa16d..9d6ff08890 100644 --- a/core/parachain/approval/knowledge.hpp +++ b/core/parachain/approval/knowledge.hpp @@ -14,16 +14,27 @@ #include "parachain/approval/state.hpp" #include "parachain/types.hpp" +template <> +struct std::hash { + auto operator()(const scale::BitVec &v) const { + auto s = ::scale::encode(v).value(); + return boost::hash_range(s.begin(), s.end()); + } +}; + namespace kagome::parachain::approval { enum struct MessageKind { Assignment, Approval }; - using MessageSubject = std::tuple; + using MessageSubject = std::tuple; struct MessageSubjectHash { auto operator()(const MessageSubject &obj) const { size_t value{0ull}; - std::apply( - [&](const auto &...v) { (..., boost::hash_combine(value, v)); }, obj); + boost::hash_range( + value, std::get<0>(obj).begin(), std::get<0>(obj).end()); + boost::hash_range( + value, std::get<1>(obj).bits.begin(), std::get<1>(obj).bits.end()); + boost::hash_combine(value, std::get<2>(obj)); return value; } }; @@ -72,6 +83,17 @@ namespace kagome::parachain::approval { const MessageKind &kind) const { return sent.contains(message, kind) || received.contains(message, kind); } + + // Generate the knowledge keys for querying if an approval is known by peer. + static std::pair generate_approval_key( + const approval::IndirectSignedApprovalVoteV2 &approval) { + return { + std::make_tuple(approval.payload.payload.block_hash, + approval.payload.payload.candidate_indices, + approval.payload.ix), + MessageKind::Approval, + }; + } }; } // namespace kagome::parachain::approval diff --git a/core/parachain/approval/state.hpp b/core/parachain/approval/state.hpp index d84b643c9d..d94e2b31c6 100644 --- a/core/parachain/approval/state.hpp +++ b/core/parachain/approval/state.hpp @@ -97,6 +97,16 @@ namespace kagome::parachain::approval { return count_ones; } + template + inline outcome::result iter_ones(const scale::BitVec &src, F &&f) { + for (size_t ix = 0; ix < src.bits.size(); ++ix) { + if (src.bits[ix]) { + OUTCOME_TRY(std::forward(f)(ix)); + } + } + return outcome::success(); + } + inline auto min_or_some(const std::optional &l, const std::optional &r) { return (l && r) ? std::min(*l, *r) diff --git a/core/parachain/backing/grid.hpp b/core/parachain/backing/grid.hpp index f3ab9b77fa..ade6476a10 100644 --- a/core/parachain/backing/grid.hpp +++ b/core/parachain/backing/grid.hpp @@ -10,6 +10,8 @@ #include #include #include +#include +#include #include #include #include @@ -18,6 +20,15 @@ #include "crypto/hasher/hasher_impl.hpp" namespace kagome::parachain::grid { + /// The sample rate for randomly propagating messages. This + /// reduces the left tail of the binomial distribution but also + /// introduces a bias towards peers who we sample before others + /// (i.e. those who get a block before others). + constexpr size_t DEFAULT_RANDOM_SAMPLE_RATE = 25; + + /// The number of peers to randomly propagate messages to. + constexpr size_t DEFAULT_RANDOM_CIRCULATION = 4; + /** * Numbers arranged into rectangular grid. * https://github.com/paritytech/polkadot-sdk/blob/2aaa9af3746b0cf671de9dc98fe2465c7ef59be2/polkadot/node/network/protocol/src/grid_topology.rs#L69-L149 @@ -112,6 +123,71 @@ namespace kagome::parachain::grid { } }; + /// Routing mode + struct RequiredRouting { + enum { + /// We don't know yet, because we're waiting for topology info + /// (race condition between learning about the first blocks in a new + /// session + /// and getting the topology for that session) + PendingTopology, + /// Propagate to all peers of any kind. + All, + /// Propagate to all peers sharing either the X or Y dimension of the + /// grid. + GridXY, + /// Propagate to all peers sharing the X dimension of the grid. + GridX, + /// Propagate to all peers sharing the Y dimension of the grid. + GridY, + /// No required propagation. + None + } value; + + /// Whether the required routing set is definitely empty. + bool is_empty() const { + return value == PendingTopology || value == None; + } + + bool operator==(const RequiredRouting &l) const { + return value == l.value; + } + }; + + /// A representation of routing based on sample + struct RandomRouting { + /// The number of peers to target. + size_t target; + /// The number of peers this has been sent to. + size_t sent; + /// Sampling rate + size_t sample_rate; + + RandomRouting() + : target(DEFAULT_RANDOM_CIRCULATION), + sent(0), + sample_rate(DEFAULT_RANDOM_SAMPLE_RATE) {} + + /// Perform random sampling for a specific peer + /// Returns `true` for a lucky peer + bool sample(size_t n_peers_total) const { + if (n_peers_total == 0 || sent >= target) { + return false; + } else if (sample_rate > n_peers_total) { + return true; + } else { + std::srand(std::time(nullptr)); + size_t random_number = (std::rand() % n_peers_total) + size_t(1); + return random_number <= sample_rate; + } + } + + /// Increase number of messages being sent + void inc_sent() { + sent += 1; + } + }; + /// View for each group using Views = std::vector; diff --git a/core/utils/map.hpp b/core/utils/map.hpp index c4c02b7f17..b77c3896c8 100644 --- a/core/utils/map.hpp +++ b/core/utils/map.hpp @@ -23,6 +23,35 @@ namespace kagome::utils { return std::nullopt; } + template + inline std::optional get( + const C &container, const typename C::key_type &key) { + if (auto it = container.find(key); it != container.end()) { + return it; + } + return std::nullopt; + } + + template + inline std::optional get( + C &container, const typename C::key_type &key) { + if (auto it = container.find(key); it != container.end()) { + return it; + } + return std::nullopt; + } + + template + inline auto get(const std::vector &container, const size_t &index) { + using ItT = std::vector::const_iterator; + if (index >= container.size()) { + return std::optional{}; + } + auto it = container.begin(); + std::advance(it, index); + return std::make_optional(it); + } + template inline auto fromRefToOwn( const std::optional> &opt_ref) { diff --git a/test/core/parachain/CMakeLists.txt b/test/core/parachain/CMakeLists.txt index aeb0308620..6e7ec55887 100644 --- a/test/core/parachain/CMakeLists.txt +++ b/test/core/parachain/CMakeLists.txt @@ -17,6 +17,7 @@ target_link_libraries(parachain_test log_configurator base_fs_test key_store + logger ) if (CMAKE_SYSTEM_NAME STREQUAL Linux) diff --git a/test/core/parachain/assignments.cpp b/test/core/parachain/assignments.cpp index 7574b5e3a7..e8a1d273e2 100644 --- a/test/core/parachain/assignments.cpp +++ b/test/core/parachain/assignments.cpp @@ -22,6 +22,7 @@ #include "crypto/random_generator/boost_generator.hpp" #include "crypto/sr25519/sr25519_provider_impl.hpp" #include "crypto/sr25519_types.hpp" +#include "log/logger.hpp" #include "mock/core/application/app_state_manager_mock.hpp" #include "parachain/approval/approval_distribution.hpp" #include "testutil/prepare_loggers.hpp" @@ -39,6 +40,11 @@ struct AssignmentsTest : public test::BaseFS_Test { testutil::prepareLoggers(); } + static kagome::log::Logger &log() { + static auto logger = kagome::log::createLogger("test"); + return logger; + } + AssignmentsTest() : BaseFS_Test(assignments_directory) {} void SetUp() override {} @@ -137,7 +143,7 @@ TEST_F(AssignmentsTest, succeeds_empty_for_0_cores) { auto assignments = kagome::parachain::ApprovalDistribution::compute_assignments( - cs, si, vrf_story, leaving_cores); + cs, si, vrf_story, leaving_cores, false, log()); ASSERT_EQ(assignments.size(), 0ull); } @@ -180,7 +186,7 @@ TEST_F(AssignmentsTest, assign_to_nonzero_core) { static_cast(1))}; auto assignments = kagome::parachain::ApprovalDistribution::compute_assignments( - cs, si, vrf_story, leaving_cores); + cs, si, vrf_story, leaving_cores, false, log()); ASSERT_EQ(assignments.size(), 1ull); @@ -251,7 +257,7 @@ TEST_F(AssignmentsTest, assignments_produced_for_non_backing) { (kagome::parachain::GroupIndex)0)}; auto assignments = kagome::parachain::ApprovalDistribution::compute_assignments( - cs, si, vrf_story, leaving_cores); + cs, si, vrf_story, leaving_cores, false, log()); ASSERT_EQ(assignments.size(), 1ull);