Skip to content

Commit

Permalink
Limit file descriptors num by adding archive slice lru (#892)
Browse files Browse the repository at this point in the history
* --max-archive-fd option limits open files in archive manager

* Don't close the latest archives + bugfix

* Delete temp packages early

---------

Co-authored-by: SpyCheese <[email protected]>
  • Loading branch information
EmelyanenkoK and SpyCheese authored Feb 7, 2024
1 parent e723213 commit 12c1b1a
Show file tree
Hide file tree
Showing 12 changed files with 317 additions and 89 deletions.
8 changes: 8 additions & 0 deletions validator-engine/validator-engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1364,6 +1364,7 @@ td::Status ValidatorEngine::load_global_config() {
validator_options_.write().set_session_logs_file(session_logs_file_);
}
validator_options_.write().set_celldb_compress_depth(celldb_compress_depth_);
validator_options_.write().set_max_open_archive_files(max_open_archive_files_);

std::vector<ton::BlockIdExt> h;
for (auto &x : conf.validator_->hardforks_) {
Expand Down Expand Up @@ -3793,6 +3794,13 @@ int main(int argc, char *argv[]) {
});
return td::Status::OK();
});
p.add_checked_option(
'\0', "max-archive-fd", "limit for a number of open file descriptirs in archive manager. 0 is unlimited (default)",
[&](td::Slice s) -> td::Status {
TRY_RESULT(v, td::to_integer_safe<size_t>(s));
acts.push_back([&x, v]() { td::actor::send_closure(x, &ValidatorEngine::set_max_open_archive_files, v); });
return td::Status::OK();
});
auto S = p.run(argc, argv);
if (S.is_error()) {
LOG(ERROR) << "failed to parse options: " << S.move_as_error();
Expand Down
4 changes: 4 additions & 0 deletions validator-engine/validator-engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ class ValidatorEngine : public td::actor::Actor {
double archive_ttl_ = 0;
double key_proof_ttl_ = 0;
td::uint32 celldb_compress_depth_ = 0;
size_t max_open_archive_files_ = 0;
bool read_config_ = false;
bool started_keyring_ = false;
bool started_ = false;
Expand Down Expand Up @@ -264,6 +265,9 @@ class ValidatorEngine : public td::actor::Actor {
void set_celldb_compress_depth(td::uint32 value) {
celldb_compress_depth_ = value;
}
void set_max_open_archive_files(size_t value) {
max_open_archive_files_ = value;
}
void start_up() override;
ValidatorEngine() {
}
Expand Down
39 changes: 33 additions & 6 deletions validator/db/archive-manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ std::string PackageId::name() const {
}
}

ArchiveManager::ArchiveManager(td::actor::ActorId<RootDb> root, std::string db_root) : db_root_(db_root) {
ArchiveManager::ArchiveManager(td::actor::ActorId<RootDb> root, std::string db_root,
td::Ref<ValidatorManagerOptions> opts)
: db_root_(db_root), opts_(opts) {
}

void ArchiveManager::add_handle(BlockHandle handle, td::Promise<td::Unit> promise) {
Expand Down Expand Up @@ -598,9 +600,11 @@ void ArchiveManager::load_package(PackageId id) {
}
}

desc.file = td::actor::create_actor<ArchiveSlice>("slice", id.id, id.key, id.temp, false, db_root_);
desc.file =
td::actor::create_actor<ArchiveSlice>("slice", id.id, id.key, id.temp, false, db_root_, archive_lru_.get());

m.emplace(id, std::move(desc));
update_permanent_slices();
}

const ArchiveManager::FileDescription *ArchiveManager::get_file_desc(ShardIdFull shard, PackageId id, BlockSeqno seqno,
Expand Down Expand Up @@ -631,7 +635,8 @@ const ArchiveManager::FileDescription *ArchiveManager::add_file_desc(ShardIdFull
FileDescription new_desc{id, false};
td::mkdir(db_root_ + id.path()).ensure();
std::string prefix = PSTRING() << db_root_ << id.path() << id.name();
new_desc.file = td::actor::create_actor<ArchiveSlice>("slice", id.id, id.key, id.temp, false, db_root_);
new_desc.file =
td::actor::create_actor<ArchiveSlice>("slice", id.id, id.key, id.temp, false, db_root_, archive_lru_.get());
const FileDescription &desc = f.emplace(id, std::move(new_desc));
if (!id.temp) {
update_desc(f, desc, shard, seqno, ts, lt);
Expand Down Expand Up @@ -673,6 +678,7 @@ const ArchiveManager::FileDescription *ArchiveManager::add_file_desc(ShardIdFull
.ensure();
}
index_->commit_transaction().ensure();
update_permanent_slices();
return &desc;
}

Expand Down Expand Up @@ -820,6 +826,9 @@ void ArchiveManager::start_up() {
td::mkdir(db_root_ + "/archive/states/").ensure();
td::mkdir(db_root_ + "/files/").ensure();
td::mkdir(db_root_ + "/files/packages/").ensure();
if (opts_->get_max_open_archive_files() > 0) {
archive_lru_ = td::actor::create_actor<ArchiveLru>("archive_lru", opts_->get_max_open_archive_files());
}
index_ = std::make_shared<td::RocksDb>(td::RocksDb::open(db_root_ + "/files/globalindex").move_as_ok());
std::string value;
auto v = index_->get(create_serialize_tl_object<ton_api::db_files_index_key>().as_slice(), value);
Expand Down Expand Up @@ -878,8 +887,8 @@ void ArchiveManager::start_up() {
persistent_state_gc(FileHash::zero());
}

void ArchiveManager::run_gc(UnixTime ts, UnixTime archive_ttl) {
auto p = get_temp_package_id_by_unixtime(ts);
void ArchiveManager::run_gc(UnixTime mc_ts, UnixTime gc_ts, UnixTime archive_ttl) {
auto p = get_temp_package_id_by_unixtime(std::max(gc_ts, mc_ts - TEMP_PACKAGES_TTL));
std::vector<PackageId> vec;
for (auto &x : temp_files_) {
if (x.first < p) {
Expand Down Expand Up @@ -907,7 +916,7 @@ void ArchiveManager::run_gc(UnixTime ts, UnixTime archive_ttl) {
if (it == desc.first_blocks.end()) {
continue;
}
if (it->second.ts < ts - archive_ttl) {
if (it->second.ts < gc_ts - archive_ttl) {
vec.push_back(f.first);
}
}
Expand Down Expand Up @@ -1200,6 +1209,7 @@ void ArchiveManager::truncate(BlockSeqno masterchain_seqno, ConstBlockHandle han
}
}
}
update_permanent_slices();
}

void ArchiveManager::FileMap::shard_index_add(const FileDescription &desc) {
Expand Down Expand Up @@ -1298,6 +1308,23 @@ const ArchiveManager::FileDescription *ArchiveManager::FileMap::get_next_file_de
return it2->second->deleted ? nullptr : it2->second;
}

void ArchiveManager::update_permanent_slices() {
if (archive_lru_.empty()) {
return;
}
std::vector<PackageId> ids;
if (!files_.empty()) {
ids.push_back(files_.rbegin()->first);
}
if (!key_files_.empty()) {
ids.push_back(key_files_.rbegin()->first);
}
if (!temp_files_.empty()) {
ids.push_back(temp_files_.rbegin()->first);
}
td::actor::send_closure(archive_lru_, &ArchiveLru::set_permanent_slices, std::move(ids));
}

} // namespace validator

} // namespace ton
13 changes: 11 additions & 2 deletions validator/db/archive-manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class RootDb;

class ArchiveManager : public td::actor::Actor {
public:
ArchiveManager(td::actor::ActorId<RootDb> root, std::string db_root);
ArchiveManager(td::actor::ActorId<RootDb> root, std::string db_root, td::Ref<ValidatorManagerOptions> opts);

void add_handle(BlockHandle handle, td::Promise<td::Unit> promise);
void update_handle(BlockHandle handle, td::Promise<td::Unit> promise);
Expand Down Expand Up @@ -58,7 +58,7 @@ class ArchiveManager : public td::actor::Actor {
void truncate(BlockSeqno masterchain_seqno, ConstBlockHandle handle, td::Promise<td::Unit> promise);
//void truncate_continue(BlockSeqno masterchain_seqno, td::Promise<td::Unit> promise);

void run_gc(UnixTime ts, UnixTime archive_ttl);
void run_gc(UnixTime mc_ts, UnixTime gc_ts, UnixTime archive_ttl);

/* from LTDB */
void get_block_by_unix_time(AccountIdPrefixFull account_id, UnixTime ts, td::Promise<ConstBlockHandle> promise);
Expand Down Expand Up @@ -123,6 +123,9 @@ class ArchiveManager : public td::actor::Actor {
size_t size() const {
return files_.size();
}
bool empty() const {
return files_.empty();
}
std::map<PackageId, FileDescription>::const_iterator lower_bound(const PackageId &x) const {
return files_.lower_bound(x);
}
Expand Down Expand Up @@ -164,6 +167,7 @@ class ArchiveManager : public td::actor::Actor {
void shard_index_del(const FileDescription &desc);
};
FileMap files_, key_files_, temp_files_;
td::actor::ActorOwn<ArchiveLru> archive_lru_;
BlockSeqno finalized_up_to_{0};
bool async_mode_ = false;
bool huge_transaction_started_ = false;
Expand Down Expand Up @@ -206,6 +210,7 @@ class ArchiveManager : public td::actor::Actor {
void got_gc_masterchain_handle(ConstBlockHandle handle, FileHash hash);

std::string db_root_;
td::Ref<ValidatorManagerOptions> opts_;

std::shared_ptr<td::KeyValue> index_;

Expand All @@ -215,6 +220,10 @@ class ArchiveManager : public td::actor::Actor {
PackageId get_temp_package_id() const;
PackageId get_key_package_id(BlockSeqno seqno) const;
PackageId get_temp_package_id_by_unixtime(UnixTime ts) const;

void update_permanent_slices();

static const td::uint32 TEMP_PACKAGES_TTL = 86400 * 7;
};

} // namespace validator
Expand Down
Loading

0 comments on commit 12c1b1a

Please sign in to comment.