Skip to content

Commit

Permalink
[BugFix] Fix compaction task type has been changed in the execute que…
Browse files Browse the repository at this point in the history
…ue (#53083)

Signed-off-by: meegoo <[email protected]>
(cherry picked from commit b49bb87)
  • Loading branch information
meegoo authored and mergify[bot] committed Nov 21, 2024
1 parent 611f26d commit 4044332
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 0 deletions.
6 changes: 6 additions & 0 deletions be/src/storage/compaction_candidate.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ struct CompactionCandidate {
TabletSharedPtr tablet;
CompactionType type;
double score = 0;
bool force_cumulative = false;

CompactionCandidate() : tablet(nullptr), type(INVALID_COMPACTION) {}

Expand All @@ -40,25 +41,29 @@ struct CompactionCandidate {
tablet = other.tablet;
type = other.type;
score = other.score;
force_cumulative = other.force_cumulative;
}

CompactionCandidate& operator=(const CompactionCandidate& rhs) {
tablet = rhs.tablet;
type = rhs.type;
score = rhs.score;
force_cumulative = rhs.force_cumulative;
return *this;
}

CompactionCandidate(CompactionCandidate&& other) {
tablet = std::move(other.tablet);
type = other.type;
score = other.score;
force_cumulative = other.force_cumulative;
}

CompactionCandidate& operator=(CompactionCandidate&& rhs) {
tablet = std::move(rhs.tablet);
type = rhs.type;
score = rhs.score;
force_cumulative = rhs.force_cumulative;
return *this;
}

Expand All @@ -70,6 +75,7 @@ struct CompactionCandidate {
ss << "nullptr tablet";
}
ss << ", type:" << starrocks::to_string(type);
ss << ", force_cumulative:" << force_cumulative;
ss << ", score:" << score;
return ss.str();
}
Expand Down
16 changes: 16 additions & 0 deletions be/src/storage/compaction_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ void CompactionManager::_schedule() {
auto st = _compaction_pool->submit_func([compaction_candidate, task_id] {
auto compaction_task = compaction_candidate.tablet->create_compaction_task();
if (compaction_task != nullptr) {
if (compaction_task->compaction_type() == CompactionType::BASE_COMPACTION &&
compaction_candidate.force_cumulative) {
LOG(INFO) << "skip base compaction task " << task_id << " for tablet "
<< compaction_candidate.tablet->tablet_id() << " because force_cumulative is true";
return;
}
compaction_task->set_task_id(task_id);
compaction_task->start();
}
Expand Down Expand Up @@ -222,6 +228,15 @@ bool CompactionManager::_check_compaction_disabled(const CompactionCandidate& ca
return false;
}

void CompactionManager::_set_force_cumulative(CompactionCandidate* candidate) {
// In the pick_candidate stage, the task is for cumulative compaction. However, during the execution stage,
// it might turn into base compaction. Therefore, if base compaction is disabled,
// such tasks will be forced to execute as cumulative compaction only.
candidate->force_cumulative = _table_to_disable_deadline_map.find(candidate->tablet->tablet_meta()->table_id()) !=
_table_to_disable_deadline_map.end() &&
candidate->type == CompactionType::CUMULATIVE_COMPACTION;
}

bool CompactionManager::_check_precondition(const CompactionCandidate& candidate) {
if (!candidate.tablet) {
LOG(WARNING) << "candidate with null tablet";
Expand Down Expand Up @@ -306,6 +321,7 @@ bool CompactionManager::pick_candidate(CompactionCandidate* candidate) {
}
if (_check_precondition(*iter)) {
*candidate = *iter;
_set_force_cumulative(candidate);
_compaction_candidates.erase(iter);
_last_score = candidate->score;
if (candidate->type == CompactionType::BASE_COMPACTION) {
Expand Down
1 change: 1 addition & 0 deletions be/src/storage/compaction_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ class CompactionManager {
void _dispatch_worker();
bool _check_compaction_disabled(const CompactionCandidate& candidate);
bool _check_precondition(const CompactionCandidate& candidate);
void _set_force_cumulative(CompactionCandidate* candidate);
void _schedule();
void _notify();
// wait until current running tasks are below max_concurrent_num
Expand Down
42 changes: 42 additions & 0 deletions be/test/storage/compaction_manager_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,48 @@ TEST_F(CompactionManagerTest, test_remove_disable_compaction) {
}
}

TEST_F(CompactionManagerTest, test_force_cumulative_compaction) {
std::vector<CompactionCandidate> candidates;
DataDir data_dir("./data_dir");
for (int i = 0; i < 10; i++) {
TabletSharedPtr tablet = std::make_shared<Tablet>();
TabletMetaSharedPtr tablet_meta = std::make_shared<TabletMeta>();
tablet_meta->set_tablet_id(i);
tablet_meta->TEST_set_table_id(3);
tablet->set_tablet_meta(tablet_meta);
tablet->set_data_dir(&data_dir);
tablet->set_tablet_state(TABLET_RUNNING);

CompactionCandidate candidate;
candidate.tablet = tablet;
candidate.score = i;
candidate.type = CUMULATIVE_COMPACTION;
candidates.push_back(candidate);
}

std::random_device rd;
std::mt19937 g(rd());
std::shuffle(candidates.begin(), candidates.end(), g);

_engine->compaction_manager()->update_candidates(candidates);

_engine->compaction_manager()->disable_table_compaction(3, UnixSeconds() + 5);

{
int64_t valid_condidates = 0;
while (true) {
CompactionCandidate candidate;
auto valid = _engine->compaction_manager()->pick_candidate(&candidate);
if (!valid) {
break;
}
++valid_condidates;
ASSERT_EQ(true, candidate.force_cumulative);
}
ASSERT_EQ(10, valid_condidates);
}
}

class MockCompactionTask : public CompactionTask {
public:
MockCompactionTask() : CompactionTask(HORIZONTAL_COMPACTION) {}
Expand Down

0 comments on commit 4044332

Please sign in to comment.