Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update FilePrefetchBuffer::Read to reuse file system buffer when possible #13118

Closed
wants to merge 31 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
2890d0b
Update FilePrefetchBuffer::Read to use FS buffer
archang19 Nov 5, 2024
5b9542d
Add initial draft unit test
archang19 Nov 6, 2024
927b5d2
Update logic for determining start offset
archang19 Nov 7, 2024
1578f89
Check buffer offsets and sizes
archang19 Nov 7, 2024
7b91d32
Nit: technically more accurate initial_end_offset
archang19 Nov 8, 2024
00896e2
Very rough draft implementation for staging buffer
archang19 Nov 8, 2024
6102466
Use req_len to determine staging buffer size
archang19 Nov 9, 2024
de127d5
Minor touch ups
archang19 Nov 11, 2024
62ae064
Test staging buffer offset and sizes
archang19 Nov 11, 2024
d3b9990
Add GetRequiredBufferAlignment helper
archang19 Nov 11, 2024
3033665
Can use the 'aligned' offsets and lengths
archang19 Nov 11, 2024
f00f17c
Minor refactoring
archang19 Nov 11, 2024
b3784bf
Make staging buffer logic more like overlap buffer logic
archang19 Nov 11, 2024
9ac8c28
Unify methods for copying to staging_buf_ and overlap_buf_
archang19 Nov 11, 2024
ceb7ebd
Same logic for copying remaining data can be used for staging_buf_
archang19 Nov 12, 2024
06a4317
Remove unused part of test class
archang19 Nov 12, 2024
51d5e73
Only enable fs buffer reuse when num_buffers is 1
archang19 Nov 12, 2024
9c41306
Add another test case
archang19 Nov 12, 2024
34fd43d
Minor comments, refactors, test clarification
archang19 Nov 13, 2024
f9c88bf
Reorder in/out parameters for PrepareBufferForRead and ReadAheadSizeT…
archang19 Nov 14, 2024
2d0b808
Replace unique_ptr<char[]> with FSAllocationPtr
archang19 Nov 14, 2024
bece01d
Remove unnecessary reset()s
archang19 Nov 15, 2024
c4e3914
Unify staging_buf_ and overlap_buf_
archang19 Nov 15, 2024
b5fe272
Replace references of staging buffer in tests
archang19 Nov 15, 2024
0e64b6a
Minor refactor
archang19 Nov 15, 2024
fe96e86
Remove code that should not get called in HandleOverlappingSyncData
archang19 Nov 15, 2024
794a7cd
Make test parameterized to cover async prefetch too
archang19 Nov 19, 2024
303fabe
Track count of calls to copy data to overlap buffer
archang19 Nov 19, 2024
1072926
Fix comment explaining why overlap buffer was not used with async pre…
archang19 Nov 19, 2024
b883c21
Add SyncPoint EnableProcessing to hopefully fix mac os tests
archang19 Nov 19, 2024
a6d1afb
Update unreleased_history
archang19 Nov 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 121 additions & 45 deletions file/file_prefetch_buffer.cc

Large diffs are not rendered by default.

111 changes: 90 additions & 21 deletions file/file_prefetch_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
#include <sstream>
#include <string>

