Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: disabled validators in dusputes #2012

Merged
merged 12 commits into from
Apr 25, 2024
2 changes: 1 addition & 1 deletion .github/pull_request_template.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ SPDX-License-Identifier: Apache-2.0

### Referenced issues

<!-- Id of the task from Jira. Example: Resolves #42 (Note that to link Pull Request with issue use one of the following keywords: close, closes, closed, fix, fixes, fixed, resolve, resolves, resolved). If there is no corresponding issue, then remove this field -->
<!-- Issues id from Github Issues. Example: Resolves #42 (Note that to link Pull Request with issue use one of the following keywords: close, closes, closed, fix, fixes, fixed, resolve, resolves, resolved). If there is no corresponding issue, then remove this field -->

### Description of the Change

Expand Down
33 changes: 29 additions & 4 deletions core/dispute_coordinator/impl/candidate_vote_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@
#include <set>

namespace kagome::dispute {
CandidateVoteState CandidateVoteState::create(CandidateVotes votes,
CandidateEnvironment &env,
Timestamp now) {
CandidateVoteState CandidateVoteState::create(
CandidateVotes votes,
CandidateEnvironment &env,
std::vector<ValidatorIndex> &disabled_validators,
Timestamp now) {
CandidateVoteState res{.votes = std::move(votes),
.own_vote = CannotVote{},
.dispute_status = std::nullopt};

// Collect own votes
auto controlled_indices = env.controlled_indices;
if (not controlled_indices.empty()) {
Voted voted;
Expand All @@ -41,11 +44,33 @@ namespace kagome::dispute {
}

// Check if isn't disputed
// (We have a dispute, if we have votes on both sides)

// (We have a dispute if we have votes on both sides.)
if (res.votes.invalid.empty() or res.votes.valid.empty()) {
return res;
}

// Check if escalated by active validators

auto has_vote_of_active = [&](auto &votes) {
auto is_not_disabled = [&](auto &vote) {
return not std::binary_search(
disabled_validators.begin(), disabled_validators.end(), vote.first);
};
return std::find_if(votes.begin(), votes.end(), is_not_disabled)
!= votes.end();
};

auto escalated_by_active = has_vote_of_active(res.votes.invalid)
and has_vote_of_active(res.votes.valid);

if (not escalated_by_active) {
res.dispute_status = Postponed{};
return res;
}

// Check thresholds

auto n_validators = env.session.validators.size();

auto byzantine_threshold = (std::max<size_t>(n_validators, 1) - 1) / 3;
Expand Down
10 changes: 10 additions & 0 deletions core/dispute_coordinator/impl/candidate_vote_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,18 @@ namespace kagome::dispute {
/// concluded, whether we already voted, ...
class CandidateVoteState final {
public:
/**
* Creates CandidateVoteState based on collected votes, environment and
* taking into account disabled validators
* @param votes already collected votes for dispute
* @param env related data
* @param disabled presorted list of disabled validator indices
* @param now current timestamp
* @return CandidateVoteState with correct inner data
*/
static CandidateVoteState create(CandidateVotes votes,
CandidateEnvironment &env,
std::vector<ValidatorIndex> &disabled,
Timestamp now);

/// Votes already existing for the candidate + receipt.
Expand Down
153 changes: 120 additions & 33 deletions core/dispute_coordinator/impl/dispute_coordinator_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,9 @@ namespace kagome::dispute {
},
[](const ConcludedAgainst &at) -> std::optional<Timestamp> {
return (Timestamp)at;
},
[](const Postponed &) -> std::optional<Timestamp> {
return std::nullopt;
});

auto dispute_is_inactive =
Expand Down Expand Up @@ -335,17 +338,33 @@ namespace kagome::dispute {
}
auto &candidate_votes = votes_res.value().value();

auto &relay_parent =
candidate_votes.candidate_receipt.descriptor.relay_parent;

auto disabled_validators_res = api_->disabled_validators(relay_parent);
if (disabled_validators_res.has_error()) {
SL_WARN(log_,
"Cannot import votes, without getting disabled validators: {}",
disabled_validators_res.error());
continue;
}
auto &disabled_validators = disabled_validators_res.value();

auto vote_state = CandidateVoteState::create(
candidate_votes, env, system_clock_.nowUint64());
candidate_votes, env, disabled_validators, system_clock_.nowUint64());

auto is_included = scraper_->is_candidate_included(candidate_hash);
auto is_backed = scraper_->is_candidate_backed(candidate_hash);
auto is_disputed = vote_state.dispute_status.has_value();
auto is_postponed =
is_disputed ? is_type<Postponed>(vote_state.dispute_status.value())
: false;
auto is_confirmed =
is_disputed ? is_type<Confirmed>(vote_state.dispute_status.value())
: false;
auto is_potential_spam =
is_disputed && !is_included && !is_backed && !is_confirmed;
auto is_potential_spam = is_disputed //
and not is_included and not is_backed
and not is_confirmed and not is_postponed;

if (is_potential_spam) {
SL_TRACE(log_,
Expand Down Expand Up @@ -1013,23 +1032,51 @@ namespace kagome::dispute {
// blocks, and hence we do not have a `CandidateReceipt` available.

CandidateVoteState old_state;
std::vector<ValidatorIndex> disabled_validators;

OUTCOME_TRY(old_state_opt,
storage_->load_candidate_votes(session, candidate_hash));
if (old_state_opt.has_value()) {
old_state = CandidateVoteState::create(old_state_opt.value(), env, now);
} else {

primitives::BlockHash relay_parent{};

if (not old_state_opt.has_value()) {
auto provided_opt = if_type<CandidateReceipt>(candidate_receipt);
if (not provided_opt.has_value()) {
SL_ERROR(log_,
"Cannot import votes, without `CandidateReceipt` available!");
return outcome::success(false);
}

relay_parent = provided_opt.value().get().descriptor.relay_parent;

old_state = {
.votes = {.candidate_receipt = provided_opt.value().get()},
.own_vote = CannotVote{},
.dispute_status = std::nullopt,
};

} else {
relay_parent = old_state_opt->candidate_receipt.descriptor.relay_parent;
}

auto disabled_validators_res = api_->disabled_validators(relay_parent);
if (disabled_validators_res.has_error()) {
SL_WARN(log_,
"Cannot import votes, without getting disabled validators: {}",
disabled_validators_res.error());
return outcome::success(false);
}
disabled_validators = std::move(disabled_validators_res.value());

auto is_disabled = [&disabled_validators =
disabled_validators_res.value()](auto index) {
return std::binary_search(
xDimon marked this conversation as resolved.
Show resolved Hide resolved
disabled_validators.begin(), disabled_validators.end(), index);
};

if (old_state_opt.has_value()) {
old_state = CandidateVoteState::create(
old_state_opt.value(), env, disabled_validators, now);
}

SL_TRACE(log_, "Loaded votes");
Expand Down Expand Up @@ -1058,6 +1105,8 @@ namespace kagome::dispute {

auto expected_candidate_hash = votes.candidate_receipt.hash(*hasher_);

std::vector<Indexed<SignedDisputeStatement>> postponed_statements;

for (auto &vote : statements) {
auto val_index = vote.ix;
SignedDisputeStatement &statement = vote.payload;
Expand All @@ -1078,13 +1127,29 @@ namespace kagome::dispute {
continue;
}

auto is_disabled_validator = is_disabled(val_index);

// Postpone votes of disabled validators while any votes for candidate are
// not exist
if (is_disabled_validator and votes.valid.empty()
and votes.invalid.empty()) {
postponed_statements.emplace_back(std::move(vote));
continue;
}

visit_in_place(
statement.dispute_statement,
[&](const ValidDisputeStatement &valid) {
auto [it, fresh] = votes.valid.emplace(
val_index,
std::make_tuple(valid, statement.validator_signature));
if (fresh) {
if (imported_valid_votes == 0) {
// Return postponed statements to process
std::move_backward(postponed_statements.begin(),
postponed_statements.end(),
statements.end());
}
++imported_valid_votes;
return true;
}
Expand All @@ -1105,22 +1170,29 @@ namespace kagome::dispute {
val_index,
std::make_tuple(invalid, statement.validator_signature));
if (fresh) {
++imported_invalid_votes;
new_invalid_voters.push_back(val_index);
if (imported_invalid_votes == 0) {
// Return postponed statements to process
std::move_backward(postponed_statements.begin(),
postponed_statements.end(),
statements.end());
}
++imported_invalid_votes;
return true;
}
return false;
});
}

CandidateVoteState _new_state = CandidateVoteState::create(votes, env, now);

ImportResult intermediate_result{old_state,
_new_state,
imported_invalid_votes,
imported_valid_votes,
0,
new_invalid_voters};
ImportResult intermediate_result{
std::move(old_state),
CandidateVoteState::create(
votes, env, disabled_validators, now), // new_state
imported_invalid_votes,
imported_valid_votes,
0,
new_invalid_voters,
};

// Handle approval vote import:
//
Expand Down Expand Up @@ -1272,8 +1344,8 @@ namespace kagome::dispute {
}
}

import_result.new_state =
CandidateVoteState::create(std::move(_votes), env, now);
import_result.new_state = CandidateVoteState::create(
std::move(_votes), env, disabled_validators, now);
}
} else {
SL_TRACE(log_, "Not requested approval signatures");
Expand All @@ -1290,11 +1362,15 @@ namespace kagome::dispute {
is_type<CannotVote>(new_state.own_vote)
or boost::relaxed_get<Voted>(new_state.own_vote).empty();
auto is_disputed = new_state.dispute_status.has_value();
auto is_postponed = is_disputed
? is_type<Postponed>(new_state.dispute_status.value())
: false;
auto is_confirmed = is_disputed
? is_type<Confirmed>(new_state.dispute_status.value())
: false;
auto is_potential_spam =
is_disputed && !is_included && !is_backed && !is_confirmed;
auto is_potential_spam = is_disputed //
and not is_included and not is_backed
and not is_confirmed and not is_postponed;

// We participate only in disputes which are not potential spam.
auto allow_participation = not is_potential_spam;
Expand Down Expand Up @@ -1330,12 +1406,15 @@ namespace kagome::dispute {
// Participate in dispute if we did not cast a vote before and actually
// have keys to cast a local vote. Disputes should fall in one of the
// categories below, otherwise we will refrain from participation:
// - `is_included` lands in prioritised queue
// - `is_confirmed` | `is_backed` lands in best effort queue
// - `is_included` lands in prioritized queue
// - `is_confirmed` | `is_backed` lands in the best effort queue
// We don't participate in disputes escalated by disabled validators only.
// We don't participate in disputes on finalized candidates.
// see: {polkadot}/node/core/dispute-coordinator/src/initialized.rs:907

if (own_vote_missing && is_disputed && allow_participation) {
if (own_vote_missing //
and is_disputed and not is_postponed //
and allow_participation) {
auto priority = static_cast<ParticipationPriority>(is_included);

auto &receipt = new_state.votes.candidate_receipt;
Expand All @@ -1357,7 +1436,7 @@ namespace kagome::dispute {
// https://github.com/paritytech/polkadot/blob/40974fb99c86f5c341105b7db53c7aa0df707d66/node/core/dispute-coordinator/src/initialized.rs#L947
is_freshly_disputed = not import_result.old_state.dispute_status.has_value()
and import_result.new_state.dispute_status.has_value();
if (is_freshly_disputed) {
if (is_freshly_disputed and not is_postponed) {
if (is_type<Voted>(new_state.own_vote)) {
auto &own_votes = boost::relaxed_get<Voted>(new_state.own_vote);

Expand Down Expand Up @@ -1915,6 +1994,9 @@ namespace kagome::dispute {
},
[](const ConcludedAgainst &at) -> std::optional<Timestamp> {
return (Timestamp)at;
},
[](const Postponed &) -> std::optional<Timestamp> {
return std::nullopt;
});

auto dispute_is_inactive =
Expand Down Expand Up @@ -1973,8 +2055,6 @@ namespace kagome::dispute {
std::move(candidate_receipt),
valid);

// TODO ничего не возвращаем, вызывается из апрувала и партисипейшена

SL_TRACE(log_, "DisputeCoordinatorMessage::IssueLocalStatement");
auto res = issue_local_statement(
candidate_hash, candidate_receipt, session, valid);
Expand Down Expand Up @@ -2173,8 +2253,8 @@ namespace kagome::dispute {
BOOST_ASSERT_MSG(not queue.empty(),
"Invariant that queues are never empty is broken.");

auto &[msg, cb] = queue.front();
heads.emplace_back(peer_id, std::move(msg), std::move(cb));
auto &[request, cb] = queue.front();
heads.emplace_back(peer_id, std::move(request), std::move(cb));
queue.pop_front();

if (not queue.empty()) {
Expand Down Expand Up @@ -2320,6 +2400,7 @@ namespace kagome::dispute {
auto cb_opt =
batch->add_votes(valid_vote, invalid_vote, peer, std::move(cb));

// Returned value means duplicate
if (cb_opt.has_value()) {
// We don't expect honest peers to send redundant votes within a
// single batch, as the timeout for retry is much higher. Still we
Expand All @@ -2341,12 +2422,18 @@ namespace kagome::dispute {
// limit.
(*cb_opt)(BatchError::RedundantMessage);

} else { // FIXME testing code. must be removed
auto prepared_import =
batch->tick(steady_clock_.now() + Batch::kBatchCollectingInterval);
if (prepared_import.has_value()) {
start_import(std::move(prepared_import.value()));
}
// } else { // FIXME testing code. must be removed
// auto prepared_import =
// batch->tick(steady_clock_.now() +
// Batch::kBatchCollectingInterval);
// if (prepared_import.has_value()) {
// start_import(std::move(prepared_import.value()));
// }
}

auto ready_prepared_imports = batches_->check_batches();
for (auto &prepared_import : ready_prepared_imports) {
start_import(std::move(prepared_import));
}
}

Expand Down
7 changes: 7 additions & 0 deletions core/dispute_coordinator/impl/dispute_coordinator_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,13 @@ namespace kagome::dispute {
std::deque<std::tuple<network::DisputeMessage, CbOutcome<void>>>>
queues_;

/// Collection of DisputeRequests from disabled validators
std::unordered_map<CandidateHash,
std::tuple<primitives::AuthorityDiscoveryId,
network::DisputeMessage,
CbOutcome<void>>>
requests_from_disabled_validators_;

/// Delay timer for establishing the rate limit.
std::optional<libp2p::basic::Scheduler::Handle> rate_limit_timer_;

Expand Down
Loading
Loading