Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add punch hole GC #326

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
cmake_minimum_required(VERSION 3.0)
cmake_minimum_required(VERSION 3.27)
project(titan)
enable_language(CXX)
enable_language(C)
Expand Down Expand Up @@ -145,6 +145,7 @@ if (WITH_TITAN_TESTS AND (CMAKE_BUILD_TYPE STREQUAL "Debug"))
blob_gc_job_test
blob_gc_picker_test
gc_stats_test
punch_hole_gc_test
table_builder_test
thread_safety_test
titan_db_test
Expand Down
7 changes: 4 additions & 3 deletions include/titan/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ struct TitanCFOptions : public ColumnFamilyOptions {
// requirement for blob entries and Titan has to distinguish between real
// data's 0s and 0s created by punch holes).
uint64_t block_size{4096};
bool enable_punch_hole_gc{false};
uint64_t punch_hole_threshold{0};

TitanCFOptions() = default;
explicit TitanCFOptions(const ColumnFamilyOptions& options)
Expand Down Expand Up @@ -219,7 +219,6 @@ struct ImmutableTitanCFOptions {
bool skip_value_in_compaction_filter;

uint64_t block_size;
bool enable_punch_hole_gc;
};

struct MutableTitanCFOptions {
Expand All @@ -229,12 +228,14 @@ struct MutableTitanCFOptions {
: blob_run_mode(opts.blob_run_mode),
min_blob_size(opts.min_blob_size),
blob_file_compression(opts.blob_file_compression),
blob_file_discardable_ratio(opts.blob_file_discardable_ratio) {}
blob_file_discardable_ratio(opts.blob_file_discardable_ratio),
punch_hole_threshold(opts.punch_hole_threshold) {}

TitanBlobRunMode blob_run_mode;
uint64_t min_blob_size;
CompressionType blob_file_compression;
double blob_file_discardable_ratio;
uint64_t punch_hole_threshold;
};

struct TitanOptions : public TitanDBOptions, public TitanCFOptions {
Expand Down
8 changes: 6 additions & 2 deletions src/blob_file_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ BlobFileBuilder::BlobFileBuilder(const TitanDBOptions& db_options,
return;
#endif
}
block_size_ = cf_options.enable_punch_hole_gc ? cf_options.block_size : 0;
block_size_ = cf_options.punch_hole_threshold > 0 ? cf_options.block_size : 0;
WriteHeader();
}

Expand Down Expand Up @@ -164,7 +164,11 @@ void BlobFileBuilder::FlushSampleRecords(OutContexts* out_ctx) {
void BlobFileBuilder::WriteEncoderData(BlobHandle* handle) {
handle->offset = file_->GetFileSize();
handle->size = encoder_.GetEncodedSize();
live_data_size_ += handle->size;
if (block_size_ > 0) {
live_data_size_ += Roundup(handle->size, block_size_);
} else {
live_data_size_ += handle->size;
}

status_ = file_->Append(encoder_.GetHeader());
if (ok()) {
Expand Down
2 changes: 2 additions & 0 deletions src/blob_file_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ class BlobFileBuilder {
const std::string& GetSmallestKey() { return smallest_key_; }
const std::string& GetLargestKey() { return largest_key_; }

uint64_t GetBlockSize() { return block_size_; }

uint64_t live_data_size() const { return live_data_size_; }

private:
Expand Down
95 changes: 50 additions & 45 deletions src/blob_file_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,23 +79,12 @@ bool BlobFileIterator::Init() {
return true;
}

uint64_t BlobFileIterator::AdjustOffsetToNextBlockHead() {
if (block_size_ == 0) return 0;
uint64_t block_offset = iterate_offset_ % block_size_;
if (block_offset != 0) {
uint64_t shift = block_size_ - block_offset;
iterate_offset_ += shift;
return shift;
}
return 0;
}

void BlobFileIterator::SeekToFirst() {
if (!init_ && !Init()) return;
status_ = Status::OK();
iterate_offset_ = header_size_;
if (block_size_ != 0) {
AdjustOffsetToNextBlockHead();
iterate_offset_ = Roundup(iterate_offset_, block_size_);
}
PrefetchAndGet();
}
Expand Down Expand Up @@ -124,7 +113,8 @@ void BlobFileIterator::IterateForPrev(uint64_t offset) {

uint64_t total_length = 0;
FixedSlice<kRecordHeaderSize> header_buffer;
iterate_offset_ = header_size_;
iterate_offset_ =
block_size_ > 0 ? Roundup(header_size_, block_size_) : header_size_;
while (iterate_offset_ < offset) {
// With for_compaction=true, rate_limiter is enabled. Since
// BlobFileIterator is only used for GC, we always set for_compaction to
Expand All @@ -136,26 +126,35 @@ void BlobFileIterator::IterateForPrev(uint64_t offset) {
status_ = decoder_.DecodeHeader(&header_buffer);
if (!status_.ok()) return;
total_length = kRecordHeaderSize + decoder_.GetRecordSize();
iterate_offset_ += total_length;
if (block_size_ != 0) {
total_length += AdjustOffsetToNextBlockHead();
total_length = Roundup(total_length, block_size_);
}
iterate_offset_ += total_length;
}

if (iterate_offset_ > offset) iterate_offset_ -= total_length;
valid_ = false;
}

void BlobFileIterator::GetBlobRecord() {
bool BlobFileIterator::GetBlobRecord() {
FixedSlice<kRecordHeaderSize> header_buffer;
// With for_compaction=true, rate_limiter is enabled. Since BlobFileIterator
// is only used for GC, we always set for_compaction to true.
status_ = file_->Read(IOOptions(), iterate_offset_, kRecordHeaderSize,
&header_buffer, header_buffer.get(),
nullptr /*aligned_buf*/, true /*for_compaction*/);
if (!status_.ok()) return;
if (!status_.ok()) return false;

// Check if the record is a hole-punch record by checking the size field in
// the header.
uint32_t* size = (uint32_t*)(header_buffer.get() + 4);
if (*size == 0) {
// This is a hole-punch record.
return false;
}

status_ = decoder_.DecodeHeader(&header_buffer);
if (!status_.ok()) return;
if (!status_.ok()) return false;

Slice record_slice;
auto record_size = decoder_.GetRecordSize();
Expand All @@ -170,45 +169,51 @@ void BlobFileIterator::GetBlobRecord() {
decoder_.DecodeRecord(&record_slice, &cur_blob_record_, &uncompressed_,
titan_cf_options_.memory_allocator());
}
if (!status_.ok()) return;
if (!status_.ok()) return false;

cur_record_offset_ = iterate_offset_;
cur_record_size_ = kRecordHeaderSize + record_size;
iterate_offset_ += cur_record_size_;
AdjustOffsetToNextBlockHead();
if (block_size_ != 0) iterate_offset_ = Roundup(iterate_offset_, block_size_);
valid_ = true;
return true;
}

void BlobFileIterator::PrefetchAndGet() {
if (iterate_offset_ >= end_of_blob_record_) {
valid_ = false;
return;
}
while (iterate_offset_ < end_of_blob_record_) {
if (readahead_begin_offset_ > iterate_offset_ ||
readahead_end_offset_ < iterate_offset_) {
// alignment
readahead_begin_offset_ =
iterate_offset_ - (iterate_offset_ & (kDefaultPageSize - 1));
readahead_end_offset_ = readahead_begin_offset_;
readahead_size_ = kMinReadaheadSize;
}
auto min_blob_size =
iterate_offset_ + kRecordHeaderSize + titan_cf_options_.min_blob_size;
if (readahead_end_offset_ <= min_blob_size) {
while (readahead_end_offset_ + readahead_size_ <= min_blob_size &&
readahead_size_ < kMaxReadaheadSize)
readahead_size_ <<= 1;
file_->Prefetch(readahead_end_offset_, readahead_size_);
readahead_end_offset_ += readahead_size_;
readahead_size_ = std::min(kMaxReadaheadSize, readahead_size_ << 1);
}

if (readahead_begin_offset_ > iterate_offset_ ||
readahead_end_offset_ < iterate_offset_) {
// alignment
readahead_begin_offset_ =
iterate_offset_ - (iterate_offset_ & (kDefaultPageSize - 1));
readahead_end_offset_ = readahead_begin_offset_;
readahead_size_ = kMinReadaheadSize;
}
auto min_blob_size =
iterate_offset_ + kRecordHeaderSize + titan_cf_options_.min_blob_size;
if (readahead_end_offset_ <= min_blob_size) {
while (readahead_end_offset_ + readahead_size_ <= min_blob_size &&
readahead_size_ < kMaxReadaheadSize)
readahead_size_ <<= 1;
file_->Prefetch(readahead_end_offset_, readahead_size_);
readahead_end_offset_ += readahead_size_;
readahead_size_ = std::min(kMaxReadaheadSize, readahead_size_ << 1);
}
bool live = GetBlobRecord();

GetBlobRecord();
if (readahead_end_offset_ < iterate_offset_) {
readahead_end_offset_ = iterate_offset_;
}

if (readahead_end_offset_ < iterate_offset_) {
readahead_end_offset_ = iterate_offset_;
// If the record is a hole-punch record, we should continue to the next
// record by adjusting iterate_offset_, otherwise (not a hole-punch record),
// we should break the loop and return the record, iterate_offset_ is
// already adjusted inside GetBlobRecord() in this case.
if (live || !status().ok()) return;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just trying to learn, in what cases will status() be not ok? Do we need to set valid_ to false in that case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, IO errors, In this case, we should not continue the loop.

As for whether setting valid_ to false, I don't think it is necessary, since valid() (the function not the variable) considers status_ already. So I didn't change the behavior (current code does not set valid_ either when encountering io errors).

iterate_offset_ += block_size_;
}
valid_ = false;
}

BlobFileMergeIterator::BlobFileMergeIterator(
Expand Down
8 changes: 4 additions & 4 deletions src/blob_file_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ class BlobFileIterator {
uint64_t readahead_size_{kMinReadaheadSize};

void PrefetchAndGet();
void GetBlobRecord();
uint64_t AdjustOffsetToNextBlockHead();
// Returns false if the record is invalid (punch-hole).
bool GetBlobRecord();
};

class BlobFileMergeIterator {
Expand Down Expand Up @@ -106,9 +106,9 @@ class BlobFileMergeIterator {
public:
// The default constructor is not supposed to be used.
// It is only to make std::priority_queue can compile.
BlobFileIterComparator() : comparator_(nullptr){};
BlobFileIterComparator() : comparator_(nullptr) {};
explicit BlobFileIterComparator(const Comparator* comparator)
: comparator_(comparator){};
: comparator_(comparator) {};
// Smaller value get Higher priority
bool operator()(const BlobFileIterator* iter1,
const BlobFileIterator* iter2) {
Expand Down
5 changes: 2 additions & 3 deletions src/blob_file_iterator_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class BlobFileIteratorTest : public testing::Test {
TitanDBOptions db_options(titan_options_);
TitanCFOptions cf_options(titan_options_);
if (with_blocks) {
cf_options.enable_punch_hole_gc = true;
cf_options.punch_hole_threshold = 4096;
cf_options.block_size = 4096;
}
BlobFileCache cache(db_options, cf_options, {NewLRUCache(128)}, nullptr);
Expand Down Expand Up @@ -194,8 +194,7 @@ TEST_F(BlobFileIteratorTest, IterateForPrev) {
blob_index = blob_file_iterator_->GetBlobIndex();
ASSERT_EQ(blob_handle, blob_index.blob_handle);

while ((idx = Random::GetTLSInstance()->Uniform(n)) == 0)
;
while ((idx = Random::GetTLSInstance()->Uniform(n)) == 0);
blob_handle = contexts[idx]->new_blob_index.blob_handle;
blob_file_iterator_->IterateForPrev(blob_handle.offset - kRecordHeaderSize -
1);
Expand Down
16 changes: 16 additions & 0 deletions src/blob_file_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,22 @@ class BlobFileSet {

bool IsOpened() { return opened_.load(std::memory_order_acquire); }

uint64_t GetBlockSize(uint32_t cf_id) {
MutexLock l(mutex_);
auto storage = GetBlobStorage(cf_id).lock();
if (storage != nullptr && storage->cf_options().punch_hole_threshold > 0) {
return storage->cf_options().block_size;
}
return 0;
}

std::unordered_map<uint64_t, uint64_t> GetFileBlockSizes(uint32_t cf_id) {
MutexLock l(mutex_);
auto storage = GetBlobStorage(cf_id).lock();
return storage ? storage->GetFileBlockSizes()
: std::unordered_map<uint64_t, uint64_t>();
}

private:
struct ManifestWriter;

Expand Down
34 changes: 30 additions & 4 deletions src/blob_file_size_collector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,13 @@ namespace titandb {

TablePropertiesCollector*
BlobFileSizeCollectorFactory::CreateTablePropertiesCollector(
rocksdb::TablePropertiesCollectorFactory::Context /* context */) {
return new BlobFileSizeCollector();
rocksdb::TablePropertiesCollectorFactory::Context context) {
if (blob_file_set_ != nullptr) {
return new BlobFileSizeCollector(
blob_file_set_->GetBlockSize(context.column_family_id),
blob_file_set_->GetFileBlockSizes(context.column_family_id));
}
return new BlobFileSizeCollector(0, {});
}

const std::string BlobFileSizeCollector::kPropertiesName =
Expand Down Expand Up @@ -57,11 +62,32 @@ Status BlobFileSizeCollector::AddUserKey(const Slice& /* key */,
return s;
}

auto size = index.blob_handle.size;
if (default_block_size_ > 0 || !file_block_sizes_.empty()) {
// If the blob file cannot be found in the block size map, it must be a
// newly created file that has not been added blob_file_set, in this case,
// we know the block size of the file is default_block_size_.
// If the blob file can be found in the block size map, it implies we are
// moving the reference only, while keeping the blob at the original file,
// in this case, we should use the block size of the original file.
uint64_t block_size = default_block_size_;
if (!file_block_sizes_.empty()) {
auto iter = file_block_sizes_.find(index.file_number);
if (iter != file_block_sizes_.end()) {
block_size = iter->second;
}
}
if (block_size > 0) {
// Align blob size with block size.
size = Roundup(size, block_size);
}
}

auto iter = blob_files_size_.find(index.file_number);
if (iter == blob_files_size_.end()) {
blob_files_size_[index.file_number] = index.blob_handle.size;
blob_files_size_[index.file_number] = size;
} else {
iter->second += index.blob_handle.size;
iter->second += size;
}

return Status::OK();
Expand Down
17 changes: 17 additions & 0 deletions src/blob_file_size_collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,31 @@ namespace titandb {
class BlobFileSizeCollectorFactory final
: public TablePropertiesCollectorFactory {
public:
// If punch_hole_gc is enabled, then blob_file_set must be provided.
// If blob_file_set is not provided, then punch_hole_gc will be considered
// disabled, blob size will not align with block size.
BlobFileSizeCollectorFactory(BlobFileSet* blob_file_set = nullptr)
: blob_file_set_(blob_file_set) {}
BlobFileSizeCollectorFactory(const BlobFileSizeCollectorFactory&) = delete;
void operator=(const BlobFileSizeCollectorFactory&) = delete;
TablePropertiesCollector* CreateTablePropertiesCollector(
TablePropertiesCollectorFactory::Context context) override;

const char* Name() const override { return "BlobFileSizeCollector"; }

private:
BlobFileSet* blob_file_set_;
};

class BlobFileSizeCollector final : public TablePropertiesCollector {
public:
const static std::string kPropertiesName;

BlobFileSizeCollector(uint64_t default_block_size,
std::unordered_map<uint64_t, uint64_t> file_block_sizes)
: default_block_size_(default_block_size),
file_block_sizes_(file_block_sizes) {}

static bool Encode(const std::map<uint64_t, uint64_t>& blob_files_size,
std::string* result);
static bool Decode(Slice* slice,
Expand All @@ -38,6 +53,8 @@ class BlobFileSizeCollector final : public TablePropertiesCollector {

private:
std::map<uint64_t, uint64_t> blob_files_size_;
uint64_t default_block_size_;
std::unordered_map<uint64_t, uint64_t> file_block_sizes_;
};

} // namespace titandb
Expand Down
Loading