#include "file/random_access_file_reader.h"
#include "file/readahead_file_info.h"
#include "file_util.h"
#include "monitoring/statistics_impl.h"
#include "port/port.h"
#include "rocksdb/env.h"
Expand Down Expand Up @@ -149,6 +151,9 @@ enum class FilePrefetchBufferUsage {
//
// If num_buffers_ == 1, it's a sequential read flow. Read API will be called on
// that one buffer whenever the data is requested and is not in the buffer.
// When reusing the file system allocated buffer, overlap_buf_ is used if the
// main buffer only contains part of the requested data. It is returned to
// the caller after the remaining data is fetched.
// If num_buffers_ > 1, then the data is prefetched asynchronosuly in the
// buffers whenever the data is consumed from the buffers and that buffer is
// freed.
Expand Down Expand Up @@ -206,10 +211,15 @@ class FilePrefetchBuffer {
assert((num_file_reads_ >= num_file_reads_for_auto_readahead_ + 1) ||
(num_file_reads_ == 0));

// If num_buffers_ > 1, data is asynchronously filled in the
// queue. As result, data can be overlapping in two buffers. It copies the
// data to overlap_buf_ in order to to return continuous buffer.
if (num_buffers_ > 1) {
// overlap_buf_ is used whenever the main buffer only has part of the
// requested data. The relevant data is copied into overlap_buf_ and the
// remaining data is copied in later to satisfy the user's request. This is
// used in both the synchronous (num_buffers_ = 1) and asynchronous
// (num_buffers_ > 1) cases. In the asynchronous case, the requested data
// may be spread out over 2 buffers.
if (num_buffers_ > 1 ||
(fs_ != nullptr &&
CheckFSFeatureSupport(fs_, FSSupportedOps::kFSBuffer))) {
overlap_buf_ = new BufferInfo();
}

Expand Down Expand Up @@ -379,12 +389,21 @@ class FilePrefetchBuffer {
void PrefetchAsyncCallback(FSReadRequest& req, void* cb_arg);

void TEST_GetBufferOffsetandSize(
std::vector<std::pair<uint64_t, size_t>>& buffer_info) {
std::vector<std::tuple<uint64_t, size_t, bool>>& buffer_info) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this third field to validate async prefetching, since otherwise I cannot distinguish between async_req_len and CurrentSize()

for (size_t i = 0; i < bufs_.size(); i++) {
buffer_info[i].first = bufs_[i]->offset_;
buffer_info[i].second = bufs_[i]->async_read_in_progress_
? bufs_[i]->async_req_len_
: bufs_[i]->CurrentSize();
std::get<0>(buffer_info[i]) = bufs_[i]->offset_;
std::get<1>(buffer_info[i]) = bufs_[i]->async_read_in_progress_
? bufs_[i]->async_req_len_
: bufs_[i]->CurrentSize();
std::get<2>(buffer_info[i]) = bufs_[i]->async_read_in_progress_;
}
}

void TEST_GetOverlapBufferOffsetandSize(
std::pair<uint64_t, size_t>& buffer_info) {
if (overlap_buf_ != nullptr) {
buffer_info.first = overlap_buf_->offset_;
buffer_info.second = overlap_buf_->CurrentSize();
}
}

Expand All @@ -394,7 +413,7 @@ class FilePrefetchBuffer {
// required.
void PrepareBufferForRead(BufferInfo* buf, size_t alignment, uint64_t offset,
size_t roundup_len, bool refit_tail,
uint64_t& aligned_useful_len);
bool use_fs_buffer, uint64_t& aligned_useful_len);

void AbortOutdatedIO(uint64_t offset);

Expand All @@ -418,7 +437,8 @@ class FilePrefetchBuffer {
uint64_t start_offset);

// Copy the data from src to overlap_buf_.
void CopyDataToBuffer(BufferInfo* src, uint64_t& offset, size_t& length);
void CopyDataToOverlapBuffer(BufferInfo* src, uint64_t& offset,
size_t& length);

bool IsBlockSequential(const size_t& offset) {
return (prev_len_ == 0 || (prev_offset_ + prev_len_ == offset));
Expand Down Expand Up @@ -465,6 +485,50 @@ class FilePrefetchBuffer {
return true;
}

// Whether we reuse the file system provided buffer
// Until we also handle the async read case, only enable this optimization
// for the synchronous case when num_buffers_ = 1.
bool UseFSBuffer(RandomAccessFileReader* reader) {
anand1976 marked this conversation as resolved.
Show resolved Hide resolved
return reader->file() != nullptr && !reader->use_direct_io() &&
fs_ != nullptr &&
CheckFSFeatureSupport(fs_, FSSupportedOps::kFSBuffer) &&
num_buffers_ == 1;
}

// When we are reusing the file system provided buffer, we are not concerned
// with alignment. However, quite a bit of prefetch code incorporates
// alignment, so we can put in 1 to keep the code simpler.
size_t GetRequiredBufferAlignment(RandomAccessFileReader* reader) {
if (UseFSBuffer(reader)) {
return 1;
}
return reader->file()->GetRequiredBufferAlignment();
}

// Reuses the file system allocated buffer to avoid an extra copy
IOStatus FSBufferDirectRead(RandomAccessFileReader* reader, BufferInfo* buf,
const IOOptions& opts, uint64_t offset, size_t n,
Slice& result) {
FSReadRequest read_req;
read_req.offset = offset;
read_req.len = n;
read_req.scratch = nullptr;
IOStatus s = reader->MultiRead(opts, &read_req, 1, nullptr);
if (!s.ok()) {
return s;
}
s = read_req.status;
if (!s.ok()) {
return s;
}
buf->buffer_.SetBuffer(read_req.result.size(),
std::move(read_req.fs_scratch));
buf->offset_ = offset;
buf->initial_end_offset_ = offset + read_req.result.size();
result = read_req.result;
return s;
}

void DestroyAndClearIOHandle(BufferInfo* buf) {
if (buf->io_handle_ != nullptr && buf->del_fn_ != nullptr) {
buf->del_fn_(buf->io_handle_);
Expand All @@ -474,11 +538,16 @@ class FilePrefetchBuffer {
buf->async_read_in_progress_ = false;
}

Status HandleOverlappingData(const IOOptions& opts,
RandomAccessFileReader* reader, uint64_t offset,
size_t length, size_t readahead_size,
bool& copy_to_third_buffer, uint64_t& tmp_offset,
size_t& tmp_length);
void HandleOverlappingSyncData(uint64_t offset, size_t length,
uint64_t& tmp_offset, size_t& tmp_length,
bool& use_overlap_buffer);

Status HandleOverlappingAsyncData(const IOOptions& opts,
RandomAccessFileReader* reader,
uint64_t offset, size_t length,
size_t readahead_size,
bool& copy_to_third_buffer,
uint64_t& tmp_offset, size_t& tmp_length);

bool TryReadFromCacheUntracked(const IOOptions& opts,
RandomAccessFileReader* reader,
Expand All @@ -487,11 +556,11 @@ class FilePrefetchBuffer {
bool for_compaction = false);

void ReadAheadSizeTuning(BufferInfo* buf, bool read_curr_block,
bool refit_tail, uint64_t prev_buf_end_offset,
size_t alignment, size_t length,
size_t readahead_size, uint64_t& offset,
uint64_t& end_offset, size_t& read_len,
uint64_t& aligned_useful_len);
bool refit_tail, bool use_fs_buffer,
uint64_t prev_buf_end_offset, size_t alignment,
size_t length, size_t readahead_size,
uint64_t& offset, uint64_t& end_offset,
size_t& read_len, uint64_t& aligned_useful_len);

void UpdateStats(bool found_in_buffer, size_t length_found) {
if (found_in_buffer) {
Expand Down
Loading
Loading