Skip to content

Commit

Permalink
WIP introduce scan_pwals thread_context for LevelDB/RocksDB Batch
Browse files Browse the repository at this point in the history
  • Loading branch information
ban-nobuhiro committed Aug 28, 2024
1 parent 2bd3655 commit 280c617
Show file tree
Hide file tree
Showing 9 changed files with 103 additions and 32 deletions.
33 changes: 27 additions & 6 deletions src/limestone/datastore_snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,11 @@ static int comp_twisted_key(const std::string_view& a, const std::string_view& b
}

[[maybe_unused]]
static void insert_entry_or_update_to_max(sortdb_wrapper* sortdb, const log_entry& e) {
static void insert_entry_or_update_to_max(sortdb_wrapper* sortdb, sortdb_wrapper::batch* b, const log_entry& e) {
(void)b;
// not using batch mode;
// 1. this mode is RMW; so need WBWI
// 2. this mode is works_with_multi_thread = false, so less need for batch
bool need_write = true;
// skip older entry than already inserted
std::string value;
Expand All @@ -114,7 +118,7 @@ static void insert_entry_or_update_to_max(sortdb_wrapper* sortdb, const log_entr
}

[[maybe_unused]]
static void insert_twisted_entry(sortdb_wrapper* sortdb, const log_entry& e) {
static void insert_twisted_entry(sortdb_wrapper* sortdb, sortdb_wrapper::batch* b, const log_entry& e) {
// key_sid: storage_id[8] key[*], value_etc: epoch[8]LE minor_version[8]LE value[*], type: type[1]
// db_key: epoch[8]BE minor_version[8]BE storage_id[8] key[*], db_value: type[1] value[*]
std::string db_key(write_version_size + e.key_sid().size(), '\0');
Expand All @@ -123,9 +127,24 @@ static void insert_twisted_entry(sortdb_wrapper* sortdb, const log_entry& e) {
std::memcpy(&db_key[write_version_size], e.key_sid().data(), e.key_sid().size());
std::string db_value(1, static_cast<char>(e.type()));
db_value.append(e.value_etc().substr(write_version_size));
sortdb->put(db_key, db_value);
sortdb->batch_put(b, db_key, db_value);
}

class sortdb_batch_context : public dblog_scan::thread_context_base {

Check warning on line 133 in src/limestone/datastore_snapshot.cpp

View workflow job for this annotation

GitHub Actions / Clang-Tidy

cppcoreguidelines-virtual-class-destructor

destructor of 'sortdb_batch_context' is public and non-virtual
sortdb_wrapper* sw;
public:
sortdb_batch_context(sortdb_wrapper* sw0) : sw(sw0) {}

Check warning on line 136 in src/limestone/datastore_snapshot.cpp

View workflow job for this annotation

GitHub Actions / Clang-Tidy

google-explicit-constructor,hicpp-explicit-conversions

single-argument constructors must be marked explicit to avoid unintentional implicit conversions
void* thread_start() override {
return sw->batch_start();
}

void thread_end(void*p) override {
auto* b = (sortdb_wrapper::batch*)p;

Check warning on line 142 in src/limestone/datastore_snapshot.cpp

View workflow job for this annotation

GitHub Actions / Clang-Tidy

cppcoreguidelines-pro-type-cstyle-cast

do not use C-style cast to convert between unrelated types

Check warning on line 142 in src/limestone/datastore_snapshot.cpp

View workflow job for this annotation

GitHub Actions / Clang-Tidy

google-readability-casting

C-style casts are discouraged; use static_cast
sw->flush(b);
sw->batch_end(b);
}
};

static std::pair<epoch_id_type, sorting_context> create_sorted_from_wals(
const boost::filesystem::path& from_dir,
int num_worker,
Expand All @@ -146,11 +165,12 @@ static std::pair<epoch_id_type, sorting_context> create_sorted_from_wals(
const auto add_entry_to_point = insert_entry_or_update_to_max;
bool works_with_multi_thread = false;
#endif
auto add_entry = [&sctx, &add_entry_to_point](const log_entry& e){
auto add_entry = [&sctx, &add_entry_to_point](void* thread_context, const log_entry& e){
(void)thread_context;
switch (e.type()) {
case log_entry::entry_type::normal_entry:
case log_entry::entry_type::remove_entry:
add_entry_to_point(sctx.get_sortdb(), e);
add_entry_to_point(sctx.get_sortdb(), (sortdb_wrapper::batch*)thread_context, e);

Check warning on line 173 in src/limestone/datastore_snapshot.cpp

View workflow job for this annotation

GitHub Actions / Clang-Tidy

cppcoreguidelines-pro-type-cstyle-cast

do not use C-style cast to convert between unrelated types

Check warning on line 173 in src/limestone/datastore_snapshot.cpp

View workflow job for this annotation

GitHub Actions / Clang-Tidy

google-readability-casting

C-style casts are discouraged; use static_cast
break;
case log_entry::entry_type::clear_storage:
case log_entry::entry_type::remove_storage: { // remove_storage is treated as clear_storage
Expand All @@ -172,8 +192,9 @@ static std::pair<epoch_id_type, sorting_context> create_sorted_from_wals(
num_worker = 1;
}
logscan.set_thread_num(num_worker);
sortdb_batch_context bc(sctx.get_sortdb());
try {
epoch_id_type max_appeared_epoch = logscan.scan_pwal_files_throws(ld_epoch, add_entry);
epoch_id_type max_appeared_epoch = logscan.scan_pwal_files_throws(ld_epoch, add_entry, &bc);
return {max_appeared_epoch, std::move(sctx)};
} catch (std::runtime_error& e) {
VLOG_LP(log_info) << "failed to scan pwal files: " << e.what();
Expand Down
16 changes: 10 additions & 6 deletions src/limestone/dblog_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,15 +122,16 @@ void dblog_scan::detach_wal_files(bool skip_empty_files) {
}

epoch_id_type dblog_scan::scan_pwal_files( // NOLINT(readability-function-cognitive-complexity)
epoch_id_type ld_epoch, const std::function<void(log_entry&)>& add_entry,
epoch_id_type ld_epoch, const std::function<void(void*, log_entry&)>& add_entry,
thread_context_base* tcb,
const error_report_func_t& report_error, dblog_scan::parse_error::code* max_parse_error_value) {
std::atomic<epoch_id_type> max_appeared_epoch{ld_epoch};
if (max_parse_error_value) { *max_parse_error_value = dblog_scan::parse_error::failed; }
std::atomic<dblog_scan::parse_error::code> max_error_value{dblog_scan::parse_error::code::ok};
auto process_file = [&](const boost::filesystem::path& p) { // NOLINT(readability-function-cognitive-complexity)
auto process_file = [&](void* tc, const boost::filesystem::path& p) { // NOLINT(readability-function-cognitive-complexity)
if (is_wal(p)) {
parse_error ec;
auto rc = scan_one_pwal_file(p, ld_epoch, add_entry, report_error, ec);
auto rc = scan_one_pwal_file(p, ld_epoch, add_entry, report_error, ec, tc);
epoch_id_type max_epoch_of_file = rc;
auto ec_value = ec.value();
switch (ec_value) {
Expand Down Expand Up @@ -193,6 +194,8 @@ epoch_id_type dblog_scan::scan_pwal_files( // NOLINT(readability-function-cogni
workers.reserve(thread_num_);
for (int i = 0; i < thread_num_; i++) {
workers.emplace_back(std::thread([&](){
void * tc = nullptr;
if (tcb) { tc = tcb->thread_start(); }
for (;;) {
boost::filesystem::path p;
{
Expand All @@ -203,7 +206,7 @@ epoch_id_type dblog_scan::scan_pwal_files( // NOLINT(readability-function-cogni
}

try {
process_file(p);
process_file(tc, p);
} catch (std::runtime_error& ex) {
VLOG(log_info) << "/:limestone catch runtime_error(" << ex.what() << ")";
{
Expand All @@ -216,6 +219,7 @@ epoch_id_type dblog_scan::scan_pwal_files( // NOLINT(readability-function-cogni
break;
}
}
if (tcb) { tcb->thread_end(tc); }
}));
}
for (int i = 0; i < thread_num_; i++) {
Expand All @@ -230,12 +234,12 @@ epoch_id_type dblog_scan::scan_pwal_files( // NOLINT(readability-function-cogni

// called from datastore::create_snapshot
// db_startup mode
epoch_id_type dblog_scan::scan_pwal_files_throws(epoch_id_type ld_epoch, const std::function<void(log_entry&)>& add_entry) {
epoch_id_type dblog_scan::scan_pwal_files_throws(epoch_id_type ld_epoch, const std::function<void(void*, log_entry&)>& add_entry, thread_context_base* tcb) {
set_fail_fast(true);
set_process_at_nondurable_epoch_snippet(process_at_nondurable::repair_by_mark);
set_process_at_truncated_epoch_snippet(process_at_truncated::report);
set_process_at_damaged_epoch_snippet(process_at_damaged::report);
return scan_pwal_files(ld_epoch, add_entry, log_error_and_throw);
return scan_pwal_files(ld_epoch, add_entry, tcb, log_error_and_throw);
}

void dblog_scan::rescan_directory_paths() {
Expand Down
16 changes: 12 additions & 4 deletions src/limestone/dblog_scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,21 +163,29 @@ class dblog_scan {

epoch_id_type last_durable_epoch_in_dir();

class thread_context_base {

Check warning on line 166 in src/limestone/dblog_scan.h

View workflow job for this annotation

GitHub Actions / Clang-Tidy

cppcoreguidelines-virtual-class-destructor

destructor of 'thread_context_base' is public and non-virtual
public:
virtual void* thread_start() = 0;
virtual void thread_end(void*) = 0;
};

/**
* @returns max epoch value in directory
* @throws exception on error
*/
epoch_id_type scan_pwal_files_throws(epoch_id_type ld_epoch, const std::function<void(log_entry&)>& add_entry);
epoch_id_type scan_pwal_files_throws(epoch_id_type ld_epoch, const std::function<void(void*, log_entry&)>& add_entry, thread_context_base *tcb = nullptr);

/**
* @returns max epoch value in directory
*/
epoch_id_type scan_pwal_files(epoch_id_type ld_epoch, const std::function<void(log_entry&)>& add_entry,
epoch_id_type scan_pwal_files(epoch_id_type ld_epoch, const std::function<void(void*, log_entry&)>& add_entry,
thread_context_base* tcb,
const error_report_func_t& report_error, dblog_scan::parse_error::code* max_parse_error_value = nullptr);

epoch_id_type scan_one_pwal_file(const boost::filesystem::path& p, epoch_id_type ld_epoch,
const std::function<void(log_entry&)>& add_entry,
const std::function<void(void*, log_entry&)>& add_entry,
const error_report_func_t& report_error,
parse_error& pe);
parse_error& pe, void*);

static bool is_wal(const boost::filesystem::path& p) { return p.filename().string().rfind(log_channel_prefix, 0) == 0; }
static bool is_detached_wal(const boost::filesystem::path& p) {
Expand Down
6 changes: 3 additions & 3 deletions src/limestone/dblogutil/dblogutil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ void inspect(dblog_scan &ds, std::optional<epoch_id_type> epoch) {
ds.set_process_at_damaged_epoch_snippet(dblog_scan::process_at_damaged::report);
ds.set_fail_fast(false);
dblog_scan::parse_error::code max_ec{};
epoch_id_type max_appeared_epoch = ds.scan_pwal_files(epoch.value_or(ld_epoch), [&](log_entry& e){
epoch_id_type max_appeared_epoch = ds.scan_pwal_files(epoch.value_or(ld_epoch), [&](void*, log_entry& e){
if (e.type() == log_entry::entry_type::normal_entry) {
VLOG(50) << "normal";
count_normal_entry++;
Expand All @@ -85,7 +85,7 @@ void inspect(dblog_scan &ds, std::optional<epoch_id_type> epoch) {
} else {
LOG(ERROR) << static_cast<int>(e.type());
}
}, [](log_entry::read_error& ec){
}, nullptr, [](log_entry::read_error& ec){
VLOG(30) << "ERROR " << ec.value() << " : " << ec.message();
return false;
}, &max_ec);
Expand Down Expand Up @@ -136,7 +136,7 @@ void repair(dblog_scan &ds, std::optional<epoch_id_type> epoch) {
ds.rescan_directory_paths();
std::atomic_size_t count_wal_entry = 0;
dblog_scan::parse_error::code max_ec{};
ds.scan_pwal_files(ld_epoch, [&count_wal_entry](log_entry&){ count_wal_entry++; }, [](log_entry::read_error& e) -> bool {
ds.scan_pwal_files(ld_epoch, [&count_wal_entry](void*, log_entry&){ count_wal_entry++; }, nullptr, [](log_entry::read_error& e) -> bool {
LOG_LP(ERROR) << "this pwal file is broken: " << e.message();
return false;
}, &max_ec);
Expand Down
6 changes: 3 additions & 3 deletions src/limestone/parse_wal_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,9 @@ void invalidate_epoch_snippet(boost::filesystem::fstream& strm, std::streampos f
// scan the file, and check max epoch number in this file
epoch_id_type dblog_scan::scan_one_pwal_file( // NOLINT(readability-function-cognitive-complexity)
const boost::filesystem::path& p, epoch_id_type ld_epoch,
const std::function<void(log_entry&)>& add_entry,
const std::function<void(void*, log_entry&)>& add_entry,
const error_report_func_t& report_error,
parse_error& pe) {
parse_error& pe, void* tc) {
VLOG_LP(log_info) << "processing pwal file: " << p.filename().string();
epoch_id_type current_epoch{UINT64_MAX};
epoch_id_type max_epoch_of_file{0};
Expand Down Expand Up @@ -251,7 +251,7 @@ epoch_id_type dblog_scan::scan_one_pwal_file( // NOLINT(readability-function-co
// normal_entry | remove_entry | clear_storage | add_storage | remove_storage : (not 1st) { if (valid) process-entry } -> loop
if (!first) {
if (valid) {
add_entry(e);
add_entry(tc, e);
}
} else {
err_unexpected();
Expand Down
38 changes: 38 additions & 0 deletions src/limestone/sortdb_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,44 @@ class sortdb_wrapper {
throw std::runtime_error("error in sortdb put");
}

using batch = WriteBatch;
batch* batch_start() {
return new WriteBatch();

Check failure on line 90 in src/limestone/sortdb_wrapper.h

View workflow job for this annotation

GitHub Actions / Clang-Tidy

clang-diagnostic-error

allocation of incomplete type 'leveldb::WriteBatch'
}

void batch_end(batch* batch) {
if (batch->Count() != 0) {

Check failure on line 94 in src/limestone/sortdb_wrapper.h

View workflow job for this annotation

GitHub Actions / Clang-Tidy

clang-diagnostic-error

member access into incomplete type 'limestone::api::sortdb_wrapper::batch' (aka 'leveldb::WriteBatch')
LOG_LP(ERROR) << "programming error, unflushed batch count = " << batch->Count();

Check failure on line 95 in src/limestone/sortdb_wrapper.h

View workflow job for this annotation

GitHub Actions / Clang-Tidy

clang-diagnostic-error

member access into incomplete type 'limestone::api::sortdb_wrapper::batch' (aka 'leveldb::WriteBatch')
throw std::runtime_error("error in sortdb batch_end");
}
delete batch;

Check failure on line 98 in src/limestone/sortdb_wrapper.h

View workflow job for this annotation

GitHub Actions / Clang-Tidy

clang-diagnostic-delete-incomplete

deleting pointer to incomplete type 'limestone::api::sortdb_wrapper::batch' (aka 'leveldb::WriteBatch') may cause undefined behavior

Check warning on line 98 in src/limestone/sortdb_wrapper.h

View workflow job for this annotation

GitHub Actions / Clang-Tidy

cppcoreguidelines-owning-memory

deleting a pointer through a type that is not marked 'gsl::owner<>'; consider using a smart pointer instead
}

void batch_put(batch* batch, const std::string& key, const std::string& value) {
auto status = batch->Put(key, value);

Check failure on line 102 in src/limestone/sortdb_wrapper.h

View workflow job for this annotation

GitHub Actions / Clang-Tidy

clang-diagnostic-error

member access into incomplete type 'limestone::api::sortdb_wrapper::batch' (aka 'leveldb::WriteBatch')
if (!status.ok()) {
LOG_LP(ERROR) << "sortdb batch put error, status: " << status.ToString();
throw std::runtime_error("error in sortdb batch put");
}
if (batch->Count() >= 256) {

Check failure on line 107 in src/limestone/sortdb_wrapper.h

View workflow job for this annotation

GitHub Actions / Clang-Tidy

clang-diagnostic-error

member access into incomplete type 'limestone::api::sortdb_wrapper::batch' (aka 'leveldb::WriteBatch')
flush(batch);
}
}

void flush(batch* batch) {
if (batch->Count() != 0) {

Check failure on line 113 in src/limestone/sortdb_wrapper.h

View workflow job for this annotation

GitHub Actions / Clang-Tidy

clang-diagnostic-error

member access into incomplete type 'limestone::api::sortdb_wrapper::batch' (aka 'leveldb::WriteBatch')
LOG_LP(INFO) << batch->Count();

Check failure on line 114 in src/limestone/sortdb_wrapper.h

View workflow job for this annotation

GitHub Actions / Clang-Tidy

clang-diagnostic-error

member access into incomplete type 'limestone::api::sortdb_wrapper::batch' (aka 'leveldb::WriteBatch')
WriteOptions write_options{};
auto status = sortdb_->Write(write_options, batch);
if (!status.ok()) {
LOG_LP(ERROR) << "sortdb flush error, status: " << status.ToString();
throw std::runtime_error("error in sortdb flush");
}
LOG_LP(INFO) << batch->Count();

Check failure on line 121 in src/limestone/sortdb_wrapper.h

View workflow job for this annotation

GitHub Actions / Clang-Tidy

clang-diagnostic-error

member access into incomplete type 'limestone::api::sortdb_wrapper::batch' (aka 'leveldb::WriteBatch')
batch->Clear();

Check failure on line 122 in src/limestone/sortdb_wrapper.h

View workflow job for this annotation

GitHub Actions / Clang-Tidy

clang-diagnostic-error

member access into incomplete type 'limestone::api::sortdb_wrapper::batch' (aka 'leveldb::WriteBatch')
}
}

bool get(const std::string& key, std::string* value) {
ReadOptions read_options{};
auto status = sortdb_->Get(read_options, key, value);
Expand Down
12 changes: 6 additions & 6 deletions test/limestone/log/dblog_scan_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,13 @@ static constexpr const char* location = "/tmp/dblog_scan_test";
dblog_scan::parse_error pe;
std::vector<log_entry::read_error> errors;

epoch_id_type max_epoch = ds.scan_one_pwal_file(p, 0x100, [](const log_entry& e){
epoch_id_type max_epoch = ds.scan_one_pwal_file(p, 0x100, [](void*, const log_entry& e){
VLOG(30) << static_cast<int>(e.type());
}, [&errors](const log_entry::read_error& re){
VLOG(30) << re.message();
errors.emplace_back(re);
return false;
}, pe);
}, pe, 0);

check(p, max_epoch, errors, pe);
EXPECT_EQ(boost::filesystem::file_size(p), data.size()); // no size change
Expand All @@ -99,13 +99,13 @@ static constexpr const char* location = "/tmp/dblog_scan_test";
dblog_scan::parse_error pe;
std::vector<log_entry::read_error> errors;

epoch_id_type max_epoch = ds.scan_one_pwal_file(p, 0x100, [](const log_entry& e){
epoch_id_type max_epoch = ds.scan_one_pwal_file(p, 0x100, [](void*, const log_entry& e){
VLOG(30) << static_cast<int>(e.type());
}, [&errors](const log_entry::read_error& re){
VLOG(30) << re.message();
errors.emplace_back(re);
return false;
}, pe);
}, pe, 0);

check(p, max_epoch, errors, pe);
EXPECT_EQ(boost::filesystem::file_size(p), data.size()); // no size change
Expand All @@ -122,13 +122,13 @@ static constexpr const char* location = "/tmp/dblog_scan_test";
dblog_scan::parse_error pe;
std::vector<log_entry::read_error> errors;

epoch_id_type max_epoch = ds.scan_one_pwal_file(p, 0x100, [](const log_entry& e){
epoch_id_type max_epoch = ds.scan_one_pwal_file(p, 0x100, [](void*, const log_entry& e){
VLOG(30) << static_cast<int>(e.type());
}, [&errors](const log_entry::read_error& re){
VLOG(30) << re.message();
errors.emplace_back(re);
return false;
}, pe);
}, pe, 0);

check(p, max_epoch, errors, pe);
}
Expand Down
4 changes: 2 additions & 2 deletions test/limestone/log/durable_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,10 @@ std::tuple<limestone::api::log_entry::read_error, limestone::api::epoch_id_type,
std::vector<log_entry> entries;
limestone::api::epoch_id_type durable_epoch = UINT64_MAX;
try {
durable_epoch = ds.scan_one_pwal_file(p, ld_epoch, [&entries](log_entry& e){ entries.emplace_back(e); }, [&err](log_entry::read_error& e) -> bool {
durable_epoch = ds.scan_one_pwal_file(p, ld_epoch, [&entries](void*, log_entry& e){ entries.emplace_back(e); }, [&err](log_entry::read_error& e) -> bool {
err = e;
throw std::runtime_error("pwal file read error");
}, ec);
}, ec, 0);
} catch (std::runtime_error& re) {
}
return {err, durable_epoch, std::move(entries)};
Expand Down
4 changes: 2 additions & 2 deletions test/limestone/log/log_dir_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ const boost::filesystem::path compaction_catalog_path = boost::filesystem::path(

static bool starts_with(std::string a, std::string b) { return a.substr(0, b.length()) == b; }
static bool is_pwal(const boost::filesystem::path& p) { return starts_with(p.filename().string(), "pwal"); }
static void ignore_entry(limestone::api::log_entry&) {}
static void ignore_entry(void*, limestone::api::log_entry&) {}

void create_mainfest_file(int persistent_format_version = 1) {
create_file(manifest_path, data_manifest(persistent_format_version));
Expand Down Expand Up @@ -281,7 +281,7 @@ TEST_F(log_dir_test, scan_pwal_files_in_dir_returns_max_epoch_nondurable) {
// EXPECT_EQ(limestone::internal::scan_pwal_files_in_dir(location, 2, is_pwal, 0x100, ignore_entry), 0x101);
limestone::internal::dblog_scan ds{boost::filesystem::path(location)};
ds.set_thread_num(2);
EXPECT_EQ(ds.scan_pwal_files(0x100, ignore_entry, [](limestone::api::log_entry::read_error&){return false;}), 0x101);
EXPECT_EQ(ds.scan_pwal_files(0x100, ignore_entry, nullptr, [](limestone::api::log_entry::read_error&){return false;}), 0x101);
}

TEST_F(log_dir_test, scan_pwal_files_in_dir_rejects_unexpected_EOF) {
Expand Down

0 comments on commit 280c617

Please sign in to comment.