Skip to content

Commit

Permalink
Merge pull request #37 from project-tsurugi/fsync-stdio
Browse files Browse the repository at this point in the history
fsync after pwal flush
  • Loading branch information
ban-nobuhiro authored Sep 28, 2023
2 parents 04b4c37 + f5b782d commit 6df9223
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 78 deletions.
5 changes: 3 additions & 2 deletions include/limestone/api/log_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
#pragma once

#include <cstdio>
#include <string>
#include <string_view>
#include <cstdint>
Expand Down Expand Up @@ -128,7 +129,7 @@ class log_channel {
/**
* @brief this is for test purpose only, must not be used for any purpose other than testing
*/
boost::filesystem::path file_path() const noexcept;
[[nodiscard]] boost::filesystem::path file_path() const noexcept;

private:
datastore& envelope_;
Expand All @@ -139,7 +140,7 @@ class log_channel {

std::size_t id_{};

boost::filesystem::ofstream strm_;
FILE* strm_{};

bool registered_{};

Expand Down
12 changes: 5 additions & 7 deletions src/limestone/datastore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,12 @@ datastore::datastore(configuration const& conf) : location_(conf.data_locations_
epoch_file_path_ = location_ / boost::filesystem::path(std::string(epoch_file_name));
const bool result = boost::filesystem::exists(epoch_file_path_, error);
if (!result || error) {
boost::filesystem::ofstream strm{};
strm.open(epoch_file_path_, std::ios_base::out | std::ios_base::app | std::ios_base::binary);
if(!strm || !strm.is_open() || strm.bad() || strm.fail()){
FILE* strm = fopen(epoch_file_path_.c_str(), "a"); // NOLINT
if (!strm) {
LOG_LP(ERROR) << "does not have write permission for the log_location directory, path: " << location_;
throw std::runtime_error("does not have write permission for the log_location directory"); //NOLINT
}
strm.close();
fclose(strm); // NOLINT TODO: error check
add_file(epoch_file_path_);
}

Expand Down Expand Up @@ -156,10 +155,9 @@ void datastore::update_min_epoch_id(bool from_switch_epoch) noexcept {
if (epoch_id_recorded_.compare_exchange_strong(old_epoch_id, to_be_epoch)) {
std::lock_guard<std::mutex> lock(mtx_epoch_file_);

boost::filesystem::ofstream strm{};
strm.open(epoch_file_path_, std::ios_base::out | std::ios_base::app | std::ios_base::binary );
FILE* strm = fopen(epoch_file_path_.c_str(), "a"); // NOLINT TODO: error check
log_entry::durable_epoch(strm, static_cast<epoch_id_type>(epoch_id_informed_.load()));
strm.close();
fclose(strm); // NOLINT TODO: error check
break;
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/limestone/datastore_snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,14 +212,14 @@ void datastore::create_snapshot() noexcept { // NOLINT(readability-function-cog
}
}

boost::filesystem::ofstream ostrm{};
boost::filesystem::path snapshot_file = sub_dir / boost::filesystem::path(std::string(snapshot::file_name_));
VLOG_LP(log_info) << "generating snapshot file: " << snapshot_file;
ostrm.open(snapshot_file, std::ios_base::out | std::ios_base::trunc | std::ios_base::binary);
if( ostrm.fail() ){
FILE* ostrm = fopen(snapshot_file.c_str(), "w"); // NOLINT
if (!ostrm) {
LOG_LP(ERROR) << "cannot create snapshot file (" << snapshot_file << ")";
std::abort();
}
setvbuf(ostrm, nullptr, _IOFBF, 1024L * 1024L); // NOLINT TODO: error check
static_assert(sizeof(log_entry::entry_type) == 1);
#if defined SORT_METHOD_PUT_ONLY
sortdb->each([&ostrm, last_key = std::string{}](std::string_view db_key, std::string_view db_value) mutable {
Expand Down Expand Up @@ -264,7 +264,7 @@ void datastore::create_snapshot() noexcept { // NOLINT(readability-function-cog
}
});
#endif
ostrm.close();
fclose(ostrm); // NOLINT TODO: error check
}

} // namespace limestone::api
11 changes: 8 additions & 3 deletions src/limestone/log_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ void log_channel::begin_session() noexcept {
} while (current_epoch_id_.load() != envelope_.epoch_id_switched_.load());

auto log_file = file_path();
strm_.open(log_file, std::ios_base::out | std::ios_base::app | std::ios_base::binary);
strm_ = fopen(log_file.c_str(), "a"); // NOLINT TODO: error check
setvbuf(strm_, nullptr, _IOFBF, 1024L * 1024L); // NOLINT TODO: error check
if (!registered_) {
envelope_.add_file(log_file);
registered_ = true;
Expand All @@ -53,11 +54,15 @@ void log_channel::begin_session() noexcept {
}

void log_channel::end_session() noexcept {
strm_.flush();
fflush(strm_); // NOLINT TODO: error check
if (int rc = fsync(fileno(strm_)); rc != 0) {
LOG_LP(ERROR) << "fsync failed, errno = " << errno;
std::abort();
}
finished_epoch_id_.store(current_epoch_id_.load());
current_epoch_id_.store(UINT64_MAX);
envelope_.update_min_epoch_id();
strm_.close();
fclose(strm_); // NOLINT TODO: error check
}

void log_channel::abort_session([[maybe_unused]] status status_code, [[maybe_unused]] const std::string& message) noexcept {
Expand Down
68 changes: 35 additions & 33 deletions src/limestone/log_entry.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2022 tsurugi project.
* Copyright 2022-2023 tsurugi project.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -14,7 +14,9 @@
* limitations under the License.
*/
#pragma once
#include <iostream>

#include <cstdio>
#include <istream>
#include <string>
#include <string_view>
#include <exception>
Expand Down Expand Up @@ -42,24 +44,24 @@ class log_entry {

log_entry() = default;

static void begin_session(boost::filesystem::ofstream& strm, epoch_id_type epoch) {
static void begin_session(FILE* strm, epoch_id_type epoch) {
entry_type type = entry_type::marker_begin;
write_uint8(strm, static_cast<std::uint8_t>(type));
write_uint64(strm, static_cast<std::uint64_t>(epoch));
}
static void end_session(boost::filesystem::ofstream& strm, epoch_id_type epoch) {
static void end_session(FILE* strm, epoch_id_type epoch) {
entry_type type = entry_type::marker_end;
write_uint8(strm, static_cast<std::uint8_t>(type));
write_uint64(strm, static_cast<std::uint64_t>(epoch));
}
static void durable_epoch(boost::filesystem::ofstream& strm, epoch_id_type epoch) {
static void durable_epoch(FILE* strm, epoch_id_type epoch) {
entry_type type = entry_type::marker_durable;
write_uint8(strm, static_cast<std::uint8_t>(type));
write_uint64(strm, static_cast<std::uint64_t>(epoch));
}

// for writer (entry)
void write(boost::filesystem::ofstream& strm) {
void write(FILE* strm) {
switch(entry_type_) {
case entry_type::normal_entry:
write(strm, key_sid_, value_etc_);
Expand All @@ -81,7 +83,7 @@ class log_entry {
}
}

static void write(boost::filesystem::ofstream& strm, storage_id_type storage_id, std::string_view key, std::string_view value, write_version_type write_version) {
static void write(FILE* strm, storage_id_type storage_id, std::string_view key, std::string_view value, write_version_type write_version) {
entry_type type = entry_type::normal_entry;
write_uint8(strm, static_cast<std::uint8_t>(type));

Expand All @@ -94,14 +96,14 @@ class log_entry {
write_uint32(strm, static_cast<std::uint32_t>(value_len));

write_uint64(strm, static_cast<std::uint64_t>(storage_id));
strm.write(key.data(), static_cast<std::streamsize>(key_len));
fwrite(key.data(), static_cast<std::streamsize>(key_len), 1, strm); // NOLINT TODO: check error

write_uint64(strm, static_cast<std::uint64_t>(write_version.epoch_number_));
write_uint64(strm, static_cast<std::uint64_t>(write_version.minor_write_version_));
strm.write(value.data(), static_cast<std::streamsize>(value_len));
fwrite(value.data(), static_cast<std::streamsize>(value_len), 1, strm); // NOLINT TODO: check error
}

static void write(boost::filesystem::ofstream& strm, std::string_view key_sid, std::string_view value_etc) {
static void write(FILE* strm, std::string_view key_sid, std::string_view value_etc) {
entry_type type = entry_type::normal_entry;
write_uint8(strm, static_cast<std::uint8_t>(type));

Expand All @@ -113,11 +115,11 @@ class log_entry {
assert(value_len <= UINT32_MAX); // NOLINT(cppcoreguidelines-pro-bounds-array-to-pointer-decay)
write_uint32(strm, static_cast<std::uint32_t>(value_len));

strm.write(key_sid.data(), static_cast<std::streamsize>(key_sid.length()));
strm.write(value_etc.data(), static_cast<std::streamsize>(value_etc.length()));
fwrite(key_sid.data(), static_cast<std::streamsize>(key_sid.length()), 1, strm); // NOLINT TODO: check error
fwrite(value_etc.data(), static_cast<std::streamsize>(value_etc.length()), 1, strm); // NOLINT TODO: check error
}

static void write_remove(boost::filesystem::ofstream& strm, storage_id_type storage_id, std::string_view key, write_version_type write_version) {
static void write_remove(FILE* strm, storage_id_type storage_id, std::string_view key, write_version_type write_version) {
entry_type type = entry_type::remove_entry;
write_uint8(strm, static_cast<std::uint8_t>(type));

Expand All @@ -126,22 +128,22 @@ class log_entry {
write_uint32(strm, static_cast<std::uint32_t>(key_len));

write_uint64(strm, static_cast<std::uint64_t>(storage_id));
strm.write(key.data(), static_cast<std::streamsize>(key_len));
fwrite(key.data(), static_cast<std::streamsize>(key_len), 1, strm); // NOLINT TODO: check error

write_uint64(strm, static_cast<std::uint64_t>(write_version.epoch_number_));
write_uint64(strm, static_cast<std::uint64_t>(write_version.minor_write_version_));
}

static void write_remove(boost::filesystem::ofstream& strm, std::string_view key_sid, std::string_view value_etc) {
static void write_remove(FILE* strm, std::string_view key_sid, std::string_view value_etc) {
entry_type type = entry_type::remove_entry;
write_uint8(strm, static_cast<std::uint8_t>(type));

std::size_t key_len = key_sid.length() - sizeof(storage_id_type);
assert(key_len <= UINT32_MAX); // NOLINT(cppcoreguidelines-pro-bounds-array-to-pointer-decay)
write_uint32(strm, static_cast<std::uint32_t>(key_len));

strm.write(key_sid.data(), static_cast<std::streamsize>(key_sid.length()));
strm.write(value_etc.data(), static_cast<std::streamsize>(value_etc.length()));
fwrite(key_sid.data(), static_cast<std::streamsize>(key_sid.length()), 1, strm); // NOLINT TODO: check error
fwrite(value_etc.data(), static_cast<std::streamsize>(value_etc.length()), 1, strm); // NOLINT TODO: check error
}

// for reader
Expand Down Expand Up @@ -233,14 +235,14 @@ class log_entry {
std::string value_etc_{};
char one_char_{};

static void write_uint8(std::ostream& out, const std::uint8_t value) {
out.put(static_cast<char>(value));
static void write_uint8(FILE* out, const std::uint8_t value) {
fputc(value, out); // NOLINT TODO: check error
}
static void write_uint32(std::ostream& out, const std::uint32_t value) {
out.put(static_cast<char>((value>>0U)&0xFFU));
out.put(static_cast<char>((value>>8U)&0xFFU));
out.put(static_cast<char>((value>>16U)&0xFFU));
out.put(static_cast<char>((value>>24U)&0xFFU));
static void write_uint32(FILE* out, const std::uint32_t value) {
fputc(static_cast<int>((value>>0U)&0xFFU), out); // NOLINT TODO: check error
fputc(static_cast<int>((value>>8U)&0xFFU), out); // NOLINT TODO: check error
fputc(static_cast<int>((value>>16U)&0xFFU), out); // NOLINT TODO: check error
fputc(static_cast<int>((value>>24U)&0xFFU), out); // NOLINT TODO: check error
}
static std::uint32_t read_uint32(std::istream& in) {
std::uint32_t value = (static_cast<std::uint8_t>(in.get())&0xFFU);
Expand All @@ -249,15 +251,15 @@ class log_entry {
value |= (static_cast<std::uint8_t>(in.get())&0xFFU)<<24U;
return value;
}
static void write_uint64(std::ostream& out, const std::uint64_t value) {
out.put(static_cast<char>((value>>0U)&0xFFU));
out.put(static_cast<char>((value>>8U)&0xFFU));
out.put(static_cast<char>((value>>16U)&0xFFU));
out.put(static_cast<char>((value>>24U)&0xFFU));
out.put(static_cast<char>((value>>32U)&0xFFU));
out.put(static_cast<char>((value>>40U)&0xFFU));
out.put(static_cast<char>((value>>48U)&0xFFU));
out.put(static_cast<char>((value>>56U)&0xFFU));
static void write_uint64(FILE* out, const std::uint64_t value) {
fputc(static_cast<int>((value>>0U)&0xFFU), out); // NOLINT TODO: check error
fputc(static_cast<int>((value>>8U)&0xFFU), out); // NOLINT TODO: check error
fputc(static_cast<int>((value>>16U)&0xFFU), out); // NOLINT TODO: check error
fputc(static_cast<int>((value>>24U)&0xFFU), out); // NOLINT TODO: check error
fputc(static_cast<int>((value>>32U)&0xFFU), out); // NOLINT TODO: check error
fputc(static_cast<int>((value>>40U)&0xFFU), out); // NOLINT TODO: check error
fputc(static_cast<int>((value>>48U)&0xFFU), out); // NOLINT TODO: check error
fputc(static_cast<int>((value>>56U)&0xFFU), out); // NOLINT TODO: check error
}
static std::uint64_t read_uint64(std::istream& in) {
std::uint64_t value_l = (static_cast<std::uint8_t>(in.get())&0xFFU);
Expand Down
12 changes: 5 additions & 7 deletions test/limestone/log/log_entry_4_LevelDB_test.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2022 tsurugi project.
* Copyright 2022-2023 tsurugi project.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -58,22 +58,20 @@ class log_entry_4_LevelDB_test : public ::testing::Test {
};

TEST_F(log_entry_4_LevelDB_test, write_and_read_and_write_and_read) {
boost::filesystem::ofstream ostrm;
limestone::api::log_entry log_entry;

ostrm.open(file1_, std::ios_base::out | std::ios_base::app | std::ios_base::binary);
FILE* ostrm = fopen(file1_.c_str(), "a");
limestone::api::log_entry::write(ostrm, storage_id, key, value, write_version);
ostrm.close();
fclose(ostrm);

boost::filesystem::ifstream istrm;
istrm.open(file1_, std::ios_base::in | std::ios_base::binary);
boost::filesystem::ofstream ostrm2;
ostrm2.open(file2_, std::ios_base::out | std::ios_base::app | std::ios_base::binary);
FILE* ostrm2 = fopen(file2_.c_str(), "a");
while(log_entry.read(istrm)) {
limestone::api::log_entry::write(ostrm2, log_entry.key_sid(), log_entry.value_etc());
}
istrm.close();
ostrm2.close();
fclose(ostrm2);

boost::filesystem::ifstream istrm2;
istrm2.open(file2_, std::ios_base::in | std::ios_base::binary);
Expand Down
19 changes: 7 additions & 12 deletions test/limestone/log/log_entry_test.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2022 tsurugi project.
* Copyright 2022-2023 tsurugi project.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -56,11 +56,9 @@ class log_entry_test : public ::testing::Test {
};

TEST_F(log_entry_test, write_and_read) {
boost::filesystem::ofstream ostrm;

ostrm.open(file1_, std::ios_base::out | std::ios_base::app | std::ios_base::binary);
FILE* ostrm = fopen(file1_.c_str(), "a");
limestone::api::log_entry::write(ostrm, storage_id, key, value, write_version);
ostrm.close();
fclose(ostrm);

boost::filesystem::ifstream istrm;
istrm.open(file1_, std::ios_base::in | std::ios_base::binary);
Expand All @@ -84,21 +82,18 @@ TEST_F(log_entry_test, write_and_read) {
}

TEST_F(log_entry_test, write_and_read_and_write_and_read) {
boost::filesystem::ofstream ostrm;

ostrm.open(file1_, std::ios_base::out | std::ios_base::app | std::ios_base::binary);
FILE* ostrm = fopen(file1_.c_str(), "a");
limestone::api::log_entry::write(ostrm, storage_id, key, value, write_version);
ostrm.close();
fclose(ostrm);

boost::filesystem::ifstream istrm;
istrm.open(file1_, std::ios_base::in | std::ios_base::binary);
log_entry_.read(istrm);
istrm.close();

boost::filesystem::ofstream ostrm2;
ostrm2.open(file2_, std::ios_base::out | std::ios_base::app | std::ios_base::binary);
FILE* ostrm2 = fopen(file2_.c_str(), "a");
limestone::api::log_entry::write(ostrm2, storage_id, key, value, write_version);
ostrm2.close();
fclose(ostrm2);

boost::filesystem::ifstream istrm2;
istrm2.open(file2_, std::ios_base::in | std::ios_base::binary);
Expand Down
Loading

0 comments on commit 6df9223

Please sign in to comment.