Skip to content

Commit

Permalink
fix pvf process (#2045)
Browse files Browse the repository at this point in the history
Signed-off-by: turuslan <[email protected]>
  • Loading branch information
turuslan authored Apr 25, 2024
1 parent 7e1cf70 commit 541d483
Show file tree
Hide file tree
Showing 16 changed files with 399 additions and 294 deletions.
12 changes: 8 additions & 4 deletions core/crypto/type_hasher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#pragma once

#include <optional>
#include <span>

#include "crypto/hasher/blake2b_stream_hasher.hpp"
#include "scale/kagome_scale.hpp"

Expand All @@ -24,12 +24,16 @@ namespace kagome::crypto {
struct Hashed {
static_assert(N == 8 || N == 16 || N == 32 || N == 64,
"Unexpected hash size");
using Type = std::decay_t<T>;
using Type = T;
using HashType = common::Blob<N>;

public:
template <typename... Args>
Hashed(Args &&...args) : type_{std::forward<Args>(args)...} {}
// NOLINTNEXTLINE(google-explicit-constructor)
Hashed(Type &&type)
requires(std::is_same_v<T, std::decay_t<T>>)
: type_{std::move(type)} {}
// NOLINTNEXTLINE(google-explicit-constructor)
Hashed(const Type &type) : type_{type} {}

Hashed(const Hashed &c) = default;
Hashed(Hashed &&c) = default;
Expand Down
35 changes: 16 additions & 19 deletions core/dispute_coordinator/participation/impl/participation_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,25 +238,22 @@ namespace kagome::dispute {
BOOST_ASSERT(ctx->available_data.has_value());
BOOST_ASSERT(ctx->validation_code.has_value());

auto res = pvf_->pvfValidate(ctx->available_data->validation_data,
ctx->available_data->pov,
ctx->request.candidate_receipt,
ctx->validation_code.value());

// we cast votes (either positive or negative) depending on the outcome of
// the validation and if valid, whether the commitments hash matches

if (res.has_value()) {
cb(ParticipationOutcome::Valid);
return;
}

// SL_WARN(log_,
// "Candidate {} considered invalid: {}",
// ctx->request.candidate_hash,
// res.error());

cb(ParticipationOutcome::Invalid);
pvf_->pvfValidate(
ctx->available_data->validation_data,
ctx->available_data->pov,
ctx->request.candidate_receipt,
ctx->validation_code.value(),
[cb{std::move(cb)}](outcome::result<parachain::Pvf::Result> &&res) {
// we cast votes (either positive or negative)
// depending on the outcome of the validation and if
// valid, whether the commitments hash matches

if (res.has_value()) {
cb(ParticipationOutcome::Valid);
return;
}
cb(ParticipationOutcome::Invalid);
});
}

} // namespace kagome::dispute
195 changes: 97 additions & 98 deletions core/parachain/approval/approval_distribution.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1240,117 +1240,116 @@ namespace kagome::parachain {
ValidatorIndex validator_index,
Hash block_hash,
GroupIndex backing_group) {
auto on_recover_complete =
[wself{weak_from_this()},
hashed_candidate{hashed_candidate},
block_hash,
session_index,
validator_index,
relay_block_hash](
std::optional<outcome::result<runtime::AvailableData>>
&&opt_result) mutable {
auto self = wself.lock();
if (!self) {
return;
}

const auto &candidate_receipt = hashed_candidate.get();
if (!opt_result) { // Unavailable
self->logger_->warn(
"No available parachain data.(session index={}, candidate "
"hash={}, relay block hash={})",
session_index,
hashed_candidate.getHash(),
relay_block_hash);
return;
}
auto on_recover_complete = [wself{weak_from_this()},
hashed_candidate{hashed_candidate},
block_hash,
session_index,
validator_index,
relay_block_hash](
std::optional<
outcome::result<runtime::AvailableData>>
&&opt_result) mutable {
auto self = wself.lock();
if (!self) {
return;
}

if (opt_result->has_error()) {
self->logger_->warn(
"Parachain data recovery failed.(error={}, session index={}, "
"candidate hash={}, relay block hash={})",
opt_result->error(),
session_index,
hashed_candidate.getHash(),
relay_block_hash);
self->dispute_coordinator_.get()->issueLocalStatement(
session_index,
hashed_candidate.getHash(),
hashed_candidate.get(),
false);
return;
}
auto &available_data = opt_result->value();
auto result = self->parachain_host_->validation_code_by_hash(
block_hash, candidate_receipt.descriptor.validation_code_hash);
if (result.has_error() || !result.value()) {
self->logger_->warn(
"Approval state is failed. Block hash {}, session index {}, "
"validator index {}, relay parent {}",
block_hash,
session_index,
validator_index,
candidate_receipt.descriptor.relay_parent);
return; /// ApprovalState::failed
}
const auto &candidate_receipt = hashed_candidate.get();
if (!opt_result) { // Unavailable
self->logger_->warn(
"No available parachain data.(session index={}, candidate "
"hash={}, relay block hash={})",
session_index,
hashed_candidate.getHash(),
relay_block_hash);
return;
}

self->logger_->info(
"Make exhaustive validation. Candidate hash {}, validator index "
"{}, block hash {}",
hashed_candidate.getHash(),
validator_index,
block_hash);
if (opt_result->has_error()) {
self->logger_->warn(
"Parachain data recovery failed.(error={}, session index={}, "
"candidate hash={}, relay block hash={})",
opt_result->error(),
session_index,
hashed_candidate.getHash(),
relay_block_hash);
self->dispute_coordinator_.get()->issueLocalStatement(
session_index,
hashed_candidate.getHash(),
hashed_candidate.get(),
false);
return;
}
auto &available_data = opt_result->value();
auto result = self->parachain_host_->validation_code_by_hash(
block_hash, candidate_receipt.descriptor.validation_code_hash);
if (result.has_error() || !result.value()) {
self->logger_->warn(
"Approval state is failed. Block hash {}, session index {}, "
"validator index {}, relay parent {}",
block_hash,
session_index,
validator_index,
candidate_receipt.descriptor.relay_parent);
return; /// ApprovalState::failed
}

runtime::ValidationCode &validation_code = *result.value();
self->logger_->info(
"Make exhaustive validation. Candidate hash {}, validator index "
"{}, block hash {}",
hashed_candidate.getHash(),
validator_index,
block_hash);

auto outcome = self->validate_candidate_exhaustive(
available_data.validation_data,
available_data.pov,
candidate_receipt,
validation_code);
runtime::ValidationCode &validation_code = *result.value();

self->approvals_cache_.exclusiveAccess([&](auto &approvals_cache) {
if (auto it = approvals_cache.find(hashed_candidate.getHash());
it != approvals_cache.end()) {
ApprovalCache &ac = it->second;
ac.approval_result = outcome;
}
});
if (ApprovalOutcome::Approved == outcome) {
self->issue_approval(
hashed_candidate.getHash(), validator_index, relay_block_hash);
} else if (ApprovalOutcome::Failed == outcome) {
self->dispute_coordinator_.get()->issueLocalStatement(
session_index,
hashed_candidate.getHash(),
candidate_receipt,
false);
auto cb = [weak_self{wself},
hashed_candidate,
session_index,
validator_index,
relay_block_hash](outcome::result<Pvf::Result> outcome) {
auto self = weak_self.lock();
if (not self) {
return;
}
const auto &candidate_receipt = hashed_candidate.get();
self->approvals_cache_.exclusiveAccess([&](auto &approvals_cache) {
if (auto it = approvals_cache.find(hashed_candidate.getHash());
it != approvals_cache.end()) {
ApprovalCache &ac = it->second;
ac.approval_result = outcome.has_error()
? ApprovalOutcome::Failed
: ApprovalOutcome::Approved;
}
};
});
if (outcome.has_error()) {
self->logger_->warn(
"Approval validation failed.(parachain id={}, relay parent={})",
candidate_receipt.descriptor.para_id,
candidate_receipt.descriptor.relay_parent);
self->dispute_coordinator_.get()->issueLocalStatement(
session_index,
hashed_candidate.getHash(),
candidate_receipt,
false);
} else {
self->issue_approval(
hashed_candidate.getHash(), validator_index, relay_block_hash);
}
};
self->pvf_->pvfValidate(available_data.validation_data,
available_data.pov,
candidate_receipt,
validation_code,
std::move(cb));
};

recovery_->recover(hashed_candidate,
session_index,
backing_group,
std::move(on_recover_complete));
}

ApprovalDistribution::ApprovalOutcome
ApprovalDistribution::validate_candidate_exhaustive(
const runtime::PersistedValidationData &data,
const network::ParachainBlock &pov,
const network::CandidateReceipt &receipt,
const ParachainRuntime &code) {
if (auto result = pvf_->pvfValidate(data, pov, receipt, code);
result.has_error()) {
logger_->warn(
"Approval validation failed.(parachain id={}, relay parent={})",
receipt.descriptor.para_id,
receipt.descriptor.relay_parent);
return ApprovalOutcome::Failed;
}
return ApprovalOutcome::Approved;
}

#define GET_OPT_VALUE_OR_EXIT(name, err, ...) \
auto __##name = (__VA_ARGS__); \
if (!__##name) { \
Expand Down
6 changes: 0 additions & 6 deletions core/parachain/approval/approval_distribution.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -533,12 +533,6 @@ namespace kagome::parachain {
void imported_block_info(const primitives::BlockHash &block_hash,
const primitives::BlockHeader &block_header);

ApprovalOutcome validate_candidate_exhaustive(
const runtime::PersistedValidationData &data,
const network::ParachainBlock &pov,
const network::CandidateReceipt &receipt,
const ParachainRuntime &code);

AssignmentCheckResult check_and_import_assignment(
const approval::IndirectAssignmentCert &assignment,
CandidateIndex claimed_candidate_index);
Expand Down
1 change: 1 addition & 0 deletions core/parachain/pvf/kagome_pvf_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ namespace kagome::parachain {
OUTCOME_TRY(len, scale::encode<uint32_t>(result.size()));
std::cout.write((const char *)len.data(), len.size());
std::cout.write((const char *)result.data(), result.size());
std::cout.flush();
return outcome::success();
}

Expand Down
20 changes: 19 additions & 1 deletion core/parachain/pvf/kagome_pvf_worker_injector.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "runtime/memory_provider.hpp"
#include "runtime/module.hpp"
#include "storage/trie/serialization/trie_serializer_impl.hpp"
#include "storage/trie/trie_storage.hpp"

#if KAGOME_WASM_COMPILER_WAVM == 1
#include "runtime/wavm/compartment_wrapper.hpp"
Expand All @@ -47,6 +48,23 @@ using kagome::injector::bind_by_lambda;
#endif

namespace kagome::parachain {
struct NullTrieStorage : storage::trie::TrieStorage {
outcome::result<std::unique_ptr<storage::trie::TrieBatch>>
getPersistentBatchAt(const storage::trie::RootHash &root,
TrieChangesTrackerOpt changes_tracker) override {
return nullptr;
}
outcome::result<std::unique_ptr<storage::trie::TrieBatch>>
getEphemeralBatchAt(const storage::trie::RootHash &root) const override {
return nullptr;
}
outcome::result<std::unique_ptr<storage::trie::TrieBatch>>
getProofReaderBatchAt(const storage::trie::RootHash &root,
const OnNodeLoaded &on_node_loaded) const override {
return nullptr;
}
};

template <typename T>
auto bind_null() {
return bind_by_lambda<T>([](auto &) { return nullptr; });
Expand All @@ -69,7 +87,7 @@ namespace kagome::parachain {
bind_null<offchain::OffchainPersistentStorage>(),
bind_null<offchain::OffchainWorkerPool>(),
di::bind<host_api::HostApiFactory>.to<host_api::HostApiFactoryImpl>(),
bind_null<storage::trie::TrieStorage>(),
di::bind<storage::trie::TrieStorage>.to<NullTrieStorage>(),
bind_null<storage::trie::TrieSerializer>()

#if KAGOME_WASM_COMPILER_WAVM == 1
Expand Down
19 changes: 10 additions & 9 deletions core/parachain/pvf/pvf.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,20 @@ namespace kagome::parachain {
using CandidateCommitments = network::CandidateCommitments;
using PersistedValidationData = runtime::PersistedValidationData;
using Result = std::pair<CandidateCommitments, PersistedValidationData>;
using Cb = std::function<void(outcome::result<Result>)>;

virtual ~Pvf() = default;

/// Execute pvf synchronously
virtual outcome::result<Result> pvfSync(
const CandidateReceipt &receipt,
const ParachainBlock &pov,
const runtime::PersistedValidationData &pvd) const = 0;
virtual void pvf(const CandidateReceipt &receipt,
const ParachainBlock &pov,
const runtime::PersistedValidationData &pvd,
Cb cb) const = 0;

virtual outcome::result<Result> pvfValidate(
const PersistedValidationData &data,
const ParachainBlock &pov,
const CandidateReceipt &receipt,
const ParachainRuntime &code) const = 0;
virtual void pvfValidate(const PersistedValidationData &data,
const ParachainBlock &pov,
const CandidateReceipt &receipt,
const ParachainRuntime &code,
Cb cb) const = 0;
};
} // namespace kagome::parachain
Loading

0 comments on commit 541d483

Please sign in to comment.