Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add compaction stats for filtered files #13136

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion db/compaction/compaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -368,9 +368,10 @@ Compaction::Compaction(
}
#endif

// setup input_levels_
// setup input_levels_ and filtered_input_levels_
{
input_levels_.resize(num_input_levels());
filtered_input_levels_.resize(num_input_levels());
if (earliest_snapshot_.has_value()) {
FilterInputsForCompactionIterator();
} else {
Expand Down Expand Up @@ -1085,6 +1086,7 @@ void Compaction::FilterInputsForCompactionIterator() {
ucmp->CompareWithoutTimestamp(rangedel_end_ukey,
file->largest.user_key()) > 0) {
non_start_level_input_files_filtered_.back().back() = true;
filtered_input_levels_[level].push_back(file);
} else {
non_start_level_input_files.back().push_back(file);
}
Expand Down
17 changes: 15 additions & 2 deletions db/compaction/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,16 @@ class Compaction {
return &input_levels_[compaction_input_level];
}

// Returns the filtered input files of the specified compaction input level.
// For now, only non start level is filtered.
const std::vector<FileMetaData*>& filtered_input_levels(
size_t compaction_input_level) const {
const std::vector<FileMetaData*>& filtered_input_level =
filtered_input_levels_[compaction_input_level];
assert(compaction_input_level != 0 || filtered_input_level.size() == 0);
return filtered_input_level;
}

// Maximum size of files to build during this compaction.
uint64_t max_output_file_size() const { return max_output_file_size_; }

Expand Down Expand Up @@ -545,10 +555,13 @@ class Compaction {

// Markers for which non start level input files are filtered out if
// applicable. Only applicable if earliest_snapshot_ is provided and input
// start level has a standalone range deletion file.
// start level has a standalone range deletion file. Filtered files are
// tracked in `filtered_input_levels_`.
std::vector<std::vector<bool>> non_start_level_input_files_filtered_;

// bool standalone_range_tombstones_used_for_filtering_inputs_;
// All files from inputs_ that are filtered.
std::vector<std::vector<FileMetaData*>> filtered_input_levels_;

const double score_; // score that was used to pick this compaction.

// Is this compaction creating a file in the bottom most level?
Expand Down
39 changes: 33 additions & 6 deletions db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -911,19 +911,23 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options,
ROCKS_LOG_BUFFER(
log_buffer_,
"[%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, "
"files in(%d, %d) out(%d +%d blob) "
"MB in(%.1f, %.1f +%.1f blob) out(%.1f +%.1f blob), "
"files in(%d, %d) filtered(%d, %d) out(%d +%d blob) "
"MB in(%.1f, %.1f +%.1f blob) filtered(%.1f, %.1f) out(%.1f +%.1f blob), "
"read-write-amplify(%.1f) write-amplify(%.1f) %s, records in: %" PRIu64
", records dropped: %" PRIu64 " output_compression: %s\n",
column_family_name.c_str(), vstorage->LevelSummary(&tmp),
bytes_read_per_sec, bytes_written_per_sec,
compact_->compaction->output_level(),
stats.num_input_files_in_non_output_levels,
stats.num_input_files_in_output_level, stats.num_output_files,
stats.num_input_files_in_output_level,
stats.num_filtered_input_files_in_non_output_levels,
stats.num_filtered_input_files_in_output_level, stats.num_output_files,
stats.num_output_files_blob, stats.bytes_read_non_output_levels / kMB,
stats.bytes_read_output_level / kMB, stats.bytes_read_blob / kMB,
stats.bytes_written / kMB, stats.bytes_written_blob / kMB, read_write_amp,
write_amp, status.ToString().c_str(), stats.num_input_records,
stats.bytes_skipped_non_output_levels / kMB,
stats.bytes_skipped_output_level / kMB, stats.bytes_written / kMB,
stats.bytes_written_blob / kMB, read_write_amp, write_amp,
status.ToString().c_str(), stats.num_input_records,
stats.num_dropped_records,
CompressionTypeToString(compact_->compaction->output_compression())
.c_str());
Expand Down Expand Up @@ -2007,7 +2011,6 @@ bool CompactionJob::UpdateCompactionStats(uint64_t* num_input_range_del) {
bool has_error = false;
const ReadOptions read_options(Env::IOActivity::kCompaction);
const auto& input_table_properties = compaction->GetInputTableProperties();
// TODO(yuzhangyu): add dedicated stats for filtered files.
for (int input_level = 0;
input_level < static_cast<int>(compaction->num_input_levels());
++input_level) {
Expand Down Expand Up @@ -2047,6 +2050,23 @@ bool CompactionJob::UpdateCompactionStats(uint64_t* num_input_range_del) {
*num_input_range_del += file_num_range_del;
}
}

const std::vector<FileMetaData*>& filtered_flevel =
compaction->filtered_input_levels(input_level);
size_t num_filtered_input_files = filtered_flevel.size();
uint64_t* bytes_skipped;
if (compaction->level(input_level) != compaction->output_level()) {
compaction_stats_.stats.num_filtered_input_files_in_non_output_levels +=
static_cast<int>(num_filtered_input_files);
bytes_skipped = &compaction_stats_.stats.bytes_skipped_non_output_levels;
} else {
compaction_stats_.stats.num_filtered_input_files_in_output_level +=
static_cast<int>(num_filtered_input_files);
bytes_skipped = &compaction_stats_.stats.bytes_skipped_output_level;
}
for (const FileMetaData* filtered_file_meta : filtered_flevel) {
*bytes_skipped += filtered_file_meta->fd.GetFileSize();
}
}

assert(compaction_job_stats_);
Expand All @@ -2071,6 +2091,13 @@ void CompactionJob::UpdateCompactionJobStats(
stats.num_input_files_in_output_level;
compaction_job_stats_->num_input_files_at_output_level =
stats.num_input_files_in_output_level;
compaction_job_stats_->num_filtered_input_files =
stats.num_filtered_input_files_in_non_output_levels +
stats.num_filtered_input_files_in_output_level;
compaction_job_stats_->num_filtered_input_files_at_output_level =
stats.num_filtered_input_files_in_output_level;
compaction_job_stats_->total_skipped_input_bytes =
stats.bytes_skipped_non_output_levels + stats.bytes_skipped_output_level;

// output information
compaction_job_stats_->total_output_bytes = stats.bytes_written;
Expand Down
3 changes: 1 addition & 2 deletions db/compaction/compaction_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,7 @@ class CompactionJob {
virtual void RecordCompactionIOStats();
void CleanupCompaction();

// Call compaction filter. Then iterate through input and compact the
// kv-pairs
// Iterate through input and compact the kv-pairs.
void ProcessKeyValueCompaction(SubcompactionState* sub_compact);

CompactionState* compact_;
Expand Down
Loading
Loading