Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Dec 4, 2024
1 parent a53e150 commit 0bc6e36
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 104 deletions.
6 changes: 4 additions & 2 deletions pkg/schedule/checker/checker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/utils/keyutil"
"github.com/tikv/pd/pkg/utils/logutil"
"go.uber.org/zap"
Expand Down Expand Up @@ -93,8 +92,11 @@ type Controller struct {
}

// NewController create a new Controller.
func NewController(ctx context.Context, cluster sche.CheckerCluster, conf config.CheckerConfigProvider, ruleManager *placement.RuleManager, labeler *labeler.RegionLabeler, opController *operator.Controller, prepareChecker *sche.PrepareChecker) *Controller {
func NewController(ctx context.Context, cluster sche.CheckerCluster, opController *operator.Controller, prepareChecker *sche.PrepareChecker) *Controller {
pendingProcessedRegions := cache.NewIDTTL(ctx, time.Minute, 3*time.Minute)
conf := cluster.GetCheckerConfig()
ruleManager := cluster.GetRuleManager()
labeler := cluster.GetRegionLabeler()
c := &Controller{
ctx: ctx,
cluster: cluster,
Expand Down
7 changes: 1 addition & 6 deletions pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func NewCoordinator(parentCtx context.Context, cluster sche.ClusterInformer, hbS
prepareChecker := sche.NewPrepareChecker()
opController := operator.NewController(ctx, cluster.GetBasicCluster(), cluster.GetSharedConfig(), hbStreams)
schedulers := schedulers.NewController(ctx, cluster, cluster.GetStorage(), opController, prepareChecker)
checkers := checker.NewController(ctx, cluster, cluster.GetCheckerConfig(), cluster.GetRuleManager(), cluster.GetRegionLabeler(), opController, prepareChecker)
checkers := checker.NewController(ctx, cluster, opController, prepareChecker)
return &Coordinator{
ctx: ctx,
cancel: cancel,
Expand Down Expand Up @@ -533,11 +533,6 @@ func ResetHotSpotMetrics() {
schedulers.HotPendingSum.Reset()
}

// ShouldRun returns true if the coordinator should run.
func (c *Coordinator) ShouldRun() bool {
return c.prepareChecker.Check(c.cluster.GetBasicCluster())
}

// GetSchedulersController returns the schedulers controller.
func (c *Coordinator) GetSchedulersController() *schedulers.Controller {
return c.schedulers
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/core/cluster_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ type SchedulerCluster interface {
buckets.BucketStatInformer

GetSchedulerConfig() sc.SchedulerConfigProvider
GetRegionLabeler() *labeler.RegionLabeler
GetStoreConfig() sc.StoreConfigProvider
}

Expand All @@ -61,6 +60,7 @@ type SharedCluster interface {
GetBasicCluster() *core.BasicCluster
GetSharedConfig() sc.SharedConfigProvider
GetRuleManager() *placement.RuleManager
GetRegionLabeler() *labeler.RegionLabeler
AllocID() (uint64, error)
IsSchedulingHalted() bool
}
Expand Down
93 changes: 0 additions & 93 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3067,99 +3067,6 @@ func TestPeerState(t *testing.T) {
waitNoResponse(re, stream)
}

func TestShouldRun(t *testing.T) {
re := require.New(t)

tc, co, cleanup := prepare(nil, nil, nil, re)
tc.RaftCluster.coordinator = co
defer cleanup()

re.NoError(tc.addLeaderStore(1, 5))
re.NoError(tc.addLeaderStore(2, 2))
re.NoError(tc.addLeaderStore(3, 0))
re.NoError(tc.addLeaderStore(4, 0))
re.NoError(tc.LoadRegion(1, 1, 2, 3))
re.NoError(tc.LoadRegion(2, 1, 2, 3))
re.NoError(tc.LoadRegion(3, 1, 2, 3))
re.NoError(tc.LoadRegion(4, 1, 2, 3))
re.NoError(tc.LoadRegion(5, 1, 2, 3))
re.NoError(tc.LoadRegion(6, 2, 1, 4))
re.NoError(tc.LoadRegion(7, 2, 1, 4))
re.False(co.ShouldRun())
re.Equal(2, tc.GetStoreRegionCount(4))

testCases := []struct {
regionID uint64
ShouldRun bool
}{
{1, false},
{2, false},
{3, false},
{4, false},
{5, false},
// store4 needs Collect two region
{6, false},
{7, true},
}

for _, testCase := range testCases {
r := tc.GetRegion(testCase.regionID)
nr := r.Clone(core.WithLeader(r.GetPeers()[0]), core.SetSource(core.Heartbeat))
re.NoError(tc.processRegionHeartbeat(core.ContextTODO(), nr))
re.Equal(testCase.ShouldRun, co.ShouldRun())
}
nr := &metapb.Region{Id: 6, Peers: []*metapb.Peer{}}
newRegion := core.NewRegionInfo(nr, nil, core.SetSource(core.Heartbeat))
re.Error(tc.processRegionHeartbeat(core.ContextTODO(), newRegion))
re.Equal(7, tc.GetClusterNotFromStorageRegionsCnt())
}

func TestShouldRunWithNonLeaderRegions(t *testing.T) {
re := require.New(t)

tc, co, cleanup := prepare(nil, nil, nil, re)
tc.RaftCluster.coordinator = co
defer cleanup()

re.NoError(tc.addLeaderStore(1, 10))
re.NoError(tc.addLeaderStore(2, 0))
re.NoError(tc.addLeaderStore(3, 0))
for i := range 10 {
re.NoError(tc.LoadRegion(uint64(i+1), 1, 2, 3))
}
re.False(co.ShouldRun())
re.Equal(10, tc.GetStoreRegionCount(1))

testCases := []struct {
regionID uint64
ShouldRun bool
}{
{1, false},
{2, false},
{3, false},
{4, false},
{5, false},
{6, false},
{7, false},
{8, false},
{9, true},
}

for _, testCase := range testCases {
r := tc.GetRegion(testCase.regionID)
nr := r.Clone(core.WithLeader(r.GetPeers()[0]), core.SetSource(core.Heartbeat))
re.NoError(tc.processRegionHeartbeat(core.ContextTODO(), nr))
re.Equal(testCase.ShouldRun, co.ShouldRun())
}
nr := &metapb.Region{Id: 9, Peers: []*metapb.Peer{}}
newRegion := core.NewRegionInfo(nr, nil, core.SetSource(core.Heartbeat))
re.Error(tc.processRegionHeartbeat(core.ContextTODO(), newRegion))
re.Equal(9, tc.GetClusterNotFromStorageRegionsCnt())

// Now, after server is prepared, there exist some regions with no leader.
re.Equal(uint64(0), tc.GetRegion(10).GetLeader().GetStoreId())
}

func TestAddScheduler(t *testing.T) {
re := require.New(t)

Expand Down
3 changes: 3 additions & 0 deletions tests/integrations/mcs/scheduling/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/tikv/pd/pkg/cache"
Expand All @@ -36,6 +37,7 @@ import (
"github.com/tikv/pd/pkg/versioninfo"
"github.com/tikv/pd/tests"
"github.com/tikv/pd/tests/server/api"
"go.uber.org/zap"
)

type configTestSuite struct {
Expand Down Expand Up @@ -133,6 +135,7 @@ func (suite *configTestSuite) TestConfigWatch() {
// Manually trigger the config persistence in the PD API server side.
func persistConfig(re *require.Assertions, pdLeaderServer *tests.TestServer) {
err := pdLeaderServer.GetPersistOptions().Persist(pdLeaderServer.GetServer().GetStorage())
log.Info("persistConfig", zap.Reflect("opts", pdLeaderServer.GetPersistOptions()))
re.NoError(err)
}

Expand Down
11 changes: 9 additions & 2 deletions tools/pd-ctl/tests/store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/tikv/pd/pkg/response"
"github.com/tikv/pd/pkg/statistics/utils"
"github.com/tikv/pd/pkg/utils/grpcutil"
"github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/server/config"
pdTests "github.com/tikv/pd/tests"
ctl "github.com/tikv/pd/tools/pd-ctl/pdctl"
Expand All @@ -54,13 +55,19 @@ func TestStoreLimitV2(t *testing.T) {
re.NoError(leaderServer.BootstrapCluster())
defer cluster.Destroy()

// TODO: fix https://github.com/tikv/pd/issues/7464
testutil.Eventually(re, func() bool {
return leaderServer.GetRaftCluster().GetCoordinator().AreSchedulersInitialized()
})

// store command
args := []string{"-u", pdAddr, "config", "set", "store-limit-version", "v2"}
_, err = tests.ExecuteCommand(cmd, args...)
output, err := tests.ExecuteCommand(cmd, args...)
re.NoError(err)
re.Contains(string(output), "Success")

args = []string{"-u", pdAddr, "store", "limit"}
output, err := tests.ExecuteCommand(cmd, args...)
output, err = tests.ExecuteCommand(cmd, args...)
re.NoError(err)
re.Contains(string(output), "not support get limit")

Expand Down

0 comments on commit 0bc6e36

Please sign in to comment.