Skip to content

Commit

Permalink
Merge pull request #42 from project-tsurugi/fsync-stdio
Browse files Browse the repository at this point in the history
error check around file write
  • Loading branch information
ban-nobuhiro authored Oct 10, 2023
2 parents 8af53a4 + 4e665e9 commit 84ca803
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 70 deletions.
25 changes: 19 additions & 6 deletions src/limestone/datastore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@ datastore::datastore(configuration const& conf) : location_(conf.data_locations_
LOG_LP(ERROR) << "does not have write permission for the log_location directory, path: " << location_;
throw std::runtime_error("does not have write permission for the log_location directory"); //NOLINT
}
fclose(strm); // NOLINT TODO: error check
if (fclose(strm) != 0) { // NOLINT
LOG_LP(ERROR) << "fclose failed, errno = " << errno;
throw std::runtime_error("I/O error"); //NOLINT
}
add_file(epoch_file_path_);
}

Expand Down Expand Up @@ -115,7 +118,7 @@ void datastore::switch_epoch(epoch_id_type new_epoch_id) noexcept {
update_min_epoch_id(true);
}

void datastore::update_min_epoch_id(bool from_switch_epoch) noexcept {
void datastore::update_min_epoch_id(bool from_switch_epoch) noexcept { // NOLINT(readability-function-cognitive-complexity)
auto upper_limit = epoch_id_switched_.load() - 1;
epoch_id_type max_finished_epoch = 0;

Expand Down Expand Up @@ -143,14 +146,24 @@ void datastore::update_min_epoch_id(bool from_switch_epoch) noexcept {
if (epoch_id_recorded_.compare_exchange_strong(old_epoch_id, to_be_epoch)) {
std::lock_guard<std::mutex> lock(mtx_epoch_file_);

FILE* strm = fopen(epoch_file_path_.c_str(), "a"); // NOLINT TODO: error check
FILE* strm = fopen(epoch_file_path_.c_str(), "a"); // NOLINT
if (!strm) {
LOG_LP(ERROR) << "fopen failed, errno = " << errno;
std::abort();
}
log_entry::durable_epoch(strm, static_cast<epoch_id_type>(epoch_id_informed_.load()));
fflush(strm); // NOLINT TODO: error check
if (int rc = fsync(fileno(strm)); rc != 0) {
if (fflush(strm) != 0) {
LOG_LP(ERROR) << "fflush failed, errno = " << errno;
std::abort();
}
if (fsync(fileno(strm)) != 0) {
LOG_LP(ERROR) << "fsync failed, errno = " << errno;
std::abort();
}
fclose(strm); // NOLINT TODO: error check
if (fclose(strm) != 0) { // NOLINT
LOG_LP(ERROR) << "fclose failed, errno = " << errno;
std::abort();
}
break;
}
}
Expand Down
7 changes: 5 additions & 2 deletions src/limestone/datastore_snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ void datastore::create_snapshot() noexcept { // NOLINT(readability-function-cog
LOG_LP(ERROR) << "cannot create snapshot file (" << snapshot_file << ")";
std::abort();
}
setvbuf(ostrm, nullptr, _IOFBF, 1024L * 1024L); // NOLINT TODO: error check
setvbuf(ostrm, nullptr, _IOFBF, 128L * 1024L); // NOLINT, NB. glibc may ignore size when _IOFBF and buffer=NULL
static_assert(sizeof(log_entry::entry_type) == 1);
#if defined SORT_METHOD_PUT_ONLY
sortdb->each([&ostrm, last_key = std::string{}](std::string_view db_key, std::string_view db_value) mutable {
Expand Down Expand Up @@ -264,7 +264,10 @@ void datastore::create_snapshot() noexcept { // NOLINT(readability-function-cog
}
});
#endif
fclose(ostrm); // NOLINT TODO: error check
if (fclose(ostrm) != 0) { // NOLINT
LOG_LP(ERROR) << "cannot close snapshot file (" << snapshot_file << "), errno = " << errno;
std::abort();
}
}

} // namespace limestone::api
20 changes: 15 additions & 5 deletions src/limestone/log_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,12 @@ void log_channel::begin_session() noexcept {
} while (current_epoch_id_.load() != envelope_.epoch_id_switched_.load());

auto log_file = file_path();
strm_ = fopen(log_file.c_str(), "a"); // NOLINT TODO: error check
setvbuf(strm_, nullptr, _IOFBF, 1024L * 1024L); // NOLINT TODO: error check
strm_ = fopen(log_file.c_str(), "a"); // NOLINT
if (!strm_) {
LOG_LP(ERROR) << "I/O error, cannot make file on " << location_ << ", errno = " << errno;
std::abort();
}
setvbuf(strm_, nullptr, _IOFBF, 128L * 1024L); // NOLINT, NB. glibc may ignore size when _IOFBF and buffer=NULL
if (!registered_) {
envelope_.add_file(log_file);
registered_ = true;
Expand All @@ -54,15 +58,21 @@ void log_channel::begin_session() noexcept {
}

void log_channel::end_session() noexcept {
fflush(strm_); // NOLINT TODO: error check
if (int rc = fsync(fileno(strm_)); rc != 0) {
if (fflush(strm_) != 0) {
LOG_LP(ERROR) << "fflush failed, errno = " << errno;
std::abort();
}
if (fsync(fileno(strm_)) != 0) {
LOG_LP(ERROR) << "fsync failed, errno = " << errno;
std::abort();
}
finished_epoch_id_.store(current_epoch_id_.load());
current_epoch_id_.store(UINT64_MAX);
envelope_.update_min_epoch_id();
fclose(strm_); // NOLINT TODO: error check
if (fclose(strm_) != 0) { // NOLINT
LOG_LP(ERROR) << "fclose failed, errno = " << errno;
std::abort();
}
}

void log_channel::abort_session([[maybe_unused]] status status_code, [[maybe_unused]] const std::string& message) noexcept {
Expand Down
114 changes: 57 additions & 57 deletions src/limestone/log_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,20 @@
#pragma once

#include <cstdio>
#include <endian.h>
#include <istream>
#include <string>
#include <string_view>
#include <exception>

#include <boost/filesystem/path.hpp>
#include <boost/filesystem/fstream.hpp>
#include <glog/logging.h>

#include <limestone/api/storage_id_type.h>
#include <limestone/api/write_version_type.h>
#include <limestone/logging.h>
#include "logging_helper.h"

namespace limestone::api {

Expand All @@ -47,17 +51,17 @@ class log_entry {
static void begin_session(FILE* strm, epoch_id_type epoch) {
entry_type type = entry_type::marker_begin;
write_uint8(strm, static_cast<std::uint8_t>(type));
write_uint64(strm, static_cast<std::uint64_t>(epoch));
write_uint64le(strm, static_cast<std::uint64_t>(epoch));
}
static void end_session(FILE* strm, epoch_id_type epoch) {
entry_type type = entry_type::marker_end;
write_uint8(strm, static_cast<std::uint8_t>(type));
write_uint64(strm, static_cast<std::uint64_t>(epoch));
write_uint64le(strm, static_cast<std::uint64_t>(epoch));
}
static void durable_epoch(FILE* strm, epoch_id_type epoch) {
entry_type type = entry_type::marker_durable;
write_uint8(strm, static_cast<std::uint8_t>(type));
write_uint64(strm, static_cast<std::uint64_t>(epoch));
write_uint64le(strm, static_cast<std::uint64_t>(epoch));
}

// for writer (entry)
Expand Down Expand Up @@ -89,18 +93,18 @@ class log_entry {

std::size_t key_len = key.length();
assert(key_len <= UINT32_MAX); // NOLINT(cppcoreguidelines-pro-bounds-array-to-pointer-decay)
write_uint32(strm, static_cast<std::uint32_t>(key_len));
write_uint32le(strm, static_cast<std::uint32_t>(key_len));

std::size_t value_len = value.length();
assert(value_len <= UINT32_MAX); // NOLINT(cppcoreguidelines-pro-bounds-array-to-pointer-decay)
write_uint32(strm, static_cast<std::uint32_t>(value_len));
write_uint32le(strm, static_cast<std::uint32_t>(value_len));

write_uint64(strm, static_cast<std::uint64_t>(storage_id));
fwrite(key.data(), static_cast<std::streamsize>(key_len), 1, strm); // NOLINT TODO: check error
write_uint64le(strm, static_cast<std::uint64_t>(storage_id));
write_bytes(strm, key.data(), key_len);

write_uint64(strm, static_cast<std::uint64_t>(write_version.epoch_number_));
write_uint64(strm, static_cast<std::uint64_t>(write_version.minor_write_version_));
fwrite(value.data(), static_cast<std::streamsize>(value_len), 1, strm); // NOLINT TODO: check error
write_uint64le(strm, static_cast<std::uint64_t>(write_version.epoch_number_));
write_uint64le(strm, static_cast<std::uint64_t>(write_version.minor_write_version_));
write_bytes(strm, value.data(), value_len);
}

static void write(FILE* strm, std::string_view key_sid, std::string_view value_etc) {
Expand All @@ -109,14 +113,14 @@ class log_entry {

std::size_t key_len = key_sid.length() - sizeof(storage_id_type);
assert(key_len <= UINT32_MAX); // NOLINT(cppcoreguidelines-pro-bounds-array-to-pointer-decay)
write_uint32(strm, static_cast<std::uint32_t>(key_len));
write_uint32le(strm, static_cast<std::uint32_t>(key_len));

std::size_t value_len = value_etc.length() - (sizeof(epoch_id_type) + sizeof(std::uint64_t));
assert(value_len <= UINT32_MAX); // NOLINT(cppcoreguidelines-pro-bounds-array-to-pointer-decay)
write_uint32(strm, static_cast<std::uint32_t>(value_len));
write_uint32le(strm, static_cast<std::uint32_t>(value_len));

fwrite(key_sid.data(), static_cast<std::streamsize>(key_sid.length()), 1, strm); // NOLINT TODO: check error
fwrite(value_etc.data(), static_cast<std::streamsize>(value_etc.length()), 1, strm); // NOLINT TODO: check error
write_bytes(strm, key_sid.data(), key_sid.length());
write_bytes(strm, value_etc.data(), value_etc.length());
}

static void write_remove(FILE* strm, storage_id_type storage_id, std::string_view key, write_version_type write_version) {
Expand All @@ -125,13 +129,13 @@ class log_entry {

std::size_t key_len = key.length();
assert(key_len <= UINT32_MAX); // NOLINT(cppcoreguidelines-pro-bounds-array-to-pointer-decay)
write_uint32(strm, static_cast<std::uint32_t>(key_len));
write_uint32le(strm, static_cast<std::uint32_t>(key_len));

write_uint64(strm, static_cast<std::uint64_t>(storage_id));
fwrite(key.data(), static_cast<std::streamsize>(key_len), 1, strm); // NOLINT TODO: check error
write_uint64le(strm, static_cast<std::uint64_t>(storage_id));
write_bytes(strm, key.data(), key_len);

write_uint64(strm, static_cast<std::uint64_t>(write_version.epoch_number_));
write_uint64(strm, static_cast<std::uint64_t>(write_version.minor_write_version_));
write_uint64le(strm, static_cast<std::uint64_t>(write_version.epoch_number_));
write_uint64le(strm, static_cast<std::uint64_t>(write_version.minor_write_version_));
}

static void write_remove(FILE* strm, std::string_view key_sid, std::string_view value_etc) {
Expand All @@ -140,10 +144,10 @@ class log_entry {

std::size_t key_len = key_sid.length() - sizeof(storage_id_type);
assert(key_len <= UINT32_MAX); // NOLINT(cppcoreguidelines-pro-bounds-array-to-pointer-decay)
write_uint32(strm, static_cast<std::uint32_t>(key_len));
write_uint32le(strm, static_cast<std::uint32_t>(key_len));

fwrite(key_sid.data(), static_cast<std::streamsize>(key_sid.length()), 1, strm); // NOLINT TODO: check error
fwrite(value_etc.data(), static_cast<std::streamsize>(value_etc.length()), 1, strm); // NOLINT TODO: check error
write_bytes(strm, key_sid.data(), key_sid.length());
write_bytes(strm, value_etc.data(), value_etc.length());
}

// for reader
Expand All @@ -157,8 +161,8 @@ class log_entry {
switch(entry_type_) {
case entry_type::normal_entry:
{
std::size_t key_len = read_uint32(strm);
std::size_t value_len = read_uint32(strm);
std::size_t key_len = read_uint32le(strm);
std::size_t value_len = read_uint32le(strm);

key_sid_.resize(key_len + sizeof(storage_id_type));
strm.read(key_sid_.data(), static_cast<std::streamsize>(key_sid_.length()));
Expand All @@ -168,7 +172,7 @@ class log_entry {
}
case entry_type::remove_entry:
{
std::size_t key_len = read_uint32(strm);
std::size_t key_len = read_uint32le(strm);

key_sid_.resize(key_len + sizeof(storage_id_type));
strm.read(key_sid_.data(), static_cast<std::streamsize>(key_sid_.length()));
Expand All @@ -179,7 +183,7 @@ class log_entry {
case entry_type::marker_begin:
case entry_type::marker_end:
case entry_type::marker_durable:
epoch_id_ = static_cast<epoch_id_type>(read_uint64(strm));
epoch_id_ = static_cast<epoch_id_type>(read_uint64le(strm));
break;

case entry_type::this_id_is_not_used:
Expand Down Expand Up @@ -236,41 +240,37 @@ class log_entry {
char one_char_{};

static void write_uint8(FILE* out, const std::uint8_t value) {
fputc(value, out); // NOLINT TODO: check error
int ret = fputc(value, out);
if (ret == EOF) {
LOG_LP(ERROR) << "fputc failed, errno = " << errno;
std::abort();
}
}
static void write_uint32le(FILE* out, const std::uint32_t value) {
std::uint32_t buf = htole32(value);
write_bytes(out, &buf, sizeof(std::uint32_t));
}
static void write_uint32(FILE* out, const std::uint32_t value) {
fputc(static_cast<int>((value>>0U)&0xFFU), out); // NOLINT TODO: check error
fputc(static_cast<int>((value>>8U)&0xFFU), out); // NOLINT TODO: check error
fputc(static_cast<int>((value>>16U)&0xFFU), out); // NOLINT TODO: check error
fputc(static_cast<int>((value>>24U)&0xFFU), out); // NOLINT TODO: check error
static std::uint32_t read_uint32le(std::istream& in) {
std::uint32_t buf{};
in.read(reinterpret_cast<char*>(&buf), sizeof(std::uint32_t)); // NOLINT
return le32toh(buf);
}
static std::uint32_t read_uint32(std::istream& in) {
std::uint32_t value = (static_cast<std::uint8_t>(in.get())&0xFFU);
value |= (static_cast<std::uint8_t>(in.get())&0xFFU)<<8U;
value |= (static_cast<std::uint8_t>(in.get())&0xFFU)<<16U;
value |= (static_cast<std::uint8_t>(in.get())&0xFFU)<<24U;
return value;
static void write_uint64le(FILE* out, const std::uint64_t value) {
std::uint64_t buf = htole64(value);
write_bytes(out, &buf, sizeof(std::uint64_t));
}
static void write_uint64(FILE* out, const std::uint64_t value) {
fputc(static_cast<int>((value>>0U)&0xFFU), out); // NOLINT TODO: check error
fputc(static_cast<int>((value>>8U)&0xFFU), out); // NOLINT TODO: check error
fputc(static_cast<int>((value>>16U)&0xFFU), out); // NOLINT TODO: check error
fputc(static_cast<int>((value>>24U)&0xFFU), out); // NOLINT TODO: check error
fputc(static_cast<int>((value>>32U)&0xFFU), out); // NOLINT TODO: check error
fputc(static_cast<int>((value>>40U)&0xFFU), out); // NOLINT TODO: check error
fputc(static_cast<int>((value>>48U)&0xFFU), out); // NOLINT TODO: check error
fputc(static_cast<int>((value>>56U)&0xFFU), out); // NOLINT TODO: check error
static std::uint64_t read_uint64le(std::istream& in) {
std::uint64_t buf{};
in.read(reinterpret_cast<char*>(&buf), sizeof(std::uint64_t)); // NOLINT
return le64toh(buf);
}
static std::uint64_t read_uint64(std::istream& in) {
std::uint64_t value_l = (static_cast<std::uint8_t>(in.get())&0xFFU);
value_l |= (static_cast<std::uint8_t>(in.get())&0xFFU)<<8U;
value_l |= (static_cast<std::uint8_t>(in.get())&0xFFU)<<16U;
value_l |= (static_cast<std::uint8_t>(in.get())&0xFFU)<<24U;
std::uint64_t value_u = (static_cast<std::uint8_t>(in.get())&0xFFU);
value_u |= (static_cast<std::uint8_t>(in.get())&0xFFU)<<8U;
value_u |= (static_cast<std::uint8_t>(in.get())&0xFFU)<<16U;
value_u |= (static_cast<std::uint8_t>(in.get())&0xFFU)<<24U;
return ((value_u << 32U) & 0xFFFFFFFF00000000UL) | value_l;
static void write_bytes(FILE* out, const void* buf, std::size_t len) {
if (len == 0) return; // nothing to write
auto ret = fwrite(buf, len, 1, out);
if (ret != 1) {
LOG_LP(ERROR) << "fwrite failed, errno = " << errno;
std::abort();
}
}
};

Expand Down

0 comments on commit 84ca803

Please sign in to comment.