diff --git a/server/cluster/coordinator.go b/server/cluster/coordinator.go index 4f4d5df8510..4b58b60f577 100644 --- a/server/cluster/coordinator.go +++ b/server/cluster/coordinator.go @@ -647,6 +647,10 @@ func (c *coordinator) addScheduler(scheduler schedule.Scheduler, args ...string) c.wg.Add(1) go c.runScheduler(s) c.schedulers[s.GetName()] = s + if err := schedule.SaveSchedulerConfig(c.cluster.storage, scheduler); err != nil { + log.Error("can not save scheduler config", zap.String("scheduler-name", scheduler.GetName()), errs.ZapError(err)) + return err + } c.cluster.opt.AddSchedulerCfg(s.GetType(), args) return nil } diff --git a/server/cluster/coordinator_test.go b/server/cluster/coordinator_test.go index b21a7910a98..b7cb70f3682 100644 --- a/server/cluster/coordinator_test.go +++ b/server/cluster/coordinator_test.go @@ -826,8 +826,9 @@ func TestPersistScheduler(t *testing.T) { // whether the schedulers added or removed in dynamic way are recorded in opt _, newOpt, err := newTestScheduleConfig() re.NoError(err) - _, err = schedule.CreateScheduler(schedulers.ShuffleRegionType, oc, storage, schedule.ConfigJSONDecoder([]byte("null"))) + shuffle, err := schedule.CreateScheduler(schedulers.ShuffleRegionType, oc, storage, schedule.ConfigJSONDecoder([]byte("null"))) re.NoError(err) + re.NoError(co.addScheduler(shuffle)) // suppose we add a new default enable scheduler config.DefaultSchedulers = append(config.DefaultSchedulers, config.SchedulerConfig{Type: "shuffle-region"}) defer func() { diff --git a/server/schedule/scheduler.go b/server/schedule/scheduler.go index 61c3d15fef1..e41fa4b8f4f 100644 --- a/server/schedule/scheduler.go +++ b/server/schedule/scheduler.go @@ -118,17 +118,16 @@ func CreateScheduler(typ string, opController *OperatorController, storage endpo if !ok { return nil, errs.ErrSchedulerCreateFuncNotRegistered.FastGenByArgs(typ) } + return fn(opController, storage, dec) +} - s, err := fn(opController, storage, dec) - if err != nil { - return nil, err - } +// SaveSchedulerConfig saves the config of the specified scheduler. +func SaveSchedulerConfig(storage endpoint.ConfigStorage, s Scheduler) error { data, err := s.EncodeConfig() if err != nil { - return nil, err + return err } - err = storage.SaveScheduleConfig(s.GetName(), data) - return s, err + return storage.SaveScheduleConfig(s.GetName(), data) } // FindSchedulerTypeByName finds the type of the specified name. diff --git a/server/schedulers/evict_leader.go b/server/schedulers/evict_leader.go index c13c4eb4c70..716b848b402 100644 --- a/server/schedulers/evict_leader.go +++ b/server/schedulers/evict_leader.go @@ -205,7 +205,7 @@ func newEvictLeaderScheduler(opController *schedule.OperatorController, conf *ev } } -// EvictStores returns the IDs of the evict-stores. +// EvictStoreIDs returns the IDs of the evict-stores. func (s *evictLeaderScheduler) EvictStoreIDs() []uint64 { return s.conf.getStores() } diff --git a/tests/server/api/testutil.go b/tests/server/api/testutil.go new file mode 100644 index 00000000000..9e19c401c2f --- /dev/null +++ b/tests/server/api/testutil.go @@ -0,0 +1,58 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + + "github.com/stretchr/testify/require" +) + +const schedulersPrefix = "/pd/api/v1/schedulers" + +// dialClient used to dial http request. +var dialClient = &http.Client{ + Transport: &http.Transport{ + DisableKeepAlives: true, + }, +} + +// MustAddScheduler adds a scheduler with HTTP API. +func MustAddScheduler( + re *require.Assertions, serverAddr string, + schedulerName string, args map[string]interface{}, +) { + request := map[string]interface{}{ + "name": schedulerName, + } + for arg, val := range args { + request[arg] = val + } + data, err := json.Marshal(request) + re.NoError(err) + httpReq, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s%s", serverAddr, schedulersPrefix), bytes.NewBuffer(data)) + re.NoError(err) + // Send request. + resp, err := dialClient.Do(httpReq) + re.NoError(err) + defer resp.Body.Close() + data, err = io.ReadAll(resp.Body) + re.NoError(err) + re.Equal(http.StatusOK, resp.StatusCode, string(data)) +} diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index c4614e27b51..73bf2fb32d3 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -43,9 +43,11 @@ import ( "github.com/tikv/pd/server/id" syncer "github.com/tikv/pd/server/region_syncer" "github.com/tikv/pd/server/schedule/operator" + "github.com/tikv/pd/server/schedulers" "github.com/tikv/pd/server/storage" "github.com/tikv/pd/server/tso" "github.com/tikv/pd/tests" + "github.com/tikv/pd/tests/server/api" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -1280,6 +1282,126 @@ func TestStaleTermHeartbeat(t *testing.T) { re.NoError(err) } +func TestTransferLeaderForScheduler(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/changeCoordinatorTicker", `return(true)`)) + tc, err := tests.NewTestCluster(ctx, 2) + defer tc.Destroy() + re.NoError(err) + err = tc.RunInitialServers() + re.NoError(err) + tc.WaitLeader() + // start + leaderServer := tc.GetServer(tc.GetLeader()) + re.NoError(leaderServer.BootstrapCluster()) + rc := leaderServer.GetServer().GetRaftCluster() + re.NotNil(rc) + + storesNum := 2 + grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr()) + for i := 1; i <= storesNum; i++ { + store := &metapb.Store{ + Id: uint64(i), + Address: "127.0.0.1:" + strconv.Itoa(i), + } + resp, err := putStore(grpcPDClient, leaderServer.GetClusterID(), store) + re.NoError(err) + re.Equal(pdpb.ErrorType_OK, resp.GetHeader().GetError().GetType()) + } + // region heartbeat + id := leaderServer.GetAllocator() + putRegionWithLeader(re, rc, id, 1) + + time.Sleep(time.Second) + re.True(leaderServer.GetRaftCluster().IsPrepared()) + // Add evict leader scheduler + api.MustAddScheduler(re, leaderServer.GetAddr(), schedulers.EvictLeaderName, map[string]interface{}{ + "store_id": 1, + }) + api.MustAddScheduler(re, leaderServer.GetAddr(), schedulers.EvictLeaderName, map[string]interface{}{ + "store_id": 2, + }) + // Check scheduler updated. + re.Len(rc.GetSchedulers(), 5) + checkEvictLeaderSchedulerExist(re, rc, true) + checkEvictLeaderStoreIDs(re, rc, []uint64{1, 2}) + + // transfer PD leader to another PD + tc.ResignLeader() + rc.Stop() + tc.WaitLeader() + leaderServer = tc.GetServer(tc.GetLeader()) + rc1 := leaderServer.GetServer().GetRaftCluster() + rc1.Start(leaderServer.GetServer()) + re.NoError(err) + re.NotNil(rc1) + // region heartbeat + id = leaderServer.GetAllocator() + putRegionWithLeader(re, rc1, id, 1) + time.Sleep(time.Second) + re.True(leaderServer.GetRaftCluster().IsPrepared()) + // Check scheduler updated. + re.Len(rc1.GetSchedulers(), 5) + checkEvictLeaderSchedulerExist(re, rc, true) + checkEvictLeaderStoreIDs(re, rc, []uint64{1, 2}) + + // transfer PD leader back to the previous PD + tc.ResignLeader() + rc1.Stop() + tc.WaitLeader() + leaderServer = tc.GetServer(tc.GetLeader()) + rc = leaderServer.GetServer().GetRaftCluster() + rc.Start(leaderServer.GetServer()) + re.NotNil(rc) + // region heartbeat + id = leaderServer.GetAllocator() + putRegionWithLeader(re, rc, id, 1) + time.Sleep(time.Second) + re.True(leaderServer.GetRaftCluster().IsPrepared()) + // Check scheduler updated + re.Len(rc.GetSchedulers(), 5) + checkEvictLeaderSchedulerExist(re, rc, true) + checkEvictLeaderStoreIDs(re, rc, []uint64{1, 2}) + + re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/changeCoordinatorTicker")) +} + +func checkEvictLeaderSchedulerExist(re *require.Assertions, rc *cluster.RaftCluster, exist bool) { + isExistScheduler := func(rc *cluster.RaftCluster, name string) bool { + s := rc.GetSchedulers() + for _, scheduler := range s { + if scheduler == name { + return true + } + } + return false + } + + testutil.Eventually(re, func() bool { + if !exist { + return !isExistScheduler(rc, schedulers.EvictLeaderName) + } + return isExistScheduler(rc, schedulers.EvictLeaderName) + }) +} + +func checkEvictLeaderStoreIDs(re *require.Assertions, rc *cluster.RaftCluster, expected []uint64) { + handler, ok := rc.GetSchedulerHandlers()[schedulers.EvictLeaderName] + re.True(ok) + h, ok := handler.(interface { + EvictStoreIDs() []uint64 + }) + re.True(ok) + var evictStoreIDs []uint64 + testutil.Eventually(re, func() bool { + evictStoreIDs = h.EvictStoreIDs() + return len(evictStoreIDs) == len(expected) + }) + re.ElementsMatch(evictStoreIDs, expected) +} + func putRegionWithLeader(re *require.Assertions, rc *cluster.RaftCluster, id id.Allocator, storeID uint64) { for i := 0; i < 3; i++ { regionID, err := id.Alloc()