Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into feature/pvf-clone
Browse files Browse the repository at this point in the history
Signed-off-by: turuslan <[email protected]>

# Conflicts:
#	core/parachain/pvf/workers.cpp
  • Loading branch information
turuslan committed Dec 16, 2024
2 parents 308a8d7 + d056f23 commit 9d69a8e
Show file tree
Hide file tree
Showing 21 changed files with 553 additions and 281 deletions.
2 changes: 1 addition & 1 deletion core/authority_discovery/query/authority_peer_info.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ namespace kagome::authority_discovery {

common::Buffer raw{};
std::optional<TimestampScale> time{};
scale::PeerInfoSerializable peer{};
::scale::PeerInfoSerializable peer{};
};

} // namespace kagome::authority_discovery
4 changes: 2 additions & 2 deletions core/authority_discovery/timestamp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@
#include "scale/big_fixed_integers.hpp"

namespace kagome::authority_discovery {
using Timestamp = scale::uint128_t;
using TimestampScale = scale::Fixed<Timestamp>;
using Timestamp = ::scale::uint128_t;
using TimestampScale = ::scale::Fixed<Timestamp>;
} // namespace kagome::authority_discovery
7 changes: 5 additions & 2 deletions core/crypto/type_hasher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@ namespace kagome::crypto {

template <typename H, typename... T>
inline void hashTypes(H &hasher, common::Blob<H::kOutlen> &out, T &&...t) {
auto val = ::scale::encode(std::forward<T>(t)...).value();
hasher.update(val);
scale::encode(
[&](const uint8_t *const ptr, size_t count) {
hasher.update(std::span<const uint8_t>(ptr, count));
},
std::forward<T>(t)...);
hasher.get_final(out);
}

Expand Down
2 changes: 1 addition & 1 deletion core/network/helpers/scale_message_read_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ namespace kagome::network {
template <typename MsgType>
void write(const MsgType &msg,
libp2p::basic::Writer::WriteCallbackFunc cb) const {
auto encoded_msg_res = ::scale::encode(msg);
auto encoded_msg_res = scale::encode(msg);
if (!encoded_msg_res) {
return cb(encoded_msg_res.error());
}
Expand Down
2 changes: 1 addition & 1 deletion core/network/impl/sync_protocol_observer_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ namespace kagome::network {
if (auto r = beefy_->getJustification(*number)) {
if (auto &opt = r.value()) {
new_block.beefy_justification = primitives::Justification{
common::Buffer{::scale::encode(*opt).value()},
common::Buffer{scale::encode(*opt).value()},
};
}
}
Expand Down
3 changes: 1 addition & 2 deletions core/network/types/collator_messages.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -458,8 +458,7 @@ namespace kagome::network {
auto commitments_hash =
hasher.blake2b_256(scale::encode(receipt.commitments).value());
return hasher.blake2b_256(
::scale::encode(std::tie(receipt.descriptor, commitments_hash))
.value());
scale::encode(std::tie(receipt.descriptor, commitments_hash)).value());
}

inline CandidateHash candidateHash(const crypto::Hasher &hasher,
Expand Down
5 changes: 3 additions & 2 deletions core/parachain/approval/knowledge.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@
#include "common/visitor.hpp"
#include "consensus/timeline/types.hpp"
#include "outcome/outcome.hpp"
#include "parachain/approval/approval.hpp"
#include "parachain/approval/state.hpp"
#include "parachain/types.hpp"

template <>
struct std::hash<scale::BitVec> {
auto operator()(const scale::BitVec &v) const {
auto s = ::scale::encode(v).value();
auto s = scale::encode(v).value();
return boost::hash_range(s.begin(), s.end());
}
};
Expand Down Expand Up @@ -86,7 +87,7 @@ namespace kagome::parachain::approval {

// Generate the knowledge keys for querying if an approval is known by peer.
static std::pair<MessageSubject, MessageKind> generate_approval_key(
const approval::IndirectSignedApprovalVoteV2 &approval) {
const IndirectSignedApprovalVoteV2 &approval) {
return {
std::make_tuple(approval.payload.payload.block_hash,
approval.payload.payload.candidate_indices,
Expand Down
3 changes: 2 additions & 1 deletion core/parachain/pvf/pvf_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ namespace kagome::parachain {
receipt.descriptor.relay_parent,
receipt.descriptor.para_id);

auto data_hash = hasher_->blake2b_256(::scale::encode(pvd).value());
auto data_hash = hasher_->blake2b_256(scale::encode(pvd).value());
if (receipt.descriptor.persisted_data_hash != data_hash) {
return cb(PvfError::PERSISTED_DATA_HASH);
}
Expand Down Expand Up @@ -391,6 +391,7 @@ namespace kagome::parachain {
}
cb(scale::decode<ValidationResult>(r.value()));
},
.kind = timeout_kind,
.timeout =
std::chrono::milliseconds{
timeout_kind == runtime::PvfExecTimeoutKind::Backing
Expand Down
34 changes: 27 additions & 7 deletions core/parachain/pvf/workers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include "utils/weak_macro.hpp"

namespace kagome::parachain {
constexpr auto kMetricQueueSize = "kagome_pvf_queue_size";

struct AsyncPipe : boost::process::async_pipe {
using async_pipe::async_pipe;
using lowest_layer_type = AsyncPipe;
Expand Down Expand Up @@ -135,13 +137,26 @@ namespace kagome::parachain {
.log_params = app_config.log(),
.force_disable_secure_mode = app_config.disableSecureMode(),
.secure_mode_support = secure_mode_support,
} {}
} {
metrics_registry_->registerGaugeFamily(kMetricQueueSize, "pvf queue size");
std::unordered_map<PvfExecTimeoutKind, std::string> kind_name{
{PvfExecTimeoutKind::Approval, "Approval"},
{PvfExecTimeoutKind::Backing, "Backing"},
};
for (auto &[kind, name] : kind_name) {
metric_queue_size_.emplace(kind,
metrics_registry_->registerGaugeMetric(
kMetricQueueSize, {{"kind", name}}));
}
}

void PvfWorkers::execute(Job &&job) {
REINVOKE(*main_pool_handler_, execute, std::move(job));
if (free_.empty()) {
if (used_ >= max_) {
queue_.emplace(std::move(job));
auto &queue = queues_[job.kind];
queue.emplace_back(std::move(job));
metric_queue_size_.at(job.kind)->set(queue.size());
return;
}
auto used = std::make_shared<Used>(*this);
Expand Down Expand Up @@ -245,11 +260,16 @@ namespace kagome::parachain {
}

void PvfWorkers::dequeue() {
if (queue_.empty()) {
return;
for (auto &kind :
{PvfExecTimeoutKind::Approval, PvfExecTimeoutKind::Backing}) {
auto &queue = queues_[kind];
if (queue.empty()) {
continue;
}
auto job = std::move(queue.front());
queue.pop_front();
metric_queue_size_.at(kind)->set(queue.size());
findFree(std::move(job));
}
auto job = std::move(queue_.front());
queue_.pop();
findFree(std::move(job));
}
} // namespace kagome::parachain
12 changes: 10 additions & 2 deletions core/parachain/pvf/workers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@

#pragma once

#include <deque>
#include <filesystem>
#include <list>
#include <queue>

#include "metrics/metrics.hpp"
#include "parachain/pvf/pvf_worker_types.hpp"
#include "runtime/runtime_api/parachain_host_types.hpp"

namespace boost::asio {
class io_context;
Expand All @@ -33,6 +35,8 @@ namespace kagome::common {
} // namespace kagome::common

namespace kagome::parachain {
using runtime::PvfExecTimeoutKind;

struct ProcessAndPipes;

class PvfWorkers : public std::enable_shared_from_this<PvfWorkers> {
Expand All @@ -47,6 +51,7 @@ namespace kagome::parachain {
PvfWorkerInputCodeParams code_params;
Buffer args;
Cb cb;
PvfExecTimeoutKind kind;
std::chrono::milliseconds timeout{0};
};
void execute(Job &&job);
Expand Down Expand Up @@ -78,6 +83,9 @@ namespace kagome::parachain {
PvfWorkerInputConfig worker_config_;
std::list<Worker> free_;
size_t used_ = 0;
std::queue<Job> queue_;
std::unordered_map<PvfExecTimeoutKind, std::deque<Job>> queues_;

metrics::RegistryPtr metrics_registry_ = metrics::createRegistry();
std::unordered_map<PvfExecTimeoutKind, metrics::Gauge *> metric_queue_size_;
};
} // namespace kagome::parachain
14 changes: 7 additions & 7 deletions core/parachain/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ namespace kagome::parachain {

auto signable() {
constexpr std::array<uint8_t, 4> kMagic{'V', 'C', 'P', 'C'};
return ::scale::encode(std::make_tuple(kMagic, *this)).value();
return scale::encode(std::make_tuple(kMagic, *this)).value();
}
};
} // namespace kagome::parachain
Expand Down Expand Up @@ -171,11 +171,11 @@ namespace kagome::network {

common::Buffer signable() const {
return common::Buffer{
::scale::encode(relay_parent,
para_id,
persisted_data_hash,
pov_hash,
validation_code_hash)
scale::encode(relay_parent,
para_id,
persisted_data_hash,
pov_hash,
validation_code_hash)
.value(),
};
}
Expand All @@ -193,7 +193,7 @@ namespace kagome::network {
const parachain::Hash &hash(const crypto::Hasher &hasher) const {
if (not hash_.has_value()) {
hash_.emplace(hasher.blake2b_256(
::scale::encode(std::tie(descriptor, commitments_hash)).value()));
scale::encode(std::tie(descriptor, commitments_hash)).value()));
}
return hash_.value();
}
Expand Down
2 changes: 1 addition & 1 deletion core/parachain/validator/signer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ namespace kagome::parachain {
template <typename T>
auto signable(const crypto::Hasher &hasher, const T &payload) const {
auto &&signable = toSignable(hasher, payload);
return ::scale::encode(std::tie(signable, *this)).value();
return scale::encode(std::tie(signable, *this)).value();
}

/// Current session index.
Expand Down
2 changes: 1 addition & 1 deletion core/parachain/validator/statement_distribution/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ namespace kagome::parachain {
statement,
[&](const StatementWithPVDSeconded &val) {
return hasher->blake2b_256(
::scale::encode(val.committed_receipt.to_plain(*hasher)).value());
scale::encode(val.committed_receipt.to_plain(*hasher)).value());
},
[&](const StatementWithPVDValid &val) { return val.candidate_hash; });
}
Expand Down
4 changes: 2 additions & 2 deletions core/primitives/inherent_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ namespace kagome::primitives {
const T &inherent) {
auto [it, inserted] = data.try_emplace(identifier, common::Buffer());
if (inserted) {
it->second = common::Buffer(::scale::encode(inherent).value());
it->second = common::Buffer(scale::encode(inherent).value());
return outcome::success();
}
return InherentDataError::IDENTIFIER_ALREADY_EXISTS;
Expand All @@ -62,7 +62,7 @@ namespace kagome::primitives {
*/
template <typename T>
void replaceData(InherentIdentifier identifier, const T &inherent) {
data[identifier] = common::Buffer(::scale::encode(inherent).value());
data[identifier] = common::Buffer(scale::encode(inherent).value());
}

/**
Expand Down
7 changes: 4 additions & 3 deletions core/runtime/module_instance.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "outcome/outcome.hpp"
#include "runtime/instance_environment.hpp"
#include "runtime/ptr_size.hpp"
#include "scale/kagome_scale.hpp"

namespace kagome::runtime {
class Module;
Expand All @@ -43,9 +44,9 @@ namespace kagome::runtime {
template <typename... Args>
static outcome::result<common::Buffer> encodeArgs(const Args &...args) {
if constexpr (sizeof...(args) > 0) {
return common::map_result(::scale::encode(args...), [](auto &&vec) {
return common::Buffer{vec};
});
return common::map_result(
kagome::scale::encode(args...),
[](auto &&vec) { return common::Buffer{vec}; });
}
return outcome::success();
}
Expand Down
22 changes: 22 additions & 0 deletions core/scale/encoder/concepts.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/**
* Copyright Quadrivium LLC
* All Rights Reserved
* SPDX-License-Identifier: Apache-2.0
*/

#pragma once

#include <cstdint>
#include <type_traits>

namespace kagome::scale {
template <typename F>
concept Invocable = std::is_invocable_v<F, const uint8_t *const, std::size_t>;

template <typename T>
concept IsEnum = std::is_enum_v<std::decay_t<T>>;

template <typename T>
concept IsNotEnum = !
std::is_enum_v<std::decay_t<T>>;
} // namespace kagome::scale
Loading

0 comments on commit 9d69a8e

Please sign in to comment.