Skip to content

Commit

Permalink
Merge branch 'testnet' into accelerator-fullnode
Browse files Browse the repository at this point in the history
  • Loading branch information
SpyCheese committed Nov 26, 2024
2 parents f9f3e6e + 6244410 commit 21f1a49
Show file tree
Hide file tree
Showing 34 changed files with 435 additions and 15 deletions.
5 changes: 4 additions & 1 deletion adnl/adnl-ext-client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ class AdnlOutboundConnection : public AdnlExtConnection {
public:
AdnlOutboundConnection(td::SocketFd fd, std::unique_ptr<AdnlExtConnection::Callback> callback, AdnlNodeIdFull dst,
td::actor::ActorId<AdnlExtClientImpl> ext_client)
: AdnlExtConnection(std::move(fd), std::move(callback), true), dst_(std::move(dst)), ext_client_(ext_client) {
: AdnlExtConnection(std::move(fd), std::move(callback), true)
, dst_(std::move(dst))
, local_id_(privkeys::Ed25519::random())
, ext_client_(ext_client) {
}
AdnlOutboundConnection(td::SocketFd fd, std::unique_ptr<AdnlExtConnection::Callback> callback, AdnlNodeIdFull dst,
PrivateKey local_id, td::actor::ActorId<AdnlExtClientImpl> ext_client)
Expand Down
3 changes: 3 additions & 0 deletions create-hardfork/create-hardfork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,9 @@ class HardforkCreator : public td::actor::Actor {

void new_key_block(ton::validator::BlockHandle handle) override {
}
void send_validator_telemetry(ton::PublicKeyHash key,
ton::tl_object_ptr<ton::ton_api::validator_telemetry> telemetry) override {
}
};

td::actor::send_closure(validator_manager_, &ton::validator::ValidatorManagerInterface::install_callback,
Expand Down
15 changes: 12 additions & 3 deletions overlay/overlay-manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ void OverlayManager::register_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdS
}
overlays_[local_id][overlay_id] = OverlayDescription{std::move(overlay), std::move(cert)};

if (!with_db_) {
return;
}
auto P =
td::PromiseCreator::lambda([id = overlays_[local_id][overlay_id].overlay.get()](td::Result<DbType::GetResult> R) {
R.ensure();
Expand Down Expand Up @@ -417,13 +420,19 @@ OverlayManager::OverlayManager(std::string db_root, td::actor::ActorId<keyring::
}

void OverlayManager::start_up() {
std::shared_ptr<td::KeyValue> kv =
std::make_shared<td::RocksDb>(td::RocksDb::open(PSTRING() << db_root_ << "/overlays").move_as_ok());
db_ = DbType{std::move(kv)};
if (!db_root_.empty()) {
with_db_ = true;
std::shared_ptr<td::KeyValue> kv =
std::make_shared<td::RocksDb>(td::RocksDb::open(PSTRING() << db_root_ << "/overlays").move_as_ok());
db_ = DbType{std::move(kv)};
}
}

void OverlayManager::save_to_db(adnl::AdnlNodeIdShort local_id, OverlayIdShort overlay_id,
std::vector<OverlayNode> nodes) {
if (!with_db_) {
return;
}
std::vector<tl_object_ptr<ton_api::overlay_node>> nodes_vec;
for (auto &n : nodes) {
nodes_vec.push_back(n.tl());
Expand Down
1 change: 1 addition & 0 deletions overlay/overlay-manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ class OverlayManager : public Overlays {
td::actor::ActorId<dht::Dht> dht_node_;

using DbType = td::KeyValueAsync<td::Bits256, td::BufferSlice>;
bool with_db_ = false;
DbType db_;

class AdnlCallback : public adnl::Adnl::Callback {
Expand Down
4 changes: 2 additions & 2 deletions overlay/overlay-peers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ void OverlayImpl::add_peer(OverlayNode node) {
peer_list_.peers_.insert(id, OverlayPeer(std::move(node)));
del_some_peers();
auto X = peer_list_.peers_.get(id);
if (X != nullptr && peer_list_.neighbours_.size() < max_neighbours() &&
if (X != nullptr && !X->is_neighbour() && peer_list_.neighbours_.size() < max_neighbours() &&
!(X->get_node()->flags() & OverlayMemberFlags::DoNotReceiveBroadcasts) && X->get_id() != local_id_) {
peer_list_.neighbours_.push_back(X->get_id());
X->set_neighbour(true);
Expand Down Expand Up @@ -440,7 +440,7 @@ void OverlayImpl::update_neighbours(td::uint32 nodes_to_change) {
VLOG(OVERLAY_INFO) << this << ": adding new neighbour " << X->get_id();
peer_list_.neighbours_.push_back(X->get_id());
X->set_neighbour(true);
} else {
} else if (X->is_alive()) {
CHECK(nodes_to_change > 0);
auto i = td::Random::fast(0, static_cast<td::uint32>(peer_list_.neighbours_.size()) - 1);
auto Y = peer_list_.peers_.get(peer_list_.neighbours_[i]);
Expand Down
7 changes: 6 additions & 1 deletion overlay/overlay.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,12 @@ void OverlayImpl::alarm() {
update_db_at_ = td::Timestamp::in(60.0);
}

update_neighbours(0);
if (update_neighbours_at_.is_in_past()) {
update_neighbours(2);
update_neighbours_at_ = td::Timestamp::in(td::Random::fast(30.0, 120.0));
} else {
update_neighbours(0);
}
alarm_timestamp() = td::Timestamp::in(1.0);
} else {
update_neighbours(0);
Expand Down
1 change: 1 addition & 0 deletions overlay/overlay.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ class OverlayImpl : public Overlay {
td::Timestamp next_dht_store_query_ = td::Timestamp::in(1.0);
td::Timestamp update_db_at_;
td::Timestamp update_throughput_at_;
td::Timestamp update_neighbours_at_;
td::Timestamp last_throughput_update_;

std::unique_ptr<Overlays::Callback> callback_;
Expand Down
41 changes: 41 additions & 0 deletions tdutils/td/utils/port/Stat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -472,4 +472,45 @@ Result<TotalMemStat> get_total_mem_stat() {
#endif
}

Result<uint32> get_cpu_cores() {
#if TD_LINUX
uint32 result = 0;
TRY_RESULT(fd, FileFd::open("/proc/cpuinfo", FileFd::Read));
SCOPE_EXIT {
fd.close();
};
std::string data;
char buf[10000];
while (true) {
TRY_RESULT(size, fd.read(MutableSlice{buf, sizeof(buf) - 1}));
if (size == 0) {
break;
}
buf[size] = '\0';
data += buf;
}
size_t i = 0;
while (i < data.size()) {
const char *line_begin = data.data() + i;
while (i < data.size() && data[i] != '\n') {
++i;
}
auto line_end = data.data() + i;
++i;
Slice line{line_begin, line_end};
size_t j = 0;
while (j < line.size() && line[j] != ' ' && line[j] != '\t' && line[j] != ':') {
++j;
}
Slice name = line.substr(0, j);
if (name == "processor") {
++result;
}
}
return result;
#else
return Status::Error("Not supported");
#endif
}

} // namespace td
2 changes: 2 additions & 0 deletions tdutils/td/utils/port/Stat.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,6 @@ struct TotalMemStat {
};
Result<TotalMemStat> get_total_mem_stat() TD_WARN_UNUSED_RESULT;

Result<uint32> get_cpu_cores() TD_WARN_UNUSED_RESULT;

} // namespace td
3 changes: 3 additions & 0 deletions test/test-ton-collator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,9 @@ class TestNode : public td::actor::Actor {

void new_key_block(ton::validator::BlockHandle handle) override {
}
void send_validator_telemetry(ton::PublicKeyHash key,
ton::tl_object_ptr<ton::ton_api::validator_telemetry> telemetry) override {
}
};

td::actor::send_closure(validator_manager_, &ton::validator::ValidatorManagerInterface::install_callback,
Expand Down
4 changes: 4 additions & 0 deletions tl/generate/scheme/ton_api.tl
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,10 @@ validator.group workchain:int shard:long catchain_seqno:int config_hash:int256 m
validator.groupEx workchain:int shard:long vertical_seqno:int catchain_seqno:int config_hash:int256 members:(vector validator.groupMember) = validator.Group;
validator.groupNew workchain:int shard:long vertical_seqno:int last_key_block_seqno:int catchain_seqno:int config_hash:int256 members:(vector validator.groupMember) = validator.Group;

validator.telemetry flags:# timestamp:double adnl_id:int256
node_version:string os_version:string node_started_at:int
ram_size:long cpu_cores:int node_threads:int = validator.Telemetry;

---functions---


Expand Down
Binary file modified tl/generate/scheme/ton_api.tlo
Binary file not shown.
2 changes: 1 addition & 1 deletion tl/generate/scheme/tonlib_api.tl
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ ton.blockIdExt workchain:int32 shard:int64 seqno:int32 root_hash:bytes file_hash
extraCurrency id:int32 amount:int64 = ExtraCurrency;

raw.fullAccountState balance:int64 extra_currencies:vector<extraCurrency> code:bytes data:bytes last_transaction_id:internal.transactionId block_id:ton.blockIdExt frozen_hash:bytes sync_utime:int53 = raw.FullAccountState;
raw.message source:accountAddress destination:accountAddress value:int64 extra_currencies:vector<extraCurrency> fwd_fee:int64 ihr_fee:int64 created_lt:int64 body_hash:bytes msg_data:msg.Data = raw.Message;
raw.message hash:bytes source:accountAddress destination:accountAddress value:int64 extra_currencies:vector<extraCurrency> fwd_fee:int64 ihr_fee:int64 created_lt:int64 body_hash:bytes msg_data:msg.Data = raw.Message;
raw.transaction address:accountAddress utime:int53 data:bytes transaction_id:internal.transactionId fee:int64 storage_fee:int64 other_fee:int64 in_msg:raw.message out_msgs:vector<raw.message> = raw.Transaction;
raw.transactions transactions:vector<raw.transaction> previous_transaction_id:internal.transactionId = raw.Transactions;

Expand Down
Binary file modified tl/generate/scheme/tonlib_api.tlo
Binary file not shown.
4 changes: 4 additions & 0 deletions tonlib/tonlib/TonlibClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3032,6 +3032,7 @@ struct ToRawTransactions {
}
auto body_cell = vm::CellBuilder().append_cellslice(*body).finalize();
auto body_hash = body_cell->get_hash().as_slice().str();
auto msg_hash = cell->get_hash().as_slice().str();

td::Ref<vm::Cell> init_state_cell;
auto& init_state_cs = message.init.write();
Expand Down Expand Up @@ -3101,6 +3102,7 @@ struct ToRawTransactions {
auto created_lt = static_cast<td::int64>(msg_info.created_lt);

return tonlib_api::make_object<tonlib_api::raw_message>(
msg_hash,
tonlib_api::make_object<tonlib_api::accountAddress>(src),
tonlib_api::make_object<tonlib_api::accountAddress>(std::move(dest)), balance,
std::move(extra_currencies), fwd_fee, ihr_fee, created_lt, std::move(body_hash),
Expand All @@ -3113,6 +3115,7 @@ struct ToRawTransactions {
}
TRY_RESULT(dest, to_std_address(msg_info.dest));
return tonlib_api::make_object<tonlib_api::raw_message>(
msg_hash,
tonlib_api::make_object<tonlib_api::accountAddress>(),
tonlib_api::make_object<tonlib_api::accountAddress>(std::move(dest)), 0,
std::vector<tonlib_api::object_ptr<tonlib_api::extraCurrency>>{}, 0, 0, 0, std::move(body_hash),
Expand All @@ -3126,6 +3129,7 @@ struct ToRawTransactions {
TRY_RESULT(src, to_std_address(msg_info.src));
auto created_lt = static_cast<td::int64>(msg_info.created_lt);
return tonlib_api::make_object<tonlib_api::raw_message>(
msg_hash,
tonlib_api::make_object<tonlib_api::accountAddress>(src),
tonlib_api::make_object<tonlib_api::accountAddress>(), 0,
std::vector<tonlib_api::object_ptr<tonlib_api::extraCurrency>>{}, 0, 0, created_lt, std::move(body_hash),
Expand Down
13 changes: 13 additions & 0 deletions validator-engine/validator-engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2024,6 +2024,10 @@ void ValidatorEngine::start_full_node() {
[](td::Unit) {});
}
load_custom_overlays_config();
if (!validator_telemetry_filename_.empty()) {
td::actor::send_closure(full_node_, &ton::validator::fullnode::FullNode::set_validator_telemetry_filename,
validator_telemetry_filename_);
}
} else {
started_full_node();
}
Expand Down Expand Up @@ -4490,6 +4494,15 @@ int main(int argc, char *argv[]) {
acts.push_back(
[&x]() { td::actor::send_closure(x, &ValidatorEngine::set_fast_state_serializer_enabled, true); });
});
p.add_option(
'\0', "collect-validator-telemetry",
"store validator telemetry from private block overlay to a given file (json format)",
[&](td::Slice s) {
acts.push_back(
[&x, s = s.str()]() {
td::actor::send_closure(x, &ValidatorEngine::set_validator_telemetry_filename, s);
});
});
auto S = p.run(argc, argv);
if (S.is_error()) {
LOG(ERROR) << "failed to parse options: " << S.move_as_error();
Expand Down
4 changes: 4 additions & 0 deletions validator-engine/validator-engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ class ValidatorEngine : public td::actor::Actor {
ton::BlockSeqno truncate_seqno_{0};
std::string session_logs_file_;
bool fast_state_serializer_enabled_ = false;
std::string validator_telemetry_filename_;
bool not_all_shards_ = false;
std::vector<ton::ShardIdFull> add_shard_cmds_;

Expand Down Expand Up @@ -315,6 +316,9 @@ class ValidatorEngine : public td::actor::Actor {
void set_fast_state_serializer_enabled(bool value) {
fast_state_serializer_enabled_ = value;
}
void set_validator_telemetry_filename(std::string value) {
validator_telemetry_filename_ = std::move(value);
}
void set_not_all_shards() {
not_all_shards_ = true;
}
Expand Down
2 changes: 2 additions & 0 deletions validator/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ set(VALIDATOR_HEADERS

import-db-slice.hpp
queue-size-counter.hpp
validator-telemetry.hpp

manager-disk.h
manager-disk.hpp
Expand All @@ -82,6 +83,7 @@ set(VALIDATOR_SOURCE
validator-group.cpp
validator-options.cpp
queue-size-counter.cpp
validator-telemetry.cpp

downloaders/wait-block-data.cpp
downloaders/wait-block-state.cpp
Expand Down
66 changes: 65 additions & 1 deletion validator/full-node-private-overlay.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
#include "common/delay.h"
#include "common/checksum.h"
#include "full-node-serializer.hpp"
#include "auto/tl/ton_api_json.h"
#include "td/utils/JsonBuilder.h"
#include "tl/tl_json.h"

namespace ton::validator::fullnode {

Expand Down Expand Up @@ -85,15 +88,52 @@ void FullNodePrivateBlockOverlay::process_block_candidate_broadcast(PublicKeyHas
validator_set_hash, std::move(data));
}

void FullNodePrivateBlockOverlay::process_telemetry_broadcast(
PublicKeyHash src, const tl_object_ptr<ton_api::validator_telemetry>& telemetry) {
if (telemetry->adnl_id_ != src.bits256_value()) {
VLOG(FULL_NODE_WARNING) << "Invalid telemetry broadcast from " << src << ": adnl_id mismatch";
return;
}
auto now = (td::int32)td::Clocks::system();
if (telemetry->timestamp_ < now - 60) {
VLOG(FULL_NODE_WARNING) << "Invalid telemetry broadcast from " << src << ": too old ("
<< now - telemetry->timestamp_ << "s ago)";
return;
}
if (telemetry->timestamp_ > now + 60) {
VLOG(FULL_NODE_WARNING) << "Invalid telemetry broadcast from " << src << ": too new ("
<< telemetry->timestamp_ - now << "s in the future)";
return;
}
VLOG(FULL_NODE_DEBUG) << "Got telemetry broadcast from " << src;
auto s = td::json_encode<std::string>(td::ToJson(*telemetry), false);
std::erase_if(s, [](char c) {
return c == '\n' || c == '\r';
});
telemetry_file_ << s << "\n";
telemetry_file_.flush();
if (telemetry_file_.fail()) {
VLOG(FULL_NODE_WARNING) << "Failed to write telemetry to file";
}
}

void FullNodePrivateBlockOverlay::receive_broadcast(PublicKeyHash src, td::BufferSlice broadcast) {
if (adnl::AdnlNodeIdShort{src} == local_id_) {
return;
}
auto B = fetch_tl_object<ton_api::tonNode_Broadcast>(std::move(broadcast), true);
if (B.is_error()) {
if (collect_telemetry_ && src != local_id_.pubkey_hash()) {
auto R = fetch_tl_prefix<ton_api::validator_telemetry>(broadcast, true);
if (R.is_ok()) {
process_telemetry_broadcast(src, R.ok());
}
}
return;
}
ton_api::downcast_call(*B.move_as_ok(), [src, Self = this](auto &obj) { Self->process_broadcast(src, obj); });
ton_api::downcast_call(*B.move_as_ok(), [src, Self = this](auto& obj) {
Self->process_broadcast(src, obj);
});
}

void FullNodePrivateBlockOverlay::send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_seqno,
Expand Down Expand Up @@ -144,6 +184,30 @@ void FullNodePrivateBlockOverlay::send_broadcast(BlockBroadcast broadcast) {
local_id_.pubkey_hash(), overlay::Overlays::BroadcastFlagAnySender(), B.move_as_ok());
}

void FullNodePrivateBlockOverlay::send_validator_telemetry(tl_object_ptr<ton_api::validator_telemetry> telemetry) {
process_telemetry_broadcast(local_id_.pubkey_hash(), telemetry);
auto data = serialize_tl_object(telemetry, true);
if (data.size() <= overlay::Overlays::max_simple_broadcast_size()) {
td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_ex, local_id_, overlay_id_,
local_id_.pubkey_hash(), 0, std::move(data));
} else {
td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_fec_ex, local_id_, overlay_id_,
local_id_.pubkey_hash(), 0, std::move(data));
}
}

void FullNodePrivateBlockOverlay::collect_validator_telemetry(std::string filename) {
if (collect_telemetry_) {
telemetry_file_.close();
}
collect_telemetry_ = true;
LOG(FULL_NODE_WARNING) << "Collecting validator telemetry to " << filename << " (local id: " << local_id_ << ")";
telemetry_file_.open(filename, std::ios_base::app);
if (!telemetry_file_.is_open()) {
LOG(WARNING) << "Cannot open file " << filename << " for validator telemetry";
}
}

void FullNodePrivateBlockOverlay::start_up() {
std::sort(nodes_.begin(), nodes_.end());
nodes_.erase(std::unique(nodes_.begin(), nodes_.end()), nodes_.end());
Expand Down
Loading

0 comments on commit 21f1a49

Please sign in to comment.