Skip to content

Commit

Permalink
Send telemetry broadcasts to fast sync overlays
Browse files Browse the repository at this point in the history
  • Loading branch information
SpyCheese committed Nov 26, 2024
1 parent e6aac0b commit d9aeab0
Show file tree
Hide file tree
Showing 8 changed files with 141 additions and 27 deletions.
2 changes: 1 addition & 1 deletion validator-engine/validator-engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4985,7 +4985,7 @@ int main(int argc, char *argv[]) {
});
p.add_option(
'\0', "collect-validator-telemetry",
"store validator telemetry from private block overlay to a given file (json format)",
"store validator telemetry from fast sync overlay to a given file (json format)",
[&](td::Slice s) {
acts.push_back(
[&x, s = s.str()]() {
Expand Down
79 changes: 75 additions & 4 deletions validator/full-node-fast-sync-overlays.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,42 @@ void FullNodeFastSyncOverlay::process_block_candidate_broadcast(PublicKeyHash sr
validator_set_hash, std::move(data));
}

void FullNodeFastSyncOverlay::process_telemetry_broadcast(
adnl::AdnlNodeIdShort src, const tl_object_ptr<ton_api::validator_telemetry> &telemetry) {
if (telemetry->adnl_id_ != src.bits256_value()) {
VLOG(FULL_NODE_WARNING) << "Invalid telemetry broadcast from " << src << ": adnl_id mismatch";
return;
}
auto now = (td::int32)td::Clocks::system();
if (telemetry->timestamp_ < now - 60) {
VLOG(FULL_NODE_WARNING) << "Invalid telemetry broadcast from " << src << ": too old ("
<< now - telemetry->timestamp_ << "s ago)";
return;
}
if (telemetry->timestamp_ > now + 60) {
VLOG(FULL_NODE_WARNING) << "Invalid telemetry broadcast from " << src << ": too new ("
<< telemetry->timestamp_ - now << "s in the future)";
return;
}
VLOG(FULL_NODE_DEBUG) << "Got telemetry broadcast from " << src;
auto s = td::json_encode<std::string>(td::ToJson(*telemetry), false);
std::erase_if(s, [](char c) { return c == '\n' || c == '\r'; });
telemetry_file_ << s << "\n";
telemetry_file_.flush();
if (telemetry_file_.fail()) {
VLOG(FULL_NODE_WARNING) << "Failed to write telemetry to file";
}
}

void FullNodeFastSyncOverlay::receive_broadcast(PublicKeyHash src, td::BufferSlice broadcast) {
auto B = fetch_tl_object<ton_api::tonNode_Broadcast>(std::move(broadcast), true);
if (B.is_error()) {
if (collect_telemetry_ && src != local_id_.pubkey_hash()) {
auto R = fetch_tl_prefix<ton_api::validator_telemetry>(broadcast, true);
if (R.is_ok()) {
process_telemetry_broadcast(adnl::AdnlNodeIdShort{src}, R.ok());
}
}
return;
}

Expand Down Expand Up @@ -143,6 +176,30 @@ void FullNodeFastSyncOverlay::send_block_candidate(BlockIdExt block_id, Catchain
local_id_.pubkey_hash(), overlay::Overlays::BroadcastFlagAnySender(), B.move_as_ok());
}

void FullNodeFastSyncOverlay::send_validator_telemetry(tl_object_ptr<ton_api::validator_telemetry> telemetry) {
process_telemetry_broadcast(local_id_, telemetry);
auto data = serialize_tl_object(telemetry, true);
if (data.size() <= overlay::Overlays::max_simple_broadcast_size()) {
td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_ex, local_id_, overlay_id_,
local_id_.pubkey_hash(), 0, std::move(data));
} else {
td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_fec_ex, local_id_, overlay_id_,
local_id_.pubkey_hash(), 0, std::move(data));
}
}

void FullNodeFastSyncOverlay::collect_validator_telemetry(std::string filename) {
if (collect_telemetry_) {
telemetry_file_.close();
}
collect_telemetry_ = true;
LOG(FULL_NODE_WARNING) << "Collecting validator telemetry to " << filename << " (local id: " << local_id_ << ")";
telemetry_file_.open(filename, std::ios_base::app);
if (!telemetry_file_.is_open()) {
LOG(WARNING) << "Cannot open file " << filename << " for validator telemetry";
}
}

void FullNodeFastSyncOverlay::start_up() {
auto X = create_hash_tl_object<ton_api::tonNode_fastSyncOverlayId>(zero_state_file_hash_, create_tl_shard_id(shard_));
td::BufferSlice b{32};
Expand Down Expand Up @@ -261,22 +318,36 @@ void FullNodeFastSyncOverlay::get_stats_extra(td::Promise<std::string> promise)
promise.set_result(td::json_encode<std::string>(td::ToJson(*res), true));
}

td::actor::ActorId<FullNodeFastSyncOverlay> FullNodeFastSyncOverlays::choose_overlay(ShardIdFull shard) {
std::pair<td::actor::ActorId<FullNodeFastSyncOverlay>, adnl::AdnlNodeIdShort> FullNodeFastSyncOverlays::choose_overlay(
ShardIdFull shard) {
for (auto &p : id_to_overlays_) {
auto &overlays = p.second.overlays_;
ShardIdFull cur_shard = shard;
while (true) {
auto it = overlays.find(cur_shard);
if (it != overlays.end()) {
return it->second.get();
return {it->second.get(), p.first};
}
if (cur_shard.pfx_len() == 0) {
break;
}
cur_shard = shard_parent(cur_shard);
}
}
return {};
return {td::actor::ActorId<FullNodeFastSyncOverlay>{}, adnl::AdnlNodeIdShort::zero()};
}

td::actor::ActorId<FullNodeFastSyncOverlay> FullNodeFastSyncOverlays::get_masterchain_overlay_for(
adnl::AdnlNodeIdShort adnl_id) {
auto it = id_to_overlays_.find(adnl_id);
if (it == id_to_overlays_.end()) {
return {};
}
auto it2 = it->second.overlays_.find(ShardIdFull{masterchainId});
if (it2 == it->second.overlays_.end()) {
return {};
}
return it2->second.get();
}

void FullNodeFastSyncOverlays::update_overlays(td::Ref<MasterchainState> state,
Expand All @@ -291,7 +362,7 @@ void FullNodeFastSyncOverlays::update_overlays(td::Ref<MasterchainState> state,
monitoring_shards.insert(ShardIdFull{masterchainId});
std::set<ShardIdFull> all_shards;
all_shards.insert(ShardIdFull{masterchainId});
for (const auto& desc : state->get_shards()) {
for (const auto &desc : state->get_shards()) {
ShardIdFull shard = desc->shard();
td::uint32 monitor_min_split = state->monitor_min_split_depth(shard.workchain);
if (shard.pfx_len() > monitor_min_split) {
Expand Down
13 changes: 12 additions & 1 deletion validator/full-node-fast-sync-overlays.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#pragma once

#include "full-node.h"
#include <fstream>

namespace ton::validator::fullnode {

Expand All @@ -32,6 +33,9 @@ class FullNodeFastSyncOverlay : public td::actor::Actor {
void process_broadcast(PublicKeyHash src, ton_api::tonNode_newBlockCandidateBroadcastCompressed& query);
void process_block_candidate_broadcast(PublicKeyHash src, ton_api::tonNode_Broadcast& query);

void process_telemetry_broadcast(adnl::AdnlNodeIdShort src,
const tl_object_ptr<ton_api::validator_telemetry>& telemetry);

template <class T>
void process_broadcast(PublicKeyHash, T&) {
VLOG(FULL_NODE_WARNING) << "dropping unknown broadcast";
Expand All @@ -42,6 +46,9 @@ class FullNodeFastSyncOverlay : public td::actor::Actor {
void send_broadcast(BlockBroadcast broadcast);
void send_block_candidate(BlockIdExt block_id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash,
td::BufferSlice data);
void send_validator_telemetry(tl_object_ptr<ton_api::validator_telemetry> telemetry);

void collect_validator_telemetry(std::string filename);

void start_up() override;
void tear_down() override;
Expand Down Expand Up @@ -96,11 +103,15 @@ class FullNodeFastSyncOverlay : public td::actor::Actor {
void try_init();
void init();
void get_stats_extra(td::Promise<std::string> promise);

bool collect_telemetry_ = false;
std::ofstream telemetry_file_;
};

class FullNodeFastSyncOverlays {
public:
td::actor::ActorId<FullNodeFastSyncOverlay> choose_overlay(ShardIdFull shard);
std::pair<td::actor::ActorId<FullNodeFastSyncOverlay>, adnl::AdnlNodeIdShort> choose_overlay(ShardIdFull shard);
td::actor::ActorId<FullNodeFastSyncOverlay> get_masterchain_overlay_for(adnl::AdnlNodeIdShort adnl_id);
void update_overlays(td::Ref<MasterchainState> state, std::set<adnl::AdnlNodeIdShort> my_adnl_ids,
std::set<ShardIdFull> monitoring_shards, const FileHash& zero_state_file_hash,
const td::actor::ActorId<keyring::Keyring>& keyring, const td::actor::ActorId<adnl::Adnl>& adnl,
Expand Down
68 changes: 51 additions & 17 deletions validator/full-node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ void FullNodeImpl::on_new_masterchain_block(td::Ref<MasterchainState> state, std
fast_sync_overlays_.update_overlays(state, std::move(my_adnl_ids), std::move(monitoring_shards),
zero_state_file_hash_, keyring_, adnl_, overlays_, validator_manager_,
actor_id(this));
update_validator_telemetry_collector();
}
}

Expand Down Expand Up @@ -338,7 +339,7 @@ void FullNodeImpl::send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_s
td::actor::send_closure(private_block_overlays_.begin()->second,
&FullNodePrivateBlockOverlay::send_shard_block_info, block_id, cc_seqno, data.clone());
}
auto fast_sync_overlay = fast_sync_overlays_.choose_overlay(ShardIdFull(masterchainId));
auto fast_sync_overlay = fast_sync_overlays_.choose_overlay(ShardIdFull(masterchainId)).first;
if (!fast_sync_overlay.empty()) {
td::actor::send_closure(fast_sync_overlay, &FullNodeFastSyncOverlay::send_shard_block_info, block_id, cc_seqno,
data.clone());
Expand All @@ -358,7 +359,7 @@ void FullNodeImpl::send_block_candidate(BlockIdExt block_id, CatchainSeqno cc_se
td::actor::send_closure(private_block_overlays_.begin()->second, &FullNodePrivateBlockOverlay::send_block_candidate,
block_id, cc_seqno, validator_set_hash, data.clone());
}
auto fast_sync_overlay = fast_sync_overlays_.choose_overlay(block_id.shard_full());
auto fast_sync_overlay = fast_sync_overlays_.choose_overlay(block_id.shard_full()).first;
if (!fast_sync_overlay.empty()) {
td::actor::send_closure(fast_sync_overlay, &FullNodeFastSyncOverlay::send_block_candidate, block_id, cc_seqno,
validator_set_hash, data.clone());
Expand All @@ -383,7 +384,7 @@ void FullNodeImpl::send_broadcast(BlockBroadcast broadcast, int mode) {
td::actor::send_closure(private_block_overlays_.begin()->second, &FullNodePrivateBlockOverlay::send_broadcast,
broadcast.clone());
}
auto fast_sync_overlay = fast_sync_overlays_.choose_overlay(broadcast.block_id.shard_full());
auto fast_sync_overlay = fast_sync_overlays_.choose_overlay(broadcast.block_id.shard_full()).first;
if (!fast_sync_overlay.empty()) {
td::actor::send_closure(fast_sync_overlay, &FullNodeFastSyncOverlay::send_broadcast, broadcast.clone());
}
Expand Down Expand Up @@ -595,12 +596,21 @@ void FullNodeImpl::new_key_block(BlockHandle handle) {
}

void FullNodeImpl::send_validator_telemetry(PublicKeyHash key, tl_object_ptr<ton_api::validator_telemetry> telemetry) {
auto it = private_block_overlays_.find(key);
if (it == private_block_overlays_.end()) {
VLOG(FULL_NODE_INFO) << "Cannot send validator telemetry for " << key << " : no private block overlay";
return;
if (use_old_private_overlays_) {
auto it = private_block_overlays_.find(key);
if (it == private_block_overlays_.end()) {
VLOG(FULL_NODE_INFO) << "Cannot send validator telemetry for " << key << " : no private block overlay";
return;
}
td::actor::send_closure(it->second, &FullNodePrivateBlockOverlay::send_validator_telemetry, std::move(telemetry));
} else {
auto overlay = fast_sync_overlays_.get_masterchain_overlay_for(adnl::AdnlNodeIdShort{telemetry->adnl_id_});
if (overlay.empty()) {
VLOG(FULL_NODE_INFO) << "Cannot send validator telemetry for adnl id " << key << " : no fast sync overlay";
return;
}
td::actor::send_closure(overlay, &FullNodeFastSyncOverlay::send_validator_telemetry, std::move(telemetry));
}
td::actor::send_closure(it->second, &FullNodePrivateBlockOverlay::send_validator_telemetry, std::move(telemetry));
}

void FullNodeImpl::process_block_broadcast(BlockBroadcast broadcast) {
Expand Down Expand Up @@ -631,19 +641,43 @@ void FullNodeImpl::set_validator_telemetry_filename(std::string value) {
}

void FullNodeImpl::update_validator_telemetry_collector() {
if (validator_telemetry_filename_.empty() || private_block_overlays_.empty()) {
validator_telemetry_collector_key_ = PublicKeyHash::zero();
return;
}
if (!private_block_overlays_.contains(validator_telemetry_collector_key_)) {
auto it = private_block_overlays_.begin();
validator_telemetry_collector_key_ = it->first;
td::actor::send_closure(it->second, &FullNodePrivateBlockOverlay::collect_validator_telemetry,
validator_telemetry_filename_);
if (use_old_private_overlays_) {
if (validator_telemetry_filename_.empty() || private_block_overlays_.empty()) {
validator_telemetry_collector_key_ = PublicKeyHash::zero();
return;
}
if (!private_block_overlays_.contains(validator_telemetry_collector_key_)) {
auto it = private_block_overlays_.begin();
validator_telemetry_collector_key_ = it->first;
td::actor::send_closure(it->second, &FullNodePrivateBlockOverlay::collect_validator_telemetry,
validator_telemetry_filename_);
}
} else {
if (validator_telemetry_filename_.empty()) {
validator_telemetry_collector_key_ = PublicKeyHash::zero();
return;
}
if (fast_sync_overlays_.get_masterchain_overlay_for(adnl::AdnlNodeIdShort{validator_telemetry_collector_key_})
.empty()) {
auto [actor, adnl_id] = fast_sync_overlays_.choose_overlay(ShardIdFull{masterchainId});
validator_telemetry_collector_key_ = adnl_id.pubkey_hash();
if (!actor.empty()) {
td::actor::send_closure(actor, &FullNodeFastSyncOverlay::collect_validator_telemetry,
validator_telemetry_filename_);
}
}
}
}

void FullNodeImpl::start_up() {
// TODO: enable fast sync overlays by other means (e.g. some config param)
// TODO: in the future - remove the old private overlay entirely
// This env var is for testing
auto fast_sync_env = getenv("TON_FAST_SYNC_OVERLAYS");
if (fast_sync_env && !strcmp(fast_sync_env, "1")) {
use_old_private_overlays_ = false;
}

update_shard_actor(ShardIdFull{masterchainId}, true);
if (local_id_.is_zero()) {
if (adnl_id_.is_zero()) {
Expand Down
2 changes: 1 addition & 1 deletion validator/full-node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ class FullNodeImpl : public FullNode {
// Old overlays - one private overlay for all validators
// New overlays (fast sync overlays) - semiprivate overlay per shard (monitor_min_split depth)
// for validators and authorized nodes
bool use_old_private_overlays_ = false; // TODO: set from config or something
bool use_old_private_overlays_ = true;
std::map<PublicKeyHash, td::actor::ActorOwn<FullNodePrivateBlockOverlay>> private_block_overlays_;
bool broadcast_block_candidates_in_public_overlay_ = false;
FullNodeFastSyncOverlays fast_sync_overlays_;
Expand Down
1 change: 0 additions & 1 deletion validator/impl/validator-set.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ class ValidatorSetQ : public ValidatorSet {
td::Ref<BlockSignatureSet> signatures) const override;
td::Result<ValidatorWeight> check_approve_signatures(RootHash root_hash, FileHash file_hash,
td::Ref<BlockSignatureSet> signatures) const override;
const ValidatorDescr* find_validator(const NodeIdShort& id) const override;

ValidatorSetQ* make_copy() const override;

Expand Down
1 change: 0 additions & 1 deletion validator/interfaces/validator-set.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ class ValidatorSet : public td::CntObject {
virtual td::uint32 get_validator_set_hash() const = 0;
virtual ShardId get_validator_set_from() const = 0;
virtual std::vector<ValidatorDescr> export_vector() const = 0;
virtual const ValidatorDescr* find_validator(const NodeIdShort& id) const = 0;
virtual td::Result<ValidatorWeight> check_signatures(RootHash root_hash, FileHash file_hash,
td::Ref<BlockSignatureSet> signatures) const = 0;
virtual td::Result<ValidatorWeight> check_approve_signatures(RootHash root_hash, FileHash file_hash,
Expand Down
2 changes: 1 addition & 1 deletion validator/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2475,7 +2475,7 @@ td::actor::ActorOwn<ValidatorGroup> ValidatorManagerImpl::create_validator_group

auto validator_id = get_validator(shard, validator_set);
CHECK(!validator_id.is_zero());
auto descr = validator_set->find_validator(validator_id.bits256_value());
auto descr = validator_set->get_validator(validator_id.bits256_value());
CHECK(descr);
auto adnl_id = adnl::AdnlNodeIdShort{
descr->addr.is_zero() ? ValidatorFullId{descr->key}.compute_short_id().bits256_value() : descr->addr};
Expand Down

0 comments on commit d9aeab0

Please sign in to comment.