Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Chunks are held on rocksdb for 25 hours (#2252)" #2264

Merged
merged 1 commit into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions core/injector/application_injector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,14 +272,11 @@ namespace {
// NOLINTNEXTLINE(cppcoreguidelines-narrowing-conversions)
options.max_open_files = soft_limit.value() / 2;

const std::unordered_map<std::string, int32_t> column_ttl = {
{"avaliability_storage", 25 * 60 * 60}}; // 25 hours
auto db_res =
storage::RocksDb::create(app_config.databasePath(chain_spec->id()),
options,
app_config.dbCacheSize(),
prevent_destruction,
column_ttl);
prevent_destruction);
if (!db_res) {
auto log = log::createLogger("Injector", "injector");
log->critical(
Expand Down
53 changes: 0 additions & 53 deletions core/parachain/availability/store/candidate_chunk_key.hpp

This file was deleted.

135 changes: 3 additions & 132 deletions core/parachain/availability/store/store_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,8 @@
*/

#include "parachain/availability/store/store_impl.hpp"
#include "candidate_chunk_key.hpp"

namespace kagome::parachain {
AvailabilityStoreImpl::AvailabilityStoreImpl(
std::shared_ptr<storage::SpacedStorage> storage)
: storage_{std::move(storage)} {
BOOST_ASSERT(storage_ != nullptr);
}

bool AvailabilityStoreImpl::hasChunk(const CandidateHash &candidate_hash,
ValidatorIndex index) const {
return state_.sharedAccess([&](const auto &state) {
Expand Down Expand Up @@ -50,7 +43,7 @@ namespace kagome::parachain {
std::optional<AvailabilityStore::ErasureChunk>
AvailabilityStoreImpl::getChunk(const CandidateHash &candidate_hash,
ValidatorIndex index) const {
auto chunk = state_.sharedAccess(
return state_.sharedAccess(
[&](const auto &state)
-> std::optional<AvailabilityStore::ErasureChunk> {
auto it = state.per_candidate_.find(candidate_hash);
Expand All @@ -63,30 +56,6 @@ namespace kagome::parachain {
}
return it2->second;
});
if (chunk) {
return chunk;
}
auto space = storage_->getSpace(storage::Space::kAvaliabilityStorage);
if (not space) {
SL_ERROR(logger, "Failed to get space");
return std::nullopt;
}
auto chunk_from_db =
space->get(CandidateChunkKey::encode(candidate_hash, index));
if (not chunk_from_db) {
return std::nullopt;
}
const auto decoded_chunk =
scale::decode<ErasureChunk>(chunk_from_db.value());
if (not decoded_chunk) {
SL_ERROR(logger,
"Failed to decode chunk candidate {} index {} error {}",
candidate_hash,
index,
decoded_chunk.error());
return std::nullopt;
}
return decoded_chunk.value();
}

std::optional<AvailabilityStore::ParachainBlock>
Expand Down Expand Up @@ -121,7 +90,7 @@ namespace kagome::parachain {

std::vector<AvailabilityStore::ErasureChunk> AvailabilityStoreImpl::getChunks(
const CandidateHash &candidate_hash) const {
auto chunks = state_.sharedAccess([&](const auto &state) {
return state_.sharedAccess([&](const auto &state) {
std::vector<AvailabilityStore::ErasureChunk> chunks;
auto it = state.per_candidate_.find(candidate_hash);
if (it != state.per_candidate_.end()) {
Expand All @@ -131,57 +100,6 @@ namespace kagome::parachain {
}
return chunks;
});
if (not chunks.empty()) {
return chunks;
}
auto space = storage_->getSpace(storage::Space::kAvaliabilityStorage);
if (not space) {
SL_ERROR(logger, "Failed to get space");
return chunks;
}
auto cursor = space->cursor();
if (not cursor) {
SL_ERROR(logger, "Failed to get cursor for AvaliabilityStorage");
return chunks;
}
const auto seek_key = CandidateChunkKey::encode_hash(candidate_hash);
auto seek_res = cursor->seek(seek_key);
if (not seek_res) {
SL_ERROR(logger, "Failed to seek, error: {}", seek_res.error());
return chunks;
}
if (not seek_res.value()) {
SL_DEBUG(logger, "Seek not found for candidate {}", candidate_hash);
return chunks;
}
const auto check_key = [&seek_key](const auto &key) {
if (not key) {
return false;
}
const auto &key_value = key.value();
return key_value.size() >= seek_key.size()
and std::equal(seek_key.begin(), seek_key.end(), key_value.begin());
};
while (cursor->isValid() and check_key(cursor->key())) {
const auto cursor_opt_value = cursor->value();
if (cursor_opt_value) {
auto decoded_res =
scale::decode<ErasureChunk>(cursor_opt_value.value());
if (decoded_res) {
chunks.emplace_back(std::move(decoded_res.value()));
} else {
SL_ERROR(
logger, "Failed to decode value, error: {}", decoded_res.error());
}
} else {
SL_ERROR(
logger, "Failed to get value for key {}", cursor->key()->toHex());
}
if (not cursor->next()) {
break;
}
}
return chunks;
}

void AvailabilityStoreImpl::printStoragesLoad() {
Expand All @@ -204,30 +122,7 @@ namespace kagome::parachain {
state.candidates_[relay_parent].insert(candidate_hash);
auto &candidate_data = state.per_candidate_[candidate_hash];
for (auto &&chunk : std::move(chunks)) {
auto encoded_chunk = scale::encode(chunk);
const auto chunk_index = chunk.index;
candidate_data.chunks[chunk.index] = std::move(chunk);
if (not encoded_chunk) {
SL_ERROR(logger,
"Failed to encode chunk, error: {}",
encoded_chunk.error());
continue;
}
auto space = storage_->getSpace(storage::Space::kAvaliabilityStorage);
if (not space) {
SL_ERROR(logger, "Failed to get space");
continue;
}
if (auto res = space->put(
CandidateChunkKey::encode(candidate_hash, chunk_index),
std::move(encoded_chunk.value()));
not res) {
SL_ERROR(logger,
"Failed to put chunk candidate {} index {} error {}",
candidate_hash,
chunk_index,
res.error());
}
}
candidate_data.pov = pov;
candidate_data.data = data;
Expand All @@ -237,42 +132,18 @@ namespace kagome::parachain {
void AvailabilityStoreImpl::putChunk(const network::RelayHash &relay_parent,
const CandidateHash &candidate_hash,
ErasureChunk &&chunk) {
auto encoded_chunk = scale::encode(chunk);
const auto chunk_index = chunk.index;
state_.exclusiveAccess([&](auto &state) {
state.candidates_[relay_parent].insert(candidate_hash);
state.per_candidate_[candidate_hash].chunks[chunk.index] =
std::move(chunk);
});
if (not encoded_chunk) {
SL_ERROR(
logger, "Failed to encode chunk, error: {}", encoded_chunk.error());
return;
}

auto space = storage_->getSpace(storage::Space::kAvaliabilityStorage);
if (not space) {
SL_ERROR(logger, "Failed to get AvaliabilityStorage space");
return;
}

if (auto res =
space->put(CandidateChunkKey::encode(candidate_hash, chunk_index),
std::move(encoded_chunk.value()));
not res) {
SL_ERROR(logger,
"Failed to put chunk candidate {} index {} error {}",
candidate_hash,
chunk_index,
res.error());
}
}

void AvailabilityStoreImpl::remove(const network::RelayHash &relay_parent) {
state_.exclusiveAccess([&](auto &state) {
if (auto it = state.candidates_.find(relay_parent);
it != state.candidates_.end()) {
for (const auto &l : it->second) {
for (auto const &l : it->second) {
state.per_candidate_.erase(l);
}
state.candidates_.erase(it);
Expand Down
4 changes: 1 addition & 3 deletions core/parachain/availability/store/store_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,12 @@
#include <unordered_map>
#include <unordered_set>
#include "log/logger.hpp"
#include "storage/spaced_storage.hpp"
#include "utils/safe_object.hpp"

namespace kagome::parachain {
class AvailabilityStoreImpl : public AvailabilityStore {
public:
~AvailabilityStoreImpl() override = default;
AvailabilityStoreImpl(std::shared_ptr<storage::SpacedStorage> storage);

bool hasChunk(const CandidateHash &candidate_hash,
ValidatorIndex index) const override;
Expand Down Expand Up @@ -57,6 +56,5 @@ namespace kagome::parachain {

log::Logger logger = log::createLogger("AvailabilityStore", "parachain");
SafeObject<State> state_{};
std::shared_ptr<storage::SpacedStorage> storage_;
};
} // namespace kagome::parachain
43 changes: 17 additions & 26 deletions core/storage/rocksdb/rocksdb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ namespace kagome::storage {
const filesystem::path &path,
rocksdb::Options options,
uint32_t memory_budget_mib,
bool prevent_destruction,
const std::unordered_map<std::string, int32_t> &column_ttl) {
bool prevent_destruction) {
OUTCOME_TRY(mkdirs(path));

auto log = log::createLogger("RocksDB", "storage");
Expand All @@ -62,32 +61,24 @@ namespace kagome::storage {
const uint32_t other_spaces_cache_size =
(memory_budget - trie_space_cache_size) / (storage::Space::kTotal - 1);
std::vector<rocksdb::ColumnFamilyDescriptor> column_family_descriptors;
std::vector<int32_t> ttls;
column_family_descriptors.reserve(Space::kTotal);
for (auto i = 0; i < Space::kTotal; ++i) {
const auto space_name = spaceName(static_cast<Space>(i));
auto ttl = 0;
if (const auto it = column_ttl.find(space_name); it != column_ttl.end()) {
ttl = it->second;
}
column_family_descriptors.emplace_back(
space_name,
spaceName(static_cast<Space>(i)),
configureColumn(i != Space::kTrieNode ? other_spaces_cache_size
: trie_space_cache_size));
ttls.push_back(ttl);
SL_INFO(log, "Column family {} configured with TTL {}", space_name, ttl);
}

std::vector<std::string> existing_families;
auto res = rocksdb::DB::ListColumnFamilies(
options, path.native(), &existing_families);
if (!res.ok() && !res.IsPathNotFound()) {
SL_ERROR(log,
"Can't list column families in {}: {}",
"Can't open database in {}: {}",
absolute_path.native(),
res.ToString());
return status_as_error(res);
}

for (auto &family : existing_families) {
if (std::ranges::find_if(
column_family_descriptors,
Expand All @@ -102,21 +93,21 @@ namespace kagome::storage {

options.create_missing_column_families = true;
auto rocks_db = std::shared_ptr<RocksDb>(new RocksDb);
auto status = rocksdb::DBWithTTL::Open(options,
path.native(),
column_family_descriptors,
&rocks_db->column_family_handles_,
&rocks_db->db_,
ttls);
if (not status.ok()) {
SL_ERROR(log,
"Can't open database in {}: {}",
absolute_path.native(),
status.ToString());
return status_as_error(status);
auto status = rocksdb::DB::Open(options,
path.native(),
column_family_descriptors,
&rocks_db->column_family_handles_,
&rocks_db->db_);
if (status.ok()) {
return rocks_db;
}

return rocks_db;
SL_ERROR(log,
"Can't open database in {}: {}",
absolute_path.native(),
status.ToString());

return status_as_error(status);
}

std::shared_ptr<BufferStorage> RocksDb::getSpace(Space space) {
Expand Down
6 changes: 2 additions & 4 deletions core/storage/rocksdb/rocksdb.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

#include <rocksdb/db.h>
#include <rocksdb/table.h>
#include <rocksdb/utilities/db_ttl.h>
#include <boost/container/flat_map.hpp>

#include "filesystem/common.hpp"
Expand Down Expand Up @@ -49,8 +48,7 @@ namespace kagome::storage {
const filesystem::path &path,
rocksdb::Options options = rocksdb::Options(),
uint32_t memory_budget_mib = kDefaultStateCacheSizeMiB,
bool prevent_destruction = false,
const std::unordered_map<std::string, int32_t>& column_ttl = {});
bool prevent_destruction = false);

std::shared_ptr<BufferStorage> getSpace(Space space) override;

Expand Down Expand Up @@ -79,7 +77,7 @@ namespace kagome::storage {

static rocksdb::ColumnFamilyOptions configureColumn(uint32_t memory_budget);

rocksdb::DBWithTTL *db_{};
rocksdb::DB *db_{};
std::vector<ColumnFamilyHandlePtr> column_family_handles_;
boost::container::flat_map<Space, std::shared_ptr<BufferStorage>> spaces_;
rocksdb::ReadOptions ro_;
Expand Down
Loading
Loading