Skip to content

Commit

Permalink
Feature: disabled validators in dusputes (#2012)
Browse files Browse the repository at this point in the history
* draft

Signed-off-by: Dmitriy Khaustov aka xDimon <[email protected]>

* feature: escalation dispute by disabled validators

Signed-off-by: Dmitriy Khaustov aka xDimon <[email protected]>

* fix: cases of postponed disputes

Signed-off-by: Dmitriy Khaustov aka xDimon <[email protected]>

* fix: jira -> github

Signed-off-by: Dmitriy Khaustov aka xDimon <[email protected]>

* doc: meaningful comment

Signed-off-by: Dmitriy Khaustov aka xDimon <[email protected]>

* feature: zombienettest "validator disabling"

Signed-off-by: Dmitriy Khaustov aka xDimon <[email protected]>

* fix: zombinet configs

Signed-off-by: Dmitriy Khaustov aka xDimon <[email protected]>

* fix: review issue

Signed-off-by: Dmitriy Khaustov aka xDimon <[email protected]>

---------

Signed-off-by: Dmitriy Khaustov aka xDimon <[email protected]>
  • Loading branch information
xDimon authored Apr 25, 2024
1 parent 541d483 commit 396c5c6
Show file tree
Hide file tree
Showing 10 changed files with 248 additions and 43 deletions.
2 changes: 1 addition & 1 deletion .github/pull_request_template.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ SPDX-License-Identifier: Apache-2.0

### Referenced issues

<!-- Id of the task from Jira. Example: Resolves #42 (Note that to link Pull Request with issue use one of the following keywords: close, closes, closed, fix, fixes, fixed, resolve, resolves, resolved). If there is no corresponding issue, then remove this field -->
<!-- Issues id from Github Issues. Example: Resolves #42 (Note that to link Pull Request with issue use one of the following keywords: close, closes, closed, fix, fixes, fixed, resolve, resolves, resolved). If there is no corresponding issue, then remove this field -->

### Description of the Change

Expand Down
33 changes: 29 additions & 4 deletions core/dispute_coordinator/impl/candidate_vote_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@
#include <set>

namespace kagome::dispute {
CandidateVoteState CandidateVoteState::create(CandidateVotes votes,
CandidateEnvironment &env,
Timestamp now) {
CandidateVoteState CandidateVoteState::create(
CandidateVotes votes,
CandidateEnvironment &env,
std::vector<ValidatorIndex> &disabled_validators,
Timestamp now) {
CandidateVoteState res{.votes = std::move(votes),
.own_vote = CannotVote{},
.dispute_status = std::nullopt};

// Collect own votes
auto controlled_indices = env.controlled_indices;
if (not controlled_indices.empty()) {
Voted voted;
Expand All @@ -41,11 +44,33 @@ namespace kagome::dispute {
}

// Check if isn't disputed
// (We have a dispute, if we have votes on both sides)

// (We have a dispute if we have votes on both sides.)
if (res.votes.invalid.empty() or res.votes.valid.empty()) {
return res;
}

// Check if escalated by active validators

auto has_vote_of_active = [&](auto &votes) {
auto is_not_disabled = [&](auto &vote) {
return not std::binary_search(
disabled_validators.begin(), disabled_validators.end(), vote.first);
};
return std::find_if(votes.begin(), votes.end(), is_not_disabled)
!= votes.end();
};

auto escalated_by_active = has_vote_of_active(res.votes.invalid)
and has_vote_of_active(res.votes.valid);

if (not escalated_by_active) {
res.dispute_status = Postponed{};
return res;
}

// Check thresholds

auto n_validators = env.session.validators.size();

auto byzantine_threshold = (std::max<size_t>(n_validators, 1) - 1) / 3;
Expand Down
10 changes: 10 additions & 0 deletions core/dispute_coordinator/impl/candidate_vote_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,18 @@ namespace kagome::dispute {
/// concluded, whether we already voted, ...
class CandidateVoteState final {
public:
/**
* Creates CandidateVoteState based on collected votes, environment and
* taking into account disabled validators
* @param votes already collected votes for dispute
* @param env related data
* @param disabled presorted list of disabled validator indices
* @param now current timestamp
* @return CandidateVoteState with correct inner data
*/
static CandidateVoteState create(CandidateVotes votes,
CandidateEnvironment &env,
std::vector<ValidatorIndex> &disabled,
Timestamp now);

/// Votes already existing for the candidate + receipt.
Expand Down
153 changes: 120 additions & 33 deletions core/dispute_coordinator/impl/dispute_coordinator_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,9 @@ namespace kagome::dispute {
},
[](const ConcludedAgainst &at) -> std::optional<Timestamp> {
return (Timestamp)at;
},
[](const Postponed &) -> std::optional<Timestamp> {
return std::nullopt;
});

auto dispute_is_inactive =
Expand Down Expand Up @@ -335,17 +338,33 @@ namespace kagome::dispute {
}
auto &candidate_votes = votes_res.value().value();

auto &relay_parent =
candidate_votes.candidate_receipt.descriptor.relay_parent;

auto disabled_validators_res = api_->disabled_validators(relay_parent);
if (disabled_validators_res.has_error()) {
SL_WARN(log_,
"Cannot import votes, without getting disabled validators: {}",
disabled_validators_res.error());
continue;
}
auto &disabled_validators = disabled_validators_res.value();

auto vote_state = CandidateVoteState::create(
candidate_votes, env, system_clock_.nowUint64());
candidate_votes, env, disabled_validators, system_clock_.nowUint64());

auto is_included = scraper_->is_candidate_included(candidate_hash);
auto is_backed = scraper_->is_candidate_backed(candidate_hash);
auto is_disputed = vote_state.dispute_status.has_value();
auto is_postponed =
is_disputed ? is_type<Postponed>(vote_state.dispute_status.value())
: false;
auto is_confirmed =
is_disputed ? is_type<Confirmed>(vote_state.dispute_status.value())
: false;
auto is_potential_spam =
is_disputed && !is_included && !is_backed && !is_confirmed;
auto is_potential_spam = is_disputed //
and not is_included and not is_backed
and not is_confirmed and not is_postponed;

if (is_potential_spam) {
SL_TRACE(log_,
Expand Down Expand Up @@ -1013,23 +1032,51 @@ namespace kagome::dispute {
// blocks, and hence we do not have a `CandidateReceipt` available.

CandidateVoteState old_state;
std::vector<ValidatorIndex> disabled_validators;

OUTCOME_TRY(old_state_opt,
storage_->load_candidate_votes(session, candidate_hash));
if (old_state_opt.has_value()) {
old_state = CandidateVoteState::create(old_state_opt.value(), env, now);
} else {

primitives::BlockHash relay_parent{};

if (not old_state_opt.has_value()) {
auto provided_opt = if_type<CandidateReceipt>(candidate_receipt);
if (not provided_opt.has_value()) {
SL_ERROR(log_,
"Cannot import votes, without `CandidateReceipt` available!");
return outcome::success(false);
}

relay_parent = provided_opt.value().get().descriptor.relay_parent;

old_state = {
.votes = {.candidate_receipt = provided_opt.value().get()},
.own_vote = CannotVote{},
.dispute_status = std::nullopt,
};

} else {
relay_parent = old_state_opt->candidate_receipt.descriptor.relay_parent;
}

auto disabled_validators_res = api_->disabled_validators(relay_parent);
if (disabled_validators_res.has_error()) {
SL_WARN(log_,
"Cannot import votes, without getting disabled validators: {}",
disabled_validators_res.error());
return outcome::success(false);
}
disabled_validators = std::move(disabled_validators_res.value());

auto is_disabled = [&disabled_validators =
disabled_validators_res.value()](auto index) {
return std::binary_search(
disabled_validators.begin(), disabled_validators.end(), index);
};

if (old_state_opt.has_value()) {
old_state = CandidateVoteState::create(
old_state_opt.value(), env, disabled_validators, now);
}

SL_TRACE(log_, "Loaded votes");
Expand Down Expand Up @@ -1058,6 +1105,8 @@ namespace kagome::dispute {

auto expected_candidate_hash = votes.candidate_receipt.hash(*hasher_);

std::vector<Indexed<SignedDisputeStatement>> postponed_statements;

for (auto &vote : statements) {
auto val_index = vote.ix;
SignedDisputeStatement &statement = vote.payload;
Expand All @@ -1078,13 +1127,29 @@ namespace kagome::dispute {
continue;
}

auto is_disabled_validator = is_disabled(val_index);

// Postpone votes of disabled validators while any votes for candidate are
// not exist
if (is_disabled_validator and votes.valid.empty()
and votes.invalid.empty()) {
postponed_statements.emplace_back(std::move(vote));
continue;
}

visit_in_place(
statement.dispute_statement,
[&](const ValidDisputeStatement &valid) {
auto [it, fresh] = votes.valid.emplace(
val_index,
std::make_tuple(valid, statement.validator_signature));
if (fresh) {
if (imported_valid_votes == 0) {
// Return postponed statements to process
std::move_backward(postponed_statements.begin(),
postponed_statements.end(),
statements.end());
}
++imported_valid_votes;
return true;
}
Expand All @@ -1105,22 +1170,29 @@ namespace kagome::dispute {
val_index,
std::make_tuple(invalid, statement.validator_signature));
if (fresh) {
++imported_invalid_votes;
new_invalid_voters.push_back(val_index);
if (imported_invalid_votes == 0) {
// Return postponed statements to process
std::move_backward(postponed_statements.begin(),
postponed_statements.end(),
statements.end());
}
++imported_invalid_votes;
return true;
}
return false;
});
}

CandidateVoteState _new_state = CandidateVoteState::create(votes, env, now);

ImportResult intermediate_result{old_state,
_new_state,
imported_invalid_votes,
imported_valid_votes,
0,
new_invalid_voters};
ImportResult intermediate_result{
std::move(old_state),
CandidateVoteState::create(
votes, env, disabled_validators, now), // new_state
imported_invalid_votes,
imported_valid_votes,
0,
new_invalid_voters,
};

// Handle approval vote import:
//
Expand Down Expand Up @@ -1272,8 +1344,8 @@ namespace kagome::dispute {
}
}

import_result.new_state =
CandidateVoteState::create(std::move(_votes), env, now);
import_result.new_state = CandidateVoteState::create(
std::move(_votes), env, disabled_validators, now);
}
} else {
SL_TRACE(log_, "Not requested approval signatures");
Expand All @@ -1290,11 +1362,15 @@ namespace kagome::dispute {
is_type<CannotVote>(new_state.own_vote)
or boost::relaxed_get<Voted>(new_state.own_vote).empty();
auto is_disputed = new_state.dispute_status.has_value();
auto is_postponed = is_disputed
? is_type<Postponed>(new_state.dispute_status.value())
: false;
auto is_confirmed = is_disputed
? is_type<Confirmed>(new_state.dispute_status.value())
: false;
auto is_potential_spam =
is_disputed && !is_included && !is_backed && !is_confirmed;
auto is_potential_spam = is_disputed //
and not is_included and not is_backed
and not is_confirmed and not is_postponed;

// We participate only in disputes which are not potential spam.
auto allow_participation = not is_potential_spam;
Expand Down Expand Up @@ -1330,12 +1406,15 @@ namespace kagome::dispute {
// Participate in dispute if we did not cast a vote before and actually
// have keys to cast a local vote. Disputes should fall in one of the
// categories below, otherwise we will refrain from participation:
// - `is_included` lands in prioritised queue
// - `is_confirmed` | `is_backed` lands in best effort queue
// - `is_included` lands in prioritized queue
// - `is_confirmed` | `is_backed` lands in the best effort queue
// We don't participate in disputes escalated by disabled validators only.
// We don't participate in disputes on finalized candidates.
// see: {polkadot}/node/core/dispute-coordinator/src/initialized.rs:907

if (own_vote_missing && is_disputed && allow_participation) {
if (own_vote_missing //
and is_disputed and not is_postponed //
and allow_participation) {
auto priority = static_cast<ParticipationPriority>(is_included);

auto &receipt = new_state.votes.candidate_receipt;
Expand All @@ -1357,7 +1436,7 @@ namespace kagome::dispute {
// https://github.com/paritytech/polkadot/blob/40974fb99c86f5c341105b7db53c7aa0df707d66/node/core/dispute-coordinator/src/initialized.rs#L947
is_freshly_disputed = not import_result.old_state.dispute_status.has_value()
and import_result.new_state.dispute_status.has_value();
if (is_freshly_disputed) {
if (is_freshly_disputed and not is_postponed) {
if (is_type<Voted>(new_state.own_vote)) {
auto &own_votes = boost::relaxed_get<Voted>(new_state.own_vote);

Expand Down Expand Up @@ -1915,6 +1994,9 @@ namespace kagome::dispute {
},
[](const ConcludedAgainst &at) -> std::optional<Timestamp> {
return (Timestamp)at;
},
[](const Postponed &) -> std::optional<Timestamp> {
return std::nullopt;
});

auto dispute_is_inactive =
Expand Down Expand Up @@ -1973,8 +2055,6 @@ namespace kagome::dispute {
std::move(candidate_receipt),
valid);

// TODO ничего не возвращаем, вызывается из апрувала и партисипейшена

SL_TRACE(log_, "DisputeCoordinatorMessage::IssueLocalStatement");
auto res = issue_local_statement(
candidate_hash, candidate_receipt, session, valid);
Expand Down Expand Up @@ -2173,8 +2253,8 @@ namespace kagome::dispute {
BOOST_ASSERT_MSG(not queue.empty(),
"Invariant that queues are never empty is broken.");

auto &[msg, cb] = queue.front();
heads.emplace_back(peer_id, std::move(msg), std::move(cb));
auto &[request, cb] = queue.front();
heads.emplace_back(peer_id, std::move(request), std::move(cb));
queue.pop_front();

if (not queue.empty()) {
Expand Down Expand Up @@ -2320,6 +2400,7 @@ namespace kagome::dispute {
auto cb_opt =
batch->add_votes(valid_vote, invalid_vote, peer, std::move(cb));

// Returned value means duplicate
if (cb_opt.has_value()) {
// We don't expect honest peers to send redundant votes within a
// single batch, as the timeout for retry is much higher. Still we
Expand All @@ -2341,12 +2422,18 @@ namespace kagome::dispute {
// limit.
(*cb_opt)(BatchError::RedundantMessage);

} else { // FIXME testing code. must be removed
auto prepared_import =
batch->tick(steady_clock_.now() + Batch::kBatchCollectingInterval);
if (prepared_import.has_value()) {
start_import(std::move(prepared_import.value()));
}
// } else { // FIXME testing code. must be removed
// auto prepared_import =
// batch->tick(steady_clock_.now() +
// Batch::kBatchCollectingInterval);
// if (prepared_import.has_value()) {
// start_import(std::move(prepared_import.value()));
// }
}

auto ready_prepared_imports = batches_->check_batches();
for (auto &prepared_import : ready_prepared_imports) {
start_import(std::move(prepared_import));
}
}

Expand Down
7 changes: 7 additions & 0 deletions core/dispute_coordinator/impl/dispute_coordinator_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,13 @@ namespace kagome::dispute {
std::deque<std::tuple<network::DisputeMessage, CbOutcome<void>>>>
queues_;

/// Collection of DisputeRequests from disabled validators
std::unordered_map<CandidateHash,
std::tuple<primitives::AuthorityDiscoveryId,
network::DisputeMessage,
CbOutcome<void>>>
requests_from_disabled_validators_;

/// Delay timer for establishing the rate limit.
std::optional<libp2p::basic::Scheduler::Handle> rate_limit_timer_;

Expand Down
Loading

0 comments on commit 396c5c6

Please sign in to comment.