Skip to content

Commit

Permalink
Add hole-punch support
Browse files Browse the repository at this point in the history
Signed-off-by: v01dstar <[email protected]>
  • Loading branch information
v01dstar committed Mar 7, 2024
1 parent 9dd0342 commit 05aa72b
Show file tree
Hide file tree
Showing 8 changed files with 256 additions and 110 deletions.
21 changes: 21 additions & 0 deletions src/blob_file_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,27 @@ void BlobFileBuilder::WriteEncoderData(BlobHandle* handle) {
}
}

void BlobFileBuilder::FillFSBlockWithPadding() {
if (alignment_size_ == 0) {
return;
}
size_t padding = 0;
if (file_->GetFileSize() % alignment_size_ != 0) {
padding = alignment_size_ - file_->GetFileSize() % alignment_size_;
}
if (padding > 0) {
char buf[4096] = {0};
while (padding > sizeof(buf)) {
status_ = file_->Append(Slice(buf, sizeof(buf)));
if (!ok()) {
return;
}
padding -= sizeof(buf);
}
status_ = file_->Append(Slice(buf, padding));
}
}

void BlobFileBuilder::WriteRawBlock(const Slice& block, BlockHandle* handle) {
handle->set_offset(file_->GetFileSize());
handle->set_size(block.size());
Expand Down
3 changes: 3 additions & 0 deletions src/blob_file_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ class BlobFileBuilder {
void WriteCompressionDictBlock(MetaIndexBuilder* meta_index_builder);
void FlushSampleRecords(OutContexts* out_ctx);
void WriteEncoderData(BlobHandle* handle);
void FillFSBlockWithPadding();

TitanCFOptions cf_options_;
WritableFileWriter* file_;
Expand All @@ -142,6 +143,8 @@ class BlobFileBuilder {
std::string smallest_key_;
std::string largest_key_;
uint64_t live_data_size_ = 0;

uint64_t alignment_size_ = 0;
};

} // namespace titandb
Expand Down
87 changes: 59 additions & 28 deletions src/blob_file_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ bool BlobFileIterator::Init() {
BlockBasedTable::kBlockTrailerSize);
}

alignment_size_ = blob_file_footer.alignment_size;

