Skip to content

Commit

Permalink
Add compaction stats for filtered files (#13136)
Browse files Browse the repository at this point in the history
Summary:
As titled. This PR adds some compaction job stats, internal stats and some logging for filtered files.

Example logging:
[default] compacted to: files[0 0 0 0 2 0 0] max score 0.25, estimated pending compaction bytes 0, MB/sec: 0.3 rd, 0.2 wr, level 6, files in(1, 0) filtered(0, 2) out(1 +0 blob) MB in(0.0, 0.0 +0.0 blob) filtered(0.0, 0.0) out(0.0 +0.0 blob), read-write-amplify(2.0) write-amplify(1.0) OK, records in: 1, records dropped: 1 output_compression: Snappy

Pull Request resolved: #13136

Test Plan: Added unit tests

Reviewed By: cbi42

Differential Revision: D65855380

Pulled By: jowlyzhang

fbshipit-source-id: a4d8eef66f8d999ca5c3d9472aeeae98d7bb03ab
  • Loading branch information
jowlyzhang authored and facebook-github-bot committed Nov 14, 2024
1 parent 9a136e1 commit ef119c9
Show file tree
Hide file tree
Showing 8 changed files with 294 additions and 13 deletions.
4 changes: 3 additions & 1 deletion db/compaction/compaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -368,9 +368,10 @@ Compaction::Compaction(
}
#endif

// setup input_levels_
// setup input_levels_ and filtered_input_levels_
{
input_levels_.resize(num_input_levels());
filtered_input_levels_.resize(num_input_levels());
if (earliest_snapshot_.has_value()) {
FilterInputsForCompactionIterator();
} else {
Expand Down Expand Up @@ -1085,6 +1086,7 @@ void Compaction::FilterInputsForCompactionIterator() {
ucmp->CompareWithoutTimestamp(rangedel_end_ukey,
file->largest.user_key()) > 0) {
non_start_level_input_files_filtered_.back().back() = true;
filtered_input_levels_[level].push_back(file);
} else {
non_start_level_input_files.back().push_back(file);
}
Expand Down
17 changes: 15 additions & 2 deletions db/compaction/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,16 @@ class Compaction {
return &input_levels_[compaction_input_level];
}

// Returns the filtered input files of the specified compaction input level.
// For now, only non start level is filtered.
const std::vector<FileMetaData*>& filtered_input_levels(
size_t compaction_input_level) const {
const std::vector<FileMetaData*>& filtered_input_level =
filtered_input_levels_[compaction_input_level];
assert(compaction_input_level != 0 || filtered_input_level.size() == 0);
return filtered_input_level;
}

// Maximum size of files to build during this compaction.
uint64_t max_output_file_size() const { return max_output_file_size_; }

Expand Down Expand Up @@ -545,10 +555,13 @@ class Compaction {

// Markers for which non start level input files are filtered out if
// applicable. Only applicable if earliest_snapshot_ is provided and input
// start level has a standalone range deletion file.
// start level has a standalone range deletion file. Filtered files are
// tracked in `filtered_input_levels_`.
std::vector<std::vector<bool>> non_start_level_input_files_filtered_;

// bool standalone_range_tombstones_used_for_filtering_inputs_;
// All files from inputs_ that are filtered.
std::vector<std::vector<FileMetaData*>> filtered_input_levels_;

const double score_; // score that was used to pick this compaction.

// Is this compaction creating a file in the bottom most level?
Expand Down
39 changes: 33 additions & 6 deletions db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -911,19 +911,23 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options,
ROCKS_LOG_BUFFER(
log_buffer_,
"[%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, "
"files in(%d, %d) out(%d +%d blob) "
"MB in(%.1f, %.1f +%.1f blob) out(%.1f +%.1f blob), "
"files in(%d, %d) filtered(%d, %d) out(%d +%d blob) "
"MB in(%.1f, %.1f +%.1f blob) filtered(%.1f, %.1f) out(%.1f +%.1f blob), "
"read-write-amplify(%.1f) write-amplify(%.1f) %s, records in: %" PRIu64
", records dropped: %" PRIu64 " output_compression: %s\n",
column_family_name.c_str(), vstorage->LevelSummary(&tmp),
bytes_read_per_sec, bytes_written_per_sec,
compact_->compaction->output_level(),
stats.num_input_files_in_non_output_levels,
stats.num_input_files_in_output_level, stats.num_output_files,
stats.num_input_files_in_output_level,
stats.num_filtered_input_files_in_non_output_levels,
stats.num_filtered_input_files_in_output_level, stats.num_output_files,
stats.num_output_files_blob, stats.bytes_read_non_output_levels / kMB,
stats.bytes_read_output_level / kMB, stats.bytes_read_blob / kMB,
stats.bytes_written / kMB, stats.bytes_written_blob / kMB, read_write_amp,
write_amp, status.ToString().c_str(), stats.num_input_records,
stats.bytes_skipped_non_output_levels / kMB,
stats.bytes_skipped_output_level / kMB, stats.bytes_written / kMB,
stats.bytes_written_blob / kMB, read_write_amp, write_amp,
status.ToString().c_str(), stats.num_input_records,
stats.num_dropped_records,
CompressionTypeToString(compact_->compaction->output_compression())
.c_str());
Expand Down Expand Up @@ -2007,7 +2011,6 @@ bool CompactionJob::UpdateCompactionStats(uint64_t* num_input_range_del) {
bool has_error = false;
const ReadOptions read_options(Env::IOActivity::kCompaction);
const auto& input_table_properties = compaction->GetInputTableProperties();
// TODO(yuzhangyu): add dedicated stats for filtered files.
for (int input_level = 0;
input_level < static_cast<int>(compaction->num_input_levels());
++input_level) {
Expand Down Expand Up @@ -2047,6 +2050,23 @@ bool CompactionJob::UpdateCompactionStats(uint64_t* num_input_range_del) {
*num_input_range_del += file_num_range_del;
}
}

const std::vector<FileMetaData*>& filtered_flevel =
compaction->filtered_input_levels(input_level);
size_t num_filtered_input_files = filtered_flevel.size();
uint64_t* bytes_skipped;
if (compaction->level(input_level) != compaction->output_level()) {
compaction_stats_.stats.num_filtered_input_files_in_non_output_levels +=
static_cast<int>(num_filtered_input_files);
bytes_skipped = &compaction_stats_.stats.bytes_skipped_non_output_levels;
} else {
compaction_stats_.stats.num_filtered_input_files_in_output_level +=
static_cast<int>(num_filtered_input_files);
bytes_skipped = &compaction_stats_.stats.bytes_skipped_output_level;
}
for (const FileMetaData* filtered_file_meta : filtered_flevel) {
*bytes_skipped += filtered_file_meta->fd.GetFileSize();
}
}

assert(compaction_job_stats_);
Expand All @@ -2071,6 +2091,13 @@ void CompactionJob::UpdateCompactionJobStats(
stats.num_input_files_in_output_level;
compaction_job_stats_->num_input_files_at_output_level =
stats.num_input_files_in_output_level;
compaction_job_stats_->num_filtered_input_files =
stats.num_filtered_input_files_in_non_output_levels +
stats.num_filtered_input_files_in_output_level;
compaction_job_stats_->num_filtered_input_files_at_output_level =
stats.num_filtered_input_files_in_output_level;
compaction_job_stats_->total_skipped_input_bytes =
stats.bytes_skipped_non_output_levels + stats.bytes_skipped_output_level;

// output information
compaction_job_stats_->total_output_bytes = stats.bytes_written;
Expand Down
3 changes: 1 addition & 2 deletions db/compaction/compaction_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,7 @@ class CompactionJob {
virtual void RecordCompactionIOStats();
void CleanupCompaction();

// Call compaction filter. Then iterate through input and compact the
// kv-pairs
// Iterate through input and compact the kv-pairs.
void ProcessKeyValueCompaction(SubcompactionState* sub_compact);

CompactionState* compact_;
Expand Down
Loading

0 comments on commit ef119c9

Please sign in to comment.