Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] Fix repeated migration of colocate tablets #53099

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,10 @@ private List<TabletSchedCtx> balanceClusterDisk(ClusterLoadStatistic clusterStat
continue;
}

if (!olapTable.needSchedule(false)) {
continue;
}

if (isDestBackendLocationMismatch(olapTable, hBackend.getId(), lBackend.getId(),
physicalPartition.getParentId(), tabletId)) {
continue;
Expand Down Expand Up @@ -799,6 +803,11 @@ private void balanceBackendDisk(TStorageMedium medium, double avgUsedPercent,
if (olapTable == null) {
continue;
}

if (!olapTable.needSchedule(true)) {
continue;
}

// check tablet healthy
if (isTabletUnhealthy(tabletMeta.getDbId(), olapTable, tabletId, tabletMeta, aliveBeIds)) {
continue;
Expand Down Expand Up @@ -1646,6 +1655,10 @@ private Map<Pair<Long, Long>, PartitionStat> getPartitionStats(TStorageMedium me
continue;
}

if (!olapTbl.needSchedule(isLocalBalance)) {
continue;
}

for (Partition partition : globalStateMgr.getLocalMetastore().getAllPartitionsIncludeRecycleBin(olapTbl)) {
partitionChecked++;
if (partitionChecked % partitionBatchNum == 0) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The most risky bug in this code is:
The changes are adding calls to olapTable.needSchedule() and olapTbl.needSchedule(), leading to skipped iterations when scheduling is not needed. If needSchedule logic is incorrect, or if skipping these parts without proper handling causes data imbalance or system performance issues, it may introduce a risk of improperly balanced clusters or missed scheduling opportunities.

You can modify the code like this:

// Assess if the conditions under which 'needSchedule' returns false might lead to critical operations being skipped inadvertently,
// and ensure that either strategic logging or alternative operations handle such cases to preserve cluster health.
private List<TabletSchedCtx> balanceClusterDisk(ClusterLoadStatistic clusterStat) {
    for (/* iteration variables */) {
        if (!olapTable.needSchedule(false)) {
            // Add logging or checks to confirm skipping doesn't cause issues
            continue;
        }
        if (isDestBackendLocationMismatch(olapTable, hBackend.getId(), lBackend.getId(),
                physicalPartition.getParentId(), tabletId)) {
            continue;
        }
        // rest of the function...
    }
}

private void balanceBackendDisk(TStorageMedium medium, double avgUsedPercent) {
    for (/* iteration variables */) {
        if (olapTable == null) {
            continue;
        }
        if (!olapTable.needSchedule(true)) {
            // Add logging or checks to assess impact
            continue;
        }
        if (isTabletUnhealthy(tabletMeta.getDbId(), olapTable, tabletId, tabletMeta, aliveBeIds)) {
            continue;
        }
        // rest of the function...
    }
}

private Map<Pair<Long, Long>, PartitionStat> getPartitionStats(TStorageMedium medium, boolean isLocalBalance) {
    for (/* iteration variables */) {
        if (!olapTbl.needSchedule(isLocalBalance)) {
            // Consider implementing fallback checks here
            continue;
        }
        for (Partition partition : globalStateMgr.getLocalMetastore().getAllPartitionsIncludeRecycleBin(olapTbl)) {
            partitionChecked++;
            if (partitionChecked % partitionBatchNum == 0) {
                // additional logic...
            }
        }
        // rest of the function...
    }
}

Ensure you validate whether operations that are bypassed should still partially execute or trigger alerts/logs for evaluation.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.starrocks.catalog.ColocateTableIndex;
import com.starrocks.catalog.DataProperty;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.DiskInfo;
Expand All @@ -37,6 +38,7 @@
import com.starrocks.catalog.TabletMeta;
import com.starrocks.clone.DiskAndTabletLoadReBalancer.BackendBalanceState;
import com.starrocks.common.Config;
import com.starrocks.common.DdlException;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.system.Backend;
import com.starrocks.system.SystemInfoService;
Expand Down Expand Up @@ -207,6 +209,132 @@ public void testBalance(@Mocked GlobalStateMgr globalStateMgr) {
Assert.assertEquals(0, rebalancer.selectAlternativeTablets().size());
}

@Test
public void testBalanceOfColocateTable(@Mocked GlobalStateMgr globalStateMgr) throws DdlException {
// system info
long dbId = 10001L;
long tableId = 10002L;
long partitionId = 10003L;
long indexId = 10004L;
long physicalPartitionId = 10005L;
long tabletDataSize = 200 * 1024 * 1024L;
TStorageMedium medium = TStorageMedium.HDD;
long beId1 = 1L;
long beId2 = 2L;
long beId3 = 3L;
long pathHash1 = 1111L;
long pathHash2 = 2222L;
long pathHash3 = 3333L;

SystemInfoService infoService = new SystemInfoService();

infoService.addBackend(genBackend(beId1, "host1", 2 * tabletDataSize,
3 * tabletDataSize, 5 * tabletDataSize, pathHash1));

infoService.addBackend(genBackend(beId2, "host2", 2 * tabletDataSize,
3 * tabletDataSize, 5 * tabletDataSize, pathHash2));

infoService.addBackend(genBackend(beId3, "host3", 5 * tabletDataSize,
0, 5 * tabletDataSize, pathHash3));

// tablet inverted index
TabletInvertedIndex invertedIndex = new TabletInvertedIndex();
MaterializedIndex materializedIndex = new MaterializedIndex(indexId, IndexState.NORMAL);
addTablet(invertedIndex, materializedIndex, TStorageMedium.HDD, dbId, tableId, partitionId, indexId, 20001L,
30001L, beId1,
tabletDataSize, pathHash1);
addTablet(invertedIndex, materializedIndex, TStorageMedium.HDD, dbId, tableId, partitionId, indexId, 20002L,
30002L, beId1,
tabletDataSize, pathHash1);
addTablet(invertedIndex, materializedIndex, TStorageMedium.HDD, dbId, tableId, partitionId, indexId, 20003L,
30003L, beId1,
tabletDataSize, pathHash1);
addTablet(invertedIndex, materializedIndex, TStorageMedium.HDD, dbId, tableId, partitionId, indexId, 20004L,
30004L, beId2,
tabletDataSize, pathHash2);
addTablet(invertedIndex, materializedIndex, TStorageMedium.HDD, dbId, tableId, partitionId, indexId, 20005L,
30005L, beId2,
tabletDataSize, pathHash2);
addTablet(invertedIndex, materializedIndex, TStorageMedium.HDD, dbId, tableId, partitionId, indexId, 20006L,
30006L, beId2,
tabletDataSize, pathHash2);

ClusterLoadStatistic clusterLoadStatistic = new ClusterLoadStatistic(infoService, invertedIndex);
clusterLoadStatistic.init();

PartitionInfo partitionInfo = new PartitionInfo();
DataProperty dataProperty = new DataProperty(medium);
partitionInfo.addPartition(partitionId, dataProperty, (short) 1, false);
DistributionInfo distributionInfo = new HashDistributionInfo(6, Lists.newArrayList());
Partition partition = new Partition(partitionId, partitionId, "partition", materializedIndex, distributionInfo);
OlapTable table = new OlapTable(tableId, "table", Lists.newArrayList(), KeysType.AGG_KEYS, partitionInfo,
distributionInfo);
table.addPartition(partition);
Database database = new Database(dbId, "database");
database.registerTableUnlocked(table);

ColocateTableIndex colocateTableIndex = new ColocateTableIndex();
colocateTableIndex.addTableToGroup(database, table, "test_group", false);

PhysicalPartition physicalPartition = new PhysicalPartition(physicalPartitionId, "partition", partitionId,
materializedIndex);

new Expectations() {
{
GlobalStateMgr.getCurrentState();
result = globalStateMgr;
minTimes = 0;

GlobalStateMgr.getCurrentState().getLocalMetastore().getDbIdsIncludeRecycleBin();
result = Lists.newArrayList(dbId);
minTimes = 0;

GlobalStateMgr.getCurrentState().getLocalMetastore().getDbIncludeRecycleBin(dbId);
result = database;
minTimes = 0;

GlobalStateMgr.getCurrentState().getLocalMetastore().getTableIncludeRecycleBin((Database) any, anyLong);
result = table;
minTimes = 0;

GlobalStateMgr.getCurrentState().getLocalMetastore().getTablesIncludeRecycleBin((Database) any);
result = Lists.newArrayList(table);
minTimes = 0;

GlobalStateMgr.getCurrentState().getLocalMetastore()
.getPhysicalPartitionIncludeRecycleBin((OlapTable) any, physicalPartitionId);
result = physicalPartition;
minTimes = 0;

GlobalStateMgr.getCurrentState().getLocalMetastore().getAllPartitionsIncludeRecycleBin((OlapTable) any);
result = Lists.newArrayList(partition);
minTimes = 0;

GlobalStateMgr.getCurrentState().getLocalMetastore()
.getReplicationNumIncludeRecycleBin((PartitionInfo) any, anyLong);
result = (short) 1;
minTimes = 0;

GlobalStateMgr.getCurrentState().getLocalMetastore()
.getDataPropertyIncludeRecycleBin((PartitionInfo) any, anyLong);
result = dataProperty;
minTimes = 0;

GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo();
result = infoService;
minTimes = 0;

GlobalStateMgr.getCurrentState().getTabletInvertedIndex();
result = invertedIndex;
minTimes = 0;
}
};

Rebalancer rebalancer = new DiskAndTabletLoadReBalancer();
rebalancer.updateLoadStatistic(clusterLoadStatistic);
Assert.assertEquals(0, rebalancer.selectAlternativeTablets().size());
}

/**
* init state:
* 1 partition with 3 tablet, 3 replica number
Expand Down Expand Up @@ -586,6 +714,186 @@ public void testBalanceBackendTablet(@Mocked GlobalStateMgr globalStateMgr) {
Assert.assertEquals(0, rebalancer.selectAlternativeTablets().size());
}

@Test
public void testBalanceBackendTabletOfColocate(@Mocked GlobalStateMgr globalStateMgr) throws DdlException {
// system info
long dbId = 10001L;
long tableId = 10002L;
long partitionId1 = 10010L;
long partitionId2 = 10011L;
long indexId = 10003L;
long physicalPartitionId1 = 10004L;
long physicalPartitionId2 = 10005L;
long tabletDataSize = 200 * 1024 * 1024L;
long beId1 = 1L;
long beId2 = 2L;
long pathHash10 = 10L;
long pathHash11 = 11L;
long pathHash12 = 12L;
long pathHash13 = 13L;
long pathHash14 = 14L;
long pathHash20 = 20L;
long pathHash21 = 21L;

Backend be1 = genBackend(beId1, "host1", 2 * tabletDataSize,
2 * tabletDataSize, 4 * tabletDataSize, pathHash10);
DiskInfo disk10 = genDiskInfo(2 * tabletDataSize, 2 * tabletDataSize,
4 * tabletDataSize, "/data10", pathHash10, TStorageMedium.HDD);
DiskInfo disk11 = genDiskInfo(3 * tabletDataSize, 1 * tabletDataSize,
4 * tabletDataSize, "/data11", pathHash11, TStorageMedium.HDD);
DiskInfo disk12 = genDiskInfo(4 * tabletDataSize, 0 * tabletDataSize,
4 * tabletDataSize, "/data12", pathHash12, TStorageMedium.HDD);
DiskInfo disk13 = genDiskInfo(2 * tabletDataSize, 2 * tabletDataSize,
4 * tabletDataSize, "/data13", pathHash13, TStorageMedium.SSD);
DiskInfo disk14 = genDiskInfo(4 * tabletDataSize, 0 * tabletDataSize,
4 * tabletDataSize, "/data14", pathHash14, TStorageMedium.SSD);
Map<String, DiskInfo> diskInfoMap1 = Maps.newHashMap();
diskInfoMap1.put(disk10.getRootPath(), disk10);
diskInfoMap1.put(disk11.getRootPath(), disk11);
diskInfoMap1.put(disk12.getRootPath(), disk12);
diskInfoMap1.put(disk13.getRootPath(), disk13);
diskInfoMap1.put(disk14.getRootPath(), disk14);
be1.setDisks(ImmutableMap.copyOf(diskInfoMap1));

Backend be2 = genBackend(beId2, "host2", 6 * tabletDataSize,
2 * tabletDataSize, 8 * tabletDataSize, pathHash20);
DiskInfo disk20 = genDiskInfo(6 * tabletDataSize, 2 * tabletDataSize,
8 * tabletDataSize, "/data20", pathHash20, TStorageMedium.HDD);
DiskInfo disk21 = genDiskInfo(9 * tabletDataSize, 3 * tabletDataSize,
12 * tabletDataSize, "/data21", pathHash21, TStorageMedium.SSD);
Map<String, DiskInfo> diskInfoMap2 = Maps.newHashMap();
diskInfoMap2.put(disk20.getRootPath(), disk20);
diskInfoMap2.put(disk21.getRootPath(), disk21);
be2.setDisks(ImmutableMap.copyOf(diskInfoMap2));

SystemInfoService infoService = new SystemInfoService();
infoService.addBackend(be1);
infoService.addBackend(be2);

// tablet inverted index
TabletInvertedIndex invertedIndex = new TabletInvertedIndex();
MaterializedIndex materializedIndex = new MaterializedIndex(indexId, IndexState.NORMAL);
addTablet(invertedIndex, materializedIndex, TStorageMedium.HDD, dbId, tableId, physicalPartitionId1, indexId,
20001L, 30001L, beId1, tabletDataSize, pathHash10);
addTablet(invertedIndex, materializedIndex, TStorageMedium.HDD, dbId, tableId, physicalPartitionId1, indexId,
20002L, 30002L, beId1, tabletDataSize, pathHash10);
addTablet(invertedIndex, materializedIndex, TStorageMedium.HDD, dbId, tableId, physicalPartitionId1, indexId,
20003L, 30003L, beId1, tabletDataSize, pathHash11);
addTablet(invertedIndex, materializedIndex, TStorageMedium.HDD, dbId, tableId, physicalPartitionId1, indexId,
20004L, 30004L, beId2, tabletDataSize, pathHash20);
addTablet(invertedIndex, materializedIndex, TStorageMedium.HDD, dbId, tableId, physicalPartitionId1, indexId,
20005L, 30005L, beId2, tabletDataSize, pathHash20);

addTablet(invertedIndex, materializedIndex, TStorageMedium.SSD, dbId, tableId, physicalPartitionId2, indexId,
20006L, 30006L, beId1, tabletDataSize, pathHash13);
addTablet(invertedIndex, materializedIndex, TStorageMedium.SSD, dbId, tableId, physicalPartitionId2, indexId,
20007L, 30007L, beId1, tabletDataSize, pathHash13);
addTablet(invertedIndex, materializedIndex, TStorageMedium.SSD, dbId, tableId, physicalPartitionId2, indexId,
20008L, 30008L, beId2, tabletDataSize, pathHash21);
addTablet(invertedIndex, materializedIndex, TStorageMedium.SSD, dbId, tableId, physicalPartitionId2, indexId,
20009L, 30009L, beId2, tabletDataSize, pathHash21);
addTablet(invertedIndex, materializedIndex, TStorageMedium.SSD, dbId, tableId, physicalPartitionId2, indexId,
20010L, 30010L, beId2, tabletDataSize, pathHash21);

ClusterLoadStatistic clusterLoadStatistic = new ClusterLoadStatistic(infoService, invertedIndex);
clusterLoadStatistic.init();

PartitionInfo partitionInfo = new PartitionInfo();
DataProperty dataProperty1 = new DataProperty(TStorageMedium.HDD);
partitionInfo.addPartition(partitionId1, dataProperty1, (short) 1, false);
DataProperty dataProperty2 = new DataProperty(TStorageMedium.SSD);
partitionInfo.addPartition(partitionId2, dataProperty2, (short) 1, false);
DistributionInfo distributionInfo = new HashDistributionInfo(6, Lists.newArrayList());
Partition partition1 = new Partition(partitionId1, physicalPartitionId1, "partition1", materializedIndex,
distributionInfo);
Partition partition2 = new Partition(partitionId2, physicalPartitionId2, "partition2", materializedIndex,
distributionInfo);
OlapTable table = new OlapTable(tableId, "table", Lists.newArrayList(), KeysType.AGG_KEYS, partitionInfo,
distributionInfo);
table.addPartition(partition1);
table.addPartition(partition2);
Database database = new Database(dbId, "database");
database.registerTableUnlocked(table);

ColocateTableIndex colocateTableIndex = new ColocateTableIndex();
colocateTableIndex.addTableToGroup(database, table, "test_group", false);

PhysicalPartition physicalPartition1 = new PhysicalPartition(physicalPartitionId1, "partition", partitionId1,
materializedIndex);
PhysicalPartition physicalPartition2 = new PhysicalPartition(physicalPartitionId2, "partition", partitionId2,
materializedIndex);

new Expectations() {
{
GlobalStateMgr.getCurrentState();
result = globalStateMgr;
minTimes = 0;

GlobalStateMgr.getCurrentState().getLocalMetastore().getDbIdsIncludeRecycleBin();
result = Lists.newArrayList(dbId);
minTimes = 0;

GlobalStateMgr.getCurrentState().getLocalMetastore().getDbIncludeRecycleBin(dbId);
result = database;
minTimes = 0;

GlobalStateMgr.getCurrentState().getLocalMetastore().getTableIncludeRecycleBin((Database) any, anyLong);
result = table;
minTimes = 0;

GlobalStateMgr.getCurrentState().getLocalMetastore().getTablesIncludeRecycleBin((Database) any);
result = Lists.newArrayList(table);
minTimes = 0;

GlobalStateMgr.getCurrentState().getLocalMetastore()
.getPhysicalPartitionIncludeRecycleBin((OlapTable) any, physicalPartitionId1);
result = physicalPartition1;
minTimes = 0;

GlobalStateMgr.getCurrentState().getLocalMetastore()
.getPhysicalPartitionIncludeRecycleBin((OlapTable) any, physicalPartitionId2);
result = physicalPartition2;
minTimes = 0;

GlobalStateMgr.getCurrentState().getLocalMetastore().getAllPartitionsIncludeRecycleBin((OlapTable) any);
result = Lists.newArrayList(partition1, partition2);
minTimes = 0;

GlobalStateMgr.getCurrentState().getLocalMetastore()
.getReplicationNumIncludeRecycleBin((PartitionInfo) any, anyLong);
result = (short) 1;
minTimes = 0;

GlobalStateMgr.getCurrentState().getLocalMetastore()
.getDataPropertyIncludeRecycleBin((PartitionInfo) any, partitionId1);
result = dataProperty1;
minTimes = 0;

GlobalStateMgr.getCurrentState().getLocalMetastore()
.getDataPropertyIncludeRecycleBin((PartitionInfo) any, partitionId2);
result = dataProperty2;
minTimes = 0;

GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo();
result = infoService;
minTimes = 0;

GlobalStateMgr.getCurrentState().getTabletInvertedIndex();
result = invertedIndex;
minTimes = 0;
}
};

Rebalancer rebalancer = new DiskAndTabletLoadReBalancer();
rebalancer.updateLoadStatistic(clusterLoadStatistic);

// set Config.balance_load_disk_safe_threshold to 0.4 to trigger backend disk balance
Config.tablet_sched_balance_load_disk_safe_threshold = 0.4;
Config.storage_usage_soft_limit_reserve_bytes = 1;
List<TabletSchedCtx> tablets = rebalancer.selectAlternativeTablets();
Assert.assertEquals(2, tablets.size());
}

private Backend genBackend(long beId, String host, long availableCapB, long dataUsedCapB, long totalCapB,
long pathHash) {
Backend backend = new Backend(beId, host, 0);
Expand Down
Loading