if (blob_file_header.flags & BlobFileHeader::kHasUncompressionDictionary) {
status_ = InitUncompressionDict(blob_file_footer, file_.get(),
&uncompression_dict_,
Expand Down Expand Up @@ -126,16 +128,38 @@ void BlobFileIterator::IterateForPrev(uint64_t offset) {
valid_ = false;
}

void BlobFileIterator::GetBlobRecord() {
void BlobFileIterator::AdjustOffsetToNextAlignment() {
if (alignment_size_ == 0) return;
uint64_t remainder = iterate_offset_ % alignment_size_;
if (remainder != 0) {
iterate_offset_ += alignment_size_ - remainder;
}
}

bool BlobFileIterator::GetBlobRecord() {
FixedSlice<kRecordHeaderSize> header_buffer;
// With for_compaction=true, rate_limiter is enabled. Since BlobFileIterator
// is only used for GC, we always set for_compaction to true.
// With for_compaction=true, rate_limiter is enabled. Since
// BlobFileIterator is only used for GC, we always set for_compaction to
// true.
status_ = file_->Read(IOOptions(), iterate_offset_, kRecordHeaderSize,
&header_buffer, header_buffer.get(),
nullptr /*aligned_buf*/, true /*for_compaction*/);
if (!status_.ok()) return;
status_ = decoder_.DecodeHeader(&header_buffer);
if (!status_.ok()) return;
// If the header buffer is all zero, it means the record is deleted (punch
// hole).
bool deleted = true;
for (size_t i = 0; i < kRecordHeaderSize; i++) {
if (header_buffer[i] != 0) {
deleted = false;
break;
}
}
if (deleted) {
AdjustOffsetToNextAlignment();
return false;
}

Slice record_slice;
auto record_size = decoder_.GetRecordSize();
Expand All @@ -155,39 +179,46 @@ void BlobFileIterator::GetBlobRecord() {
cur_record_offset_ = iterate_offset_;
cur_record_size_ = kRecordHeaderSize + record_size;
iterate_offset_ += cur_record_size_;
// align to next record
AdjustOffsetToNextAlignment();
valid_ = true;
return true;
}

void BlobFileIterator::PrefetchAndGet() {
if (iterate_offset_ >= end_of_blob_record_) {
valid_ = false;
return;
}
while (iterate_offset_ < end_of_blob_record_) {
// TODO: maybe reduce read ahead when encountering punch holes. e.g. just
// read header.
if (readahead_begin_offset_ > iterate_offset_ ||
readahead_end_offset_ < iterate_offset_) {
// alignment
readahead_begin_offset_ =
iterate_offset_ - (iterate_offset_ & (kDefaultPageSize - 1));
readahead_end_offset_ = readahead_begin_offset_;
readahead_size_ = kMinReadaheadSize;
}
auto min_blob_size =
iterate_offset_ + kRecordHeaderSize + titan_cf_options_.min_blob_size;
if (readahead_end_offset_ <= min_blob_size) {
while (readahead_end_offset_ + readahead_size_ <= min_blob_size &&
readahead_size_ < kMaxReadaheadSize)
readahead_size_ <<= 1;
file_->Prefetch(readahead_end_offset_, readahead_size_);
readahead_end_offset_ += readahead_size_;
readahead_size_ = std::min(kMaxReadaheadSize, readahead_size_ << 1);
}

if (readahead_begin_offset_ > iterate_offset_ ||
readahead_end_offset_ < iterate_offset_) {
// alignment
readahead_begin_offset_ =
iterate_offset_ - (iterate_offset_ & (kDefaultPageSize - 1));
readahead_end_offset_ = readahead_begin_offset_;
readahead_size_ = kMinReadaheadSize;
}
auto min_blob_size =
iterate_offset_ + kRecordHeaderSize + titan_cf_options_.min_blob_size;
if (readahead_end_offset_ <= min_blob_size) {
while (readahead_end_offset_ + readahead_size_ <= min_blob_size &&
readahead_size_ < kMaxReadaheadSize)
readahead_size_ <<= 1;
file_->Prefetch(readahead_end_offset_, readahead_size_);
readahead_end_offset_ += readahead_size_;
readahead_size_ = std::min(kMaxReadaheadSize, readahead_size_ << 1);
}
bool live = GetBlobRecord();

GetBlobRecord();
if (readahead_end_offset_ < iterate_offset_) {
readahead_end_offset_ = iterate_offset_;
}

if (readahead_end_offset_ < iterate_offset_) {
readahead_end_offset_ = iterate_offset_;
// If the record is valid (not punch-holed), we can return. Otherwise,
// continue iterating until we find a valid record.
if (live) return;
}
valid_ = false;
}

BlobFileMergeIterator::BlobFileMergeIterator(
Expand Down
10 changes: 9 additions & 1 deletion src/blob_file_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ class BlobFileIterator {
Slice value() const;
Status status() const { return status_; }
uint64_t header_size() const { return header_size_; }
uint64_t file_number() const { return file_number_; }
uint64_t alginment_size() const { return alignment_size_; }

void IterateForPrev(uint64_t);

Expand All @@ -61,6 +63,8 @@ class BlobFileIterator {
bool valid_{false};

std::unique_ptr<UncompressionDict> uncompression_dict_;
uint64_t alignment_size_{0};

BlobDecoder decoder_;

uint64_t iterate_offset_{0};
Expand All @@ -76,7 +80,11 @@ class BlobFileIterator {
uint64_t readahead_size_{kMinReadaheadSize};

void PrefetchAndGet();
void GetBlobRecord();
// Return whether the record at the current offset is valid or not (punch
// hole), if it is deleted, callers needs to move the offset to the next
// block.
bool GetBlobRecord();
void AdjustOffsetToNextAlignment();
};

class BlobFileMergeIterator {
Expand Down
14 changes: 13 additions & 1 deletion src/blob_format.cc
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ Status BlobFileHeader::DecodeFrom(Slice* src) {
"Blob file header magic number missing or mismatched.");
}
if (!GetFixed32(src, &version) ||
(version != kVersion1 && version != kVersion2)) {
(version != kVersion1 && version != kVersion2 && version != kVersion3)) {
return Status::Corruption("Blob file header version missing or invalid.");
}
if (version == BlobFileHeader::kVersion2) {
Expand All @@ -305,6 +305,7 @@ Status BlobFileHeader::DecodeFrom(Slice* src) {

void BlobFileFooter::EncodeTo(std::string* dst) const {
auto size = dst->size();
PutFixed64(dst, alignment_size);
meta_index_handle.EncodeTo(dst);
// Add padding to make a fixed size footer.
dst->resize(size + kEncodedLength - 12);
Expand All @@ -315,6 +316,17 @@ void BlobFileFooter::EncodeTo(std::string* dst) const {

Status BlobFileFooter::DecodeFrom(Slice* src) {
auto data = src->data();
if (version == BlobFileHeader::kVersion3) {
if (!GetFixed64(src, &alignment_size)) {
return Status::Corruption("BlobFileFooter", "alignment size");
}
} else {
// src's size is kEncodedLength regardless of version. If version is not 3,
// the first 8 bytes should be ignored.
src->remove_prefix(8);
// Update the footer's offset.
data = src->data();
}
Status s = meta_index_handle.DecodeFrom(src);
if (!s.ok()) {
return Status::Corruption("BlobFileFooter", s.ToString());
Expand Down
33 changes: 23 additions & 10 deletions src/blob_format.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ namespace titandb {
//
const uint64_t kBlobMaxHeaderSize = 12;
const uint64_t kRecordHeaderSize = 9;
const uint64_t kBlobFooterSize = BlockHandle::kMaxEncodedLength + 8 + 4;
const uint64_t kBlobFooterSize = 8 + BlockHandle::kMaxEncodedLength + 8 + 4;
const std::string kAlignmentSizeBlockName = "titan.alignment_size";

// Format of blob record (not fixed size):
//
Expand Down Expand Up @@ -327,14 +328,16 @@ struct BlobFileHeader {
static const uint32_t kHeaderMagicNumber = 0x2be0a614ul;
static const uint32_t kVersion1 = 1;
static const uint32_t kVersion2 = 2;
// Introducing alignment size in version 3.
static const uint32_t kVersion3 = 3;

static const uint64_t kMinEncodedLength = 4 + 4;
static const uint64_t kMaxEncodedLength = 4 + 4 + 4;

// Flags:
static const uint32_t kHasUncompressionDictionary = 1 << 0;

uint32_t version = kVersion2;
uint32_t version = kVersion3;
uint32_t flags = 0;

static Status ValidateVersion(uint32_t ver) {
Expand All @@ -355,22 +358,32 @@ struct BlobFileHeader {
Status DecodeFrom(Slice* src);
};

// Format of blob file footer (BlockHandle::kMaxEncodedLength + 12):
// Format of blob file footer V3 (BlockHandle::kMaxEncodedLength + 20):
//
// +---------------------+-------------+--------------+----------+
// | meta index handle | padding | magic number | checksum |
// +---------------------+-------------+--------------+----------+
// | Varint64 + Varint64 | padding_len | Fixed64 | Fixed32 |
// +---------------------+-------------+--------------+----------+
// +------------------+---------------------+-------------+
// | alignment size | meta index handle | padding |
// +------------------+---------------------+-------------+
// | Fixed64 | Varint64 + Varint64 | padding_len |
// +------------------+---------------------+-------------+
//
// To make the blob file footer fixed size,
// the padding_len is `BlockHandle::kMaxEncodedLength - meta_handle_len`
// +--------------+----------+
// | magic number | checksum |
// +--------------+----------+
// | Fixed64 | Fixed32 |
// +--------------+----------+
//
// To make the blob file footer fixed size, the padding_len is calculated as:
// `BlockHandle::kMaxEncodedLength - meta_handle_len - sizeof(uint64_t)`
struct BlobFileFooter {
// The first 64bits from $(echo titandb/blob | sha1sum).
static const uint64_t kFooterMagicNumber{0x2be0a6148e39edc6ull};
static const uint64_t kEncodedLength{kBlobFooterSize};

BlockHandle meta_index_handle{BlockHandle::NullBlockHandle()};
uint64_t alignment_size{0};

// Non-persistent field.
uint32_t version = BlobFileHeader::kVersion3;

void EncodeTo(std::string* dst) const;
Status DecodeFrom(Slice* src);
Expand Down
Loading

0 comments on commit 05aa72b

Please sign in to comment.