Skip to content

Commit

Permalink
This is an automated cherry-pick of #7852
Browse files Browse the repository at this point in the history
close #7848

Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
lhy1024 authored and ti-chi-bot committed Apr 12, 2024
1 parent 87bed04 commit 25469b5
Show file tree
Hide file tree
Showing 4 changed files with 265 additions and 37 deletions.
15 changes: 15 additions & 0 deletions metrics/grafana/pd.json
Original file line number Diff line number Diff line change
Expand Up @@ -5595,6 +5595,21 @@
"intervalFactor": 1,
"legendFormat": "store-{{store}}-in",
"refId": "B"
},
{
"expr": "- sum(delta(pd_scheduler_hot_region_direction{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", store=~\"$store\",type=\"move-leader\",direction=\"out\",rw=\"write\"}[1m]))by (store)",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "store-{{store}}-out",
"refId": "C",
"step": 4
},
{
"expr": "sum(delta(pd_scheduler_hot_region_direction{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", store=~\"$store\",type=\"move-leader\",direction=\"in\",rw=\"write\"}[1m]))by (store)",
"format": "time_series",
"intervalFactor": 1,
"legendFormat": "store-{{store}}-in",
"refId": "D"
}
],
"thresholds": [],
Expand Down
17 changes: 12 additions & 5 deletions pkg/schedule/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -1457,6 +1457,7 @@ func (bs *balanceSolver) buildOperators() (ops []*operator.Operator) {
targetLabel := strconv.FormatUint(dstStoreID, 10)
dim := bs.rankToDimString()

<<<<<<< HEAD

Check failure on line 1460 in pkg/schedule/schedulers/hot_region.go

View workflow job for this annotation

GitHub Actions / tso-function-test

syntax error: unexpected <<, expected }
var createOperator func(region *core.RegionInfo, srcStoreID, dstStoreID uint64) (op *operator.Operator, typ string, err error)
switch bs.rwTy {

Check failure on line 1462 in pkg/schedule/schedulers/hot_region.go

View workflow job for this annotation

GitHub Actions / tso-function-test

syntax error: non-declaration statement outside function body
case statistics.Read:
Expand All @@ -1466,11 +1467,14 @@ func (bs *balanceSolver) buildOperators() (ops []*operator.Operator) {
}

currentOp, typ, err := createOperator(bs.cur.region, srcStoreID, dstStoreID)
=======
currentOp, typ, err := bs.createOperator(bs.cur.region, srcStoreID, dstStoreID)
>>>>>>> 33ae3b614 (scheduler: use move-hot-write-leader operator (#7852))

Check failure on line 1472 in pkg/schedule/schedulers/hot_region.go

View workflow job for this annotation

GitHub Actions / tso-function-test

invalid character U+0023 '#'
if err == nil {
bs.decorateOperator(currentOp, false, sourceLabel, targetLabel, typ, dim)
ops = []*operator.Operator{currentOp}
if bs.cur.revertRegion != nil {
currentOp, typ, err = createOperator(bs.cur.revertRegion, dstStoreID, srcStoreID)
currentOp, typ, err = bs.createOperator(bs.cur.revertRegion, dstStoreID, srcStoreID)
if err == nil {
bs.decorateOperator(currentOp, true, targetLabel, sourceLabel, typ, dim)
ops = append(ops, currentOp)
Expand Down Expand Up @@ -1545,11 +1549,11 @@ func (bs *balanceSolver) createSplitOperator(regions []*core.RegionInfo) []*oper
return operators
}

func (bs *balanceSolver) createReadOperator(region *core.RegionInfo, srcStoreID, dstStoreID uint64) (op *operator.Operator, typ string, err error) {
func (bs *balanceSolver) createOperator(region *core.RegionInfo, srcStoreID, dstStoreID uint64) (op *operator.Operator, typ string, err error) {
if region.GetStorePeer(dstStoreID) != nil {
typ = "transfer-leader"
op, err = operator.CreateTransferLeaderOperator(
"transfer-hot-read-leader",
"transfer-hot-"+bs.rwTy.String()+"-leader",
bs,
region,
srcStoreID,
Expand All @@ -1562,7 +1566,7 @@ func (bs *balanceSolver) createReadOperator(region *core.RegionInfo, srcStoreID,
if region.GetLeader().GetStoreId() == srcStoreID {
typ = "move-leader"
op, err = operator.CreateMoveLeaderOperator(
"move-hot-read-leader",
"move-hot-"+bs.rwTy.String()+"-leader",
bs,
region,
operator.OpHotRegion,
Expand All @@ -1571,7 +1575,7 @@ func (bs *balanceSolver) createReadOperator(region *core.RegionInfo, srcStoreID,
} else {
typ = "move-peer"
op, err = operator.CreateMovePeerOperator(
"move-hot-read-peer",
"move-hot-"+bs.rwTy.String()+"-peer",
bs,
region,
operator.OpHotRegion,
Expand All @@ -1582,6 +1586,7 @@ func (bs *balanceSolver) createReadOperator(region *core.RegionInfo, srcStoreID,
return
}

<<<<<<< HEAD

Check failure on line 1589 in pkg/schedule/schedulers/hot_region.go

View workflow job for this annotation

GitHub Actions / tso-function-test

syntax error: non-declaration statement outside function body
func (bs *balanceSolver) createWriteOperator(region *core.RegionInfo, srcStoreID, dstStoreID uint64) (op *operator.Operator, typ string, err error) {
if region.GetStorePeer(dstStoreID) != nil {
typ = "transfer-leader"
Expand All @@ -1608,6 +1613,8 @@ func (bs *balanceSolver) createWriteOperator(region *core.RegionInfo, srcStoreID
return
}

=======

Check failure on line 1616 in pkg/schedule/schedulers/hot_region.go

View workflow job for this annotation

GitHub Actions / tso-function-test

syntax error: non-declaration statement outside function body
>>>>>>> 33ae3b614 (scheduler: use move-hot-write-leader operator (#7852))

Check failure on line 1617 in pkg/schedule/schedulers/hot_region.go

View workflow job for this annotation

GitHub Actions / tso-function-test

invalid character U+0023 '#'
func (bs *balanceSolver) decorateOperator(op *operator.Operator, isRevert bool, sourceLabel, targetLabel, typ, dim string) {
op.SetPriorityLevel(constant.High)
op.FinishedCounters = append(op.FinishedCounters,
Expand Down
204 changes: 195 additions & 9 deletions pkg/schedule/schedulers/hot_region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ func clearPendingInfluence(h *hotScheduler) {
h.regionPendings = make(map[uint64]*pendingInfluence)
}

func newTestRegion(id uint64) *core.RegionInfo {
peers := []*metapb.Peer{{Id: id*100 + 1, StoreId: 1}, {Id: id*100 + 2, StoreId: 2}, {Id: id*100 + 3, StoreId: 3}}
return core.NewRegionInfo(&metapb.Region{Id: id, Peers: peers}, peers[0])
}

func TestUpgrade(t *testing.T) {
re := require.New(t)
cancel, _, _, oc := prepareSchedulersTest()
Expand Down Expand Up @@ -189,6 +194,7 @@ func checkGCPendingOpInfos(re *require.Assertions, enablePlacementRules bool) {
}
}

<<<<<<< HEAD

Check failure on line 197 in pkg/schedule/schedulers/hot_region_test.go

View workflow job for this annotation

GitHub Actions / chunks (3)

expected declaration, found '<<'
func newTestRegion(id uint64) *core.RegionInfo {
peers := []*metapb.Peer{{Id: id*100 + 1, StoreId: 1}, {Id: id*100 + 2, StoreId: 2}, {Id: id*100 + 3, StoreId: 3}}
return core.NewRegionInfo(&metapb.Region{Id: id, Peers: peers}, peers[0])
Expand All @@ -204,6 +210,9 @@ func TestHotWriteRegionScheduleByteRateOnly(t *testing.T) {
}

func TestSplitBuckets(t *testing.T) {
=======
func TestSplitIfRegionTooHot(t *testing.T) {
>>>>>>> 33ae3b614 (scheduler: use move-hot-write-leader operator (#7852))
re := require.New(t)
statistics.Denoising = false
cancel, _, tc, oc := prepareSchedulersTest()
Expand Down Expand Up @@ -239,15 +248,184 @@ func TestSplitBuckets(t *testing.T) {
expectKeys := [][]byte{[]byte("a"), []byte("c"), []byte("d"), []byte("f")}
expectOp, err := operator.CreateSplitRegionOperator(splitBucket, region, operator.OpSplit, pdpb.CheckPolicy_USEKEY, expectKeys)
re.NoError(err)
<<<<<<< HEAD
expectOp.GetCreateTime()
re.Equal(expectOp.Brief(), op.Brief())
re.Equal(expectOp.GetAdditionalInfo(), op.GetAdditionalInfo())
=======
solve := newBalanceSolver(hb.(*hotScheduler), tc, utils.Read, transferLeader)
solve.cur = &solution{}
region := core.NewTestRegionInfo(1, 1, []byte("a"), []byte("f"))

testdata := []struct {
hotBuckets [][]byte
splitKeys [][]byte
}{
{
[][]byte{[]byte("a"), []byte("b"), []byte("f")},
[][]byte{[]byte("b")},
},
{
[][]byte{[]byte(""), []byte("a"), []byte("")},
nil,
},
{
[][]byte{},
nil,
},
}

for _, data := range testdata {
b := &metapb.Buckets{
RegionId: 1,
PeriodInMs: 1000,
Keys: data.hotBuckets,
}
region.UpdateBuckets(b, region.GetBuckets())
ops := solve.createSplitOperator([]*core.RegionInfo{region}, bySize)
if data.splitKeys == nil {
re.Empty(ops)
continue
}
re.Len(ops, 1)
op := ops[0]
re.Equal(splitHotReadBuckets, op.Desc())

expectOp, err := operator.CreateSplitRegionOperator(splitHotReadBuckets, region, operator.OpSplit, pdpb.CheckPolicy_USEKEY, data.splitKeys)
re.NoError(err)
re.Equal(expectOp.Brief(), op.Brief())
}
}

func TestSplitBucketsByLoad(t *testing.T) {
re := require.New(t)
statistics.Denoising = false
cancel, _, tc, oc := prepareSchedulersTest()
tc.SetHotRegionCacheHitsThreshold(1)
tc.SetRegionBucketEnabled(true)
defer cancel()
hb, err := CreateScheduler(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil)
re.NoError(err)
solve := newBalanceSolver(hb.(*hotScheduler), tc, utils.Read, transferLeader)
solve.cur = &solution{}
region := core.NewTestRegionInfo(1, 1, []byte("a"), []byte("f"))
testdata := []struct {
hotBuckets [][]byte
splitKeys [][]byte
}{
{
[][]byte{[]byte(""), []byte("b"), []byte("")},
[][]byte{[]byte("b")},
},
{
[][]byte{[]byte(""), []byte("a"), []byte("")},
nil,
},
{
[][]byte{[]byte("b"), []byte("c"), []byte("")},
[][]byte{[]byte("c")},
},
}
for _, data := range testdata {
b := &metapb.Buckets{
RegionId: 1,
PeriodInMs: 1000,
Keys: data.hotBuckets,
Stats: &metapb.BucketStats{
ReadBytes: []uint64{10 * units.KiB, 10 * units.MiB},
ReadKeys: []uint64{256, 256},
ReadQps: []uint64{0, 0},
WriteBytes: []uint64{0, 0},
WriteQps: []uint64{0, 0},
WriteKeys: []uint64{0, 0},
},
}
task := buckets.NewCheckPeerTask(b)
re.True(tc.HotBucketCache.CheckAsync(task))
time.Sleep(time.Millisecond * 10)
ops := solve.createSplitOperator([]*core.RegionInfo{region}, byLoad)
if data.splitKeys == nil {
re.Empty(ops)
continue
}
re.Len(ops, 1)
op := ops[0]
re.Equal(splitHotReadBuckets, op.Desc())

expectOp, err := operator.CreateSplitRegionOperator(splitHotReadBuckets, region, operator.OpSplit, pdpb.CheckPolicy_USEKEY, data.splitKeys)
re.NoError(err)
re.Equal(expectOp.Brief(), op.Brief())
}
}

func TestHotWriteRegionScheduleByteRateOnly(t *testing.T) {
re := require.New(t)
statistics.Denoising = false
statisticsInterval = 0
checkHotWriteRegionScheduleByteRateOnly(re, false /* disable placement rules */)
checkHotWriteRegionScheduleByteRateOnly(re, true /* enable placement rules */)
checkHotWriteRegionPlacement(re, true)
}

func checkHotWriteRegionPlacement(re *require.Assertions, enablePlacementRules bool) {
cancel, _, tc, oc := prepareSchedulersTest()
defer cancel()
tc.SetEnableUseJointConsensus(true)
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.ConfChangeV2))
tc.SetEnablePlacementRules(enablePlacementRules)
labels := []string{"zone", "host"}
tc.SetMaxReplicasWithLabel(enablePlacementRules, 3, labels...)
hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil)
re.NoError(err)
hb.(*hotScheduler).conf.SetHistorySampleDuration(0)
tc.SetHotRegionCacheHitsThreshold(0)

tc.AddLabelsStore(1, 2, map[string]string{"zone": "z1", "host": "h1"})
tc.AddLabelsStore(2, 2, map[string]string{"zone": "z1", "host": "h2"})
tc.AddLabelsStore(3, 2, map[string]string{"zone": "z2", "host": "h3"})
tc.AddLabelsStore(4, 2, map[string]string{"zone": "z2", "host": "h4"})
tc.AddLabelsStore(5, 2, map[string]string{"zone": "z2", "host": "h5"})
tc.AddLabelsStore(6, 2, map[string]string{"zone": "z2", "host": "h6"})
tc.RuleManager.SetRule(&placement.Rule{
GroupID: "pd", ID: "leader", Role: placement.Leader, Count: 1, LabelConstraints: []placement.LabelConstraint{{Key: "zone", Op: "in", Values: []string{"z1"}}},
})
tc.RuleManager.SetRule(&placement.Rule{
GroupID: "pd", ID: "voter", Role: placement.Follower, Count: 2, LabelConstraints: []placement.LabelConstraint{{Key: "zone", Op: "in", Values: []string{"z2"}}},
})
tc.RuleManager.DeleteRule("pd", "default")

tc.UpdateStorageWrittenBytes(1, 10*units.MiB*utils.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenBytes(2, 0)
tc.UpdateStorageWrittenBytes(3, 6*units.MiB*utils.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenBytes(4, 3*units.MiB*utils.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenBytes(5, 3*units.MiB*utils.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenBytes(6, 6*units.MiB*utils.StoreHeartBeatReportInterval)

// Region 1, 2 and 3 are hot regions.
addRegionInfo(tc, utils.Write, []testRegionInfo{
{1, []uint64{1, 3, 5}, 512 * units.KiB, 0, 0},
{2, []uint64{1, 4, 6}, 512 * units.KiB, 0, 0},
{3, []uint64{1, 3, 6}, 512 * units.KiB, 0, 0},
})
ops, _ := hb.Schedule(tc, false)
re.NotEmpty(ops)
re.NotContains(ops[0].Step(1).String(), "transfer leader")
clearPendingInfluence(hb.(*hotScheduler))

tc.RuleManager.SetRule(&placement.Rule{
GroupID: "pd", ID: "voter", Role: placement.Voter, Count: 2, LabelConstraints: []placement.LabelConstraint{{Key: "zone", Op: "in", Values: []string{"z2"}}},
})
tc.RuleManager.DeleteRule("pd", "follower")
ops, _ = hb.Schedule(tc, false)
re.NotEmpty(ops)
re.NotContains(ops[0].Step(1).String(), "transfer leader")
>>>>>>> 33ae3b614 (scheduler: use move-hot-write-leader operator (#7852))
}

func checkHotWriteRegionScheduleByteRateOnly(re *require.Assertions, enablePlacementRules bool) {
cancel, opt, tc, oc := prepareSchedulersTest()
defer cancel()
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0))
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.ConfChangeV2))
tc.SetEnablePlacementRules(enablePlacementRules)
labels := []string{"zone", "host"}
tc.SetMaxReplicasWithLabel(enablePlacementRules, 3, labels...)
Expand Down Expand Up @@ -304,12 +482,14 @@ func checkHotWriteRegionScheduleByteRateOnly(re *require.Assertions, enablePlace
switch op.Len() {
case 1:
// balance by leader selected
re.Equal("transfer-hot-write-leader", op.Desc())
operatorutil.CheckTransferLeaderFrom(re, op, operator.OpHotRegion, 1)
case 4:
case 5:
// balance by peer selected
re.Equal("move-hot-write-leader", op.Desc())
if op.RegionID() == 2 {
// peer in store 1 of the region 2 can transfer to store 5 or store 6 because of the label
operatorutil.CheckTransferPeerWithLeaderTransferFrom(re, op, operator.OpHotRegion, 1)
operatorutil.CheckTransferPeerWithLeaderTransfer(re, op, operator.OpHotRegion, 1, 0)
} else {
// peer in store 1 of the region 1,3 can only transfer to store 6
operatorutil.CheckTransferPeerWithLeaderTransfer(re, op, operator.OpHotRegion, 1, 6)
Expand All @@ -329,10 +509,10 @@ func checkHotWriteRegionScheduleByteRateOnly(re *require.Assertions, enablePlace
ops, _ := hb.Schedule(tc, false)
op := ops[0]
clearPendingInfluence(hb.(*hotScheduler))
re.Equal(4, op.Len())
re.Equal(5, op.Len())
if op.RegionID() == 2 {
// peer in store 1 of the region 2 can transfer to store 5 or store 6 because of the label
operatorutil.CheckTransferPeerWithLeaderTransferFrom(re, op, operator.OpHotRegion, 1)
operatorutil.CheckTransferPeerWithLeaderTransfer(re, op, operator.OpHotRegion, 1, 0)
} else {
// peer in store 1 of the region 1,3 can only transfer to store 6
operatorutil.CheckTransferPeerWithLeaderTransfer(re, op, operator.OpHotRegion, 1, 6)
Expand Down Expand Up @@ -429,7 +609,7 @@ func TestHotWriteRegionScheduleByteRateOnlyWithTiFlash(t *testing.T) {
statisticsInterval = 0
cancel, _, tc, oc := prepareSchedulersTest()
defer cancel()
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0))
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.ConfChangeV2))
tc.SetHotRegionCacheHitsThreshold(0)
re.NoError(tc.RuleManager.SetRules([]*placement.Rule{
{
Expand Down Expand Up @@ -522,9 +702,11 @@ func TestHotWriteRegionScheduleByteRateOnlyWithTiFlash(t *testing.T) {
switch op.Len() {
case 1:
// balance by leader selected
re.Equal("transfer-hot-write-leader", op.Desc())
operatorutil.CheckTransferLeaderFrom(re, op, operator.OpHotRegion, 1)
case 2:
// balance by peer selected
re.Equal("move-hot-write-leader", op.Desc())
operatorutil.CheckTransferLearner(re, op, operator.OpHotRegion, 8, 10)
default:
re.FailNow("wrong op: " + op.String())
Expand Down Expand Up @@ -615,12 +797,14 @@ func TestHotWriteRegionScheduleByteRateOnlyWithTiFlash(t *testing.T) {
switch op.Len() {
case 1:
// balance by leader selected
re.Equal("transfer-hot-write-leader", op.Desc())
operatorutil.CheckTransferLeaderFrom(re, op, operator.OpHotRegion, 1)
case 4:
case 5:
// balance by peer selected
re.Equal("move-hot-write-leader", op.Desc())
if op.RegionID() == 2 {
// peer in store 1 of the region 2 can transfer to store 5 or store 6 because of the label
operatorutil.CheckTransferPeerWithLeaderTransferFrom(re, op, operator.OpHotRegion, 1)
operatorutil.CheckTransferPeerWithLeaderTransfer(re, op, operator.OpHotRegion, 1, 0)
} else {
// peer in store 1 of the region 1,3 can only transfer to store 6
operatorutil.CheckTransferPeerWithLeaderTransfer(re, op, operator.OpHotRegion, 1, 6)
Expand Down Expand Up @@ -952,9 +1136,11 @@ func checkHotWriteRegionScheduleWithPendingInfluence(re *require.Assertions, dim
switch op.Len() {
case 1:
// balance by leader selected
re.Equal("transfer-hot-write-leader", op.Desc())
operatorutil.CheckTransferLeaderFrom(re, op, operator.OpHotRegion, 1)
case 4:
case 5:
// balance by peer selected
re.Equal("move-hot-write-leader", op.Desc())
operatorutil.CheckTransferPeerWithLeaderTransfer(re, op, operator.OpHotRegion, 1, 4)
cnt++
if cnt == 3 {
Expand Down
Loading

0 comments on commit 25469b5

Please sign in to comment.