From 541d4835d12fb9d0a0385340d8a0560b3bfa9065 Mon Sep 17 00:00:00 2001 From: Ruslan Tushov Date: Thu, 25 Apr 2024 18:33:34 +0500 Subject: [PATCH] fix pvf process (#2045) Signed-off-by: turuslan --- core/crypto/type_hasher.hpp | 12 +- .../participation/impl/participation_impl.cpp | 35 ++-- .../approval/approval_distribution.cpp | 195 +++++++++--------- .../approval/approval_distribution.hpp | 6 - core/parachain/pvf/kagome_pvf_worker.cpp | 1 + .../pvf/kagome_pvf_worker_injector.hpp | 20 +- core/parachain/pvf/pvf.hpp | 19 +- core/parachain/pvf/pvf_impl.cpp | 147 ++++++++----- core/parachain/pvf/pvf_impl.hpp | 37 ++-- core/parachain/pvf/pvf_thread_pool.hpp | 4 +- .../validator/impl/parachain_processor.cpp | 155 ++++++++------ .../validator/parachain_processor.hpp | 4 - .../validator/prospective_parachains.hpp | 6 +- core/utils/argv0.hpp | 30 ++- node/main.cpp | 2 - test/core/parachain/pvf_test.cpp | 20 +- 16 files changed, 399 insertions(+), 294 deletions(-) diff --git a/core/crypto/type_hasher.hpp b/core/crypto/type_hasher.hpp index 15adba3e1f..a3430e6055 100644 --- a/core/crypto/type_hasher.hpp +++ b/core/crypto/type_hasher.hpp @@ -7,7 +7,7 @@ #pragma once #include -#include + #include "crypto/hasher/blake2b_stream_hasher.hpp" #include "scale/kagome_scale.hpp" @@ -24,12 +24,16 @@ namespace kagome::crypto { struct Hashed { static_assert(N == 8 || N == 16 || N == 32 || N == 64, "Unexpected hash size"); - using Type = std::decay_t; + using Type = T; using HashType = common::Blob; public: - template - Hashed(Args &&...args) : type_{std::forward(args)...} {} + // NOLINTNEXTLINE(google-explicit-constructor) + Hashed(Type &&type) + requires(std::is_same_v>) + : type_{std::move(type)} {} + // NOLINTNEXTLINE(google-explicit-constructor) + Hashed(const Type &type) : type_{type} {} Hashed(const Hashed &c) = default; Hashed(Hashed &&c) = default; diff --git a/core/dispute_coordinator/participation/impl/participation_impl.cpp b/core/dispute_coordinator/participation/impl/participation_impl.cpp index 7e215f7323..d84c1103dd 100644 --- a/core/dispute_coordinator/participation/impl/participation_impl.cpp +++ b/core/dispute_coordinator/participation/impl/participation_impl.cpp @@ -238,25 +238,22 @@ namespace kagome::dispute { BOOST_ASSERT(ctx->available_data.has_value()); BOOST_ASSERT(ctx->validation_code.has_value()); - auto res = pvf_->pvfValidate(ctx->available_data->validation_data, - ctx->available_data->pov, - ctx->request.candidate_receipt, - ctx->validation_code.value()); - - // we cast votes (either positive or negative) depending on the outcome of - // the validation and if valid, whether the commitments hash matches - - if (res.has_value()) { - cb(ParticipationOutcome::Valid); - return; - } - - // SL_WARN(log_, - // "Candidate {} considered invalid: {}", - // ctx->request.candidate_hash, - // res.error()); - - cb(ParticipationOutcome::Invalid); + pvf_->pvfValidate( + ctx->available_data->validation_data, + ctx->available_data->pov, + ctx->request.candidate_receipt, + ctx->validation_code.value(), + [cb{std::move(cb)}](outcome::result &&res) { + // we cast votes (either positive or negative) + // depending on the outcome of the validation and if + // valid, whether the commitments hash matches + + if (res.has_value()) { + cb(ParticipationOutcome::Valid); + return; + } + cb(ParticipationOutcome::Invalid); + }); } } // namespace kagome::dispute diff --git a/core/parachain/approval/approval_distribution.cpp b/core/parachain/approval/approval_distribution.cpp index daf9914f26..df092e243f 100644 --- a/core/parachain/approval/approval_distribution.cpp +++ b/core/parachain/approval/approval_distribution.cpp @@ -1240,93 +1240,109 @@ namespace kagome::parachain { ValidatorIndex validator_index, Hash block_hash, GroupIndex backing_group) { - auto on_recover_complete = - [wself{weak_from_this()}, - hashed_candidate{hashed_candidate}, - block_hash, - session_index, - validator_index, - relay_block_hash]( - std::optional> - &&opt_result) mutable { - auto self = wself.lock(); - if (!self) { - return; - } - - const auto &candidate_receipt = hashed_candidate.get(); - if (!opt_result) { // Unavailable - self->logger_->warn( - "No available parachain data.(session index={}, candidate " - "hash={}, relay block hash={})", - session_index, - hashed_candidate.getHash(), - relay_block_hash); - return; - } + auto on_recover_complete = [wself{weak_from_this()}, + hashed_candidate{hashed_candidate}, + block_hash, + session_index, + validator_index, + relay_block_hash]( + std::optional< + outcome::result> + &&opt_result) mutable { + auto self = wself.lock(); + if (!self) { + return; + } - if (opt_result->has_error()) { - self->logger_->warn( - "Parachain data recovery failed.(error={}, session index={}, " - "candidate hash={}, relay block hash={})", - opt_result->error(), - session_index, - hashed_candidate.getHash(), - relay_block_hash); - self->dispute_coordinator_.get()->issueLocalStatement( - session_index, - hashed_candidate.getHash(), - hashed_candidate.get(), - false); - return; - } - auto &available_data = opt_result->value(); - auto result = self->parachain_host_->validation_code_by_hash( - block_hash, candidate_receipt.descriptor.validation_code_hash); - if (result.has_error() || !result.value()) { - self->logger_->warn( - "Approval state is failed. Block hash {}, session index {}, " - "validator index {}, relay parent {}", - block_hash, - session_index, - validator_index, - candidate_receipt.descriptor.relay_parent); - return; /// ApprovalState::failed - } + const auto &candidate_receipt = hashed_candidate.get(); + if (!opt_result) { // Unavailable + self->logger_->warn( + "No available parachain data.(session index={}, candidate " + "hash={}, relay block hash={})", + session_index, + hashed_candidate.getHash(), + relay_block_hash); + return; + } - self->logger_->info( - "Make exhaustive validation. Candidate hash {}, validator index " - "{}, block hash {}", - hashed_candidate.getHash(), - validator_index, - block_hash); + if (opt_result->has_error()) { + self->logger_->warn( + "Parachain data recovery failed.(error={}, session index={}, " + "candidate hash={}, relay block hash={})", + opt_result->error(), + session_index, + hashed_candidate.getHash(), + relay_block_hash); + self->dispute_coordinator_.get()->issueLocalStatement( + session_index, + hashed_candidate.getHash(), + hashed_candidate.get(), + false); + return; + } + auto &available_data = opt_result->value(); + auto result = self->parachain_host_->validation_code_by_hash( + block_hash, candidate_receipt.descriptor.validation_code_hash); + if (result.has_error() || !result.value()) { + self->logger_->warn( + "Approval state is failed. Block hash {}, session index {}, " + "validator index {}, relay parent {}", + block_hash, + session_index, + validator_index, + candidate_receipt.descriptor.relay_parent); + return; /// ApprovalState::failed + } - runtime::ValidationCode &validation_code = *result.value(); + self->logger_->info( + "Make exhaustive validation. Candidate hash {}, validator index " + "{}, block hash {}", + hashed_candidate.getHash(), + validator_index, + block_hash); - auto outcome = self->validate_candidate_exhaustive( - available_data.validation_data, - available_data.pov, - candidate_receipt, - validation_code); + runtime::ValidationCode &validation_code = *result.value(); - self->approvals_cache_.exclusiveAccess([&](auto &approvals_cache) { - if (auto it = approvals_cache.find(hashed_candidate.getHash()); - it != approvals_cache.end()) { - ApprovalCache &ac = it->second; - ac.approval_result = outcome; - } - }); - if (ApprovalOutcome::Approved == outcome) { - self->issue_approval( - hashed_candidate.getHash(), validator_index, relay_block_hash); - } else if (ApprovalOutcome::Failed == outcome) { - self->dispute_coordinator_.get()->issueLocalStatement( - session_index, - hashed_candidate.getHash(), - candidate_receipt, - false); + auto cb = [weak_self{wself}, + hashed_candidate, + session_index, + validator_index, + relay_block_hash](outcome::result outcome) { + auto self = weak_self.lock(); + if (not self) { + return; + } + const auto &candidate_receipt = hashed_candidate.get(); + self->approvals_cache_.exclusiveAccess([&](auto &approvals_cache) { + if (auto it = approvals_cache.find(hashed_candidate.getHash()); + it != approvals_cache.end()) { + ApprovalCache &ac = it->second; + ac.approval_result = outcome.has_error() + ? ApprovalOutcome::Failed + : ApprovalOutcome::Approved; } - }; + }); + if (outcome.has_error()) { + self->logger_->warn( + "Approval validation failed.(parachain id={}, relay parent={})", + candidate_receipt.descriptor.para_id, + candidate_receipt.descriptor.relay_parent); + self->dispute_coordinator_.get()->issueLocalStatement( + session_index, + hashed_candidate.getHash(), + candidate_receipt, + false); + } else { + self->issue_approval( + hashed_candidate.getHash(), validator_index, relay_block_hash); + } + }; + self->pvf_->pvfValidate(available_data.validation_data, + available_data.pov, + candidate_receipt, + validation_code, + std::move(cb)); + }; recovery_->recover(hashed_candidate, session_index, @@ -1334,23 +1350,6 @@ namespace kagome::parachain { std::move(on_recover_complete)); } - ApprovalDistribution::ApprovalOutcome - ApprovalDistribution::validate_candidate_exhaustive( - const runtime::PersistedValidationData &data, - const network::ParachainBlock &pov, - const network::CandidateReceipt &receipt, - const ParachainRuntime &code) { - if (auto result = pvf_->pvfValidate(data, pov, receipt, code); - result.has_error()) { - logger_->warn( - "Approval validation failed.(parachain id={}, relay parent={})", - receipt.descriptor.para_id, - receipt.descriptor.relay_parent); - return ApprovalOutcome::Failed; - } - return ApprovalOutcome::Approved; - } - #define GET_OPT_VALUE_OR_EXIT(name, err, ...) \ auto __##name = (__VA_ARGS__); \ if (!__##name) { \ diff --git a/core/parachain/approval/approval_distribution.hpp b/core/parachain/approval/approval_distribution.hpp index 68fb271f23..d33b0bd85e 100644 --- a/core/parachain/approval/approval_distribution.hpp +++ b/core/parachain/approval/approval_distribution.hpp @@ -533,12 +533,6 @@ namespace kagome::parachain { void imported_block_info(const primitives::BlockHash &block_hash, const primitives::BlockHeader &block_header); - ApprovalOutcome validate_candidate_exhaustive( - const runtime::PersistedValidationData &data, - const network::ParachainBlock &pov, - const network::CandidateReceipt &receipt, - const ParachainRuntime &code); - AssignmentCheckResult check_and_import_assignment( const approval::IndirectAssignmentCert &assignment, CandidateIndex claimed_candidate_index); diff --git a/core/parachain/pvf/kagome_pvf_worker.cpp b/core/parachain/pvf/kagome_pvf_worker.cpp index f1a889a656..19d70b6e46 100644 --- a/core/parachain/pvf/kagome_pvf_worker.cpp +++ b/core/parachain/pvf/kagome_pvf_worker.cpp @@ -101,6 +101,7 @@ namespace kagome::parachain { OUTCOME_TRY(len, scale::encode(result.size())); std::cout.write((const char *)len.data(), len.size()); std::cout.write((const char *)result.data(), result.size()); + std::cout.flush(); return outcome::success(); } diff --git a/core/parachain/pvf/kagome_pvf_worker_injector.hpp b/core/parachain/pvf/kagome_pvf_worker_injector.hpp index 9b538193d2..6dde2ae707 100644 --- a/core/parachain/pvf/kagome_pvf_worker_injector.hpp +++ b/core/parachain/pvf/kagome_pvf_worker_injector.hpp @@ -25,6 +25,7 @@ #include "runtime/memory_provider.hpp" #include "runtime/module.hpp" #include "storage/trie/serialization/trie_serializer_impl.hpp" +#include "storage/trie/trie_storage.hpp" #if KAGOME_WASM_COMPILER_WAVM == 1 #include "runtime/wavm/compartment_wrapper.hpp" @@ -47,6 +48,23 @@ using kagome::injector::bind_by_lambda; #endif namespace kagome::parachain { + struct NullTrieStorage : storage::trie::TrieStorage { + outcome::result> + getPersistentBatchAt(const storage::trie::RootHash &root, + TrieChangesTrackerOpt changes_tracker) override { + return nullptr; + } + outcome::result> + getEphemeralBatchAt(const storage::trie::RootHash &root) const override { + return nullptr; + } + outcome::result> + getProofReaderBatchAt(const storage::trie::RootHash &root, + const OnNodeLoaded &on_node_loaded) const override { + return nullptr; + } + }; + template auto bind_null() { return bind_by_lambda([](auto &) { return nullptr; }); @@ -69,7 +87,7 @@ namespace kagome::parachain { bind_null(), bind_null(), di::bind.to(), - bind_null(), + di::bind.to(), bind_null() #if KAGOME_WASM_COMPILER_WAVM == 1 diff --git a/core/parachain/pvf/pvf.hpp b/core/parachain/pvf/pvf.hpp index 9128122708..ff9b739242 100644 --- a/core/parachain/pvf/pvf.hpp +++ b/core/parachain/pvf/pvf.hpp @@ -18,19 +18,20 @@ namespace kagome::parachain { using CandidateCommitments = network::CandidateCommitments; using PersistedValidationData = runtime::PersistedValidationData; using Result = std::pair; + using Cb = std::function)>; virtual ~Pvf() = default; /// Execute pvf synchronously - virtual outcome::result pvfSync( - const CandidateReceipt &receipt, - const ParachainBlock &pov, - const runtime::PersistedValidationData &pvd) const = 0; + virtual void pvf(const CandidateReceipt &receipt, + const ParachainBlock &pov, + const runtime::PersistedValidationData &pvd, + Cb cb) const = 0; - virtual outcome::result pvfValidate( - const PersistedValidationData &data, - const ParachainBlock &pov, - const CandidateReceipt &receipt, - const ParachainRuntime &code) const = 0; + virtual void pvfValidate(const PersistedValidationData &data, + const ParachainBlock &pov, + const CandidateReceipt &receipt, + const ParachainRuntime &code, + Cb cb) const = 0; }; } // namespace kagome::parachain diff --git a/core/parachain/pvf/pvf_impl.cpp b/core/parachain/pvf/pvf_impl.cpp index 64b9004a34..ee90fe5fc0 100644 --- a/core/parachain/pvf/pvf_impl.cpp +++ b/core/parachain/pvf/pvf_impl.cpp @@ -6,7 +6,7 @@ #include "parachain/pvf/pvf_impl.hpp" -#include +#include #include "application/app_configuration.hpp" #include "application/app_state_manager.hpp" @@ -14,6 +14,7 @@ #include "common/visitor.hpp" #include "metrics/histogram_timer.hpp" #include "parachain/pvf/module_precompiler.hpp" +#include "parachain/pvf/pvf_thread_pool.hpp" #include "parachain/pvf/pvf_worker_types.hpp" #include "parachain/pvf/run_worker.hpp" #include "parachain/pvf/session_params.hpp" @@ -29,6 +30,17 @@ #include "scale/std_variant.hpp" #include "utils/argv0.hpp" +#define _CB_TRY_VOID(tmp, expr) \ + auto tmp = (expr); \ + if (tmp.has_error()) { \ + return cb(tmp.error()); \ + } +#define _CB_TRY_OUT(tmp, out, expr) \ + _CB_TRY_VOID(tmp, expr); \ + out = std::move(tmp.value()); +#define CB_TRYV(expr) _CB_TRY_VOID(BOOST_OUTCOME_TRY_UNIQUE_NAME, expr) +#define CB_TRY(out, expr) _CB_TRY_OUT(BOOST_OUTCOME_TRY_UNIQUE_NAME, out, expr) + OUTCOME_CPP_DEFINE_CATEGORY(kagome::parachain, PvfError, e) { using kagome::parachain::PvfError; switch (e) { @@ -144,7 +156,8 @@ namespace kagome::parachain { std::shared_ptr parachain_api, std::shared_ptr executor, std::shared_ptr ctx_factory, - std::shared_ptr state_manager, + PvfThreadPool &pvf_thread_pool, + std::shared_ptr app_state_manager, std::shared_ptr app_configuration) : config_{config}, io_context_{std::move(io_context)}, @@ -165,8 +178,9 @@ namespace kagome::parachain { parachain_api_, runtime_cache_, hasher_)}, + pvf_thread_handler_{pvf_thread_pool.handler(*app_state_manager)}, app_configuration_{std::move(app_configuration)} { - state_manager->takeControl(*this); + app_state_manager->takeControl(*this); constexpr std::array engines{ "kBinaryen", "kWAVM", @@ -202,64 +216,86 @@ namespace kagome::parachain { return true; } - outcome::result PvfImpl::pvfValidate( - const PersistedValidationData &data, - const ParachainBlock &pov, - const CandidateReceipt &receipt, - const ParachainRuntime &code_zstd) const { - OUTCOME_TRY(pov_encoded, scale::encode(pov)); + void PvfImpl::pvfValidate(const PersistedValidationData &data, + const ParachainBlock &pov, + const CandidateReceipt &receipt, + const ParachainRuntime &code_zstd, + Cb cb) const { + REINVOKE(*pvf_thread_handler_, + pvfValidate, + data, + pov, + receipt, + code_zstd, + std::move(cb)); + CB_TRY(auto pov_encoded, scale::encode(pov)); if (pov_encoded.size() > data.max_pov_size) { - return PvfError::POV_SIZE; + return cb(PvfError::POV_SIZE); } auto pov_hash = hasher_->blake2b_256(pov_encoded); if (pov_hash != receipt.descriptor.pov_hash) { - return PvfError::POV_HASH; + return cb(PvfError::POV_HASH); } auto code_hash = hasher_->blake2b_256(code_zstd); if (code_hash != receipt.descriptor.validation_code_hash) { - return PvfError::CODE_HASH; + return cb(PvfError::CODE_HASH); } - OUTCOME_TRY(signature_valid, - sr25519_provider_->verify(receipt.descriptor.signature, - receipt.descriptor.signable(), - receipt.descriptor.collator_id)); + CB_TRY(auto signature_valid, + sr25519_provider_->verify(receipt.descriptor.signature, + receipt.descriptor.signable(), + receipt.descriptor.collator_id)); if (!signature_valid) { - return PvfError::SIGNATURE; + return cb(PvfError::SIGNATURE); } auto timer = metric_pvf_execution_time.timer(); ParachainRuntime code; - OUTCOME_TRY(runtime::uncompressCodeIfNeeded(code_zstd, code)); + CB_TRYV(runtime::uncompressCodeIfNeeded(code_zstd, code)); metric_code_size.observe(code.size()); ValidationParams params; params.parent_head = data.parent_head; - OUTCOME_TRY(runtime::uncompressCodeIfNeeded(pov.payload, - params.block_data.payload)); + CB_TRYV(runtime::uncompressCodeIfNeeded(pov.payload, + params.block_data.payload)); params.relay_parent_number = data.relay_parent_number; params.relay_parent_storage_root = data.relay_parent_storage_root; - OUTCOME_TRY(result, callWasm(receipt, code_hash, code, params)); - timer.reset(); - - OUTCOME_TRY(commitments, fromOutputs(receipt, std::move(result))); - return std::make_pair(std::move(commitments), data); + callWasm(receipt, + code_hash, + code, + params, + libp2p::SharedFn{[weak_self{weak_from_this()}, + data, + receipt, + cb{std::move(cb)}, + timer{std::move(timer)}]( + outcome::result r) { + auto self = weak_self.lock(); + if (not self) { + return; + } + CB_TRY(auto result, std::move(r)); + CB_TRY(auto commitments, + self->fromOutputs(receipt, std::move(result))); + cb(std::make_pair(std::move(commitments), data)); + }}); } - outcome::result PvfImpl::pvfSync( - const CandidateReceipt &receipt, - const ParachainBlock &pov, - const runtime::PersistedValidationData &pvd) const { + void PvfImpl::pvf(const CandidateReceipt &receipt, + const ParachainBlock &pov, + const runtime::PersistedValidationData &pvd, + Cb cb) const { + REINVOKE(*pvf_thread_handler_, pvf, receipt, pov, pvd, std::move(cb)); SL_DEBUG(log_, - "pvfSync relay_parent={} para_id={}", + "pvf relay_parent={} para_id={}", receipt.descriptor.relay_parent, receipt.descriptor.para_id); auto data_hash = hasher_->blake2b_256(::scale::encode(pvd).value()); if (receipt.descriptor.persisted_data_hash != data_hash) { - return PvfError::PERSISTED_DATA_HASH; + return cb(PvfError::PERSISTED_DATA_HASH); } - OUTCOME_TRY(code, getCode(receipt.descriptor)); - return pvfValidate(pvd, pov, receipt, code); + CB_TRY(auto code, getCode(receipt.descriptor)); + pvfValidate(pvd, pov, receipt, code, std::move(cb)); } outcome::result PvfImpl::getCode( @@ -288,25 +324,23 @@ namespace kagome::parachain { return PvfError::NO_CODE; } - outcome::result PvfImpl::callWasm( - const CandidateReceipt &receipt, - const common::Hash256 &code_hash, - const ParachainRuntime &code_zstd, - const ValidationParams ¶ms) const { - OUTCOME_TRY( - executor_params, - sessionParams(*parachain_api_, receipt.descriptor.relay_parent)); + void PvfImpl::callWasm(const CandidateReceipt &receipt, + const common::Hash256 &code_hash, + const ParachainRuntime &code_zstd, + const ValidationParams ¶ms, + WasmCb cb) const { + CB_TRY(auto executor_params, + sessionParams(*parachain_api_, receipt.descriptor.relay_parent)); constexpr auto name = "validate_block"; if (not app_configuration_->usePvfSubprocess()) { - OUTCOME_TRY(instance, - runtime_cache_->instantiateFromCode( - code_hash, code_zstd, executor_params)); - OUTCOME_TRY( - ctx, - ctx_factory_->ephemeral( - instance, storage::trie::kEmptyRootHash, executor_params)); - return executor_->call(ctx, name, params); + CB_TRY(auto instance, + runtime_cache_->instantiateFromCode( + code_hash, code_zstd, executor_params)); + CB_TRY(auto ctx, + ctx_factory_->ephemeral( + instance, storage::trie::kEmptyRootHash, executor_params)); + return cb(executor_->call(ctx, name, params)); } PvfWorkerInput input{ @@ -320,18 +354,17 @@ namespace kagome::parachain { : std::nullopt, app_configuration_->log(), }; - std::promise> promise; - auto cb = [&](outcome::result r) { - promise.set_value(std::move(r)); - }; runWorker(*io_context_, scheduler_, app_configuration_->pvfSubprocessDeadline(), - argv0().value(), + exePath(), common::Buffer{scale::encode(input).value()}, - cb); - OUTCOME_TRY(result, promise.get_future().get()); - return scale::decode(result); + [cb{std::move(cb)}](outcome::result r) { + if (r.has_error()) { + return cb(r.error()); + } + cb(scale::decode(r.value())); + }); } outcome::result PvfImpl::fromOutputs( diff --git a/core/parachain/pvf/pvf_impl.hpp b/core/parachain/pvf/pvf_impl.hpp index eec1f10aba..47139c4e42 100644 --- a/core/parachain/pvf/pvf_impl.hpp +++ b/core/parachain/pvf/pvf_impl.hpp @@ -22,6 +22,10 @@ namespace libp2p::basic { class Scheduler; } // namespace libp2p::basic +namespace kagome { + class PoolHandler; +} // namespace kagome + namespace kagome::application { class AppConfiguration; class AppStateManager; @@ -59,6 +63,8 @@ namespace kagome::parachain { OUTCOME_HPP_DECLARE_ERROR(kagome::parachain, PvfError) namespace kagome::parachain { + class PvfThreadPool; + class ModulePrecompiler; struct ValidationParams; @@ -93,6 +99,7 @@ namespace kagome::parachain { std::shared_ptr parachain_api, std::shared_ptr executor, std::shared_ptr ctx_factory, + PvfThreadPool &pvf_thread_pool, std::shared_ptr app_state_manager, std::shared_ptr app_configuration); @@ -100,27 +107,28 @@ namespace kagome::parachain { bool prepare(); - outcome::result pvfSync( - const CandidateReceipt &receipt, - const ParachainBlock &pov, - const runtime::PersistedValidationData &pvd) const override; - outcome::result pvfValidate( - const PersistedValidationData &data, - const ParachainBlock &pov, - const CandidateReceipt &receipt, - const ParachainRuntime &code) const override; + void pvf(const CandidateReceipt &receipt, + const ParachainBlock &pov, + const runtime::PersistedValidationData &pvd, + Cb cb) const override; + void pvfValidate(const PersistedValidationData &data, + const ParachainBlock &pov, + const CandidateReceipt &receipt, + const ParachainRuntime &code, + Cb cb) const override; private: using CandidateDescriptor = network::CandidateDescriptor; using ParachainRuntime = network::ParachainRuntime; + using WasmCb = std::function)>; outcome::result getCode( const CandidateDescriptor &descriptor) const; - outcome::result callWasm( - const CandidateReceipt &receipt, - const common::Hash256 &code_hash, - const ParachainRuntime &code_zstd, - const ValidationParams ¶ms) const; + void callWasm(const CandidateReceipt &receipt, + const common::Hash256 &code_hash, + const ParachainRuntime &code_zstd, + const ValidationParams ¶ms, + WasmCb cb) const; outcome::result fromOutputs( const CandidateReceipt &receipt, ValidationResult &&result) const; @@ -138,6 +146,7 @@ namespace kagome::parachain { std::shared_ptr runtime_cache_; std::shared_ptr precompiler_; + std::shared_ptr pvf_thread_handler_; std::shared_ptr app_configuration_; std::unique_ptr precompiler_thread_; diff --git a/core/parachain/pvf/pvf_thread_pool.hpp b/core/parachain/pvf/pvf_thread_pool.hpp index 5c5b341ad0..620132acc9 100644 --- a/core/parachain/pvf/pvf_thread_pool.hpp +++ b/core/parachain/pvf/pvf_thread_pool.hpp @@ -12,7 +12,9 @@ namespace kagome::parachain { class PvfThreadPool final : public ThreadPool { public: - PvfThreadPool(std::shared_ptr watchdog) + PvfThreadPool(std::shared_ptr watchdog, Inject, ...) : ThreadPool(std::move(watchdog), "pvf", 1, std::nullopt) {} + + PvfThreadPool(TestThreadPool test) : ThreadPool{std::move(test)} {} }; } // namespace kagome::parachain diff --git a/core/parachain/validator/impl/parachain_processor.cpp b/core/parachain/validator/impl/parachain_processor.cpp index ab2d0ddf92..e9792cfc81 100644 --- a/core/parachain/validator/impl/parachain_processor.cpp +++ b/core/parachain/validator/impl/parachain_processor.cpp @@ -3271,14 +3271,6 @@ namespace kagome::parachain { } } - outcome::result - ParachainProcessorImpl::validateCandidate( - const network::CandidateReceipt &candidate, - const network::ParachainBlock &pov, - runtime::PersistedValidationData &&pvd) { - return pvf_->pvfSync(candidate, pov, pvd); - } - outcome::result> ParachainProcessorImpl::validateErasureCoding( const runtime::AvailableData &validating_data, size_t n_validators) { @@ -3356,7 +3348,7 @@ namespace kagome::parachain { relay_parent, peer_id); - TicToc _measure{"Parachain validation", logger_}; + auto _measure = std::make_shared("Parachain validation", logger_); const auto candidate_hash{candidate.hash(*hasher_)}; /// TODO(iceseer): do https://github.com/qdrvm/kagome/issues/1888 @@ -3373,71 +3365,98 @@ namespace kagome::parachain { candidate_hash); return; } + auto cb = [weak_self{weak_from_this()}, + candidate, + pov, + pvd, + peer_id, + relay_parent, + n_validators, + _measure, + candidate_hash]( + outcome::result validation_result) mutable { + auto self = weak_self.lock(); + if (not self) { + return; + } + if (!validation_result) { + SL_WARN( + self->logger_, + "Candidate {} on relay_parent {}, para_id {} validation failed with " + "error: {}", + candidate_hash, + candidate.descriptor.relay_parent, + candidate.descriptor.para_id, + validation_result.error().message()); + return; + } - auto pvd_copy{pvd}; - auto validation_result = validateCandidate(candidate, pov, std::move(pvd)); - if (!validation_result) { - logger_->warn( - "Candidate {} on relay_parent {}, para_id {} validation failed with " - "error: {}", - candidate_hash, - candidate.descriptor.relay_parent, - candidate.descriptor.para_id, - validation_result.error().message()); - return; - } - - /// TODO(iceseer): do https://github.com/qdrvm/kagome/issues/1888 - /// checks if we still need to execute parachain task - need_to_process = - our_current_state_.active_leaves.count(relay_parent) != 0ull; + /// TODO(iceseer): do https://github.com/qdrvm/kagome/issues/1888 + /// checks if we still need to execute parachain task + auto need_to_process = + self->our_current_state_.active_leaves.count(relay_parent) != 0ull; - if (!need_to_process) { - SL_TRACE(logger_, - "Candidate validation skipped before erasure-coding because of " - "extruded relay parent. " - "(relay_parent={}, parachain_id={}, candidate_hash={})", - relay_parent, - candidate.descriptor.para_id, - candidate_hash); - return; - } + if (!need_to_process) { + SL_TRACE( + self->logger_, + "Candidate validation skipped before erasure-coding because of " + "extruded relay parent. " + "(relay_parent={}, parachain_id={}, candidate_hash={})", + relay_parent, + candidate.descriptor.para_id, + candidate_hash); + return; + } - auto &[comms, data] = validation_result.value(); - runtime::AvailableData available_data{ - .pov = std::move(pov), - .validation_data = std::move(data), - }; + auto &[comms, data] = validation_result.value(); + runtime::AvailableData available_data{ + .pov = std::move(pov), + .validation_data = std::move(data), + }; - std::vector chunks; - if (auto res = validateErasureCoding(available_data, n_validators); - res.has_error()) { - SL_WARN(logger_, - "Erasure coding validation failed. (error={})", - res.error().message()); - return; - } else { - chunks = std::move(res.value()); - } + auto chunks_res = + self->validateErasureCoding(available_data, n_validators); + if (chunks_res.has_error()) { + SL_WARN(self->logger_, + "Erasure coding validation failed. (error={})", + chunks_res.error()); + return; + } + auto &chunks = chunks_res.value(); - notifyAvailableData(std::move(chunks), - relay_parent, - candidate_hash, - available_data.pov, - available_data.validation_data); + self->notifyAvailableData(std::move(chunks), + relay_parent, + candidate_hash, + available_data.pov, + available_data.validation_data); - makeAvailable( - peer_id, - candidate_hash, - ValidateAndSecondResult{ - .result = outcome::success(), - .relay_parent = relay_parent, - .commitments = std::make_shared( - std::move(comms)), - .candidate = std::move(candidate), - .pov = std::move(available_data.pov), - .pvd = std::move(pvd_copy), - }); + self->makeAvailable( + peer_id, + candidate_hash, + ValidateAndSecondResult{ + .result = outcome::success(), + .relay_parent = relay_parent, + .commitments = std::make_shared( + std::move(comms)), + .candidate = candidate, + .pov = std::move(available_data.pov), + .pvd = std::move(pvd), + }); + }; + pvf_->pvf(candidate, + pov, + pvd, + [weak_self{weak_from_this()}, + cb{std::move(cb)}](outcome::result r) mutable { + auto self = weak_self.lock(); + if (not self) { + return; + } + post(*self->main_pool_handler_, + [cb{std::move(cb)}, r{std::move(r)}]() mutable { + cb(std::move(r)); + }); + }); } void ParachainProcessorImpl::onAttestComplete( diff --git a/core/parachain/validator/parachain_processor.hpp b/core/parachain/validator/parachain_processor.hpp index f706d33853..7758fba5c3 100644 --- a/core/parachain/validator/parachain_processor.hpp +++ b/core/parachain/validator/parachain_processor.hpp @@ -288,10 +288,6 @@ namespace kagome::parachain { outcome::result advCanBeProcessed( const primitives::BlockHash &relay_parent, const libp2p::peer::PeerId &peer_id); - outcome::result validateCandidate( - const network::CandidateReceipt &candidate, - const network::ParachainBlock &pov, - runtime::PersistedValidationData &&pvd); outcome::result> validateErasureCoding( const runtime::AvailableData &validating_data, size_t n_validators); diff --git a/core/parachain/validator/prospective_parachains.hpp b/core/parachain/validator/prospective_parachains.hpp index 3ce1ced3c4..96aba90597 100644 --- a/core/parachain/validator/prospective_parachains.hpp +++ b/core/parachain/validator/prospective_parachains.hpp @@ -377,14 +377,16 @@ namespace kagome::parachain { "Add block. " "(relay_hash={}, hash={}, number={})", relay_hash, - hash, info->number); + hash, + info->number); block_info.emplace_back(*info); } else { SL_TRACE(logger, "Skipped block. " "(relay_hash={}, hash={}, number={})", relay_hash, - hash, info->number); + hash, + info->number); break; } } diff --git a/core/utils/argv0.hpp b/core/utils/argv0.hpp index e680e6a313..880e2094d5 100644 --- a/core/utils/argv0.hpp +++ b/core/utils/argv0.hpp @@ -6,12 +6,30 @@ #pragma once -#include -#include +#include namespace kagome { - inline auto &argv0() { - static std::optional executable; - return executable; - } + inline const std::filesystem::path &exePath(); } // namespace kagome + +#ifdef __APPLE__ +#include + +const std::filesystem::path &kagome::exePath() { + static const std::filesystem::path path{[] { + std::string path; + uint32_t size = 0; + _NSGetExecutablePath(nullptr, &size); + path.resize(size); + _NSGetExecutablePath(path.data(), &size); + return path; + }()}; + return path; +} +#else +const std::filesystem::path &kagome::exePath() { + static const std::filesystem::path path{ + std::filesystem::read_symlink("/proc/self/exe")}; + return path; +} +#endif diff --git a/node/main.cpp b/node/main.cpp index 4b6da270ab..3985f6cea3 100644 --- a/node/main.cpp +++ b/node/main.cpp @@ -23,7 +23,6 @@ #include "log/configurator.hpp" #include "log/logger.hpp" #include "parachain/pvf/kagome_pvf_worker.hpp" -#include "utils/argv0.hpp" using kagome::application::AppConfiguration; using kagome::application::AppConfigurationImpl; @@ -97,7 +96,6 @@ int main(int argc, const char **argv) { // Needed for zombienet setvbuf(stdout, nullptr, _IOLBF, 0); setvbuf(stderr, nullptr, _IOLBF, 0); - kagome::argv0() = argv[0]; if (argc > 1) { std::string_view name{argv[1]}; diff --git a/test/core/parachain/pvf_test.cpp b/test/core/parachain/pvf_test.cpp index b90a9a0435..a4e5746df3 100644 --- a/test/core/parachain/pvf_test.cpp +++ b/test/core/parachain/pvf_test.cpp @@ -21,13 +21,16 @@ #include "mock/core/runtime/runtime_context_factory_mock.hpp" #include "mock/core/runtime/runtime_properties_cache_mock.hpp" #include "mock/span.hpp" +#include "parachain/pvf/pvf_thread_pool.hpp" #include "parachain/types.hpp" #include "runtime/executor.hpp" #include "testutil/literals.hpp" #include "testutil/outcome.hpp" #include "testutil/prepare_loggers.hpp" +using kagome::TestThreadPool; using kagome::application::AppConfigurationMock; +using kagome::application::StartApp; using kagome::common::Buffer; using kagome::common::BufferView; using kagome::common::Hash256; @@ -36,6 +39,7 @@ using kagome::parachain::ParachainId; using kagome::parachain::ParachainRuntime; using kagome::parachain::Pvf; using kagome::parachain::PvfImpl; +using kagome::parachain::PvfThreadPool; using kagome::parachain::ValidationResult; using kagome::runtime::DontInstrumentWasm; using kagome::runtime::MemoryLimits; @@ -82,9 +86,9 @@ class PvfTest : public testing::Test { EXPECT_CALL(*parachain_api, session_executor_params(_, _)) .WillRepeatedly(Return(outcome::success(std::nullopt))); - auto app_state_manager = - std::make_shared(); + auto app_state_manager = std::make_shared(); + PvfThreadPool pvf_thread{TestThreadPool{io_}}; pvf_ = std::make_shared( PvfImpl::Config{ .precompile_modules = false, @@ -101,8 +105,10 @@ class PvfTest : public testing::Test { parachain_api, executor, ctx_factory, + pvf_thread, app_state_manager, app_config_); + app_state_manager->start(); } auto mockModule(uint8_t code_i) { @@ -137,7 +143,13 @@ class PvfTest : public testing::Test { receipt.descriptor.para_head_hash = hasher_->blake2b_256(pvd.parent_head); receipt.commitments_hash = hasher_->blake2b_256( scale::encode(Pvf::CandidateCommitments{}).value()); - ASSERT_OUTCOME_SUCCESS_TRY(pvf_->pvfValidate(pvd, pov, receipt, code)); + testing::MockFunction)> cb; + EXPECT_CALL(cb, Call(_)).WillOnce([](outcome::result r) { + EXPECT_TRUE(r); + }); + pvf_->pvfValidate(pvd, pov, receipt, code, cb.AsStdFunction()); + io_->restart(); + io_->run(); }; } @@ -149,6 +161,8 @@ class PvfTest : public testing::Test { std::shared_ptr module_factory_ = std::make_shared(); std::shared_ptr ctx_factory; + std::shared_ptr io_ = + std::make_shared(); }; TEST_F(PvfTest, InstancesCached) {