Skip to content

Commit

Permalink
Revert "Chunks are held on rocksdb for 25 hours (#2252)"
Browse files Browse the repository at this point in the history
This reverts commit 8a593bb.
  • Loading branch information
kamilsa committed Nov 5, 2024
1 parent 8a593bb commit 6ed70dd
Show file tree
Hide file tree
Showing 8 changed files with 24 additions and 224 deletions.
5 changes: 1 addition & 4 deletions core/injector/application_injector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,14 +272,11 @@ namespace {
// NOLINTNEXTLINE(cppcoreguidelines-narrowing-conversions)
options.max_open_files = soft_limit.value() / 2;

const std::unordered_map<std::string, int32_t> column_ttl = {
{"avaliability_storage", 25 * 60 * 60}}; // 25 hours
auto db_res =
storage::RocksDb::create(app_config.databasePath(chain_spec->id()),
options,
app_config.dbCacheSize(),
prevent_destruction,
column_ttl);
prevent_destruction);
if (!db_res) {
auto log = log::createLogger("Injector", "injector");
log->critical(
Expand Down
53 changes: 0 additions & 53 deletions core/parachain/availability/store/candidate_chunk_key.hpp

This file was deleted.

135 changes: 3 additions & 132 deletions core/parachain/availability/store/store_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,8 @@
*/

#include "parachain/availability/store/store_impl.hpp"
#include "candidate_chunk_key.hpp"

namespace kagome::parachain {
AvailabilityStoreImpl::AvailabilityStoreImpl(
std::shared_ptr<storage::SpacedStorage> storage)
: storage_{std::move(storage)} {
BOOST_ASSERT(storage_ != nullptr);
}

bool AvailabilityStoreImpl::hasChunk(const CandidateHash &candidate_hash,
ValidatorIndex index) const {
return state_.sharedAccess([&](const auto &state) {
Expand Down Expand Up @@ -50,7 +43,7 @@ namespace kagome::parachain {
std::optional<AvailabilityStore::ErasureChunk>
AvailabilityStoreImpl::getChunk(const CandidateHash &candidate_hash,
ValidatorIndex index) const {
auto chunk = state_.sharedAccess(
return state_.sharedAccess(
[&](const auto &state)
-> std::optional<AvailabilityStore::ErasureChunk> {
auto it = state.per_candidate_.find(candidate_hash);
Expand All @@ -63,30 +56,6 @@ namespace kagome::parachain {
}
return it2->second;
});
if (chunk) {
return chunk;
}
auto space = storage_->getSpace(storage::Space::kAvaliabilityStorage);
if (not space) {
SL_ERROR(logger, "Failed to get space");
return std::nullopt;
}
auto chunk_from_db =
space->get(CandidateChunkKey::encode(candidate_hash, index));
if (not chunk_from_db) {
return std::nullopt;
}
const auto decoded_chunk =
scale::decode<ErasureChunk>(chunk_from_db.value());
if (not decoded_chunk) {
SL_ERROR(logger,
"Failed to decode chunk candidate {} index {} error {}",
candidate_hash,
index,
decoded_chunk.error());
return std::nullopt;
}
return decoded_chunk.value();
}

std::optional<AvailabilityStore::ParachainBlock>
Expand Down Expand Up @@ -121,7 +90,7 @@ namespace kagome::parachain {

std::vector<AvailabilityStore::ErasureChunk> AvailabilityStoreImpl::getChunks(
const CandidateHash &candidate_hash) const {
auto chunks = state_.sharedAccess([&](const auto &state) {
return state_.sharedAccess([&](const auto &state) {
std::vector<AvailabilityStore::ErasureChunk> chunks;
auto it = state.per_candidate_.find(candidate_hash);
if (it != state.per_candidate_.end()) {
Expand All @@ -131,57 +100,6 @@ namespace kagome::parachain {
}
return chunks;
});
if (not chunks.empty()) {
return chunks;
}
auto space = storage_->getSpace(storage::Space::kAvaliabilityStorage);
if (not space) {
SL_ERROR(logger, "Failed to get space");
return chunks;
}
auto cursor = space->cursor();
if (not cursor) {
SL_ERROR(logger, "Failed to get cursor for AvaliabilityStorage");
return chunks;
}
const auto seek_key = CandidateChunkKey::encode_hash(candidate_hash);
auto seek_res = cursor->seek(seek_key);
if (not seek_res) {
SL_ERROR(logger, "Failed to seek, error: {}", seek_res.error());
return chunks;
}
if (not seek_res.value()) {
SL_DEBUG(logger, "Seek not found for candidate {}", candidate_hash);
return chunks;
}
const auto check_key = [&seek_key](const auto &key) {
if (not key) {
return false;
}
const auto &key_value = key.value();
return key_value.size() >= seek_key.size()
and std::equal(seek_key.begin(), seek_key.end(), key_value.begin());
};
while (cursor->isValid() and check_key(cursor->key())) {
const auto cursor_opt_value = cursor->value();
if (cursor_opt_value) {
auto decoded_res =
scale::decode<ErasureChunk>(cursor_opt_value.value());
if (decoded_res) {
chunks.emplace_back(std::move(decoded_res.value()));
} else {
SL_ERROR(
logger, "Failed to decode value, error: {}", decoded_res.error());
}
} else {
SL_ERROR(
logger, "Failed to get value for key {}", cursor->key()->toHex());
}
if (not cursor->next()) {
break;
}
}
return chunks;
}

void AvailabilityStoreImpl::printStoragesLoad() {
Expand All @@ -204,30 +122,7 @@ namespace kagome::parachain {
state.candidates_[relay_parent].insert(candidate_hash);
auto &candidate_data = state.per_candidate_[candidate_hash];
for (auto &&chunk : std::move(chunks)) {
auto encoded_chunk = scale::encode(chunk);
const auto chunk_index = chunk.index;
candidate_data.chunks[chunk.index] = std::move(chunk);
if (not encoded_chunk) {
SL_ERROR(logger,
"Failed to encode chunk, error: {}",
encoded_chunk.error());
continue;
}
auto space = storage_->getSpace(storage::Space::kAvaliabilityStorage);
if (not space) {
SL_ERROR(logger, "Failed to get space");
continue;
}
if (auto res = space->put(
CandidateChunkKey::encode(candidate_hash, chunk_index),
std::move(encoded_chunk.value()));
not res) {
SL_ERROR(logger,
"Failed to put chunk candidate {} index {} error {}",
candidate_hash,
chunk_index,
res.error());
}
}
candidate_data.pov = pov;
candidate_data.data = data;
Expand All @@ -237,42 +132,18 @@ namespace kagome::parachain {
void AvailabilityStoreImpl::putChunk(const network::RelayHash &relay_parent,
const CandidateHash &candidate_hash,
ErasureChunk &&chunk) {
auto encoded_chunk = scale::encode(chunk);
const auto chunk_index = chunk.index;
state_.exclusiveAccess([&](auto &state) {
state.candidates_[relay_parent].insert(candidate_hash);
state.per_candidate_[candidate_hash].chunks[chunk.index] =
std::move(chunk);
});
if (not encoded_chunk) {
SL_ERROR(
logger, "Failed to encode chunk, error: {}", encoded_chunk.error());
return;
}

auto space = storage_->getSpace(storage::Space::kAvaliabilityStorage);
if (not space) {
SL_ERROR(logger, "Failed to get AvaliabilityStorage space");
return;
}

if (auto res =
space->put(CandidateChunkKey::encode(candidate_hash, chunk_index),
std::move(encoded_chunk.value()));
not res) {
SL_ERROR(logger,
"Failed to put chunk candidate {} index {} error {}",
candidate_hash,
chunk_index,
res.error());
}
}

void AvailabilityStoreImpl::remove(const network::RelayHash &relay_parent) {
state_.exclusiveAccess([&](auto &state) {
if (auto it = state.candidates_.find(relay_parent);
it != state.candidates_.end()) {
for (const auto &l : it->second) {
for (auto const &l : it->second) {
state.per_candidate_.erase(l);
}
state.candidates_.erase(it);
Expand Down
4 changes: 1 addition & 3 deletions core/parachain/availability/store/store_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,12 @@
#include <unordered_map>
#include <unordered_set>
#include "log/logger.hpp"
#include "storage/spaced_storage.hpp"
#include "utils/safe_object.hpp"

namespace kagome::parachain {
class AvailabilityStoreImpl : public AvailabilityStore {
public:
~AvailabilityStoreImpl() override = default;
AvailabilityStoreImpl(std::shared_ptr<storage::SpacedStorage> storage);

bool hasChunk(const CandidateHash &candidate_hash,
ValidatorIndex index) const override;
Expand Down Expand Up @@ -57,6 +56,5 @@ namespace kagome::parachain {

log::Logger logger = log::createLogger("AvailabilityStore", "parachain");
SafeObject<State> state_{};
std::shared_ptr<storage::SpacedStorage> storage_;
};
} // namespace kagome::parachain
43 changes: 17 additions & 26 deletions core/storage/rocksdb/rocksdb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ namespace kagome::storage {
const filesystem::path &path,
rocksdb::Options options,
uint32_t memory_budget_mib,
bool prevent_destruction,
const std::unordered_map<std::string, int32_t> &column_ttl) {
bool prevent_destruction) {
OUTCOME_TRY(mkdirs(path));

auto log = log::createLogger("RocksDB", "storage");
Expand All @@ -62,32 +61,24 @@ namespace kagome::storage {
const uint32_t other_spaces_cache_size =
(memory_budget - trie_space_cache_size) / (storage::Space::kTotal - 1);
std::vector<rocksdb::ColumnFamilyDescriptor> column_family_descriptors;
std::vector<int32_t> ttls;
column_family_descriptors.reserve(Space::kTotal);
for (auto i = 0; i < Space::kTotal; ++i) {
const auto space_name = spaceName(static_cast<Space>(i));
auto ttl = 0;
if (const auto it = column_ttl.find(space_name); it != column_ttl.end()) {
ttl = it->second;
}
column_family_descriptors.emplace_back(
space_name,
spaceName(static_cast<Space>(i)),
configureColumn(i != Space::kTrieNode ? other_spaces_cache_size
: trie_space_cache_size));
ttls.push_back(ttl);
SL_INFO(log, "Column family {} configured with TTL {}", space_name, ttl);
}

std::vector<std::string> existing_families;
auto res = rocksdb::DB::ListColumnFamilies(
options, path.native(), &existing_families);
if (!res.ok() && !res.IsPathNotFound()) {
SL_ERROR(log,
"Can't list column families in {}: {}",
"Can't open database in {}: {}",
absolute_path.native(),
res.ToString());
return status_as_error(res);
}

for (auto &family : existing_families) {
if (std::ranges::find_if(
column_family_descriptors,
Expand All @@ -102,21 +93,21 @@ namespace kagome::storage {

options.create_missing_column_families = true;
auto rocks_db = std::shared_ptr<RocksDb>(new RocksDb);
auto status = rocksdb::DBWithTTL::Open(options,
path.native(),
column_family_descriptors,
&rocks_db->column_family_handles_,
&rocks_db->db_,
ttls);
if (not status.ok()) {
SL_ERROR(log,
"Can't open database in {}: {}",
absolute_path.native(),
status.ToString());
return status_as_error(status);
auto status = rocksdb::DB::Open(options,
path.native(),
column_family_descriptors,
&rocks_db->column_family_handles_,
&rocks_db->db_);
if (status.ok()) {
return rocks_db;
}

return rocks_db;
SL_ERROR(log,
"Can't open database in {}: {}",
absolute_path.native(),
status.ToString());

return status_as_error(status);
}

std::shared_ptr<BufferStorage> RocksDb::getSpace(Space space) {
Expand Down
6 changes: 2 additions & 4 deletions core/storage/rocksdb/rocksdb.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

#include <rocksdb/db.h>
#include <rocksdb/table.h>
#include <rocksdb/utilities/db_ttl.h>
#include <boost/container/flat_map.hpp>

#include "filesystem/common.hpp"
Expand Down Expand Up @@ -49,8 +48,7 @@ namespace kagome::storage {
const filesystem::path &path,
rocksdb::Options options = rocksdb::Options(),
uint32_t memory_budget_mib = kDefaultStateCacheSizeMiB,
bool prevent_destruction = false,
const std::unordered_map<std::string, int32_t>& column_ttl = {});
bool prevent_destruction = false);

std::shared_ptr<BufferStorage> getSpace(Space space) override;

Expand Down Expand Up @@ -79,7 +77,7 @@ namespace kagome::storage {

static rocksdb::ColumnFamilyOptions configureColumn(uint32_t memory_budget);

rocksdb::DBWithTTL *db_{};
rocksdb::DB *db_{};
std::vector<ColumnFamilyHandlePtr> column_family_handles_;
boost::container::flat_map<Space, std::shared_ptr<BufferStorage>> spaces_;
rocksdb::ReadOptions ro_;
Expand Down
Loading

0 comments on commit 6ed70dd

Please sign in to comment.